diff --git a/src/lib/monitoring/monitoring-service.ts b/src/lib/monitoring/monitoring-service.ts new file mode 100644 index 0000000..719e922 --- /dev/null +++ b/src/lib/monitoring/monitoring-service.ts @@ -0,0 +1,549 @@ +/** + * Monitoring Service + * + * Collects system health metrics, service statuses, and external + * dependency checks. Used by the monitoring dashboard and snapshot collector. + */ + +import os from 'node:os' +import { execSync } from 'node:child_process' +import type { + SystemHealth, + ProcessStatus, + PostgresqlStatus, + PgBouncerStatus, + RedisStatus, + SmtpStatus, + OAuthTokenStatus, + CronStatuses, + CronJobStatus, + PerformanceMetrics, + SystemMetrics, +} from './types.js' + +// ============================================================================ +// System Health +// ============================================================================ + +/** + * Collects system health metrics using Node.js os module and df command. + * CPU usage is calculated by sampling /proc/stat twice with 100ms delay. + */ +export async function checkSystemHealth(): Promise { + const cpuUsagePercent = await getCpuUsage() + + const memoryTotalMB = Math.round(os.totalmem() / 1024 / 1024) + const memoryUsedMB = Math.round((os.totalmem() - os.freemem()) / 1024 / 1024) + const memoryUsagePercent = roundToOneDecimal((memoryUsedMB / memoryTotalMB) * 100) + + const { diskUsedGB, diskTotalGB, diskUsagePercent } = getDiskUsage() + + const [loadAvg1, loadAvg5] = os.loadavg() + + return { + cpuUsagePercent: roundToOneDecimal(cpuUsagePercent), + memoryUsedMB, + memoryTotalMB, + memoryUsagePercent, + diskUsedGB, + diskTotalGB, + diskUsagePercent, + loadAvg1: roundToTwoDecimals(loadAvg1), + loadAvg5: roundToTwoDecimals(loadAvg5), + uptime: Math.round(os.uptime()), + } +} + +// ============================================================================ +// Service Checks +// ============================================================================ + +export async function checkRedis(): Promise { + const offlineStatus: RedisStatus = { + status: 'offline', + memoryUsedMB: 0, + connectedClients: 0, + opsPerSec: 0, + } + + try { + const { getRedisClient } = await import('../redis.js') + const client = getRedisClient() + if (!client) return offlineStatus + + const info = await client.info() + const getVal = (key: string): number => { + const match = info.match(new RegExp(`${key}:(\\d+)`)) + return match ? parseInt(match[1], 10) : 0 + } + + return { + status: 'online', + memoryUsedMB: Math.round(getVal('used_memory') / 1024 / 1024), + connectedClients: getVal('connected_clients'), + opsPerSec: getVal('instantaneous_ops_per_sec'), + } + } catch { + return offlineStatus + } +} + +export async function checkPostgresql(): Promise { + const offlineStatus: PostgresqlStatus = { + status: 'offline', + connections: 0, + maxConnections: 50, + latencyMs: -1, + } + + try { + const { getPayload } = await import('payload') + const payload = await getPayload({ config: (await import('@payload-config')).default }) + + const start = Date.now() + await payload.find({ collection: 'users', limit: 0 }) + const latencyMs = Date.now() - start + + let connections = 0 + let maxConnections = 50 + try { + const connResult = runPsql( + '-h 10.10.181.101 -U payload -d payload_db -t -c "SELECT count(*) FROM pg_stat_activity WHERE datname = \'payload_db\'"', + ) + connections = parseInt(connResult.trim(), 10) || 0 + + const maxResult = runPsql( + '-h 10.10.181.101 -U payload -d payload_db -t -c "SHOW max_connections"', + ) + maxConnections = parseInt(maxResult.trim(), 10) || 50 + } catch { + // psql unavailable -- latency check already proves connectivity + } + + return { + status: latencyMs < 1000 ? 'online' : 'warning', + connections, + maxConnections, + latencyMs, + } + } catch { + return offlineStatus + } +} + +export async function checkPgBouncer(): Promise { + const offlineStatus: PgBouncerStatus = { + status: 'offline', + activeConnections: 0, + waitingClients: 0, + poolSize: 0, + } + + try { + const output = runPsql('-h 127.0.0.1 -p 6432 -U payload -d pgbouncer -t -c "SHOW POOLS"') + + // SHOW POOLS columns: database | user | cl_active | cl_waiting | sv_active | sv_idle | pool_size | ... + const lines = output + .trim() + .split('\n') + .filter((l) => l.includes('payload')) + + let activeConnections = 0 + let waitingClients = 0 + let poolSize = 20 + + for (const line of lines) { + const parts = line.split('|').map((s) => s.trim()) + activeConnections += parseInt(parts[2], 10) || 0 + waitingClients += parseInt(parts[3], 10) || 0 + poolSize = parseInt(parts[6], 10) || 20 + } + + return { status: 'online', activeConnections, waitingClients, poolSize } + } catch { + return offlineStatus + } +} + +export interface QueueCounts { + waiting: number + active: number + completed: number + failed: number +} + +export async function checkQueues(): Promise> { + try { + const { Queue } = await import('bullmq') + const { getQueueRedisConnection } = await import('../queue/queue-service.js') + + const connection = getQueueRedisConnection() + // Queue names matching QUEUE_NAMES in queue-service.ts + const queueNames = ['email', 'pdf', 'cleanup', 'youtube-upload'] + const results: Record = {} + + for (const name of queueNames) { + try { + const queue = new Queue(name, { connection }) + const counts = await queue.getJobCounts() + results[name] = { + waiting: counts.waiting || 0, + active: counts.active || 0, + completed: counts.completed || 0, + failed: counts.failed || 0, + } + await queue.close() + } catch { + results[name] = { waiting: 0, active: 0, completed: 0, failed: 0 } + } + } + + return results + } catch { + return {} + } +} + +export async function checkSmtp(): Promise { + const now = new Date().toISOString() + + try { + const nodemailer = await import('nodemailer') + const transporter = nodemailer.createTransport({ + host: process.env.SMTP_HOST, + port: parseInt(process.env.SMTP_PORT || '587', 10), + secure: process.env.SMTP_SECURE === 'true', + auth: { + user: process.env.SMTP_USER, + pass: process.env.SMTP_PASS, + }, + }) + + const start = Date.now() + await transporter.verify() + const responseTimeMs = Date.now() - start + + return { status: 'online', lastCheck: now, responseTimeMs } + } catch { + return { status: 'offline', lastCheck: now, responseTimeMs: -1 } + } +} + +export async function checkOAuthTokens(): Promise<{ + metaOAuth: OAuthTokenStatus + youtubeOAuth: OAuthTokenStatus +}> { + const errorStatus: OAuthTokenStatus = { + status: 'error', + tokensTotal: 0, + tokensExpiringSoon: 0, + tokensExpired: 0, + } + + try { + const { getPayload } = await import('payload') + const payload = await getPayload({ config: (await import('@payload-config')).default }) + + const accounts = await payload.find({ + collection: 'social-accounts', + limit: 100, + where: { status: { equals: 'connected' } }, + }) + + const sevenDaysFromNow = new Date() + sevenDaysFromNow.setDate(sevenDaysFromNow.getDate() + 7) + const now = new Date() + + const meta = { tokensTotal: 0, tokensExpiringSoon: 0, tokensExpired: 0 } + const youtube = { tokensTotal: 0, tokensExpiringSoon: 0, tokensExpired: 0 } + + for (const account of accounts.docs) { + const doc = account as Record + const target = doc.platform === 'youtube' ? youtube : meta + target.tokensTotal++ + + const expiresAt = doc.tokenExpiresAt ? new Date(doc.tokenExpiresAt as string) : null + if (expiresAt) { + if (expiresAt < now) { + target.tokensExpired++ + } else if (expiresAt < sevenDaysFromNow) { + target.tokensExpiringSoon++ + } + } + } + + return { + metaOAuth: { status: getOAuthStatus(meta), ...meta }, + youtubeOAuth: { status: getOAuthStatus(youtube), ...youtube }, + } + } catch { + return { metaOAuth: errorStatus, youtubeOAuth: errorStatus } + } +} + +export async function checkCronJobs(): Promise { + const unknownStatus: CronJobStatus = { lastRun: '', status: 'unknown' } + + try { + const { getPayload } = await import('payload') + const payload = await getPayload({ config: (await import('@payload-config')).default }) + + async function checkCron(source: string): Promise { + try { + const logs = await payload.find({ + collection: 'monitoring-logs', + limit: 1, + sort: '-createdAt', + where: { + and: [ + { source: { equals: 'cron' } }, + { message: { contains: source } }, + ], + }, + }) + + if (logs.docs.length === 0) return unknownStatus + + const doc = logs.docs[0] as Record + return { + lastRun: doc.createdAt as string, + status: doc.level === 'error' ? 'failed' : 'ok', + } + } catch { + return unknownStatus + } + } + + const [communitySync, tokenRefresh, youtubeSync] = await Promise.all([ + checkCron('community-sync'), + checkCron('token-refresh'), + checkCron('youtube'), + ]) + + return { communitySync, tokenRefresh, youtubeSync } + } catch { + return { + communitySync: unknownStatus, + tokenRefresh: unknownStatus, + youtubeSync: unknownStatus, + } + } +} + +// ============================================================================ +// Full Metrics Collection +// ============================================================================ + +/** + * Collects all monitoring metrics in parallel. Individual check failures + * are isolated and return safe defaults instead of failing the whole collection. + */ +export async function collectMetrics(): Promise> { + const [system, redis, postgresql, pgbouncer, smtp, oauth, cronJobs] = await Promise.allSettled([ + checkSystemHealth(), + checkRedis(), + checkPostgresql(), + checkPgBouncer(), + checkSmtp(), + checkOAuthTokens(), + checkCronJobs(), + ]) + + // Load performance tracker lazily to avoid circular dependencies + let performance: PerformanceMetrics = { + avgResponseTimeMs: 0, + p95ResponseTimeMs: 0, + p99ResponseTimeMs: 0, + errorRate: 0, + requestsPerMinute: 0, + } + try { + // Dynamic path constructed at runtime to avoid Vite static analysis + // when performance-tracker module has not been created yet + const trackerPath = './performance-tracker' + const mod = await import(/* @vite-ignore */ trackerPath) + performance = mod.performanceTracker.getMetrics('1h') + } catch { + // Performance tracker not yet initialized + } + + const defaultProcess: ProcessStatus = { + status: 'offline', + pid: 0, + memoryMB: 0, + uptimeSeconds: 0, + restarts: 0, + } + + const { payloadProcess, queueWorkerProcess } = getPm2Processes(defaultProcess) + + const oauthDefaults = { + metaOAuth: { status: 'error' as const, tokensTotal: 0, tokensExpiringSoon: 0, tokensExpired: 0 }, + youtubeOAuth: { status: 'error' as const, tokensTotal: 0, tokensExpiringSoon: 0, tokensExpired: 0 }, + } + + const cronDefaults: CronStatuses = { + communitySync: { lastRun: '', status: 'unknown' }, + tokenRefresh: { lastRun: '', status: 'unknown' }, + youtubeSync: { lastRun: '', status: 'unknown' }, + } + + const systemDefaults: SystemHealth = { + cpuUsagePercent: 0, + memoryUsedMB: 0, + memoryTotalMB: 0, + memoryUsagePercent: 0, + diskUsedGB: 0, + diskTotalGB: 0, + diskUsagePercent: 0, + loadAvg1: 0, + loadAvg5: 0, + uptime: 0, + } + + const oauthResult = settled(oauth, oauthDefaults) + + return { + system: settled(system, systemDefaults), + services: { + payload: payloadProcess, + queueWorker: queueWorkerProcess, + postgresql: settled(postgresql, { status: 'offline', connections: 0, maxConnections: 50, latencyMs: -1 }), + pgbouncer: settled(pgbouncer, { status: 'offline', activeConnections: 0, waitingClients: 0, poolSize: 0 }), + redis: settled(redis, { status: 'offline', memoryUsedMB: 0, connectedClients: 0, opsPerSec: 0 }), + }, + external: { + smtp: settled(smtp, { status: 'offline', lastCheck: new Date().toISOString(), responseTimeMs: -1 }), + metaOAuth: oauthResult.metaOAuth, + youtubeOAuth: oauthResult.youtubeOAuth, + cronJobs: settled(cronJobs, cronDefaults), + }, + performance, + } +} + +// ============================================================================ +// Internal Helpers +// ============================================================================ + +/** + * Runs a psql command with the database password passed via PGPASSWORD env var + * rather than inline in the command string (avoids secret detection false positives). + */ +function runPsql(args: string): string { + return execSync(`psql ${args}`, { + encoding: 'utf-8', + timeout: 5000, + env: { ...process.env, PGPASSWORD: process.env.DB_PASSWORD || '' }, + }) +} + +function roundToOneDecimal(value: number): number { + return Math.round(value * 10) / 10 +} + +function roundToTwoDecimals(value: number): number { + return Math.round(value * 100) / 100 +} + +/** + * Extracts the fulfilled value from a PromiseSettledResult, returning + * the fallback when the promise was rejected. + */ +function settled(result: PromiseSettledResult, fallback: T): T { + return result.status === 'fulfilled' ? result.value : fallback +} + +async function getCpuUsage(): Promise { + try { + const fs = await import('node:fs/promises') + const stat1 = await fs.readFile('/proc/stat', 'utf-8') + await new Promise((resolve) => setTimeout(resolve, 100)) + const stat2 = await fs.readFile('/proc/stat', 'utf-8') + + const parse = (data: string): { idle: number; total: number } => { + const line = data.split('\n')[0] // first line: cpu user nice system idle ... + const parts = line.split(/\s+/).slice(1).map(Number) + const idle = parts[3] + (parts[4] || 0) // idle + iowait + const total = parts.reduce((a, b) => a + b, 0) + return { idle, total } + } + + const s1 = parse(stat1) + const s2 = parse(stat2) + const idleDiff = s2.idle - s1.idle + const totalDiff = s2.total - s1.total + + if (totalDiff === 0) return 0 + return ((totalDiff - idleDiff) / totalDiff) * 100 + } catch { + // Fallback if /proc/stat is unavailable + const cpuCount = os.cpus().length + return (os.loadavg()[0] / cpuCount) * 100 + } +} + +function getDiskUsage(): { diskUsedGB: number; diskTotalGB: number; diskUsagePercent: number } { + try { + const output = execSync('df -B1 / | tail -1', { encoding: 'utf-8' }) + const parts = output.trim().split(/\s+/) + // Format: filesystem 1B-blocks used available use% mountpoint + const total = parseInt(parts[1], 10) + const used = parseInt(parts[2], 10) + return { + diskTotalGB: roundToOneDecimal(total / 1024 / 1024 / 1024), + diskUsedGB: roundToOneDecimal(used / 1024 / 1024 / 1024), + diskUsagePercent: roundToOneDecimal((used / total) * 100), + } + } catch { + return { diskUsedGB: 0, diskTotalGB: 0, diskUsagePercent: 0 } + } +} + +function getOAuthStatus( + counts: { tokensExpired: number; tokensExpiringSoon: number }, +): OAuthTokenStatus['status'] { + if (counts.tokensExpired > 0) return 'expired' + if (counts.tokensExpiringSoon > 0) return 'expiring_soon' + return 'ok' +} + +interface Pm2Processes { + payloadProcess: ProcessStatus + queueWorkerProcess: ProcessStatus +} + +function getPm2Processes(defaultProcess: ProcessStatus): Pm2Processes { + let payloadProcess = defaultProcess + let queueWorkerProcess = defaultProcess + + try { + const pm2Out = execSync('pm2 jlist', { encoding: 'utf-8', timeout: 5000 }) + const pm2List = JSON.parse(pm2Out) as Array> + + for (const proc of pm2List) { + const env = proc.pm2_env as Record | undefined + const monit = proc.monit as Record | undefined + + const info: ProcessStatus = { + status: env?.status === 'online' ? 'online' : 'offline', + pid: (proc.pid as number) || 0, + memoryMB: Math.round((monit?.memory || 0) / 1024 / 1024), + uptimeSeconds: env?.pm_uptime + ? Math.round((Date.now() - (env.pm_uptime as number)) / 1000) + : 0, + restarts: (env?.restart_time as number) || 0, + } + + if (proc.name === 'payload') { + payloadProcess = info + } else if (proc.name === 'queue-worker') { + queueWorkerProcess = info + } + } + } catch { + // PM2 not available + } + + return { payloadProcess, queueWorkerProcess } +} diff --git a/tests/unit/monitoring/monitoring-service.unit.spec.ts b/tests/unit/monitoring/monitoring-service.unit.spec.ts new file mode 100644 index 0000000..75c354c --- /dev/null +++ b/tests/unit/monitoring/monitoring-service.unit.spec.ts @@ -0,0 +1,52 @@ +import { describe, it, expect } from 'vitest' +import { checkSystemHealth } from '@/lib/monitoring/monitoring-service' + +describe('MonitoringService', () => { + describe('checkSystemHealth', () => { + it('returns CPU, memory, disk, load, and uptime', async () => { + const health = await checkSystemHealth() + + expect(health.cpuUsagePercent).toBeGreaterThanOrEqual(0) + expect(health.cpuUsagePercent).toBeLessThanOrEqual(100) + expect(health.memoryTotalMB).toBeGreaterThan(0) + expect(health.memoryUsedMB).toBeGreaterThan(0) + expect(health.memoryUsagePercent).toBeGreaterThanOrEqual(0) + expect(health.memoryUsagePercent).toBeLessThanOrEqual(100) + expect(health.diskTotalGB).toBeGreaterThan(0) + expect(health.diskUsedGB).toBeGreaterThanOrEqual(0) + expect(health.uptime).toBeGreaterThan(0) + expect(health.loadAvg1).toBeGreaterThanOrEqual(0) + expect(health.loadAvg5).toBeGreaterThanOrEqual(0) + }) + + it('returns rounded values with correct decimal precision', async () => { + const health = await checkSystemHealth() + + // memoryUsagePercent should have at most 1 decimal place + const memDecimal = health.memoryUsagePercent.toString().split('.')[1] + if (memDecimal) { + expect(memDecimal.length).toBeLessThanOrEqual(1) + } + + // cpuUsagePercent should have at most 1 decimal place + const cpuDecimal = health.cpuUsagePercent.toString().split('.')[1] + if (cpuDecimal) { + expect(cpuDecimal.length).toBeLessThanOrEqual(1) + } + + // loadAvg1 should have at most 2 decimal places + const loadDecimal = health.loadAvg1.toString().split('.')[1] + if (loadDecimal) { + expect(loadDecimal.length).toBeLessThanOrEqual(2) + } + }) + + it('returns integer values for memoryUsedMB, memoryTotalMB, and uptime', async () => { + const health = await checkSystemHealth() + + expect(Number.isInteger(health.memoryUsedMB)).toBe(true) + expect(Number.isInteger(health.memoryTotalMB)).toBe(true) + expect(Number.isInteger(health.uptime)).toBe(true) + }) + }) +})