/** * Retention Worker * * Verarbeitet Cleanup-Jobs aus der Queue. */ import { Worker, Job } from 'bullmq' import { getPayload } from 'payload' import config from '@payload-config' import { QUEUE_NAMES, getQueueRedisConnection } from '../queue-service' import type { RetentionJobData, RetentionJobResult } from '../jobs/retention-job' import { cleanupCollection, cleanupExpiredConsentLogs, cleanupOrphanedMedia, runFullRetention, } from '../../retention/cleanup-service' import { getCutoffDate } from '../../retention/retention-config' // Worker-Konfiguration const CONCURRENCY = parseInt(process.env.QUEUE_RETENTION_CONCURRENCY || '1', 10) /** * Retention Job Processor */ async function processRetentionJob(job: Job): Promise { const { type, collection, cutoffDate, batchSize, dateField, triggeredBy } = job.data const startTime = Date.now() console.log(`[RetentionWorker] Processing job ${job.id} (type: ${type})`) console.log(`[RetentionWorker] Triggered by: ${triggeredBy || 'unknown'}`) try { // Payload-Instanz holen const payload = await getPayload({ config }) let deletedCount = 0 let errorCount = 0 const errors: string[] = [] switch (type) { case 'cleanup-collection': { if (!collection) { throw new Error('Collection is required for cleanup-collection job') } const cutoff = cutoffDate ? new Date(cutoffDate) : getCutoffDate(90) const result = await cleanupCollection(payload, collection, cutoff, { dateField, batchSize, }) deletedCount = result.deletedCount errorCount = result.errorCount errors.push(...result.errors) break } case 'cleanup-media-orphans': { const result = await cleanupOrphanedMedia(payload, { batchSize, minAgeDays: cutoffDate ? Math.ceil((Date.now() - new Date(cutoffDate).getTime()) / (1000 * 60 * 60 * 24)) : undefined, }) deletedCount = result.deletedCount errorCount = result.errorCount errors.push(...result.errors) break } case 'retention-full': { const result = await runFullRetention(payload) deletedCount = result.totalDeleted errorCount = result.totalErrors // Sammle alle Fehler for (const r of result.results) { errors.push(...r.errors) } if (result.mediaOrphanResult) { errors.push(...result.mediaOrphanResult.errors) } break } default: throw new Error(`Unknown retention job type: ${type}`) } const duration = Date.now() - startTime const jobResult: RetentionJobResult = { success: errorCount === 0, type, collection, deletedCount, errorCount, errors: errors.length > 0 ? errors.slice(0, 20) : undefined, // Limitiere Fehler-Anzahl duration, timestamp: new Date().toISOString(), } console.log( `[RetentionWorker] Job ${job.id} completed: ${deletedCount} deleted, ${errorCount} errors, ${duration}ms` ) return jobResult } catch (error) { const errorMessage = error instanceof Error ? error.message : 'Unknown error' console.error(`[RetentionWorker] Job ${job.id} failed:`, errorMessage) throw error } } /** * Retention Worker Instanz */ let retentionWorker: Worker | null = null /** * Startet den Retention Worker */ export function startRetentionWorker(): Worker { if (retentionWorker) { console.warn('[RetentionWorker] Worker already running') return retentionWorker } retentionWorker = new Worker( QUEUE_NAMES.CLEANUP, processRetentionJob, { connection: getQueueRedisConnection(), concurrency: CONCURRENCY, // Retention Jobs können lange dauern lockDuration: 300000, // 5 Minuten stalledInterval: 60000, // 1 Minute maxStalledCount: 2, } ) // Event Handlers retentionWorker.on('ready', () => { console.log(`[RetentionWorker] Ready (concurrency: ${CONCURRENCY})`) }) retentionWorker.on('completed', (job, result) => { console.log( `[RetentionWorker] Job ${job.id} completed: ${result.deletedCount} deleted in ${result.duration}ms` ) }) retentionWorker.on('failed', (job, error) => { console.error( `[RetentionWorker] Job ${job?.id} failed after ${job?.attemptsMade} attempts:`, error.message ) }) retentionWorker.on('stalled', (jobId) => { console.warn(`[RetentionWorker] Job ${jobId} stalled`) }) retentionWorker.on('error', (error) => { console.error('[RetentionWorker] Error:', error) }) return retentionWorker } /** * Stoppt den Retention Worker */ export async function stopRetentionWorker(): Promise { if (retentionWorker) { console.log('[RetentionWorker] Stopping...') await retentionWorker.close() retentionWorker = null console.log('[RetentionWorker] Stopped') } } /** * Gibt die Worker-Instanz zurück (falls aktiv) */ export function getRetentionWorker(): Worker | null { return retentionWorker }