cms.c2sgmbh/scripts/run-queue-worker.ts
Martin Porwoll 0615b22188 feat(monitoring): add snapshot collector to queue worker
Periodic metric collection running in the queue-worker PM2 process.
Collects system metrics every 60s (configurable), stores them in
MonitoringSnapshots, and evaluates alert rules against each snapshot.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-15 00:34:28 +00:00

147 lines
5.3 KiB
JavaScript

#!/usr/bin/env node
/**
* Queue Worker Runner
*
* Standalone script zum Starten der Queue Workers.
* Wird von PM2 als separater Prozess gestartet.
*
* Usage:
* npx tsx scripts/run-queue-worker.ts
* # oder via PM2
* pm2 start ecosystem.config.cjs --only queue-worker
*/
// Load .env with explicit path BEFORE any other imports
import { config as dotenvConfig } from 'dotenv'
import { resolve, dirname } from 'path'
import { fileURLToPath } from 'url'
// ES Module equivalent of __dirname
const __filename = fileURLToPath(import.meta.url)
const __dirname = dirname(__filename)
// Explicitly load .env from project root (handles PM2 cwd issues)
const envPath = resolve(__dirname, '..', '.env')
dotenvConfig({ path: envPath })
console.log(`[QueueRunner] Loading env from: ${envPath}`)
console.log(`[QueueRunner] PAYLOAD_SECRET loaded: ${process.env.PAYLOAD_SECRET ? 'yes' : 'NO!'}`)
// Dynamic imports after dotenv is loaded
async function main() {
const { startEmailWorker, stopEmailWorker } = await import('../src/lib/queue/workers/email-worker')
const { startPdfWorker, stopPdfWorker } = await import('../src/lib/queue/workers/pdf-worker')
const { startRetentionWorker, stopRetentionWorker } = await import('../src/lib/queue/workers/retention-worker')
const { startYouTubeUploadWorker, stopYouTubeUploadWorker } = await import('../src/lib/queue/workers/youtube-upload-worker')
const { scheduleRetentionJobs } = await import('../src/lib/queue/jobs/retention-job')
const { retentionSchedule } = await import('../src/lib/retention/retention-config')
// Konfiguration via Umgebungsvariablen
const ENABLE_EMAIL_WORKER = process.env.QUEUE_ENABLE_EMAIL !== 'false'
const ENABLE_PDF_WORKER = process.env.QUEUE_ENABLE_PDF !== 'false'
const ENABLE_RETENTION_WORKER = process.env.QUEUE_ENABLE_RETENTION !== 'false'
const ENABLE_RETENTION_SCHEDULER = process.env.QUEUE_ENABLE_RETENTION_SCHEDULER !== 'false'
const ENABLE_YOUTUBE_UPLOAD_WORKER = process.env.QUEUE_ENABLE_YOUTUBE_UPLOAD !== 'false'
// Monitoring (optional)
const ENABLE_MONITORING = process.env.QUEUE_ENABLE_MONITORING !== 'false'
console.log('='.repeat(50))
console.log('[QueueRunner] Starting queue workers...')
console.log(`[QueueRunner] PID: ${process.pid}`)
console.log(`[QueueRunner] Node: ${process.version}`)
console.log(`[QueueRunner] Email Worker: ${ENABLE_EMAIL_WORKER ? 'enabled' : 'disabled'}`)
console.log(`[QueueRunner] PDF Worker: ${ENABLE_PDF_WORKER ? 'enabled' : 'disabled'}`)
console.log(`[QueueRunner] Retention Worker: ${ENABLE_RETENTION_WORKER ? 'enabled' : 'disabled'}`)
console.log(`[QueueRunner] Retention Scheduler: ${ENABLE_RETENTION_SCHEDULER ? 'enabled' : 'disabled'}`)
console.log(`[QueueRunner] YouTube Upload Worker: ${ENABLE_YOUTUBE_UPLOAD_WORKER ? 'enabled' : 'disabled'}`)
console.log(`[QueueRunner] Monitoring Collector: ${ENABLE_MONITORING ? 'enabled' : 'disabled'}`)
console.log('='.repeat(50))
// Workers starten
if (ENABLE_EMAIL_WORKER) {
startEmailWorker()
}
if (ENABLE_PDF_WORKER) {
startPdfWorker()
}
if (ENABLE_RETENTION_WORKER) {
startRetentionWorker()
// Retention Scheduler starten (nur wenn Worker aktiv)
if (ENABLE_RETENTION_SCHEDULER) {
const cronSchedule = process.env.RETENTION_CRON_SCHEDULE || retentionSchedule.cron
console.log(`[QueueRunner] Scheduling retention jobs with cron: ${cronSchedule}`)
await scheduleRetentionJobs(cronSchedule)
}
}
if (ENABLE_YOUTUBE_UPLOAD_WORKER) {
startYouTubeUploadWorker()
}
// Monitoring Snapshot Collector
if (ENABLE_MONITORING) {
const { startSnapshotCollector, stopSnapshotCollector } = await import('../src/lib/monitoring/snapshot-collector')
await startSnapshotCollector()
// Store stop function for shutdown
;(global as any).__stopSnapshotCollector = stopSnapshotCollector
}
// Graceful Shutdown
async function shutdown(signal: string) {
console.log(`\n[QueueRunner] Received ${signal}, shutting down gracefully...`)
try {
const stopPromises: Promise<void>[] = []
if (ENABLE_EMAIL_WORKER) {
stopPromises.push(stopEmailWorker())
}
if (ENABLE_PDF_WORKER) {
stopPromises.push(stopPdfWorker())
}
if (ENABLE_RETENTION_WORKER) {
stopPromises.push(stopRetentionWorker())
}
if (ENABLE_YOUTUBE_UPLOAD_WORKER) {
stopPromises.push(stopYouTubeUploadWorker())
}
if (ENABLE_MONITORING && (global as any).__stopSnapshotCollector) {
stopPromises.push((global as any).__stopSnapshotCollector())
}
await Promise.all(stopPromises)
console.log('[QueueRunner] All workers stopped')
process.exit(0)
} catch (error) {
console.error('[QueueRunner] Error during shutdown:', error)
process.exit(1)
}
}
// Signal Handlers
process.on('SIGTERM', () => shutdown('SIGTERM'))
process.on('SIGINT', () => shutdown('SIGINT'))
// Unhandled Errors
process.on('unhandledRejection', (reason) => {
console.error('[QueueRunner] Unhandled Rejection:', reason)
})
process.on('uncaughtException', (error) => {
console.error('[QueueRunner] Uncaught Exception:', error)
shutdown('uncaughtException')
})
// Keep process alive
console.log('[QueueRunner] Workers started, waiting for jobs...')
}
main().catch((error) => {
console.error('[QueueRunner] Fatal error:', error)
process.exit(1)
})