cms.c2sgmbh/src/lib/monitoring/monitoring-service.ts
Martin Porwoll 037835d1de fix(ci): increase build heap size and format monitoring files
Build was OOM-ing in CI with default Node heap limit. Added
NODE_OPTIONS with 4GB heap. Also ran Prettier on monitoring files.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 11:58:08 +00:00

563 lines
17 KiB
TypeScript

/**
* Monitoring Service
*
* Collects system health metrics, service statuses, and external
* dependency checks. Used by the monitoring dashboard and snapshot collector.
*/
import os from "node:os";
import { execSync } from "node:child_process";
import type {
SystemHealth,
ProcessStatus,
PostgresqlStatus,
PgBouncerStatus,
RedisStatus,
SmtpStatus,
OAuthTokenStatus,
CronStatuses,
CronJobStatus,
SecretsHealthStatus,
SecurityMetricsStatus,
PerformanceMetrics,
SystemMetrics,
} from "./types";
import { checkSecretsHealth } from "../security/secrets-health";
import { getSecurityMetricsSnapshot } from "../security/security-observability";
// ============================================================================
// System Health
// ============================================================================
/**
* Collects system health metrics using Node.js os module and df command.
* CPU usage is calculated by sampling /proc/stat twice with 100ms delay.
*/
export async function checkSystemHealth(): Promise<SystemHealth> {
const cpuUsagePercent = await getCpuUsage();
const memoryTotalMB = Math.round(os.totalmem() / 1024 / 1024);
const memoryUsedMB = Math.round((os.totalmem() - os.freemem()) / 1024 / 1024);
const memoryUsagePercent = roundToOneDecimal((memoryUsedMB / memoryTotalMB) * 100);
const { diskUsedGB, diskTotalGB, diskUsagePercent } = getDiskUsage();
const [loadAvg1, loadAvg5] = os.loadavg();
return {
cpuUsagePercent: roundToOneDecimal(cpuUsagePercent),
memoryUsedMB,
memoryTotalMB,
memoryUsagePercent,
diskUsedGB,
diskTotalGB,
diskUsagePercent,
loadAvg1: roundToTwoDecimals(loadAvg1),
loadAvg5: roundToTwoDecimals(loadAvg5),
uptime: Math.round(os.uptime()),
};
}
// ============================================================================
// Service Checks
// ============================================================================
export async function checkRedis(): Promise<RedisStatus> {
const offlineStatus: RedisStatus = {
status: "offline",
memoryUsedMB: 0,
connectedClients: 0,
opsPerSec: 0,
};
try {
const { getRedisClient } = await import("../redis.js");
const client = getRedisClient();
if (!client) return offlineStatus;
const info = await client.info();
const getVal = (key: string): number => {
const match = info.match(new RegExp(`${key}:(\\d+)`));
return match ? parseInt(match[1], 10) : 0;
};
return {
status: "online",
memoryUsedMB: Math.round(getVal("used_memory") / 1024 / 1024),
connectedClients: getVal("connected_clients"),
opsPerSec: getVal("instantaneous_ops_per_sec"),
};
} catch {
return offlineStatus;
}
}
export async function checkPostgresql(): Promise<PostgresqlStatus> {
const offlineStatus: PostgresqlStatus = {
status: "offline",
connections: 0,
maxConnections: 50,
latencyMs: -1,
};
try {
const { getPayload } = await import("payload");
const payload = await getPayload({ config: (await import("@payload-config")).default });
const start = Date.now();
await payload.find({ collection: "users", limit: 0 });
const latencyMs = Date.now() - start;
let connections = 0;
let maxConnections = 50;
try {
const connResult = runPsql(
"-h 10.10.181.101 -U payload -d payload_db -t -c \"SELECT count(*) FROM pg_stat_activity WHERE datname = 'payload_db'\"",
);
connections = parseInt(connResult.trim(), 10) || 0;
const maxResult = runPsql('-h 10.10.181.101 -U payload -d payload_db -t -c "SHOW max_connections"');
maxConnections = parseInt(maxResult.trim(), 10) || 50;
} catch {
// psql unavailable -- latency check already proves connectivity
}
return {
status: latencyMs < 1000 ? "online" : "warning",
connections,
maxConnections,
latencyMs,
};
} catch {
return offlineStatus;
}
}
export async function checkPgBouncer(): Promise<PgBouncerStatus> {
const offlineStatus: PgBouncerStatus = {
status: "offline",
activeConnections: 0,
waitingClients: 0,
poolSize: 0,
};
try {
const output = runPsql('-h 127.0.0.1 -p 6432 -U payload -d pgbouncer -t -c "SHOW POOLS"');
// SHOW POOLS columns: database | user | cl_active | cl_waiting | sv_active | sv_idle | pool_size | ...
const lines = output
.trim()
.split("\n")
.filter((l) => l.includes("payload"));
let activeConnections = 0;
let waitingClients = 0;
let poolSize = 20;
for (const line of lines) {
const parts = line.split("|").map((s) => s.trim());
activeConnections += parseInt(parts[2], 10) || 0;
waitingClients += parseInt(parts[3], 10) || 0;
poolSize = parseInt(parts[6], 10) || 20;
}
return { status: "online", activeConnections, waitingClients, poolSize };
} catch {
return offlineStatus;
}
}
export interface QueueCounts {
waiting: number;
active: number;
completed: number;
failed: number;
}
export async function checkQueues(): Promise<Record<string, QueueCounts>> {
try {
const { Queue } = await import("bullmq");
const { getQueueRedisConnection } = await import("../queue/queue-service.js");
const connection = getQueueRedisConnection();
// Queue names matching QUEUE_NAMES in queue-service.ts
const queueNames = ["email", "pdf", "cleanup", "youtube-upload"];
const results: Record<string, QueueCounts> = {};
for (const name of queueNames) {
try {
const queue = new Queue(name, { connection });
const counts = await queue.getJobCounts();
results[name] = {
waiting: counts.waiting || 0,
active: counts.active || 0,
completed: counts.completed || 0,
failed: counts.failed || 0,
};
await queue.close();
} catch {
results[name] = { waiting: 0, active: 0, completed: 0, failed: 0 };
}
}
return results;
} catch {
return {};
}
}
export async function checkSmtp(): Promise<SmtpStatus> {
const now = new Date().toISOString();
try {
const nodemailer = await import("nodemailer");
const transporter = nodemailer.createTransport({
host: process.env.SMTP_HOST,
port: parseInt(process.env.SMTP_PORT || "587", 10),
secure: process.env.SMTP_SECURE === "true",
auth: {
user: process.env.SMTP_USER,
pass: process.env.SMTP_PASS,
},
});
const start = Date.now();
await transporter.verify();
const responseTimeMs = Date.now() - start;
return { status: "online", lastCheck: now, responseTimeMs };
} catch {
return { status: "offline", lastCheck: now, responseTimeMs: -1 };
}
}
export async function checkOAuthTokens(): Promise<{
metaOAuth: OAuthTokenStatus;
youtubeOAuth: OAuthTokenStatus;
}> {
const errorStatus: OAuthTokenStatus = {
status: "error",
tokensTotal: 0,
tokensExpiringSoon: 0,
tokensExpired: 0,
};
try {
const { getPayload } = await import("payload");
const payload = await getPayload({ config: (await import("@payload-config")).default });
const accounts = await payload.find({
collection: "social-accounts",
limit: 100,
where: { status: { equals: "connected" } },
});
const sevenDaysFromNow = new Date();
sevenDaysFromNow.setDate(sevenDaysFromNow.getDate() + 7);
const now = new Date();
const meta = { tokensTotal: 0, tokensExpiringSoon: 0, tokensExpired: 0 };
const youtube = { tokensTotal: 0, tokensExpiringSoon: 0, tokensExpired: 0 };
for (const account of accounts.docs) {
const doc = account as unknown as Record<string, unknown>;
const target = doc.platform === "youtube" ? youtube : meta;
target.tokensTotal++;
const expiresAt = doc.tokenExpiresAt ? new Date(doc.tokenExpiresAt as string) : null;
if (expiresAt) {
if (expiresAt < now) {
target.tokensExpired++;
} else if (expiresAt < sevenDaysFromNow) {
target.tokensExpiringSoon++;
}
}
}
return {
metaOAuth: { status: getOAuthStatus(meta), ...meta },
youtubeOAuth: { status: getOAuthStatus(youtube), ...youtube },
};
} catch {
return { metaOAuth: errorStatus, youtubeOAuth: errorStatus };
}
}
export async function checkCronJobs(): Promise<CronStatuses> {
const unknownStatus: CronJobStatus = { lastRun: "", status: "unknown" };
try {
const { getPayload } = await import("payload");
const payload = await getPayload({ config: (await import("@payload-config")).default });
async function checkCron(source: string): Promise<CronJobStatus> {
try {
const logs = await payload.find({
collection: "monitoring-logs",
limit: 1,
sort: "-createdAt",
where: {
and: [{ source: { equals: "cron" } }, { message: { contains: source } }],
},
});
if (logs.docs.length === 0) return unknownStatus;
const doc = logs.docs[0] as unknown as Record<string, unknown>;
return {
lastRun: doc.createdAt as string,
status: doc.level === "error" ? "failed" : "ok",
};
} catch {
return unknownStatus;
}
}
const [communitySync, tokenRefresh, youtubeSync] = await Promise.all([
checkCron("community-sync"),
checkCron("token-refresh"),
checkCron("youtube"),
]);
return { communitySync, tokenRefresh, youtubeSync };
} catch {
return {
communitySync: unknownStatus,
tokenRefresh: unknownStatus,
youtubeSync: unknownStatus,
};
}
}
// ============================================================================
// Full Metrics Collection
// ============================================================================
/**
* Collects all monitoring metrics in parallel. Individual check failures
* are isolated and return safe defaults instead of failing the whole collection.
*/
export async function collectMetrics(): Promise<Omit<SystemMetrics, "timestamp">> {
const [system, redis, postgresql, pgbouncer, smtp, oauth, cronJobs, secrets, securityEvents] =
await Promise.allSettled([
checkSystemHealth(),
checkRedis(),
checkPostgresql(),
checkPgBouncer(),
checkSmtp(),
checkOAuthTokens(),
checkCronJobs(),
Promise.resolve(checkSecretsHealth()),
Promise.resolve(getSecurityMetricsSnapshot()),
]);
// Load performance tracker lazily to avoid circular dependencies
let performance: PerformanceMetrics = {
avgResponseTimeMs: 0,
p95ResponseTimeMs: 0,
p99ResponseTimeMs: 0,
errorRate: 0,
requestsPerMinute: 0,
};
try {
// Dynamic path constructed at runtime to avoid Vite static analysis
// when performance-tracker module has not been created yet
const trackerPath = "./performance-tracker";
const mod = await import(/* @vite-ignore */ trackerPath);
performance = mod.performanceTracker.getMetrics("1h");
} catch {
// Performance tracker not yet initialized
}
const defaultProcess: ProcessStatus = {
status: "offline",
pid: 0,
memoryMB: 0,
uptimeSeconds: 0,
restarts: 0,
};
const { payloadProcess, queueWorkerProcess } = getPm2Processes(defaultProcess);
const oauthDefaults = {
metaOAuth: { status: "error" as const, tokensTotal: 0, tokensExpiringSoon: 0, tokensExpired: 0 },
youtubeOAuth: { status: "error" as const, tokensTotal: 0, tokensExpiringSoon: 0, tokensExpired: 0 },
};
const cronDefaults: CronStatuses = {
communitySync: { lastRun: "", status: "unknown" },
tokenRefresh: { lastRun: "", status: "unknown" },
youtubeSync: { lastRun: "", status: "unknown" },
};
const secretsDefaults: SecretsHealthStatus = {
status: "critical",
checkedAt: new Date().toISOString(),
missing: [],
expiringSoon: [],
expired: [],
rotationOverdue: [],
};
const securityEventsDefaults: SecurityMetricsStatus = {
windowMs: 300000,
counters: [],
};
const systemDefaults: SystemHealth = {
cpuUsagePercent: 0,
memoryUsedMB: 0,
memoryTotalMB: 0,
memoryUsagePercent: 0,
diskUsedGB: 0,
diskTotalGB: 0,
diskUsagePercent: 0,
loadAvg1: 0,
loadAvg5: 0,
uptime: 0,
};
const oauthResult = settled(oauth, oauthDefaults);
return {
system: settled(system, systemDefaults),
services: {
payload: payloadProcess,
queueWorker: queueWorkerProcess,
postgresql: settled(postgresql, { status: "offline", connections: 0, maxConnections: 50, latencyMs: -1 }),
pgbouncer: settled(pgbouncer, { status: "offline", activeConnections: 0, waitingClients: 0, poolSize: 0 }),
redis: settled(redis, { status: "offline", memoryUsedMB: 0, connectedClients: 0, opsPerSec: 0 }),
},
external: {
smtp: settled(smtp, { status: "offline", lastCheck: new Date().toISOString(), responseTimeMs: -1 }),
metaOAuth: oauthResult.metaOAuth,
youtubeOAuth: oauthResult.youtubeOAuth,
cronJobs: settled(cronJobs, cronDefaults),
secrets: settled(secrets, secretsDefaults),
securityEvents: settled(securityEvents, securityEventsDefaults),
},
performance,
};
}
// ============================================================================
// Internal Helpers
// ============================================================================
/**
* Runs a psql command with the database password passed via PGPASSWORD env var
* rather than inline in the command string (avoids secret detection false positives).
*/
function runPsql(args: string): string {
return execSync(`psql ${args}`, {
encoding: "utf-8",
timeout: 5000,
env: { ...process.env, PGPASSWORD: process.env.DB_PASSWORD || "" },
});
}
function roundToOneDecimal(value: number): number {
return Math.round(value * 10) / 10;
}
function roundToTwoDecimals(value: number): number {
return Math.round(value * 100) / 100;
}
/**
* Extracts the fulfilled value from a PromiseSettledResult, returning
* the fallback when the promise was rejected.
*/
function settled<T>(result: PromiseSettledResult<T>, fallback: T): T {
return result.status === "fulfilled" ? result.value : fallback;
}
async function getCpuUsage(): Promise<number> {
try {
const fs = await import("node:fs/promises");
const stat1 = await fs.readFile("/proc/stat", "utf-8");
await new Promise((resolve) => setTimeout(resolve, 100));
const stat2 = await fs.readFile("/proc/stat", "utf-8");
const parse = (data: string): { idle: number; total: number } => {
const line = data.split("\n")[0]; // first line: cpu user nice system idle ...
const parts = line.split(/\s+/).slice(1).map(Number);
const idle = parts[3] + (parts[4] || 0); // idle + iowait
const total = parts.reduce((a, b) => a + b, 0);
return { idle, total };
};
const s1 = parse(stat1);
const s2 = parse(stat2);
const idleDiff = s2.idle - s1.idle;
const totalDiff = s2.total - s1.total;
if (totalDiff === 0) return 0;
return ((totalDiff - idleDiff) / totalDiff) * 100;
} catch {
// Fallback if /proc/stat is unavailable
const cpuCount = os.cpus().length;
return (os.loadavg()[0] / cpuCount) * 100;
}
}
function getDiskUsage(): { diskUsedGB: number; diskTotalGB: number; diskUsagePercent: number } {
try {
const output = execSync("df -B1 / | tail -1", { encoding: "utf-8" });
const parts = output.trim().split(/\s+/);
// Format: filesystem 1B-blocks used available use% mountpoint
const total = parseInt(parts[1], 10);
const used = parseInt(parts[2], 10);
return {
diskTotalGB: roundToOneDecimal(total / 1024 / 1024 / 1024),
diskUsedGB: roundToOneDecimal(used / 1024 / 1024 / 1024),
diskUsagePercent: roundToOneDecimal((used / total) * 100),
};
} catch {
return { diskUsedGB: 0, diskTotalGB: 0, diskUsagePercent: 0 };
}
}
function getOAuthStatus(counts: { tokensExpired: number; tokensExpiringSoon: number }): OAuthTokenStatus["status"] {
if (counts.tokensExpired > 0) return "expired";
if (counts.tokensExpiringSoon > 0) return "expiring_soon";
return "ok";
}
interface Pm2Processes {
payloadProcess: ProcessStatus;
queueWorkerProcess: ProcessStatus;
}
function getPm2Processes(defaultProcess: ProcessStatus): Pm2Processes {
let payloadProcess = defaultProcess;
let queueWorkerProcess = defaultProcess;
try {
const pm2Out = execSync("pm2 jlist", { encoding: "utf-8", timeout: 5000 });
const pm2List = JSON.parse(pm2Out) as Array<Record<string, unknown>>;
for (const proc of pm2List) {
const env = proc.pm2_env as Record<string, unknown> | undefined;
const monit = proc.monit as Record<string, number> | undefined;
const info: ProcessStatus = {
status: env?.status === "online" ? "online" : "offline",
pid: (proc.pid as number) || 0,
memoryMB: Math.round((monit?.memory || 0) / 1024 / 1024),
uptimeSeconds: env?.pm_uptime ? Math.round((Date.now() - (env.pm_uptime as number)) / 1000) : 0,
restarts: (env?.restart_time as number) || 0,
};
if (proc.name === "payload") {
payloadProcess = info;
} else if (proc.name === "queue-worker") {
queueWorkerProcess = info;
}
}
} catch {
// PM2 not available
}
return { payloadProcess, queueWorkerProcess };
}