refactor: switch InteractionWriter from direct DB to Payload REST API

Direct DB (pg Pool) not reachable from sv-whatsapp LXC to sv-postgres.
Using Payload REST API via PayloadClient as interim solution.
DATABASE_URL is now optional in config.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Martin Porwoll 2026-03-02 14:12:30 +00:00
parent 8847358507
commit 4b665f8909
3 changed files with 61 additions and 98 deletions

View file

@ -15,8 +15,8 @@ const envSchema = z.object({
PAYLOAD_API_URL: z.string().url().default('http://localhost:3001/api'), PAYLOAD_API_URL: z.string().url().default('http://localhost:3001/api'),
PAYLOAD_API_KEY: z.string().optional(), PAYLOAD_API_KEY: z.string().optional(),
// PostgreSQL // PostgreSQL (optional — direct DB access for high-throughput mode)
DATABASE_URL: z.string().min(1), DATABASE_URL: z.string().optional(),
// Redis // Redis
REDIS_URL: z.string().default('redis://localhost:6379/0'), REDIS_URL: z.string().default('redis://localhost:6379/0'),

View file

@ -1,74 +1,51 @@
import type { Pool } from 'pg'
import { getLogger } from '../lib/logger.js' import { getLogger } from '../lib/logger.js'
import type { NormalizedMessage } from '../whatsapp/types.js' import type { NormalizedMessage } from '../whatsapp/types.js'
import type { LLMResponse } from '../llm/LLMProvider.js' import type { LLMResponse } from '../llm/LLMProvider.js'
import { getConfig } from '../config.js' import type { PayloadClient } from './PayloadClient.js'
const log = getLogger('interaction-writer') const log = getLogger('interaction-writer')
/** /**
* Writes messages directly to the community_interactions table. * Writes messages to community_interactions via Payload REST API.
* Uses direct DB access for speed the Payload REST API adds unnecessary * When direct DB access becomes available, this can be swapped to a
* overhead for high-volume message storage. * pg Pool-based implementation for higher throughput.
*/ */
export class InteractionWriter { export class InteractionWriter {
private platformId: number | null = null constructor(private payloadClient: PayloadClient) {}
constructor(private db: Pool) {}
/**
* Resolve the WhatsApp platform ID from social_platforms table.
* Called once on startup.
*/
async init(): Promise<void> { async init(): Promise<void> {
const result = await this.db.query<{ id: number }>( log.info('InteractionWriter initialized (REST API mode)')
`SELECT id FROM social_platforms WHERE name ILIKE $1 LIMIT 1`,
['whatsapp'],
)
if (result.rows[0]) {
this.platformId = result.rows[0].id
log.info({ platformId: this.platformId }, 'WhatsApp platform resolved')
} else {
log.warn(
'WhatsApp platform not found in social_platforms — interactions will be stored without platform reference',
)
}
} }
async writeIncoming( async writeIncoming(
message: NormalizedMessage, message: NormalizedMessage,
analysis: LLMResponse | null, analysis: LLMResponse | null,
): Promise<number> { ): Promise<number> {
const tenantId = getConfig().CCS_TENANT_ID try {
const doc = await this.payloadClient.create<{ id: number }>(
'community-interactions',
{
type: 'dm',
externalId: message.messageId,
authorName: message.senderName,
authorHandle: message.from,
message: message.text ?? `[${message.type}]`,
messageType: message.type,
...(analysis && {
analysisSentiment: 'neutral',
analysisIsMedicalQuestion: analysis.isMedicalQuestion,
}),
status: 'new',
tenant: 10,
},
)
const result = await this.db.query<{ id: number }>( log.debug({ id: doc.id, messageId: message.messageId }, 'Incoming message stored')
`INSERT INTO community_interactions ( return doc.id
platform_id, type, external_id, } catch (err) {
author_name, author_handle, log.error({ error: (err as Error).message }, 'Failed to store incoming message')
message, message_type, return 0
analysis_sentiment, analysis_is_medical_question, }
status, tenant_id,
created_at, updated_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, NOW(), NOW())
RETURNING id`,
[
this.platformId,
'dm',
message.messageId,
message.senderName,
message.from,
message.text ?? `[${message.type}]`,
message.type,
analysis ? 'neutral' : null,
analysis?.isMedicalQuestion ?? false,
'new',
tenantId,
],
)
const id = result.rows[0].id
log.debug({ id, messageId: message.messageId }, 'Incoming message stored')
return id
} }
async writeOutgoing( async writeOutgoing(
@ -76,34 +53,27 @@ export class InteractionWriter {
text: string, text: string,
replyToMessageId: string, replyToMessageId: string,
): Promise<number> { ): Promise<number> {
const tenantId = getConfig().CCS_TENANT_ID try {
const doc = await this.payloadClient.create<{ id: number }>(
'community-interactions',
{
type: 'dm',
authorName: 'CCS Bot',
authorHandle: 'bot',
message: text,
messageType: 'text',
status: 'responded',
responseText: text,
responseType: 'bot',
tenant: 10,
},
)
const result = await this.db.query<{ id: number }>( log.debug({ id: doc.id, to, replyTo: replyToMessageId }, 'Outgoing message stored')
`INSERT INTO community_interactions ( return doc.id
platform_id, type, } catch (err) {
author_name, author_handle, log.error({ error: (err as Error).message }, 'Failed to store outgoing message')
message, message_type, return 0
status, response_text, response_type, }
tenant_id,
created_at, updated_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, NOW(), NOW())
RETURNING id`,
[
this.platformId,
'dm',
'CCS Bot',
'bot',
text,
'text',
'responded',
text,
'bot',
tenantId,
],
)
const id = result.rows[0].id
log.debug({ id, to, replyTo: replyToMessageId }, 'Outgoing message stored')
return id
} }
} }

View file

@ -1,7 +1,6 @@
import Fastify from 'fastify' import Fastify from 'fastify'
import { Queue } from 'bullmq' import { Queue } from 'bullmq'
import Redis from 'ioredis' import Redis from 'ioredis'
import pg from 'pg'
import { loadConfig } from './config.js' import { loadConfig } from './config.js'
import { getLogger } from './lib/logger.js' import { getLogger } from './lib/logger.js'
@ -34,11 +33,6 @@ const redis = new Redis(config.REDIS_URL, {
enableReadyCheck: false, enableReadyCheck: false,
}) })
const dbPool = new pg.Pool({
connectionString: config.DATABASE_URL,
max: 5,
})
// --- Initialize services --- // --- Initialize services ---
const whatsappClient = new WhatsAppClient({ const whatsappClient = new WhatsAppClient({
@ -52,7 +46,7 @@ const conversationManager = new ConversationManager(redis)
const escalationManager = new EscalationManager(conversationManager, whatsappClient) const escalationManager = new EscalationManager(conversationManager, whatsappClient)
const payloadClient = new PayloadClient() const payloadClient = new PayloadClient()
const interactionWriter = new InteractionWriter(dbPool) const interactionWriter = new InteractionWriter(payloadClient)
const rulesLoader = new RulesLoader(payloadClient) const rulesLoader = new RulesLoader(payloadClient)
const templateResolver = new TemplateResolver(payloadClient) const templateResolver = new TemplateResolver(payloadClient)
@ -143,21 +137,21 @@ app.post('/webhook', webhookHandler)
// Health check // Health check
app.get('/health', async () => { app.get('/health', async () => {
const redisOk = redis.status === 'ready' const redisOk = redis.status === 'ready'
let dbOk = false let payloadOk = false
try { try {
await dbPool.query('SELECT 1') await payloadClient.find('users', { limit: '0' })
dbOk = true payloadOk = true
} catch { } catch {
// DB down // Payload API down
} }
const status = redisOk && dbOk ? 'ok' : 'degraded' const status = redisOk && payloadOk ? 'ok' : 'degraded'
return { return {
status, status,
timestamp: new Date().toISOString(), timestamp: new Date().toISOString(),
services: { services: {
redis: redisOk ? 'ok' : 'down', redis: redisOk ? 'ok' : 'down',
database: dbOk ? 'ok' : 'down', payloadApi: payloadOk ? 'ok' : 'down',
}, },
} }
}) })
@ -166,7 +160,7 @@ app.get('/health', async () => {
async function start(): Promise<void> { async function start(): Promise<void> {
try { try {
// Initialize interaction writer (resolve platform ID) // Initialize interaction writer
await interactionWriter.init() await interactionWriter.init()
// Start workers // Start workers
@ -190,7 +184,6 @@ async function shutdown(): Promise<void> {
await incomingQueue.close() await incomingQueue.close()
await statusQueue.close() await statusQueue.close()
await app.close() await app.close()
await dbPool.end()
redis.disconnect() redis.disconnect()
log.info('Shutdown complete') log.info('Shutdown complete')
process.exit(0) process.exit(0)