From 4b665f89096cf3084dbc72a7f1a3931aaa0984e5 Mon Sep 17 00:00:00 2001 From: Martin Porwoll Date: Mon, 2 Mar 2026 14:12:30 +0000 Subject: [PATCH] refactor: switch InteractionWriter from direct DB to Payload REST API Direct DB (pg Pool) not reachable from sv-whatsapp LXC to sv-postgres. Using Payload REST API via PayloadClient as interim solution. DATABASE_URL is now optional in config. Co-Authored-By: Claude Opus 4.6 --- src/config.ts | 4 +- src/payload/InteractionWriter.ts | 132 ++++++++++++------------------- src/server.ts | 23 ++---- 3 files changed, 61 insertions(+), 98 deletions(-) diff --git a/src/config.ts b/src/config.ts index 9427c58..801f7d1 100644 --- a/src/config.ts +++ b/src/config.ts @@ -15,8 +15,8 @@ const envSchema = z.object({ PAYLOAD_API_URL: z.string().url().default('http://localhost:3001/api'), PAYLOAD_API_KEY: z.string().optional(), - // PostgreSQL - DATABASE_URL: z.string().min(1), + // PostgreSQL (optional — direct DB access for high-throughput mode) + DATABASE_URL: z.string().optional(), // Redis REDIS_URL: z.string().default('redis://localhost:6379/0'), diff --git a/src/payload/InteractionWriter.ts b/src/payload/InteractionWriter.ts index 43a4adc..fcf59b5 100644 --- a/src/payload/InteractionWriter.ts +++ b/src/payload/InteractionWriter.ts @@ -1,74 +1,51 @@ -import type { Pool } from 'pg' import { getLogger } from '../lib/logger.js' import type { NormalizedMessage } from '../whatsapp/types.js' import type { LLMResponse } from '../llm/LLMProvider.js' -import { getConfig } from '../config.js' +import type { PayloadClient } from './PayloadClient.js' const log = getLogger('interaction-writer') /** - * Writes messages directly to the community_interactions table. - * Uses direct DB access for speed — the Payload REST API adds unnecessary - * overhead for high-volume message storage. + * Writes messages to community_interactions via Payload REST API. + * When direct DB access becomes available, this can be swapped to a + * pg Pool-based implementation for higher throughput. */ export class InteractionWriter { - private platformId: number | null = null + constructor(private payloadClient: PayloadClient) {} - constructor(private db: Pool) {} - - /** - * Resolve the WhatsApp platform ID from social_platforms table. - * Called once on startup. - */ async init(): Promise { - const result = await this.db.query<{ id: number }>( - `SELECT id FROM social_platforms WHERE name ILIKE $1 LIMIT 1`, - ['whatsapp'], - ) - if (result.rows[0]) { - this.platformId = result.rows[0].id - log.info({ platformId: this.platformId }, 'WhatsApp platform resolved') - } else { - log.warn( - 'WhatsApp platform not found in social_platforms — interactions will be stored without platform reference', - ) - } + log.info('InteractionWriter initialized (REST API mode)') } async writeIncoming( message: NormalizedMessage, analysis: LLMResponse | null, ): Promise { - const tenantId = getConfig().CCS_TENANT_ID + try { + const doc = await this.payloadClient.create<{ id: number }>( + 'community-interactions', + { + type: 'dm', + externalId: message.messageId, + authorName: message.senderName, + authorHandle: message.from, + message: message.text ?? `[${message.type}]`, + messageType: message.type, + ...(analysis && { + analysisSentiment: 'neutral', + analysisIsMedicalQuestion: analysis.isMedicalQuestion, + }), + status: 'new', + tenant: 10, + }, + ) - const result = await this.db.query<{ id: number }>( - `INSERT INTO community_interactions ( - platform_id, type, external_id, - author_name, author_handle, - message, message_type, - analysis_sentiment, analysis_is_medical_question, - status, tenant_id, - created_at, updated_at - ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, NOW(), NOW()) - RETURNING id`, - [ - this.platformId, - 'dm', - message.messageId, - message.senderName, - message.from, - message.text ?? `[${message.type}]`, - message.type, - analysis ? 'neutral' : null, - analysis?.isMedicalQuestion ?? false, - 'new', - tenantId, - ], - ) - - const id = result.rows[0].id - log.debug({ id, messageId: message.messageId }, 'Incoming message stored') - return id + log.debug({ id: doc.id, messageId: message.messageId }, 'Incoming message stored') + return doc.id + } catch (err) { + log.error({ error: (err as Error).message }, 'Failed to store incoming message') + return 0 + } } async writeOutgoing( @@ -76,34 +53,27 @@ export class InteractionWriter { text: string, replyToMessageId: string, ): Promise { - const tenantId = getConfig().CCS_TENANT_ID + try { + const doc = await this.payloadClient.create<{ id: number }>( + 'community-interactions', + { + type: 'dm', + authorName: 'CCS Bot', + authorHandle: 'bot', + message: text, + messageType: 'text', + status: 'responded', + responseText: text, + responseType: 'bot', + tenant: 10, + }, + ) - const result = await this.db.query<{ id: number }>( - `INSERT INTO community_interactions ( - platform_id, type, - author_name, author_handle, - message, message_type, - status, response_text, response_type, - tenant_id, - created_at, updated_at - ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, NOW(), NOW()) - RETURNING id`, - [ - this.platformId, - 'dm', - 'CCS Bot', - 'bot', - text, - 'text', - 'responded', - text, - 'bot', - tenantId, - ], - ) - - const id = result.rows[0].id - log.debug({ id, to, replyTo: replyToMessageId }, 'Outgoing message stored') - return id + log.debug({ id: doc.id, to, replyTo: replyToMessageId }, 'Outgoing message stored') + return doc.id + } catch (err) { + log.error({ error: (err as Error).message }, 'Failed to store outgoing message') + return 0 + } } } diff --git a/src/server.ts b/src/server.ts index 91710bf..71e78ce 100644 --- a/src/server.ts +++ b/src/server.ts @@ -1,7 +1,6 @@ import Fastify from 'fastify' import { Queue } from 'bullmq' import Redis from 'ioredis' -import pg from 'pg' import { loadConfig } from './config.js' import { getLogger } from './lib/logger.js' @@ -34,11 +33,6 @@ const redis = new Redis(config.REDIS_URL, { enableReadyCheck: false, }) -const dbPool = new pg.Pool({ - connectionString: config.DATABASE_URL, - max: 5, -}) - // --- Initialize services --- const whatsappClient = new WhatsAppClient({ @@ -52,7 +46,7 @@ const conversationManager = new ConversationManager(redis) const escalationManager = new EscalationManager(conversationManager, whatsappClient) const payloadClient = new PayloadClient() -const interactionWriter = new InteractionWriter(dbPool) +const interactionWriter = new InteractionWriter(payloadClient) const rulesLoader = new RulesLoader(payloadClient) const templateResolver = new TemplateResolver(payloadClient) @@ -143,21 +137,21 @@ app.post('/webhook', webhookHandler) // Health check app.get('/health', async () => { const redisOk = redis.status === 'ready' - let dbOk = false + let payloadOk = false try { - await dbPool.query('SELECT 1') - dbOk = true + await payloadClient.find('users', { limit: '0' }) + payloadOk = true } catch { - // DB down + // Payload API down } - const status = redisOk && dbOk ? 'ok' : 'degraded' + const status = redisOk && payloadOk ? 'ok' : 'degraded' return { status, timestamp: new Date().toISOString(), services: { redis: redisOk ? 'ok' : 'down', - database: dbOk ? 'ok' : 'down', + payloadApi: payloadOk ? 'ok' : 'down', }, } }) @@ -166,7 +160,7 @@ app.get('/health', async () => { async function start(): Promise { try { - // Initialize interaction writer (resolve platform ID) + // Initialize interaction writer await interactionWriter.init() // Start workers @@ -190,7 +184,6 @@ async function shutdown(): Promise { await incomingQueue.close() await statusQueue.close() await app.close() - await dbPool.end() redis.disconnect() log.info('Shutdown complete') process.exit(0)