#!/usr/bin/env node /** * Queue Worker Runner * * Standalone script zum Starten der Queue Workers. * Wird von PM2 als separater Prozess gestartet. * * Usage: * npx tsx scripts/run-queue-worker.ts * # oder via PM2 * pm2 start ecosystem.config.cjs --only queue-worker */ // Load .env with explicit path BEFORE any other imports import { config as dotenvConfig } from 'dotenv' import { resolve, dirname } from 'path' import { fileURLToPath } from 'url' // ES Module equivalent of __dirname const __filename = fileURLToPath(import.meta.url) const __dirname = dirname(__filename) // Explicitly load .env from project root (handles PM2 cwd issues) const envPath = resolve(__dirname, '..', '.env') dotenvConfig({ path: envPath }) console.log(`[QueueRunner] Loading env from: ${envPath}`) console.log(`[QueueRunner] PAYLOAD_SECRET loaded: ${process.env.PAYLOAD_SECRET ? 'yes' : 'NO!'}`) // Dynamic imports after dotenv is loaded async function main() { const { startEmailWorker, stopEmailWorker } = await import('../src/lib/queue/workers/email-worker') const { startPdfWorker, stopPdfWorker } = await import('../src/lib/queue/workers/pdf-worker') const { startRetentionWorker, stopRetentionWorker } = await import('../src/lib/queue/workers/retention-worker') const { startYouTubeUploadWorker, stopYouTubeUploadWorker } = await import('../src/lib/queue/workers/youtube-upload-worker') const { scheduleRetentionJobs } = await import('../src/lib/queue/jobs/retention-job') const { retentionSchedule } = await import('../src/lib/retention/retention-config') // Konfiguration via Umgebungsvariablen const ENABLE_EMAIL_WORKER = process.env.QUEUE_ENABLE_EMAIL !== 'false' const ENABLE_PDF_WORKER = process.env.QUEUE_ENABLE_PDF !== 'false' const ENABLE_RETENTION_WORKER = process.env.QUEUE_ENABLE_RETENTION !== 'false' const ENABLE_RETENTION_SCHEDULER = process.env.QUEUE_ENABLE_RETENTION_SCHEDULER !== 'false' const ENABLE_YOUTUBE_UPLOAD_WORKER = process.env.QUEUE_ENABLE_YOUTUBE_UPLOAD !== 'false' console.log('='.repeat(50)) console.log('[QueueRunner] Starting queue workers...') console.log(`[QueueRunner] PID: ${process.pid}`) console.log(`[QueueRunner] Node: ${process.version}`) console.log(`[QueueRunner] Email Worker: ${ENABLE_EMAIL_WORKER ? 'enabled' : 'disabled'}`) console.log(`[QueueRunner] PDF Worker: ${ENABLE_PDF_WORKER ? 'enabled' : 'disabled'}`) console.log(`[QueueRunner] Retention Worker: ${ENABLE_RETENTION_WORKER ? 'enabled' : 'disabled'}`) console.log(`[QueueRunner] Retention Scheduler: ${ENABLE_RETENTION_SCHEDULER ? 'enabled' : 'disabled'}`) console.log(`[QueueRunner] YouTube Upload Worker: ${ENABLE_YOUTUBE_UPLOAD_WORKER ? 'enabled' : 'disabled'}`) console.log('='.repeat(50)) // Workers starten if (ENABLE_EMAIL_WORKER) { startEmailWorker() } if (ENABLE_PDF_WORKER) { startPdfWorker() } if (ENABLE_RETENTION_WORKER) { startRetentionWorker() // Retention Scheduler starten (nur wenn Worker aktiv) if (ENABLE_RETENTION_SCHEDULER) { const cronSchedule = process.env.RETENTION_CRON_SCHEDULE || retentionSchedule.cron console.log(`[QueueRunner] Scheduling retention jobs with cron: ${cronSchedule}`) await scheduleRetentionJobs(cronSchedule) } } if (ENABLE_YOUTUBE_UPLOAD_WORKER) { startYouTubeUploadWorker() } // Graceful Shutdown async function shutdown(signal: string) { console.log(`\n[QueueRunner] Received ${signal}, shutting down gracefully...`) try { const stopPromises: Promise[] = [] if (ENABLE_EMAIL_WORKER) { stopPromises.push(stopEmailWorker()) } if (ENABLE_PDF_WORKER) { stopPromises.push(stopPdfWorker()) } if (ENABLE_RETENTION_WORKER) { stopPromises.push(stopRetentionWorker()) } if (ENABLE_YOUTUBE_UPLOAD_WORKER) { stopPromises.push(stopYouTubeUploadWorker()) } await Promise.all(stopPromises) console.log('[QueueRunner] All workers stopped') process.exit(0) } catch (error) { console.error('[QueueRunner] Error during shutdown:', error) process.exit(1) } } // Signal Handlers process.on('SIGTERM', () => shutdown('SIGTERM')) process.on('SIGINT', () => shutdown('SIGINT')) // Unhandled Errors process.on('unhandledRejection', (reason) => { console.error('[QueueRunner] Unhandled Rejection:', reason) }) process.on('uncaughtException', (error) => { console.error('[QueueRunner] Uncaught Exception:', error) shutdown('uncaughtException') }) // Keep process alive console.log('[QueueRunner] Workers started, waiting for jobs...') } main().catch((error) => { console.error('[QueueRunner] Fatal error:', error) process.exit(1) })