/** * Email Worker * * Verarbeitet E-Mail-Jobs aus der Queue. * Nutzt den bestehenden TenantEmailService. */ import { Worker, Job } from 'bullmq' import { getPayload } from 'payload' import config from '@payload-config' import { QUEUE_NAMES, getQueueRedisConnection } from '../queue-service' import type { EmailJobData, EmailJobResult } from '../jobs/email-job' import { sendTenantEmail, type EmailSource } from '../../email/tenant-email-service' // Worker-Konfiguration const CONCURRENCY = parseInt(process.env.QUEUE_EMAIL_CONCURRENCY || '3', 10) /** * E-Mail Job Processor */ async function processEmailJob(job: Job): Promise { const { tenantId, to, subject, html, text, replyTo, source, correlationId } = job.data console.log(`[EmailWorker] Processing job ${job.id} for tenant ${tenantId}`) try { // Payload-Instanz holen (für DB-Zugriff und Logging) const payload = await getPayload({ config }) // E-Mail über bestehenden Service senden const result = await sendTenantEmail(payload, tenantId, { to, subject, html, text, replyTo, source: (source as EmailSource) || 'system', metadata: { queueJobId: job.id, correlationId, attempts: job.attemptsMade + 1, }, }) if (!result.success) { throw new Error(result.error || 'Unknown email error') } const jobResult: EmailJobResult = { success: true, messageId: result.messageId, timestamp: new Date().toISOString(), attempts: job.attemptsMade + 1, } console.log(`[EmailWorker] Job ${job.id} completed: ${result.messageId}`) return jobResult } catch (error) { const errorMessage = error instanceof Error ? error.message : 'Unknown error' console.error(`[EmailWorker] Job ${job.id} failed:`, errorMessage) // Error werfen damit BullMQ Retry-Logik greift throw error } } /** * Email Worker Instanz */ let emailWorker: Worker | null = null /** * Startet den Email Worker */ export function startEmailWorker(): Worker { if (emailWorker) { console.warn('[EmailWorker] Worker already running') return emailWorker } emailWorker = new Worker( QUEUE_NAMES.EMAIL, processEmailJob, { connection: getQueueRedisConnection(), concurrency: CONCURRENCY, // Stalled Job Detection stalledInterval: 30000, // 30s maxStalledCount: 2, } ) // Event Handlers emailWorker.on('ready', () => { console.log(`[EmailWorker] Ready (concurrency: ${CONCURRENCY})`) }) emailWorker.on('completed', (job) => { console.log(`[EmailWorker] Job ${job.id} completed in ${Date.now() - job.timestamp}ms`) }) emailWorker.on('failed', (job, error) => { console.error(`[EmailWorker] Job ${job?.id} failed after ${job?.attemptsMade} attempts:`, error.message) }) emailWorker.on('stalled', (jobId) => { console.warn(`[EmailWorker] Job ${jobId} stalled`) }) emailWorker.on('error', (error) => { console.error('[EmailWorker] Error:', error) }) return emailWorker } /** * Stoppt den Email Worker */ export async function stopEmailWorker(): Promise { if (emailWorker) { console.log('[EmailWorker] Stopping...') await emailWorker.close() emailWorker = null console.log('[EmailWorker] Stopped') } } /** * Gibt die Worker-Instanz zurück (falls aktiv) */ export function getEmailWorker(): Worker | null { return emailWorker }