cms.c2sgmbh/src/lib/queue/queue-service.ts
Martin Porwoll e904f0949b fix(queue): resolve queue-worker crash-loop via Redis auth and PM2 config
Redis requires authentication but IORedis connections were not passing a
password, causing immediate NOAUTH failures and a PM2 crash-loop (1900+
restarts). Additionally, the PM2 config used `npx` as the script entry
which caused instability.

- Add REDIS_PASSWORD support to queue-service.ts and redis.ts
- Change PM2 script from npx wrapper to direct tsx CLI entry point
- Add explicit exec_mode: 'fork' to prevent cluster mode issues

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-14 17:46:54 +00:00

204 lines
4.7 KiB
TypeScript

/**
* BullMQ Queue Service
*
* Zentrale Queue-Verwaltung für Background Jobs.
* Verwendet Redis als Backend (gleiche Instanz wie Caching).
*/
import { Queue, QueueEvents, JobsOptions } from 'bullmq'
import IORedis from 'ioredis'
// Queue-Namen
export const QUEUE_NAMES = {
EMAIL: 'email',
PDF: 'pdf',
CLEANUP: 'cleanup',
YOUTUBE_UPLOAD: 'youtube-upload',
} as const
export type QueueName = (typeof QUEUE_NAMES)[keyof typeof QUEUE_NAMES]
// Redis-Konfiguration für Queue (separate DB)
const QUEUE_REDIS_DB = parseInt(process.env.QUEUE_REDIS_DB || '1', 10)
// Gemeinsame Redis-Connection für alle Queues
let redisConnection: IORedis | null = null
/**
* Erstellt oder gibt die existierende Redis-Connection zurück
*/
export function getQueueRedisConnection(): IORedis {
if (!redisConnection) {
const host = process.env.REDIS_HOST || 'localhost'
const port = parseInt(process.env.REDIS_PORT || '6379', 10)
redisConnection = new IORedis({
host,
port,
db: QUEUE_REDIS_DB,
password: process.env.REDIS_PASSWORD || undefined,
maxRetriesPerRequest: null, // BullMQ requirement
enableReadyCheck: false,
})
redisConnection.on('error', (err) => {
console.error('[Queue] Redis connection error:', err.message)
})
redisConnection.on('connect', () => {
console.log(`[Queue] Redis connected (db: ${QUEUE_REDIS_DB})`)
})
}
return redisConnection
}
// Queue-Instanzen Cache
const queues = new Map<QueueName, Queue>()
const queueEvents = new Map<QueueName, QueueEvents>()
/**
* Standard Job-Optionen
*/
export const defaultJobOptions: JobsOptions = {
attempts: parseInt(process.env.QUEUE_DEFAULT_RETRY || '3', 10),
backoff: {
type: 'exponential',
delay: 1000, // 1s, 2s, 4s, 8s, ...
},
removeOnComplete: {
count: 100, // Behalte letzte 100 erfolgreiche Jobs
age: 24 * 60 * 60, // Oder 24 Stunden
},
removeOnFail: {
count: 500, // Behalte letzte 500 fehlgeschlagene Jobs
age: 7 * 24 * 60 * 60, // Oder 7 Tage
},
}
/**
* Erstellt oder gibt eine existierende Queue zurück
*/
export function getQueue(name: QueueName): Queue {
if (!queues.has(name)) {
const queue = new Queue(name, {
connection: getQueueRedisConnection(),
defaultJobOptions,
})
queues.set(name, queue)
console.log(`[Queue] Queue "${name}" initialized`)
}
return queues.get(name)!
}
/**
* Erstellt oder gibt QueueEvents für eine Queue zurück
*/
export function getQueueEvents(name: QueueName): QueueEvents {
if (!queueEvents.has(name)) {
const events = new QueueEvents(name, {
connection: getQueueRedisConnection(),
})
queueEvents.set(name, events)
}
return queueEvents.get(name)!
}
/**
* Gibt alle aktiven Queues zurück (für bull-board)
*/
export function getAllQueues(): Queue[] {
// Initialisiere alle Queues falls noch nicht geschehen
Object.values(QUEUE_NAMES).forEach((name) => getQueue(name))
return Array.from(queues.values())
}
/**
* Schließt alle Queue-Verbindungen (für graceful shutdown)
*/
export async function closeAllQueues(): Promise<void> {
console.log('[Queue] Closing all queues...')
// Schließe QueueEvents
const eventEntries = Array.from(queueEvents.values())
for (const events of eventEntries) {
await events.close()
}
queueEvents.clear()
// Schließe Queues
const queueEntries = Array.from(queues.values())
for (const queue of queueEntries) {
await queue.close()
}
queues.clear()
// Schließe Redis Connection
if (redisConnection) {
await redisConnection.quit()
redisConnection = null
}
console.log('[Queue] All queues closed')
}
/**
* Prüft ob Queue-System verfügbar ist
*/
export async function isQueueAvailable(): Promise<boolean> {
try {
const connection = getQueueRedisConnection()
const pong = await connection.ping()
return pong === 'PONG'
} catch {
return false
}
}
/**
* Queue-Status für Monitoring
*/
export interface QueueStatus {
name: string
waiting: number
active: number
completed: number
failed: number
delayed: number
paused: boolean
}
/**
* Gibt den Status aller Queues zurück
*/
export async function getQueuesStatus(): Promise<QueueStatus[]> {
const statuses: QueueStatus[] = []
const entries = Array.from(queues.entries())
for (const [name, queue] of entries) {
const [waiting, active, completed, failed, delayed, isPaused] = await Promise.all([
queue.getWaitingCount(),
queue.getActiveCount(),
queue.getCompletedCount(),
queue.getFailedCount(),
queue.getDelayedCount(),
queue.isPaused(),
])
statuses.push({
name,
waiting,
active,
completed,
failed,
delayed,
paused: isPaused,
})
}
return statuses
}