diff --git a/src/app/(payload)/api/youtube/upload/route.ts b/src/app/(payload)/api/youtube/upload/route.ts new file mode 100644 index 0000000..485ca54 --- /dev/null +++ b/src/app/(payload)/api/youtube/upload/route.ts @@ -0,0 +1,138 @@ +// src/app/(payload)/api/youtube/upload/route.ts + +import { getPayload } from 'payload' +import config from '@payload-config' +import { NextRequest, NextResponse } from 'next/server' +import { + enqueueYouTubeUpload, + getYouTubeUploadJobStatus, +} from '@/lib/queue/jobs/youtube-upload-job' + +/** + * POST /api/youtube/upload + * + * Erstellt einen YouTube-Upload-Job fuer ein YouTubeContent-Dokument. + * Erwartet { contentId } im Request-Body. + */ +export async function POST(request: NextRequest): Promise { + try { + const payload = await getPayload({ config }) + + // Auth pruefen + const { user } = await payload.auth({ headers: request.headers }) + if (!user) { + return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) + } + + const body = await request.json() + const { contentId } = body + + if (!contentId) { + return NextResponse.json({ error: 'contentId required' }, { status: 400 }) + } + + // YouTubeContent laden + const content = await payload.findByID({ + collection: 'youtube-content', + id: contentId, + depth: 1, + }) + + if (!content) { + return NextResponse.json({ error: 'Content not found' }, { status: 404 }) + } + + const doc = content as Record + + if (!doc.videoFile) { + return NextResponse.json({ error: 'No video file attached' }, { status: 400 }) + } + + // Felder sicher extrahieren + const channelRaw = doc.channel + const channelId = typeof channelRaw === 'object' && channelRaw !== null + ? (channelRaw as { id: number }).id + : channelRaw as number + + const videoFileRaw = doc.videoFile + const mediaId = typeof videoFileRaw === 'object' && videoFileRaw !== null + ? (videoFileRaw as { id: number }).id + : videoFileRaw as number + + const youtube = doc.youtube as { + metadata?: { + youtubeTitle?: string + youtubeDescription?: string + tags?: Array<{ tag?: string }> + visibility?: 'public' | 'unlisted' | 'private' + } + } | undefined + + const tags = (youtube?.metadata?.tags || []) + .map((t) => t.tag) + .filter((tag): tag is string => Boolean(tag)) + + // Upload-Job erstellen + const job = await enqueueYouTubeUpload({ + contentId: contentId as number, + channelId, + mediaId, + metadata: { + title: youtube?.metadata?.youtubeTitle || (doc.title as string) || 'Untitled', + description: youtube?.metadata?.youtubeDescription || (doc.description as string) || '', + tags, + visibility: youtube?.metadata?.visibility || 'private', + }, + scheduledPublishAt: (doc.scheduledPublishDate as string) || undefined, + triggeredBy: user.id as number, + }) + + // Status auf upload_scheduled setzen + await payload.update({ + collection: 'youtube-content', + id: contentId, + data: { status: 'upload_scheduled' }, + }) + + return NextResponse.json({ + success: true, + jobId: job.id, + message: 'Upload-Job erstellt', + }) + } catch (error) { + console.error('[YouTube Upload API] Error:', error) + return NextResponse.json({ error: 'Internal Server Error' }, { status: 500 }) + } +} + +/** + * GET /api/youtube/upload?jobId=xxx + * + * Gibt den Status eines YouTube-Upload-Jobs zurueck. + */ +export async function GET(request: NextRequest): Promise { + try { + const payload = await getPayload({ config }) + + // Auth pruefen + const { user } = await payload.auth({ headers: request.headers }) + if (!user) { + return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) + } + + const jobId = request.nextUrl.searchParams.get('jobId') + if (!jobId) { + return NextResponse.json({ error: 'jobId required' }, { status: 400 }) + } + + const status = await getYouTubeUploadJobStatus(jobId) + if (!status) { + return NextResponse.json({ error: 'Job not found' }, { status: 404 }) + } + + return NextResponse.json(status) + } catch (error) { + console.error('[YouTube Upload API] Status check error:', error) + return NextResponse.json({ error: 'Internal Server Error' }, { status: 500 }) + } +} diff --git a/src/lib/queue/workers/youtube-upload-worker.ts b/src/lib/queue/workers/youtube-upload-worker.ts new file mode 100644 index 0000000..24b3729 --- /dev/null +++ b/src/lib/queue/workers/youtube-upload-worker.ts @@ -0,0 +1,158 @@ +/** + * YouTube Upload Worker + * + * Verarbeitet YouTube-Upload-Jobs aus der Queue. + * Nutzt den VideoUploadService fuer die eigentliche YouTube-API-Kommunikation. + */ + +import { Worker, Job } from 'bullmq' +import { getPayload } from 'payload' +import config from '@payload-config' +import { QUEUE_NAMES, getQueueRedisConnection } from '../queue-service' +import type { YouTubeUploadJobData, YouTubeUploadJobResult } from '../jobs/youtube-upload-job' +import { VideoUploadService } from '../../integrations/youtube/VideoUploadService' +import { NotificationService } from '../../jobs/NotificationService' + +// Worker-Konfiguration +const CONCURRENCY = parseInt(process.env.QUEUE_YOUTUBE_UPLOAD_CONCURRENCY || '1', 10) + +/** + * YouTube Upload Job Processor + */ +async function processUploadJob( + job: Job, +): Promise { + const { contentId, mediaId, metadata, scheduledPublishAt, triggeredBy } = job.data + + console.log(`[YouTubeUploadWorker] Processing job ${job.id} for content ${contentId}`) + + try { + const payload = await getPayload({ config }) + const uploadService = new VideoUploadService(payload) + + const result = await uploadService.uploadVideo({ + mediaId, + metadata, + scheduledPublishAt, + }) + + if (!result.success) { + throw new Error(result.error || 'Upload failed') + } + + // YouTubeContent mit Video-ID und URL aktualisieren + await payload.update({ + collection: 'youtube-content', + id: contentId, + data: { + youtube: { + videoId: result.youtubeVideoId, + url: result.youtubeUrl, + }, + status: 'published', + actualPublishDate: new Date().toISOString(), + }, + }) + + // Benachrichtigung erstellen + const notificationService = new NotificationService(payload) + await notificationService.createNotification({ + recipientId: triggeredBy, + type: 'video_published', + title: `Video "${metadata.title}" erfolgreich hochgeladen`, + message: `YouTube-URL: ${result.youtubeUrl}`, + link: `/admin/collections/youtube-content/${contentId}`, + relatedVideoId: contentId, + }) + + const jobResult: YouTubeUploadJobResult = { + success: true, + youtubeVideoId: result.youtubeVideoId, + youtubeUrl: result.youtubeUrl, + timestamp: new Date().toISOString(), + } + + console.log(`[YouTubeUploadWorker] Job ${job.id} completed: ${result.youtubeUrl}`) + return jobResult + } catch (error) { + const errorMessage = error instanceof Error ? error.message : String(error) + console.error(`[YouTubeUploadWorker] Job ${job.id} failed:`, errorMessage) + + // Error werfen damit BullMQ Retry-Logik greift + throw error + } +} + +/** + * YouTube Upload Worker Instanz + */ +let uploadWorker: Worker | null = null + +/** + * Startet den YouTube Upload Worker + */ +export function startYouTubeUploadWorker(): Worker { + if (uploadWorker) { + console.warn('[YouTubeUploadWorker] Worker already running') + return uploadWorker + } + + uploadWorker = new Worker( + QUEUE_NAMES.YOUTUBE_UPLOAD, + processUploadJob, + { + connection: getQueueRedisConnection(), + concurrency: CONCURRENCY, + // Uploads dauern laenger - grosszuegigere Stalled-Detection + stalledInterval: 120000, // 2 Minuten + maxStalledCount: 1, + // Laengerer Lock fuer Video-Uploads + lockDuration: 300000, // 5 Minuten + }, + ) + + // Event Handlers + uploadWorker.on('ready', () => { + console.log(`[YouTubeUploadWorker] Ready (concurrency: ${CONCURRENCY})`) + }) + + uploadWorker.on('completed', (job) => { + console.log(`[YouTubeUploadWorker] Job ${job.id} completed in ${Date.now() - job.timestamp}ms`) + }) + + uploadWorker.on('failed', (job, error) => { + console.error( + `[YouTubeUploadWorker] Job ${job?.id} failed after ${job?.attemptsMade} attempts:`, + error.message, + ) + }) + + uploadWorker.on('stalled', (jobId) => { + console.warn(`[YouTubeUploadWorker] Job ${jobId} stalled`) + }) + + uploadWorker.on('error', (error) => { + console.error('[YouTubeUploadWorker] Error:', error) + }) + + return uploadWorker +} + +/** + * Stoppt den YouTube Upload Worker + */ +export async function stopYouTubeUploadWorker(): Promise { + if (uploadWorker) { + console.log('[YouTubeUploadWorker] Stopping...') + await uploadWorker.close() + uploadWorker = null + console.log('[YouTubeUploadWorker] Stopped') + } +} + +/** + * Gibt die Worker-Instanz zurueck (falls aktiv) + */ +export function getYouTubeUploadWorker(): Worker | null { + return uploadWorker +}