diff --git a/CLAUDE.md b/CLAUDE.md index dec9ef9..331e90b 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -302,6 +302,7 @@ PGPASSWORD="$DB_PASSWORD" psql -h 10.10.181.101 -U payload -d payload_db -c "\dt - **Newsletter Abmeldung:** https://pl.c2sgmbh.de/api/newsletter/unsubscribe (GET/POST) - **Timeline API:** https://pl.c2sgmbh.de/api/timelines (GET, öffentlich, tenant required) - **Workflows API:** https://pl.c2sgmbh.de/api/workflows (GET, öffentlich, tenant required) +- **Data Retention API:** https://pl.c2sgmbh.de/api/retention (GET/POST, Super-Admin erforderlich) ## Security-Features @@ -477,9 +478,102 @@ const status = await getPdfJobStatus(job.id) Über `ecosystem.config.cjs`: - `QUEUE_EMAIL_CONCURRENCY`: Parallele E-Mail-Jobs (default: 3) - `QUEUE_PDF_CONCURRENCY`: Parallele PDF-Jobs (default: 2) +- `QUEUE_RETENTION_CONCURRENCY`: Parallele Retention-Jobs (default: 1) - `QUEUE_DEFAULT_RETRY`: Retry-Versuche (default: 3) - `QUEUE_REDIS_DB`: Redis-Datenbank für Queue (default: 1) +## Data Retention + +Automatische Datenbereinigung für DSGVO-Compliance und Speicheroptimierung. + +### Retention Policies + +| Collection | Retention | Umgebungsvariable | Beschreibung | +|------------|-----------|-------------------|--------------| +| email-logs | 90 Tage | `RETENTION_EMAIL_LOGS_DAYS` | E-Mail-Protokolle | +| audit-logs | 90 Tage | `RETENTION_AUDIT_LOGS_DAYS` | Audit-Trail | +| consent-logs | 3 Jahre | `RETENTION_CONSENT_LOGS_DAYS` | DSGVO: expiresAt-basiert | +| media (orphans) | 30 Tage | `RETENTION_MEDIA_ORPHAN_MIN_AGE_DAYS` | Unreferenzierte Medien | + +### Automatischer Scheduler + +Retention-Jobs laufen täglich um 03:00 Uhr (konfigurierbar via `RETENTION_CRON_SCHEDULE`). + +```bash +# Umgebungsvariablen in .env +RETENTION_EMAIL_LOGS_DAYS=90 +RETENTION_AUDIT_LOGS_DAYS=90 +RETENTION_CONSENT_LOGS_DAYS=1095 +RETENTION_MEDIA_ORPHAN_MIN_AGE_DAYS=30 +RETENTION_CRON_SCHEDULE="0 3 * * *" + +# Worker aktivieren/deaktivieren +QUEUE_ENABLE_RETENTION=true +QUEUE_ENABLE_RETENTION_SCHEDULER=true +``` + +### API-Endpoint `/api/retention` + +**GET - Konfiguration abrufen:** +```bash +curl https://pl.c2sgmbh.de/api/retention \ + -H "Cookie: payload-token=..." +``` + +**GET - Job-Status abfragen:** +```bash +curl "https://pl.c2sgmbh.de/api/retention?jobId=abc123" \ + -H "Cookie: payload-token=..." +``` + +**POST - Manuellen Job auslösen:** +```bash +# Vollständige Retention (alle Policies + Media-Orphans) +curl -X POST https://pl.c2sgmbh.de/api/retention \ + -H "Content-Type: application/json" \ + -H "Cookie: payload-token=..." \ + -d '{"type": "full"}' + +# Einzelne Collection bereinigen +curl -X POST https://pl.c2sgmbh.de/api/retention \ + -H "Content-Type: application/json" \ + -H "Cookie: payload-token=..." \ + -d '{"type": "collection", "collection": "email-logs"}' + +# Nur Media-Orphans bereinigen +curl -X POST https://pl.c2sgmbh.de/api/retention \ + -H "Content-Type: application/json" \ + -H "Cookie: payload-token=..." \ + -d '{"type": "media-orphans"}' +``` + +### Architektur + +``` +Scheduler (Cron) + ↓ +Retention Queue (BullMQ) + ↓ +Retention Worker + ↓ +┌─────────────────┬─────────────────┬─────────────────┐ +│ Email-Logs │ Audit-Logs │ Consent-Logs │ +│ (createdAt) │ (createdAt) │ (expiresAt) │ +└─────────────────┴─────────────────┴─────────────────┘ + ↓ +Media-Orphan-Cleanup + ↓ +Cleanup-Ergebnis (Logs) +``` + +### Dateien + +- `src/lib/retention/retention-config.ts` - Zentrale Konfiguration +- `src/lib/retention/cleanup-service.ts` - Lösch-Logik +- `src/lib/queue/jobs/retention-job.ts` - Job-Definition +- `src/lib/queue/workers/retention-worker.ts` - Worker +- `src/app/(payload)/api/retention/route.ts` - API-Endpoint + ## Redis Caching Redis wird für API-Response-Caching und E-Mail-Transporter-Caching verwendet: diff --git a/docs/anleitungen/TODO.md b/docs/anleitungen/TODO.md index 7500c49..f6fcedd 100644 --- a/docs/anleitungen/TODO.md +++ b/docs/anleitungen/TODO.md @@ -28,8 +28,10 @@ | Status | Task | Bereich | |--------|------|---------| | [ ] | Monitoring: Sentry, Prometheus, Grafana | Monitoring | -| [ ] | AuditLogs Retention (90 Tage Cron) | Data Retention | -| [ ] | Email-Log Cleanup Cron | Data Retention | +| [x] | AuditLogs Retention (90 Tage Cron) | Data Retention | +| [x] | Email-Log Cleanup Cron | Data Retention | +| [x] | Media-Orphan-Cleanup | Data Retention | +| [x] | Consent-Logs Archivierung | Data Retention | | [ ] | Dashboard-Widget für Email-Status | Admin UX | | [ ] | TypeScript Strict Mode | Tech Debt | | [x] | E2E Tests für kritische Flows | Testing | @@ -129,11 +131,11 @@ ## Data Retention -- [ ] **Automatische Datenbereinigung** - - [ ] Cron-Job für Email-Log Cleanup (älter als X Tage) - - [ ] AuditLogs Retention Policy (90 Tage) - - [ ] Consent-Logs Archivierung - - [ ] Media-Orphan-Cleanup +- [x] **Automatische Datenbereinigung** *(erledigt: `src/lib/retention/`)* + - [x] Cron-Job für Email-Log Cleanup (90 Tage default) + - [x] AuditLogs Retention Policy (90 Tage) + - [x] Consent-Logs Archivierung (3 Jahre, expiresAt-basiert) + - [x] Media-Orphan-Cleanup (30 Tage Mindestalter) --- @@ -227,6 +229,15 @@ ## Changelog ### 15.12.2025 +- **Data Retention System implementiert:** + - Automatische Datenbereinigung für DSGVO-Compliance + - Email-Logs Cleanup (90 Tage default) + - AuditLogs Retention (90 Tage default) + - Consent-Logs Archivierung (3 Jahre, expiresAt-basiert) + - Media-Orphan-Cleanup (unreferenzierte Dateien) + - Scheduler: Täglich um 03:00 Uhr via BullMQ + - API-Endpoint `/api/retention` für manuellen Trigger + - Dateien: `src/lib/retention/`, `src/lib/queue/workers/retention-worker.ts` - **E2E Tests stabilisiert:** - Rate-Limit Handling (429) zu allen API-Tests hinzugefügt - `networkidle` durch `domcontentloaded` + explizite Waits ersetzt diff --git a/scripts/run-queue-worker.ts b/scripts/run-queue-worker.ts index f6996a8..207889a 100644 --- a/scripts/run-queue-worker.ts +++ b/scripts/run-queue-worker.ts @@ -31,10 +31,15 @@ console.log(`[QueueRunner] PAYLOAD_SECRET loaded: ${process.env.PAYLOAD_SECRET ? async function main() { const { startEmailWorker, stopEmailWorker } = await import('../src/lib/queue/workers/email-worker') const { startPdfWorker, stopPdfWorker } = await import('../src/lib/queue/workers/pdf-worker') + const { startRetentionWorker, stopRetentionWorker } = await import('../src/lib/queue/workers/retention-worker') + const { scheduleRetentionJobs } = await import('../src/lib/queue/jobs/retention-job') + const { retentionSchedule } = await import('../src/lib/retention/retention-config') // Konfiguration via Umgebungsvariablen const ENABLE_EMAIL_WORKER = process.env.QUEUE_ENABLE_EMAIL !== 'false' const ENABLE_PDF_WORKER = process.env.QUEUE_ENABLE_PDF !== 'false' + const ENABLE_RETENTION_WORKER = process.env.QUEUE_ENABLE_RETENTION !== 'false' + const ENABLE_RETENTION_SCHEDULER = process.env.QUEUE_ENABLE_RETENTION_SCHEDULER !== 'false' console.log('='.repeat(50)) console.log('[QueueRunner] Starting queue workers...') @@ -42,6 +47,8 @@ async function main() { console.log(`[QueueRunner] Node: ${process.version}`) console.log(`[QueueRunner] Email Worker: ${ENABLE_EMAIL_WORKER ? 'enabled' : 'disabled'}`) console.log(`[QueueRunner] PDF Worker: ${ENABLE_PDF_WORKER ? 'enabled' : 'disabled'}`) + console.log(`[QueueRunner] Retention Worker: ${ENABLE_RETENTION_WORKER ? 'enabled' : 'disabled'}`) + console.log(`[QueueRunner] Retention Scheduler: ${ENABLE_RETENTION_SCHEDULER ? 'enabled' : 'disabled'}`) console.log('='.repeat(50)) // Workers starten @@ -53,6 +60,17 @@ async function main() { startPdfWorker() } + if (ENABLE_RETENTION_WORKER) { + startRetentionWorker() + + // Retention Scheduler starten (nur wenn Worker aktiv) + if (ENABLE_RETENTION_SCHEDULER) { + const cronSchedule = process.env.RETENTION_CRON_SCHEDULE || retentionSchedule.cron + console.log(`[QueueRunner] Scheduling retention jobs with cron: ${cronSchedule}`) + await scheduleRetentionJobs(cronSchedule) + } + } + // Graceful Shutdown async function shutdown(signal: string) { console.log(`\n[QueueRunner] Received ${signal}, shutting down gracefully...`) @@ -66,6 +84,9 @@ async function main() { if (ENABLE_PDF_WORKER) { stopPromises.push(stopPdfWorker()) } + if (ENABLE_RETENTION_WORKER) { + stopPromises.push(stopRetentionWorker()) + } await Promise.all(stopPromises) console.log('[QueueRunner] All workers stopped') diff --git a/src/app/(payload)/api/retention/route.ts b/src/app/(payload)/api/retention/route.ts new file mode 100644 index 0000000..b3a79f0 --- /dev/null +++ b/src/app/(payload)/api/retention/route.ts @@ -0,0 +1,204 @@ +/** + * Data Retention API + * + * Ermöglicht manuelles Auslösen von Retention-Jobs. + * Nur für Super-Admins zugänglich. + */ + +import { NextRequest, NextResponse } from 'next/server' +import { getPayload } from 'payload' +import config from '@payload-config' +import { + enqueueFullRetention, + enqueueCollectionCleanup, + enqueueMediaOrphanCleanup, + getRetentionJobStatus, +} from '@/lib/queue/jobs/retention-job' +import { retentionPolicies, getCutoffDate, mediaOrphanConfig } from '@/lib/retention' + +/** + * GET /api/retention + * + * Gibt die aktuelle Retention-Konfiguration und Job-Status zurück. + */ +export async function GET(request: NextRequest): Promise { + try { + const payload = await getPayload({ config }) + + // Auth-Check + const authHeader = request.headers.get('authorization') + const cookieHeader = request.headers.get('cookie') + + let user = null + + // Versuche Auth über Header oder Cookie + if (authHeader?.startsWith('Bearer ') || cookieHeader) { + try { + const result = await payload.auth({ + headers: request.headers, + }) + user = result.user + } catch { + // Auth fehlgeschlagen + } + } + + if (!user) { + return NextResponse.json({ error: 'Authentication required' }, { status: 401 }) + } + + // Nur Super-Admins + if (!user.isSuperAdmin) { + return NextResponse.json({ error: 'Super admin access required' }, { status: 403 }) + } + + // Job-Status abfragen falls jobId angegeben + const jobId = request.nextUrl.searchParams.get('jobId') + if (jobId) { + const status = await getRetentionJobStatus(jobId) + if (!status) { + return NextResponse.json({ error: 'Job not found' }, { status: 404 }) + } + return NextResponse.json(status) + } + + // Konfiguration zurückgeben + return NextResponse.json({ + policies: retentionPolicies.map((p) => ({ + name: p.name, + collection: p.collection, + retentionDays: p.retentionDays, + dateField: p.dateField, + description: p.description, + })), + mediaOrphan: { + minAgeDays: mediaOrphanConfig.minAgeDays, + referencingCollections: mediaOrphanConfig.referencingCollections, + }, + environment: { + RETENTION_EMAIL_LOGS_DAYS: process.env.RETENTION_EMAIL_LOGS_DAYS || '90', + RETENTION_AUDIT_LOGS_DAYS: process.env.RETENTION_AUDIT_LOGS_DAYS || '90', + RETENTION_CONSENT_LOGS_DAYS: process.env.RETENTION_CONSENT_LOGS_DAYS || '1095', + RETENTION_MEDIA_ORPHAN_MIN_AGE_DAYS: process.env.RETENTION_MEDIA_ORPHAN_MIN_AGE_DAYS || '30', + RETENTION_CRON_SCHEDULE: process.env.RETENTION_CRON_SCHEDULE || '0 3 * * *', + }, + }) + } catch (error) { + console.error('[RetentionAPI] Error:', error) + return NextResponse.json( + { error: error instanceof Error ? error.message : 'Internal server error' }, + { status: 500 } + ) + } +} + +/** + * POST /api/retention + * + * Löst einen Retention-Job aus. + * + * Body: + * - type: 'full' | 'collection' | 'media-orphans' + * - collection?: string (für type='collection') + */ +export async function POST(request: NextRequest): Promise { + try { + const payload = await getPayload({ config }) + + // Auth-Check + const authHeader = request.headers.get('authorization') + const cookieHeader = request.headers.get('cookie') + + let user = null + + if (authHeader?.startsWith('Bearer ') || cookieHeader) { + try { + const result = await payload.auth({ + headers: request.headers, + }) + user = result.user + } catch { + // Auth fehlgeschlagen + } + } + + if (!user) { + return NextResponse.json({ error: 'Authentication required' }, { status: 401 }) + } + + // Nur Super-Admins + if (!user.isSuperAdmin) { + return NextResponse.json({ error: 'Super admin access required' }, { status: 403 }) + } + + // Body parsen + const body = await request.json().catch(() => ({})) + const { type, collection } = body as { type?: string; collection?: string } + + if (!type) { + return NextResponse.json({ error: 'Type is required' }, { status: 400 }) + } + + let job + + switch (type) { + case 'full': + job = await enqueueFullRetention(user.email) + break + + case 'collection': + if (!collection) { + return NextResponse.json({ error: 'Collection is required for type=collection' }, { status: 400 }) + } + + // Prüfe ob Collection in Policies definiert + const policy = retentionPolicies.find((p) => p.collection === collection) + if (!policy) { + return NextResponse.json( + { + error: `Collection '${collection}' not found in retention policies`, + availableCollections: retentionPolicies.map((p) => p.collection), + }, + { status: 400 } + ) + } + + const cutoff = getCutoffDate(policy.retentionDays) + job = await enqueueCollectionCleanup(collection, cutoff, { + batchSize: policy.batchSize, + dateField: policy.dateField, + triggeredBy: user.email, + }) + break + + case 'media-orphans': + job = await enqueueMediaOrphanCleanup({ + triggeredBy: user.email, + }) + break + + default: + return NextResponse.json( + { + error: `Invalid type: ${type}`, + validTypes: ['full', 'collection', 'media-orphans'], + }, + { status: 400 } + ) + } + + return NextResponse.json({ + success: true, + jobId: job.id, + type, + collection, + message: `Retention job queued successfully. Use GET /api/retention?jobId=${job.id} to check status.`, + }) + } catch (error) { + console.error('[RetentionAPI] Error:', error) + return NextResponse.json( + { error: error instanceof Error ? error.message : 'Internal server error' }, + { status: 500 } + ) + } +} diff --git a/src/lib/queue/jobs/retention-job.ts b/src/lib/queue/jobs/retention-job.ts new file mode 100644 index 0000000..8e2ba04 --- /dev/null +++ b/src/lib/queue/jobs/retention-job.ts @@ -0,0 +1,215 @@ +/** + * Retention Job Definition + * + * Definiert Cleanup-Jobs für Data Retention. + */ + +import { Job } from 'bullmq' +import { getQueue, QUEUE_NAMES } from '../queue-service' + +// Job-Typen +export type RetentionJobType = + | 'cleanup-collection' + | 'cleanup-media-orphans' + | 'retention-full' + +// Job-Daten Typen +export interface RetentionJobData { + type: RetentionJobType + /** Collection-Slug für cleanup-collection */ + collection?: string + /** Cutoff-Datum als ISO-String */ + cutoffDate?: string + /** Batch-Größe für Löschung */ + batchSize?: number + /** Feld für Datum-Vergleich */ + dateField?: string + /** Ausgelöst von (User/System) */ + triggeredBy?: string +} + +export interface RetentionJobResult { + success: boolean + type: RetentionJobType + collection?: string + deletedCount: number + errorCount: number + errors?: string[] + duration: number + timestamp: string +} + +/** + * Fügt einen Collection-Cleanup-Job zur Queue hinzu + */ +export async function enqueueCollectionCleanup( + collection: string, + cutoffDate: Date, + options?: { + batchSize?: number + dateField?: string + triggeredBy?: string + } +): Promise> { + const queue = getQueue(QUEUE_NAMES.CLEANUP) + + const data: RetentionJobData = { + type: 'cleanup-collection', + collection, + cutoffDate: cutoffDate.toISOString(), + batchSize: options?.batchSize || 100, + dateField: options?.dateField || 'createdAt', + triggeredBy: options?.triggeredBy || 'system', + } + + const job = await queue.add('retention', data, { + attempts: 3, + backoff: { + type: 'exponential', + delay: 5000, + }, + removeOnComplete: { + count: 50, + age: 7 * 24 * 60 * 60, // 7 Tage + }, + removeOnFail: { + count: 100, + age: 30 * 24 * 60 * 60, // 30 Tage + }, + }) + + console.log(`[RetentionJob] Collection cleanup job ${job.id} queued for ${collection}`) + return job +} + +/** + * Fügt einen Media-Orphan-Cleanup-Job zur Queue hinzu + */ +export async function enqueueMediaOrphanCleanup(options?: { + batchSize?: number + minAgeDays?: number + triggeredBy?: string +}): Promise> { + const queue = getQueue(QUEUE_NAMES.CLEANUP) + + // Cutoff-Datum für Mindestalter + const cutoff = new Date() + cutoff.setDate(cutoff.getDate() - (options?.minAgeDays || 30)) + + const data: RetentionJobData = { + type: 'cleanup-media-orphans', + cutoffDate: cutoff.toISOString(), + batchSize: options?.batchSize || 50, + triggeredBy: options?.triggeredBy || 'system', + } + + const job = await queue.add('retention', data, { + attempts: 3, + backoff: { + type: 'exponential', + delay: 5000, + }, + removeOnComplete: { + count: 50, + age: 7 * 24 * 60 * 60, + }, + removeOnFail: { + count: 100, + age: 30 * 24 * 60 * 60, + }, + }) + + console.log(`[RetentionJob] Media orphan cleanup job ${job.id} queued`) + return job +} + +/** + * Fügt einen vollständigen Retention-Job zur Queue hinzu + * (Führt alle konfigurierten Cleanups durch) + */ +export async function enqueueFullRetention(triggeredBy?: string): Promise> { + const queue = getQueue(QUEUE_NAMES.CLEANUP) + + const data: RetentionJobData = { + type: 'retention-full', + triggeredBy: triggeredBy || 'scheduler', + } + + const job = await queue.add('retention', data, { + attempts: 1, // Full Retention sollte nicht wiederholt werden + removeOnComplete: { + count: 30, + age: 30 * 24 * 60 * 60, + }, + removeOnFail: { + count: 50, + age: 60 * 24 * 60 * 60, + }, + }) + + console.log(`[RetentionJob] Full retention job ${job.id} queued`) + return job +} + +/** + * Plant wiederkehrende Retention-Jobs + */ +export async function scheduleRetentionJobs(cronExpression: string): Promise { + const queue = getQueue(QUEUE_NAMES.CLEANUP) + + // Entferne existierende Scheduler + const repeatableJobs = await queue.getRepeatableJobs() + for (const job of repeatableJobs) { + if (job.name === 'scheduled-retention') { + await queue.removeRepeatableByKey(job.key) + } + } + + // Neuen Scheduler hinzufügen + await queue.add( + 'scheduled-retention', + { + type: 'retention-full', + triggeredBy: 'scheduler', + } as RetentionJobData, + { + repeat: { + pattern: cronExpression, + }, + removeOnComplete: { + count: 30, + age: 30 * 24 * 60 * 60, + }, + removeOnFail: { + count: 50, + age: 60 * 24 * 60 * 60, + }, + } + ) + + console.log(`[RetentionJob] Scheduled retention job with cron: ${cronExpression}`) +} + +/** + * Holt den Status eines Retention-Jobs + */ +export async function getRetentionJobStatus(jobId: string): Promise<{ + state: string + progress: number + result?: RetentionJobResult + failedReason?: string +} | null> { + const queue = getQueue(QUEUE_NAMES.CLEANUP) + + const job = await queue.getJob(jobId) + if (!job) return null + + const [state, progress] = await Promise.all([job.getState(), job.progress]) + + return { + state, + progress: typeof progress === 'number' ? progress : 0, + result: job.returnvalue as RetentionJobResult | undefined, + failedReason: job.failedReason, + } +} diff --git a/src/lib/queue/workers/retention-worker.ts b/src/lib/queue/workers/retention-worker.ts new file mode 100644 index 0000000..6f91165 --- /dev/null +++ b/src/lib/queue/workers/retention-worker.ts @@ -0,0 +1,191 @@ +/** + * Retention Worker + * + * Verarbeitet Cleanup-Jobs aus der Queue. + */ + +import { Worker, Job } from 'bullmq' +import { getPayload } from 'payload' +import config from '@payload-config' +import { QUEUE_NAMES, getQueueRedisConnection } from '../queue-service' +import type { RetentionJobData, RetentionJobResult } from '../jobs/retention-job' +import { + cleanupCollection, + cleanupExpiredConsentLogs, + cleanupOrphanedMedia, + runFullRetention, +} from '../../retention/cleanup-service' +import { getCutoffDate } from '../../retention/retention-config' + +// Worker-Konfiguration +const CONCURRENCY = parseInt(process.env.QUEUE_RETENTION_CONCURRENCY || '1', 10) + +/** + * Retention Job Processor + */ +async function processRetentionJob(job: Job): Promise { + const { type, collection, cutoffDate, batchSize, dateField, triggeredBy } = job.data + const startTime = Date.now() + + console.log(`[RetentionWorker] Processing job ${job.id} (type: ${type})`) + console.log(`[RetentionWorker] Triggered by: ${triggeredBy || 'unknown'}`) + + try { + // Payload-Instanz holen + const payload = await getPayload({ config }) + + let deletedCount = 0 + let errorCount = 0 + const errors: string[] = [] + + switch (type) { + case 'cleanup-collection': { + if (!collection) { + throw new Error('Collection is required for cleanup-collection job') + } + + const cutoff = cutoffDate ? new Date(cutoffDate) : getCutoffDate(90) + const result = await cleanupCollection(payload, collection, cutoff, { + dateField, + batchSize, + }) + + deletedCount = result.deletedCount + errorCount = result.errorCount + errors.push(...result.errors) + break + } + + case 'cleanup-media-orphans': { + const result = await cleanupOrphanedMedia(payload, { + batchSize, + minAgeDays: cutoffDate + ? Math.ceil((Date.now() - new Date(cutoffDate).getTime()) / (1000 * 60 * 60 * 24)) + : undefined, + }) + + deletedCount = result.deletedCount + errorCount = result.errorCount + errors.push(...result.errors) + break + } + + case 'retention-full': { + const result = await runFullRetention(payload) + + deletedCount = result.totalDeleted + errorCount = result.totalErrors + + // Sammle alle Fehler + for (const r of result.results) { + errors.push(...r.errors) + } + if (result.mediaOrphanResult) { + errors.push(...result.mediaOrphanResult.errors) + } + break + } + + default: + throw new Error(`Unknown retention job type: ${type}`) + } + + const duration = Date.now() - startTime + + const jobResult: RetentionJobResult = { + success: errorCount === 0, + type, + collection, + deletedCount, + errorCount, + errors: errors.length > 0 ? errors.slice(0, 20) : undefined, // Limitiere Fehler-Anzahl + duration, + timestamp: new Date().toISOString(), + } + + console.log( + `[RetentionWorker] Job ${job.id} completed: ${deletedCount} deleted, ${errorCount} errors, ${duration}ms` + ) + + return jobResult + } catch (error) { + const errorMessage = error instanceof Error ? error.message : 'Unknown error' + console.error(`[RetentionWorker] Job ${job.id} failed:`, errorMessage) + throw error + } +} + +/** + * Retention Worker Instanz + */ +let retentionWorker: Worker | null = null + +/** + * Startet den Retention Worker + */ +export function startRetentionWorker(): Worker { + if (retentionWorker) { + console.warn('[RetentionWorker] Worker already running') + return retentionWorker + } + + retentionWorker = new Worker( + QUEUE_NAMES.CLEANUP, + processRetentionJob, + { + connection: getQueueRedisConnection(), + concurrency: CONCURRENCY, + // Retention Jobs können lange dauern + lockDuration: 300000, // 5 Minuten + stalledInterval: 60000, // 1 Minute + maxStalledCount: 2, + } + ) + + // Event Handlers + retentionWorker.on('ready', () => { + console.log(`[RetentionWorker] Ready (concurrency: ${CONCURRENCY})`) + }) + + retentionWorker.on('completed', (job, result) => { + console.log( + `[RetentionWorker] Job ${job.id} completed: ${result.deletedCount} deleted in ${result.duration}ms` + ) + }) + + retentionWorker.on('failed', (job, error) => { + console.error( + `[RetentionWorker] Job ${job?.id} failed after ${job?.attemptsMade} attempts:`, + error.message + ) + }) + + retentionWorker.on('stalled', (jobId) => { + console.warn(`[RetentionWorker] Job ${jobId} stalled`) + }) + + retentionWorker.on('error', (error) => { + console.error('[RetentionWorker] Error:', error) + }) + + return retentionWorker +} + +/** + * Stoppt den Retention Worker + */ +export async function stopRetentionWorker(): Promise { + if (retentionWorker) { + console.log('[RetentionWorker] Stopping...') + await retentionWorker.close() + retentionWorker = null + console.log('[RetentionWorker] Stopped') + } +} + +/** + * Gibt die Worker-Instanz zurück (falls aktiv) + */ +export function getRetentionWorker(): Worker | null { + return retentionWorker +} diff --git a/src/lib/retention/cleanup-service.ts b/src/lib/retention/cleanup-service.ts new file mode 100644 index 0000000..eaec75a --- /dev/null +++ b/src/lib/retention/cleanup-service.ts @@ -0,0 +1,403 @@ +/** + * Cleanup Service + * + * Führt die eigentliche Datenbereinigung durch. + * Wird vom Retention Worker aufgerufen. + */ + +import type { Payload } from 'payload' +import type { Config } from '@/payload-types' +import { retentionPolicies, getCutoffDate, mediaOrphanConfig } from './retention-config' + +// Type für dynamische Collection-Zugriffe +type CollectionSlug = keyof Config['collections'] + +export interface CleanupResult { + collection: string + deletedCount: number + errorCount: number + errors: string[] + duration: number +} + +export interface MediaOrphanResult { + deletedCount: number + deletedFiles: string[] + errorCount: number + errors: string[] + duration: number +} + +/** + * Löscht alte Einträge aus einer Collection basierend auf dem Datum + */ +export async function cleanupCollection( + payload: Payload, + collection: string, + cutoffDate: Date, + options?: { + dateField?: string + batchSize?: number + } +): Promise { + const startTime = Date.now() + const dateField = options?.dateField || 'createdAt' + const batchSize = options?.batchSize || 100 + + const result: CleanupResult = { + collection, + deletedCount: 0, + errorCount: 0, + errors: [], + duration: 0, + } + + console.log(`[CleanupService] Starting cleanup for ${collection}`) + console.log(`[CleanupService] Cutoff date: ${cutoffDate.toISOString()}`) + console.log(`[CleanupService] Date field: ${dateField}, Batch size: ${batchSize}`) + + try { + let hasMore = true + + while (hasMore) { + // Finde alte Einträge + const oldEntries = await payload.find({ + collection: collection as CollectionSlug, + where: { + [dateField]: { + less_than: cutoffDate.toISOString(), + }, + }, + limit: batchSize, + depth: 0, // Keine Relation-Auflösung für Performance + }) + + if (oldEntries.docs.length === 0) { + hasMore = false + break + } + + console.log(`[CleanupService] Found ${oldEntries.docs.length} entries to delete`) + + // Lösche in Batches + for (const doc of oldEntries.docs) { + try { + await payload.delete({ + collection: collection as CollectionSlug, + id: doc.id, + overrideAccess: true, // System-Löschung + }) + result.deletedCount++ + } catch (error) { + result.errorCount++ + const errorMsg = error instanceof Error ? error.message : 'Unknown error' + result.errors.push(`Failed to delete ${collection}/${doc.id}: ${errorMsg}`) + console.error(`[CleanupService] Error deleting ${collection}/${doc.id}:`, errorMsg) + } + } + + // Prüfe ob es mehr Einträge gibt + hasMore = oldEntries.docs.length === batchSize + } + } catch (error) { + const errorMsg = error instanceof Error ? error.message : 'Unknown error' + result.errors.push(`Query failed: ${errorMsg}`) + result.errorCount++ + console.error(`[CleanupService] Query error for ${collection}:`, errorMsg) + } + + result.duration = Date.now() - startTime + console.log( + `[CleanupService] Cleanup for ${collection} completed: ${result.deletedCount} deleted, ${result.errorCount} errors, ${result.duration}ms` + ) + + return result +} + +/** + * Löscht ConsentLogs basierend auf expiresAt (bereits abgelaufene) + * Spezielle Behandlung für WORM-Collection + */ +export async function cleanupExpiredConsentLogs( + payload: Payload, + batchSize = 50 +): Promise { + const startTime = Date.now() + const now = new Date() + + const result: CleanupResult = { + collection: 'consent-logs', + deletedCount: 0, + errorCount: 0, + errors: [], + duration: 0, + } + + console.log(`[CleanupService] Starting consent-logs cleanup (expired before ${now.toISOString()})`) + + try { + let hasMore = true + + while (hasMore) { + // Finde abgelaufene Consent-Logs + const expiredLogs = await payload.find({ + collection: 'consent-logs', + where: { + expiresAt: { + less_than: now.toISOString(), + }, + }, + limit: batchSize, + depth: 0, + }) + + if (expiredLogs.docs.length === 0) { + hasMore = false + break + } + + console.log(`[CleanupService] Found ${expiredLogs.docs.length} expired consent logs`) + + // Lösche via Direct Access (WORM Collection hat delete: false) + // Verwende overrideAccess für System-Löschung + for (const doc of expiredLogs.docs) { + try { + await payload.delete({ + collection: 'consent-logs', + id: doc.id, + overrideAccess: true, // Bypass WORM protection für Retention + }) + result.deletedCount++ + } catch (error) { + result.errorCount++ + const errorMsg = error instanceof Error ? error.message : 'Unknown error' + result.errors.push(`Failed to delete consent-logs/${doc.id}: ${errorMsg}`) + console.error(`[CleanupService] Error deleting consent-logs/${doc.id}:`, errorMsg) + } + } + + hasMore = expiredLogs.docs.length === batchSize + } + } catch (error) { + const errorMsg = error instanceof Error ? error.message : 'Unknown error' + result.errors.push(`Query failed: ${errorMsg}`) + result.errorCount++ + console.error(`[CleanupService] Query error for consent-logs:`, errorMsg) + } + + result.duration = Date.now() - startTime + console.log( + `[CleanupService] Consent-logs cleanup completed: ${result.deletedCount} deleted, ${result.errorCount} errors, ${result.duration}ms` + ) + + return result +} + +/** + * Findet und löscht verwaiste Media-Dateien + * (Dateien, die von keinem Dokument mehr referenziert werden) + */ +export async function cleanupOrphanedMedia( + payload: Payload, + options?: { + minAgeDays?: number + batchSize?: number + } +): Promise { + const startTime = Date.now() + const minAgeDays = options?.minAgeDays || mediaOrphanConfig.minAgeDays + const batchSize = options?.batchSize || mediaOrphanConfig.batchSize + + const result: MediaOrphanResult = { + deletedCount: 0, + deletedFiles: [], + errorCount: 0, + errors: [], + duration: 0, + } + + // Cutoff für Mindestalter + const cutoff = getCutoffDate(minAgeDays) + + console.log(`[CleanupService] Starting media orphan cleanup`) + console.log(`[CleanupService] Min age: ${minAgeDays} days (cutoff: ${cutoff.toISOString()})`) + + try { + // Hole alle Media älter als Cutoff + let offset = 0 + let hasMore = true + + while (hasMore) { + const mediaItems = await payload.find({ + collection: 'media', + where: { + createdAt: { + less_than: cutoff.toISOString(), + }, + }, + limit: batchSize, + page: Math.floor(offset / batchSize) + 1, + depth: 0, + }) + + if (mediaItems.docs.length === 0) { + hasMore = false + break + } + + console.log(`[CleanupService] Checking ${mediaItems.docs.length} media items for orphans`) + + // Prüfe jedes Media-Item auf Referenzen + for (const media of mediaItems.docs) { + const isOrphan = await checkIfMediaIsOrphan(payload, media.id) + + if (isOrphan) { + try { + // Lösche das Media-Item (Payload löscht auch die Dateien) + await payload.delete({ + collection: 'media', + id: media.id, + overrideAccess: true, + }) + result.deletedCount++ + result.deletedFiles.push( + typeof media.filename === 'string' ? media.filename : String(media.id) + ) + console.log(`[CleanupService] Deleted orphan media: ${media.id}`) + } catch (error) { + result.errorCount++ + const errorMsg = error instanceof Error ? error.message : 'Unknown error' + result.errors.push(`Failed to delete media/${media.id}: ${errorMsg}`) + console.error(`[CleanupService] Error deleting media/${media.id}:`, errorMsg) + } + } + } + + offset += mediaItems.docs.length + hasMore = mediaItems.docs.length === batchSize + } + } catch (error) { + const errorMsg = error instanceof Error ? error.message : 'Unknown error' + result.errors.push(`Query failed: ${errorMsg}`) + result.errorCount++ + console.error(`[CleanupService] Query error for media orphans:`, errorMsg) + } + + result.duration = Date.now() - startTime + console.log( + `[CleanupService] Media orphan cleanup completed: ${result.deletedCount} deleted, ${result.errorCount} errors, ${result.duration}ms` + ) + + return result +} + +/** + * Prüft ob ein Media-Item von keinem Dokument referenziert wird + */ +async function checkIfMediaIsOrphan( + payload: Payload, + mediaId: number | string +): Promise { + const collections = mediaOrphanConfig.referencingCollections + + for (const collection of collections) { + try { + // Suche nach Referenzen in verschiedenen Feldtypen + // Media kann als relationship, in Blocks, oder in Rich-Text referenziert werden + const references = await payload.find({ + collection: collection as CollectionSlug, + where: { + or: [ + // Direct relationship fields (common patterns) + { image: { equals: mediaId } }, + { featuredImage: { equals: mediaId } }, + { thumbnail: { equals: mediaId } }, + { logo: { equals: mediaId } }, + { avatar: { equals: mediaId } }, + { photo: { equals: mediaId } }, + { cover: { equals: mediaId } }, + { icon: { equals: mediaId } }, + { backgroundImage: { equals: mediaId } }, + { heroImage: { equals: mediaId } }, + { ogImage: { equals: mediaId } }, + // Gallery/Array fields (check if contains) + { 'gallery.image': { equals: mediaId } }, + { 'images.image': { equals: mediaId } }, + { 'slides.image': { equals: mediaId } }, + { 'slides.backgroundImage': { equals: mediaId } }, + { 'slides.mobileBackgroundImage': { equals: mediaId } }, + ], + }, + limit: 1, + depth: 0, + }) + + if (references.totalDocs > 0) { + return false // Hat Referenzen, ist kein Orphan + } + } catch { + // Collection existiert möglicherweise nicht oder Feld nicht vorhanden + // Ignorieren und mit nächster Collection fortfahren + } + } + + return true // Keine Referenzen gefunden +} + +/** + * Führt alle konfigurierten Retention Policies aus + */ +export async function runFullRetention(payload: Payload): Promise<{ + results: CleanupResult[] + mediaOrphanResult?: MediaOrphanResult + totalDeleted: number + totalErrors: number + duration: number +}> { + const startTime = Date.now() + const results: CleanupResult[] = [] + let totalDeleted = 0 + let totalErrors = 0 + + console.log('[CleanupService] Starting full retention run') + console.log(`[CleanupService] Policies: ${retentionPolicies.map((p) => p.name).join(', ')}`) + + // Führe Collection Cleanups durch + for (const policy of retentionPolicies) { + // ConsentLogs haben spezielle Behandlung + if (policy.collection === 'consent-logs') { + const consentResult = await cleanupExpiredConsentLogs(payload, policy.batchSize) + results.push(consentResult) + totalDeleted += consentResult.deletedCount + totalErrors += consentResult.errorCount + } else { + const cutoff = getCutoffDate(policy.retentionDays) + const result = await cleanupCollection(payload, policy.collection, cutoff, { + dateField: policy.dateField, + batchSize: policy.batchSize, + }) + results.push(result) + totalDeleted += result.deletedCount + totalErrors += result.errorCount + } + } + + // Media Orphan Cleanup + const mediaOrphanResult = await cleanupOrphanedMedia(payload) + totalDeleted += mediaOrphanResult.deletedCount + totalErrors += mediaOrphanResult.errorCount + + const duration = Date.now() - startTime + + console.log( + `[CleanupService] Full retention completed: ${totalDeleted} total deleted, ${totalErrors} total errors, ${duration}ms` + ) + + return { + results, + mediaOrphanResult, + totalDeleted, + totalErrors, + duration, + } +} diff --git a/src/lib/retention/index.ts b/src/lib/retention/index.ts new file mode 100644 index 0000000..e7f92a7 --- /dev/null +++ b/src/lib/retention/index.ts @@ -0,0 +1,25 @@ +/** + * Data Retention Module + * + * Exportiert alle Retention-bezogenen Funktionen. + */ + +// Konfiguration +export { + retentionPolicies, + mediaOrphanConfig, + retentionSchedule, + getRetentionPolicy, + getCutoffDate, + type RetentionPolicy, +} from './retention-config' + +// Cleanup Service +export { + cleanupCollection, + cleanupExpiredConsentLogs, + cleanupOrphanedMedia, + runFullRetention, + type CleanupResult, + type MediaOrphanResult, +} from './cleanup-service' diff --git a/src/lib/retention/retention-config.ts b/src/lib/retention/retention-config.ts new file mode 100644 index 0000000..e7472d1 --- /dev/null +++ b/src/lib/retention/retention-config.ts @@ -0,0 +1,103 @@ +/** + * Data Retention Configuration + * + * Zentrale Konfiguration für Daten-Aufbewahrungsfristen. + * Alle Werte in Tagen. + */ + +export interface RetentionPolicy { + /** Eindeutiger Name für Logging */ + name: string + /** Collection-Slug */ + collection: string + /** Aufbewahrungsfrist in Tagen */ + retentionDays: number + /** Feld für Datum-Vergleich (Standard: createdAt) */ + dateField?: string + /** Batch-Größe für Löschung */ + batchSize?: number + /** Beschreibung für Dokumentation */ + description: string +} + +/** + * Retention Policies für verschiedene Collections + * + * Die Werte können via Umgebungsvariablen überschrieben werden. + */ +export const retentionPolicies: RetentionPolicy[] = [ + { + name: 'email-logs', + collection: 'email-logs', + retentionDays: parseInt(process.env.RETENTION_EMAIL_LOGS_DAYS || '90', 10), + dateField: 'createdAt', + batchSize: 100, + description: 'E-Mail-Logs älter als X Tage löschen', + }, + { + name: 'audit-logs', + collection: 'audit-logs', + retentionDays: parseInt(process.env.RETENTION_AUDIT_LOGS_DAYS || '90', 10), + dateField: 'createdAt', + batchSize: 100, + description: 'Audit-Logs älter als X Tage löschen', + }, + { + name: 'consent-logs', + collection: 'consent-logs', + retentionDays: parseInt(process.env.RETENTION_CONSENT_LOGS_DAYS || '1095', 10), // 3 Jahre + dateField: 'expiresAt', // ConsentLogs haben expiresAt statt createdAt-basierter Retention + batchSize: 50, + description: 'Consent-Logs nach Ablaufdatum löschen (DSGVO: 3 Jahre)', + }, +] + +/** + * Media Orphan Cleanup Konfiguration + */ +export const mediaOrphanConfig = { + /** Mindestalter in Tagen bevor ein Media als Orphan gilt */ + minAgeDays: parseInt(process.env.RETENTION_MEDIA_ORPHAN_MIN_AGE_DAYS || '30', 10), + /** Batch-Größe für Löschung */ + batchSize: 50, + /** Collections, die Media referenzieren können */ + referencingCollections: [ + 'pages', + 'posts', + 'portfolios', + 'team', + 'services', + 'testimonials', + 'faqs', + 'tenants', + 'projects', + 'certifications', + 'bookings', + ], +} + +/** + * Cron-Schedule für Retention Jobs + * Default: Täglich um 03:00 Uhr + */ +export const retentionSchedule = { + cron: process.env.RETENTION_CRON_SCHEDULE || '0 3 * * *', + timezone: process.env.TZ || 'Europe/Berlin', +} + +/** + * Gibt die Retention Policy für eine bestimmte Collection zurück + */ +export function getRetentionPolicy(collectionSlug: string): RetentionPolicy | undefined { + return retentionPolicies.find((p) => p.collection === collectionSlug) +} + +/** + * Berechnet das Cutoff-Datum basierend auf der Retention Policy + */ +export function getCutoffDate(retentionDays: number): Date { + const cutoff = new Date() + cutoff.setDate(cutoff.getDate() - retentionDays) + cutoff.setHours(0, 0, 0, 0) + return cutoff +}