feat(monitoring): add monitoring service with system health and service checks

Implements checkSystemHealth (CPU, memory, disk, load), service checks
(Redis, PostgreSQL, PgBouncer, SMTP, queues, OAuth, cron), and the
collectMetrics aggregator that gathers all metrics in parallel.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Martin Porwoll 2026-02-15 00:26:28 +00:00
parent ee981c32bc
commit 4907371715
2 changed files with 601 additions and 0 deletions

View file

@ -0,0 +1,549 @@
/**
* Monitoring Service
*
* Collects system health metrics, service statuses, and external
* dependency checks. Used by the monitoring dashboard and snapshot collector.
*/
import os from 'node:os'
import { execSync } from 'node:child_process'
import type {
SystemHealth,
ProcessStatus,
PostgresqlStatus,
PgBouncerStatus,
RedisStatus,
SmtpStatus,
OAuthTokenStatus,
CronStatuses,
CronJobStatus,
PerformanceMetrics,
SystemMetrics,
} from './types.js'
// ============================================================================
// System Health
// ============================================================================
/**
* Collects system health metrics using Node.js os module and df command.
* CPU usage is calculated by sampling /proc/stat twice with 100ms delay.
*/
export async function checkSystemHealth(): Promise<SystemHealth> {
const cpuUsagePercent = await getCpuUsage()
const memoryTotalMB = Math.round(os.totalmem() / 1024 / 1024)
const memoryUsedMB = Math.round((os.totalmem() - os.freemem()) / 1024 / 1024)
const memoryUsagePercent = roundToOneDecimal((memoryUsedMB / memoryTotalMB) * 100)
const { diskUsedGB, diskTotalGB, diskUsagePercent } = getDiskUsage()
const [loadAvg1, loadAvg5] = os.loadavg()
return {
cpuUsagePercent: roundToOneDecimal(cpuUsagePercent),
memoryUsedMB,
memoryTotalMB,
memoryUsagePercent,
diskUsedGB,
diskTotalGB,
diskUsagePercent,
loadAvg1: roundToTwoDecimals(loadAvg1),
loadAvg5: roundToTwoDecimals(loadAvg5),
uptime: Math.round(os.uptime()),
}
}
// ============================================================================
// Service Checks
// ============================================================================
export async function checkRedis(): Promise<RedisStatus> {
const offlineStatus: RedisStatus = {
status: 'offline',
memoryUsedMB: 0,
connectedClients: 0,
opsPerSec: 0,
}
try {
const { getRedisClient } = await import('../redis.js')
const client = getRedisClient()
if (!client) return offlineStatus
const info = await client.info()
const getVal = (key: string): number => {
const match = info.match(new RegExp(`${key}:(\\d+)`))
return match ? parseInt(match[1], 10) : 0
}
return {
status: 'online',
memoryUsedMB: Math.round(getVal('used_memory') / 1024 / 1024),
connectedClients: getVal('connected_clients'),
opsPerSec: getVal('instantaneous_ops_per_sec'),
}
} catch {
return offlineStatus
}
}
export async function checkPostgresql(): Promise<PostgresqlStatus> {
const offlineStatus: PostgresqlStatus = {
status: 'offline',
connections: 0,
maxConnections: 50,
latencyMs: -1,
}
try {
const { getPayload } = await import('payload')
const payload = await getPayload({ config: (await import('@payload-config')).default })
const start = Date.now()
await payload.find({ collection: 'users', limit: 0 })
const latencyMs = Date.now() - start
let connections = 0
let maxConnections = 50
try {
const connResult = runPsql(
'-h 10.10.181.101 -U payload -d payload_db -t -c "SELECT count(*) FROM pg_stat_activity WHERE datname = \'payload_db\'"',
)
connections = parseInt(connResult.trim(), 10) || 0
const maxResult = runPsql(
'-h 10.10.181.101 -U payload -d payload_db -t -c "SHOW max_connections"',
)
maxConnections = parseInt(maxResult.trim(), 10) || 50
} catch {
// psql unavailable -- latency check already proves connectivity
}
return {
status: latencyMs < 1000 ? 'online' : 'warning',
connections,
maxConnections,
latencyMs,
}
} catch {
return offlineStatus
}
}
export async function checkPgBouncer(): Promise<PgBouncerStatus> {
const offlineStatus: PgBouncerStatus = {
status: 'offline',
activeConnections: 0,
waitingClients: 0,
poolSize: 0,
}
try {
const output = runPsql('-h 127.0.0.1 -p 6432 -U payload -d pgbouncer -t -c "SHOW POOLS"')
// SHOW POOLS columns: database | user | cl_active | cl_waiting | sv_active | sv_idle | pool_size | ...
const lines = output
.trim()
.split('\n')
.filter((l) => l.includes('payload'))
let activeConnections = 0
let waitingClients = 0
let poolSize = 20
for (const line of lines) {
const parts = line.split('|').map((s) => s.trim())
activeConnections += parseInt(parts[2], 10) || 0
waitingClients += parseInt(parts[3], 10) || 0
poolSize = parseInt(parts[6], 10) || 20
}
return { status: 'online', activeConnections, waitingClients, poolSize }
} catch {
return offlineStatus
}
}
export interface QueueCounts {
waiting: number
active: number
completed: number
failed: number
}
export async function checkQueues(): Promise<Record<string, QueueCounts>> {
try {
const { Queue } = await import('bullmq')
const { getQueueRedisConnection } = await import('../queue/queue-service.js')
const connection = getQueueRedisConnection()
// Queue names matching QUEUE_NAMES in queue-service.ts
const queueNames = ['email', 'pdf', 'cleanup', 'youtube-upload']
const results: Record<string, QueueCounts> = {}
for (const name of queueNames) {
try {
const queue = new Queue(name, { connection })
const counts = await queue.getJobCounts()
results[name] = {
waiting: counts.waiting || 0,
active: counts.active || 0,
completed: counts.completed || 0,
failed: counts.failed || 0,
}
await queue.close()
} catch {
results[name] = { waiting: 0, active: 0, completed: 0, failed: 0 }
}
}
return results
} catch {
return {}
}
}
export async function checkSmtp(): Promise<SmtpStatus> {
const now = new Date().toISOString()
try {
const nodemailer = await import('nodemailer')
const transporter = nodemailer.createTransport({
host: process.env.SMTP_HOST,
port: parseInt(process.env.SMTP_PORT || '587', 10),
secure: process.env.SMTP_SECURE === 'true',
auth: {
user: process.env.SMTP_USER,
pass: process.env.SMTP_PASS,
},
})
const start = Date.now()
await transporter.verify()
const responseTimeMs = Date.now() - start
return { status: 'online', lastCheck: now, responseTimeMs }
} catch {
return { status: 'offline', lastCheck: now, responseTimeMs: -1 }
}
}
export async function checkOAuthTokens(): Promise<{
metaOAuth: OAuthTokenStatus
youtubeOAuth: OAuthTokenStatus
}> {
const errorStatus: OAuthTokenStatus = {
status: 'error',
tokensTotal: 0,
tokensExpiringSoon: 0,
tokensExpired: 0,
}
try {
const { getPayload } = await import('payload')
const payload = await getPayload({ config: (await import('@payload-config')).default })
const accounts = await payload.find({
collection: 'social-accounts',
limit: 100,
where: { status: { equals: 'connected' } },
})
const sevenDaysFromNow = new Date()
sevenDaysFromNow.setDate(sevenDaysFromNow.getDate() + 7)
const now = new Date()
const meta = { tokensTotal: 0, tokensExpiringSoon: 0, tokensExpired: 0 }
const youtube = { tokensTotal: 0, tokensExpiringSoon: 0, tokensExpired: 0 }
for (const account of accounts.docs) {
const doc = account as Record<string, unknown>
const target = doc.platform === 'youtube' ? youtube : meta
target.tokensTotal++
const expiresAt = doc.tokenExpiresAt ? new Date(doc.tokenExpiresAt as string) : null
if (expiresAt) {
if (expiresAt < now) {
target.tokensExpired++
} else if (expiresAt < sevenDaysFromNow) {
target.tokensExpiringSoon++
}
}
}
return {
metaOAuth: { status: getOAuthStatus(meta), ...meta },
youtubeOAuth: { status: getOAuthStatus(youtube), ...youtube },
}
} catch {
return { metaOAuth: errorStatus, youtubeOAuth: errorStatus }
}
}
export async function checkCronJobs(): Promise<CronStatuses> {
const unknownStatus: CronJobStatus = { lastRun: '', status: 'unknown' }
try {
const { getPayload } = await import('payload')
const payload = await getPayload({ config: (await import('@payload-config')).default })
async function checkCron(source: string): Promise<CronJobStatus> {
try {
const logs = await payload.find({
collection: 'monitoring-logs',
limit: 1,
sort: '-createdAt',
where: {
and: [
{ source: { equals: 'cron' } },
{ message: { contains: source } },
],
},
})
if (logs.docs.length === 0) return unknownStatus
const doc = logs.docs[0] as Record<string, unknown>
return {
lastRun: doc.createdAt as string,
status: doc.level === 'error' ? 'failed' : 'ok',
}
} catch {
return unknownStatus
}
}
const [communitySync, tokenRefresh, youtubeSync] = await Promise.all([
checkCron('community-sync'),
checkCron('token-refresh'),
checkCron('youtube'),
])
return { communitySync, tokenRefresh, youtubeSync }
} catch {
return {
communitySync: unknownStatus,
tokenRefresh: unknownStatus,
youtubeSync: unknownStatus,
}
}
}
// ============================================================================
// Full Metrics Collection
// ============================================================================
/**
* Collects all monitoring metrics in parallel. Individual check failures
* are isolated and return safe defaults instead of failing the whole collection.
*/
export async function collectMetrics(): Promise<Omit<SystemMetrics, 'timestamp'>> {
const [system, redis, postgresql, pgbouncer, smtp, oauth, cronJobs] = await Promise.allSettled([
checkSystemHealth(),
checkRedis(),
checkPostgresql(),
checkPgBouncer(),
checkSmtp(),
checkOAuthTokens(),
checkCronJobs(),
])
// Load performance tracker lazily to avoid circular dependencies
let performance: PerformanceMetrics = {
avgResponseTimeMs: 0,
p95ResponseTimeMs: 0,
p99ResponseTimeMs: 0,
errorRate: 0,
requestsPerMinute: 0,
}
try {
// Dynamic path constructed at runtime to avoid Vite static analysis
// when performance-tracker module has not been created yet
const trackerPath = './performance-tracker'
const mod = await import(/* @vite-ignore */ trackerPath)
performance = mod.performanceTracker.getMetrics('1h')
} catch {
// Performance tracker not yet initialized
}
const defaultProcess: ProcessStatus = {
status: 'offline',
pid: 0,
memoryMB: 0,
uptimeSeconds: 0,
restarts: 0,
}
const { payloadProcess, queueWorkerProcess } = getPm2Processes(defaultProcess)
const oauthDefaults = {
metaOAuth: { status: 'error' as const, tokensTotal: 0, tokensExpiringSoon: 0, tokensExpired: 0 },
youtubeOAuth: { status: 'error' as const, tokensTotal: 0, tokensExpiringSoon: 0, tokensExpired: 0 },
}
const cronDefaults: CronStatuses = {
communitySync: { lastRun: '', status: 'unknown' },
tokenRefresh: { lastRun: '', status: 'unknown' },
youtubeSync: { lastRun: '', status: 'unknown' },
}
const systemDefaults: SystemHealth = {
cpuUsagePercent: 0,
memoryUsedMB: 0,
memoryTotalMB: 0,
memoryUsagePercent: 0,
diskUsedGB: 0,
diskTotalGB: 0,
diskUsagePercent: 0,
loadAvg1: 0,
loadAvg5: 0,
uptime: 0,
}
const oauthResult = settled(oauth, oauthDefaults)
return {
system: settled(system, systemDefaults),
services: {
payload: payloadProcess,
queueWorker: queueWorkerProcess,
postgresql: settled(postgresql, { status: 'offline', connections: 0, maxConnections: 50, latencyMs: -1 }),
pgbouncer: settled(pgbouncer, { status: 'offline', activeConnections: 0, waitingClients: 0, poolSize: 0 }),
redis: settled(redis, { status: 'offline', memoryUsedMB: 0, connectedClients: 0, opsPerSec: 0 }),
},
external: {
smtp: settled(smtp, { status: 'offline', lastCheck: new Date().toISOString(), responseTimeMs: -1 }),
metaOAuth: oauthResult.metaOAuth,
youtubeOAuth: oauthResult.youtubeOAuth,
cronJobs: settled(cronJobs, cronDefaults),
},
performance,
}
}
// ============================================================================
// Internal Helpers
// ============================================================================
/**
* Runs a psql command with the database password passed via PGPASSWORD env var
* rather than inline in the command string (avoids secret detection false positives).
*/
function runPsql(args: string): string {
return execSync(`psql ${args}`, {
encoding: 'utf-8',
timeout: 5000,
env: { ...process.env, PGPASSWORD: process.env.DB_PASSWORD || '' },
})
}
function roundToOneDecimal(value: number): number {
return Math.round(value * 10) / 10
}
function roundToTwoDecimals(value: number): number {
return Math.round(value * 100) / 100
}
/**
* Extracts the fulfilled value from a PromiseSettledResult, returning
* the fallback when the promise was rejected.
*/
function settled<T>(result: PromiseSettledResult<T>, fallback: T): T {
return result.status === 'fulfilled' ? result.value : fallback
}
async function getCpuUsage(): Promise<number> {
try {
const fs = await import('node:fs/promises')
const stat1 = await fs.readFile('/proc/stat', 'utf-8')
await new Promise((resolve) => setTimeout(resolve, 100))
const stat2 = await fs.readFile('/proc/stat', 'utf-8')
const parse = (data: string): { idle: number; total: number } => {
const line = data.split('\n')[0] // first line: cpu user nice system idle ...
const parts = line.split(/\s+/).slice(1).map(Number)
const idle = parts[3] + (parts[4] || 0) // idle + iowait
const total = parts.reduce((a, b) => a + b, 0)
return { idle, total }
}
const s1 = parse(stat1)
const s2 = parse(stat2)
const idleDiff = s2.idle - s1.idle
const totalDiff = s2.total - s1.total
if (totalDiff === 0) return 0
return ((totalDiff - idleDiff) / totalDiff) * 100
} catch {
// Fallback if /proc/stat is unavailable
const cpuCount = os.cpus().length
return (os.loadavg()[0] / cpuCount) * 100
}
}
function getDiskUsage(): { diskUsedGB: number; diskTotalGB: number; diskUsagePercent: number } {
try {
const output = execSync('df -B1 / | tail -1', { encoding: 'utf-8' })
const parts = output.trim().split(/\s+/)
// Format: filesystem 1B-blocks used available use% mountpoint
const total = parseInt(parts[1], 10)
const used = parseInt(parts[2], 10)
return {
diskTotalGB: roundToOneDecimal(total / 1024 / 1024 / 1024),
diskUsedGB: roundToOneDecimal(used / 1024 / 1024 / 1024),
diskUsagePercent: roundToOneDecimal((used / total) * 100),
}
} catch {
return { diskUsedGB: 0, diskTotalGB: 0, diskUsagePercent: 0 }
}
}
function getOAuthStatus(
counts: { tokensExpired: number; tokensExpiringSoon: number },
): OAuthTokenStatus['status'] {
if (counts.tokensExpired > 0) return 'expired'
if (counts.tokensExpiringSoon > 0) return 'expiring_soon'
return 'ok'
}
interface Pm2Processes {
payloadProcess: ProcessStatus
queueWorkerProcess: ProcessStatus
}
function getPm2Processes(defaultProcess: ProcessStatus): Pm2Processes {
let payloadProcess = defaultProcess
let queueWorkerProcess = defaultProcess
try {
const pm2Out = execSync('pm2 jlist', { encoding: 'utf-8', timeout: 5000 })
const pm2List = JSON.parse(pm2Out) as Array<Record<string, unknown>>
for (const proc of pm2List) {
const env = proc.pm2_env as Record<string, unknown> | undefined
const monit = proc.monit as Record<string, number> | undefined
const info: ProcessStatus = {
status: env?.status === 'online' ? 'online' : 'offline',
pid: (proc.pid as number) || 0,
memoryMB: Math.round((monit?.memory || 0) / 1024 / 1024),
uptimeSeconds: env?.pm_uptime
? Math.round((Date.now() - (env.pm_uptime as number)) / 1000)
: 0,
restarts: (env?.restart_time as number) || 0,
}
if (proc.name === 'payload') {
payloadProcess = info
} else if (proc.name === 'queue-worker') {
queueWorkerProcess = info
}
}
} catch {
// PM2 not available
}
return { payloadProcess, queueWorkerProcess }
}

View file

@ -0,0 +1,52 @@
import { describe, it, expect } from 'vitest'
import { checkSystemHealth } from '@/lib/monitoring/monitoring-service'
describe('MonitoringService', () => {
describe('checkSystemHealth', () => {
it('returns CPU, memory, disk, load, and uptime', async () => {
const health = await checkSystemHealth()
expect(health.cpuUsagePercent).toBeGreaterThanOrEqual(0)
expect(health.cpuUsagePercent).toBeLessThanOrEqual(100)
expect(health.memoryTotalMB).toBeGreaterThan(0)
expect(health.memoryUsedMB).toBeGreaterThan(0)
expect(health.memoryUsagePercent).toBeGreaterThanOrEqual(0)
expect(health.memoryUsagePercent).toBeLessThanOrEqual(100)
expect(health.diskTotalGB).toBeGreaterThan(0)
expect(health.diskUsedGB).toBeGreaterThanOrEqual(0)
expect(health.uptime).toBeGreaterThan(0)
expect(health.loadAvg1).toBeGreaterThanOrEqual(0)
expect(health.loadAvg5).toBeGreaterThanOrEqual(0)
})
it('returns rounded values with correct decimal precision', async () => {
const health = await checkSystemHealth()
// memoryUsagePercent should have at most 1 decimal place
const memDecimal = health.memoryUsagePercent.toString().split('.')[1]
if (memDecimal) {
expect(memDecimal.length).toBeLessThanOrEqual(1)
}
// cpuUsagePercent should have at most 1 decimal place
const cpuDecimal = health.cpuUsagePercent.toString().split('.')[1]
if (cpuDecimal) {
expect(cpuDecimal.length).toBeLessThanOrEqual(1)
}
// loadAvg1 should have at most 2 decimal places
const loadDecimal = health.loadAvg1.toString().split('.')[1]
if (loadDecimal) {
expect(loadDecimal.length).toBeLessThanOrEqual(2)
}
})
it('returns integer values for memoryUsedMB, memoryTotalMB, and uptime', async () => {
const health = await checkSystemHealth()
expect(Number.isInteger(health.memoryUsedMB)).toBe(true)
expect(Number.isInteger(health.memoryTotalMB)).toBe(true)
expect(Number.isInteger(health.uptime)).toBe(true)
})
})
})