feat(youtube): add upload worker and API route

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Martin Porwoll 2026-02-14 13:34:28 +00:00
parent e6ef78a08a
commit ce4c21fd0a
2 changed files with 296 additions and 0 deletions

View file

@ -0,0 +1,138 @@
// src/app/(payload)/api/youtube/upload/route.ts
import { getPayload } from 'payload'
import config from '@payload-config'
import { NextRequest, NextResponse } from 'next/server'
import {
enqueueYouTubeUpload,
getYouTubeUploadJobStatus,
} from '@/lib/queue/jobs/youtube-upload-job'
/**
* POST /api/youtube/upload
*
* Erstellt einen YouTube-Upload-Job fuer ein YouTubeContent-Dokument.
* Erwartet { contentId } im Request-Body.
*/
export async function POST(request: NextRequest): Promise<NextResponse> {
try {
const payload = await getPayload({ config })
// Auth pruefen
const { user } = await payload.auth({ headers: request.headers })
if (!user) {
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
const body = await request.json()
const { contentId } = body
if (!contentId) {
return NextResponse.json({ error: 'contentId required' }, { status: 400 })
}
// YouTubeContent laden
const content = await payload.findByID({
collection: 'youtube-content',
id: contentId,
depth: 1,
})
if (!content) {
return NextResponse.json({ error: 'Content not found' }, { status: 404 })
}
const doc = content as Record<string, unknown>
if (!doc.videoFile) {
return NextResponse.json({ error: 'No video file attached' }, { status: 400 })
}
// Felder sicher extrahieren
const channelRaw = doc.channel
const channelId = typeof channelRaw === 'object' && channelRaw !== null
? (channelRaw as { id: number }).id
: channelRaw as number
const videoFileRaw = doc.videoFile
const mediaId = typeof videoFileRaw === 'object' && videoFileRaw !== null
? (videoFileRaw as { id: number }).id
: videoFileRaw as number
const youtube = doc.youtube as {
metadata?: {
youtubeTitle?: string
youtubeDescription?: string
tags?: Array<{ tag?: string }>
visibility?: 'public' | 'unlisted' | 'private'
}
} | undefined
const tags = (youtube?.metadata?.tags || [])
.map((t) => t.tag)
.filter((tag): tag is string => Boolean(tag))
// Upload-Job erstellen
const job = await enqueueYouTubeUpload({
contentId: contentId as number,
channelId,
mediaId,
metadata: {
title: youtube?.metadata?.youtubeTitle || (doc.title as string) || 'Untitled',
description: youtube?.metadata?.youtubeDescription || (doc.description as string) || '',
tags,
visibility: youtube?.metadata?.visibility || 'private',
},
scheduledPublishAt: (doc.scheduledPublishDate as string) || undefined,
triggeredBy: user.id as number,
})
// Status auf upload_scheduled setzen
await payload.update({
collection: 'youtube-content',
id: contentId,
data: { status: 'upload_scheduled' },
})
return NextResponse.json({
success: true,
jobId: job.id,
message: 'Upload-Job erstellt',
})
} catch (error) {
console.error('[YouTube Upload API] Error:', error)
return NextResponse.json({ error: 'Internal Server Error' }, { status: 500 })
}
}
/**
* GET /api/youtube/upload?jobId=xxx
*
* Gibt den Status eines YouTube-Upload-Jobs zurueck.
*/
export async function GET(request: NextRequest): Promise<NextResponse> {
try {
const payload = await getPayload({ config })
// Auth pruefen
const { user } = await payload.auth({ headers: request.headers })
if (!user) {
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
const jobId = request.nextUrl.searchParams.get('jobId')
if (!jobId) {
return NextResponse.json({ error: 'jobId required' }, { status: 400 })
}
const status = await getYouTubeUploadJobStatus(jobId)
if (!status) {
return NextResponse.json({ error: 'Job not found' }, { status: 404 })
}
return NextResponse.json(status)
} catch (error) {
console.error('[YouTube Upload API] Status check error:', error)
return NextResponse.json({ error: 'Internal Server Error' }, { status: 500 })
}
}

View file

@ -0,0 +1,158 @@
/**
* YouTube Upload Worker
*
* Verarbeitet YouTube-Upload-Jobs aus der Queue.
* Nutzt den VideoUploadService fuer die eigentliche YouTube-API-Kommunikation.
*/
import { Worker, Job } from 'bullmq'
import { getPayload } from 'payload'
import config from '@payload-config'
import { QUEUE_NAMES, getQueueRedisConnection } from '../queue-service'
import type { YouTubeUploadJobData, YouTubeUploadJobResult } from '../jobs/youtube-upload-job'
import { VideoUploadService } from '../../integrations/youtube/VideoUploadService'
import { NotificationService } from '../../jobs/NotificationService'
// Worker-Konfiguration
const CONCURRENCY = parseInt(process.env.QUEUE_YOUTUBE_UPLOAD_CONCURRENCY || '1', 10)
/**
* YouTube Upload Job Processor
*/
async function processUploadJob(
job: Job<YouTubeUploadJobData>,
): Promise<YouTubeUploadJobResult> {
const { contentId, mediaId, metadata, scheduledPublishAt, triggeredBy } = job.data
console.log(`[YouTubeUploadWorker] Processing job ${job.id} for content ${contentId}`)
try {
const payload = await getPayload({ config })
const uploadService = new VideoUploadService(payload)
const result = await uploadService.uploadVideo({
mediaId,
metadata,
scheduledPublishAt,
})
if (!result.success) {
throw new Error(result.error || 'Upload failed')
}
// YouTubeContent mit Video-ID und URL aktualisieren
await payload.update({
collection: 'youtube-content',
id: contentId,
data: {
youtube: {
videoId: result.youtubeVideoId,
url: result.youtubeUrl,
},
status: 'published',
actualPublishDate: new Date().toISOString(),
},
})
// Benachrichtigung erstellen
const notificationService = new NotificationService(payload)
await notificationService.createNotification({
recipientId: triggeredBy,
type: 'video_published',
title: `Video "${metadata.title}" erfolgreich hochgeladen`,
message: `YouTube-URL: ${result.youtubeUrl}`,
link: `/admin/collections/youtube-content/${contentId}`,
relatedVideoId: contentId,
})
const jobResult: YouTubeUploadJobResult = {
success: true,
youtubeVideoId: result.youtubeVideoId,
youtubeUrl: result.youtubeUrl,
timestamp: new Date().toISOString(),
}
console.log(`[YouTubeUploadWorker] Job ${job.id} completed: ${result.youtubeUrl}`)
return jobResult
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error)
console.error(`[YouTubeUploadWorker] Job ${job.id} failed:`, errorMessage)
// Error werfen damit BullMQ Retry-Logik greift
throw error
}
}
/**
* YouTube Upload Worker Instanz
*/
let uploadWorker: Worker<YouTubeUploadJobData, YouTubeUploadJobResult> | null = null
/**
* Startet den YouTube Upload Worker
*/
export function startYouTubeUploadWorker(): Worker<YouTubeUploadJobData, YouTubeUploadJobResult> {
if (uploadWorker) {
console.warn('[YouTubeUploadWorker] Worker already running')
return uploadWorker
}
uploadWorker = new Worker<YouTubeUploadJobData, YouTubeUploadJobResult>(
QUEUE_NAMES.YOUTUBE_UPLOAD,
processUploadJob,
{
connection: getQueueRedisConnection(),
concurrency: CONCURRENCY,
// Uploads dauern laenger - grosszuegigere Stalled-Detection
stalledInterval: 120000, // 2 Minuten
maxStalledCount: 1,
// Laengerer Lock fuer Video-Uploads
lockDuration: 300000, // 5 Minuten
},
)
// Event Handlers
uploadWorker.on('ready', () => {
console.log(`[YouTubeUploadWorker] Ready (concurrency: ${CONCURRENCY})`)
})
uploadWorker.on('completed', (job) => {
console.log(`[YouTubeUploadWorker] Job ${job.id} completed in ${Date.now() - job.timestamp}ms`)
})
uploadWorker.on('failed', (job, error) => {
console.error(
`[YouTubeUploadWorker] Job ${job?.id} failed after ${job?.attemptsMade} attempts:`,
error.message,
)
})
uploadWorker.on('stalled', (jobId) => {
console.warn(`[YouTubeUploadWorker] Job ${jobId} stalled`)
})
uploadWorker.on('error', (error) => {
console.error('[YouTubeUploadWorker] Error:', error)
})
return uploadWorker
}
/**
* Stoppt den YouTube Upload Worker
*/
export async function stopYouTubeUploadWorker(): Promise<void> {
if (uploadWorker) {
console.log('[YouTubeUploadWorker] Stopping...')
await uploadWorker.close()
uploadWorker = null
console.log('[YouTubeUploadWorker] Stopped')
}
}
/**
* Gibt die Worker-Instanz zurueck (falls aktiv)
*/
export function getYouTubeUploadWorker(): Worker<YouTubeUploadJobData, YouTubeUploadJobResult> | null {
return uploadWorker
}