import { Worker, type Job } from 'bullmq' import type Redis from 'ioredis' import { getLogger } from '../lib/logger.js' import { MessageDeduplicator } from '../lib/deduplication.js' import { OutgoingRateLimiter } from '../lib/rate-limiter.js' import type { MessageRouter } from '../bot/MessageRouter.js' import type { WhatsAppClient } from '../whatsapp/WhatsAppClient.js' import { INCOMING_MESSAGE_QUEUE, STATUS_UPDATE_QUEUE, type IncomingMessageJobData, type IncomingMessageJobResult, type StatusUpdateJobData, } from './message-job.js' const log = getLogger('message-worker') interface MessageWorkerDeps { redis: Redis messageRouter: MessageRouter whatsappClient: WhatsAppClient } export class MessageWorkerManager { private incomingWorker: Worker | null = null private statusWorker: Worker | null = null private deduplicator: MessageDeduplicator private rateLimiter: OutgoingRateLimiter constructor(private deps: MessageWorkerDeps) { this.deduplicator = new MessageDeduplicator(deps.redis) this.rateLimiter = new OutgoingRateLimiter(deps.redis) } async start(): Promise { const { redis, messageRouter, whatsappClient } = this.deps // Incoming message worker this.incomingWorker = new Worker( INCOMING_MESSAGE_QUEUE, async (job: Job) => { const { message } = job.data const { messageId, from } = message log.info( { jobId: job.id, messageId, from, attempt: job.attemptsMade + 1 }, 'Processing incoming message', ) // Deduplicate if (await this.deduplicator.isDuplicate(messageId)) { return { action: 'skipped', reason: 'duplicate' } } // Mark as read try { await whatsappClient.markAsRead(messageId) } catch (err) { log.warn({ messageId, error: (err as Error).message }, 'Failed to mark as read') } // Rate limit outgoing before routing (router may send messages) await this.rateLimiter.acquire() // Route and process const result = await messageRouter.route(message) return { action: result.action, reason: 'reason' in result ? (result.reason as string) : undefined, responseLength: result.action === 'bot_response' ? result.response.text.length : undefined, } }, { connection: redis, concurrency: 3, stalledInterval: 30_000, maxStalledCount: 2, }, ) // Status update worker this.statusWorker = new Worker( STATUS_UPDATE_QUEUE, async (job: Job) => { const { status } = job.data log.debug( { messageId: status.id, status: status.status }, 'Message status update', ) if (status.status === 'failed' && status.errors?.length) { log.error( { messageId: status.id, errors: status.errors }, 'Message delivery failed', ) } }, { connection: redis, concurrency: 5, }, ) // Event handlers this.incomingWorker.on('completed', (job) => { log.debug( { jobId: job.id, result: job.returnvalue }, 'Message job completed', ) }) this.incomingWorker.on('failed', (job, err) => { log.error( { jobId: job?.id, error: err.message, attempts: job?.attemptsMade }, 'Message job failed', ) }) this.incomingWorker.on('stalled', (jobId) => { log.warn({ jobId }, 'Message job stalled') }) log.info('Message workers started') } async stop(): Promise { await this.incomingWorker?.close() await this.statusWorker?.close() log.info('Message workers stopped') } }