From ce4962e74bbe6c29b3c00958ca5d09040d782017 Mon Sep 17 00:00:00 2001 From: Martin Porwoll Date: Tue, 9 Dec 2025 22:59:17 +0000 Subject: [PATCH] feat: BullMQ queue system for email and PDF processing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add BullMQ-based job queue with Redis backend - Implement email worker with tenant-specific SMTP support - Add PDF worker with Playwright for HTML/URL-to-PDF generation - Create /api/generate-pdf endpoint with job status polling - Fix TypeScript errors in Tenants, TenantBreadcrumb, TenantDashboard - Fix type casts in auditAuthEvents and audit-service - Remove credentials from ecosystem.config.cjs (now loaded via dotenv) - Fix ESM __dirname issue with fileURLToPath for PM2 compatibility đŸ€– Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- docs/INFRASTRUCTURE.md | 33 ++ docs/anleitungen/TODO.md | 131 ++++++- ecosystem.config.cjs | 73 +++- package.json | 1 + pnpm-lock.yaml | 139 ++++++- scripts/run-queue-worker.ts | 100 ++++++ .../admin/queues/[queueName]/jobs/route.ts | 131 +++++++ src/app/(payload)/api/admin/queues/route.ts | 179 +++++++++ src/app/(payload)/api/generate-pdf/route.ts | 339 ++++++++++++++++++ src/app/(payload)/api/send-email/route.ts | 53 ++- src/collections/Tenants.ts | 4 +- src/components/admin/TenantBreadcrumb.tsx | 7 +- src/components/admin/TenantDashboard.tsx | 7 +- src/hooks/auditAuthEvents.ts | 12 +- src/lib/audit/audit-service.ts | 12 +- src/lib/pdf/pdf-service.ts | 322 +++++++++++++++++ src/lib/queue/index.ts | 43 +++ src/lib/queue/jobs/email-job.ts | 150 ++++++++ src/lib/queue/jobs/pdf-job.ts | 201 +++++++++++ src/lib/queue/queue-service.ts | 202 +++++++++++ src/lib/queue/workers/email-worker.ts | 135 +++++++ src/lib/queue/workers/pdf-worker.ts | 172 +++++++++ src/lib/search.ts | 273 ++++++++++++-- 23 files changed, 2623 insertions(+), 96 deletions(-) create mode 100644 scripts/run-queue-worker.ts create mode 100644 src/app/(payload)/api/admin/queues/[queueName]/jobs/route.ts create mode 100644 src/app/(payload)/api/admin/queues/route.ts create mode 100644 src/app/(payload)/api/generate-pdf/route.ts create mode 100644 src/lib/pdf/pdf-service.ts create mode 100644 src/lib/queue/index.ts create mode 100644 src/lib/queue/jobs/email-job.ts create mode 100644 src/lib/queue/jobs/pdf-job.ts create mode 100644 src/lib/queue/queue-service.ts create mode 100644 src/lib/queue/workers/email-worker.ts create mode 100644 src/lib/queue/workers/pdf-worker.ts diff --git a/docs/INFRASTRUCTURE.md b/docs/INFRASTRUCTURE.md index e46e35b..1db94ec 100644 --- a/docs/INFRASTRUCTURE.md +++ b/docs/INFRASTRUCTURE.md @@ -297,6 +297,39 @@ redis-cli flushdb redis-cli monitor ``` +### Background Jobs (geplant) + +**Evaluation (09.12.2025): BullMQ vs Agenda.js** + +| Kriterium | BullMQ | Agenda.js | +|-----------|--------|-----------| +| Database | Redis ✅ | MongoDB ❌ | +| TypeScript | Native ✅ | Begrenzt ⚠ | +| Priority Jobs | Ja ✅ | Nein ❌ | +| Rate Limiting | Ja ✅ | Nein ❌ | +| UI Dashboard | @bull-board ✅ | Keine ❌ | + +**Entscheidung: BullMQ** (Redis bereits vorhanden, TypeScript-native, @bull-board UI) + +**Geplante Konfiguration:** +```bash +# Environment Variables (wenn implementiert) +QUEUE_REDIS_URL=redis://localhost:6379/1 # Separate DB fĂŒr Jobs +QUEUE_CONCURRENCY=5 # Parallele Worker +QUEUE_DEFAULT_RETRY=3 # Wiederholungsversuche +``` + +**PM2 Worker-Konfiguration (geplant):** +```javascript +// ecosystem.config.cjs - Erweiterung +{ + name: 'queue-worker', + script: './scripts/run-queue-worker.ts', + instances: 1, + max_memory_restart: '500M' +} +``` + --- ## Deployment Workflow diff --git a/docs/anleitungen/TODO.md b/docs/anleitungen/TODO.md index bc2daa9..13b18a4 100644 --- a/docs/anleitungen/TODO.md +++ b/docs/anleitungen/TODO.md @@ -253,27 +253,96 @@ ### Mittlere PrioritĂ€t - Performance & Skalierung #### Search Performance -- [ ] **Full-Text-Search aktivieren** - - [ ] `USE_FTS=true` in Production setzen - - [ ] PostgreSQL `to_tsvector`-Indices erstellen - - [ ] Performance-Test mit Produktionsdaten +- [x] **Full-Text-Search aktivieren** (Erledigt: 09.12.2025) + - [x] `USE_FTS=true` in Production gesetzt + - [x] PostgreSQL `to_tsvector`-Indices erstellt: + - `posts_locales_fts_title_idx` (GIN auf title) + - `posts_locales_fts_excerpt_idx` (GIN auf excerpt) + - `posts_locales_fts_combined_idx` (GIN auf title + excerpt) + - `pages_locales_fts_title_idx` (GIN auf title) + - `categories_locales_fts_name_idx` (GIN auf name) + - [x] Deutsche Sprachkonfiguration (`german` config) + - [x] Relevanz-Ranking mit `ts_rank()` + - [x] Prefix-Suche mit `:*` Operator + - [x] Fallback auf ILIKE bei `USE_FTS=false` - [ ] **Redis-Migration fĂŒr Caches** - [ ] Search-Cache von In-Memory auf Redis migrieren - [ ] Rate-Limit-Maps auf Redis migrieren - [ ] Suggestions-Cache auf Redis #### Background Jobs -- [ ] **Queue-System implementieren** - - [ ] BullMQ oder Agenda.js evaluieren - - [ ] E-Mail-Versand ĂŒber Queue (non-blocking) - - [ ] PDF-Generierung ĂŒber Queue - - [ ] Job-Dashboard im Admin +- [x] **Queue-System implementieren** (Erledigt: 09.12.2025) + - [x] BullMQ oder Agenda.js evaluieren → **Empfehlung: BullMQ** + - [x] E-Mail-Versand ĂŒber Queue (non-blocking) + - Queue-Service: `src/lib/queue/queue-service.ts` + - Email-Job: `src/lib/queue/jobs/email-job.ts` + - Email-Worker: `src/lib/queue/workers/email-worker.ts` + - API-Integration: `queued: true` Option in `/api/send-email` + - [x] PDF-Generierung ĂŒber Queue (Erledigt: 09.12.2025) + - PDF-Job: `src/lib/queue/jobs/pdf-job.ts` + - PDF-Service: `src/lib/pdf/pdf-service.ts` (Playwright-basiert) + - PDF-Worker: `src/lib/queue/workers/pdf-worker.ts` + - API-Endpoint: `/api/generate-pdf` (POST fĂŒr Generierung, GET fĂŒr Job-Status) + - UnterstĂŒtzt HTML-zu-PDF und URL-zu-PDF + - PM2-Integration mit konfigurierbaren Workern + - [x] Job-Dashboard im Admin + - Queue-Status: `GET /api/admin/queues` (SuperAdmin only) + - Queue-Jobs: `GET /api/admin/queues/[name]/jobs` + - Queue-Actions: `POST /api/admin/queues` (pause, resume, clean, drain) + + **Evaluation BullMQ vs Agenda.js:** + + | Kriterium | BullMQ | Agenda.js | + |-----------|--------|-----------| + | **Database** | Redis ✅ (bereits vorhanden) | MongoDB ❌ (neue Dependency) | + | **TypeScript** | Native ✅ | Begrenzt ⚠ | + | **Priority Jobs** | Ja ✅ | Nein ❌ | + | **Rate Limiting** | Ja ✅ | Nein ❌ | + | **Delayed Jobs** | Ja ✅ | Ja ✅ | + | **Repeatable Jobs** | Ja ✅ | Ja ✅ | + | **UI Dashboard** | @bull-board ✅ | Keine Built-in ❌ | + | **Weekly Downloads** | 1.6M | 120K | + | **Maintenance** | Aktiv | Weniger aktiv | + + **Entscheidung: BullMQ** wegen: + 1. Redis bereits im Stack (keine neue DB) + 2. Native TypeScript-UnterstĂŒtzung + 3. Priority Jobs & Rate Limiting fĂŒr Multi-Tenant + 4. @bull-board fĂŒr Admin-Dashboard + 5. Höhere AktivitĂ€t und Downloads + + **Implementierungsplan:** + 1. `pnpm add bullmq @bull-board/api @bull-board/express` + 2. Queue-Service (`src/lib/queue/queue-service.ts`) + 3. Job-Definitionen (`src/lib/queue/jobs/`) + 4. Worker-Script (`scripts/run-queue-worker.ts`) + 5. PM2-Integration (separater Prozess) + 6. Admin-Dashboard Route (`/admin/jobs`) + + **Betroffene Dateien fĂŒr E-Mail-Queue:** + - `src/lib/email/tenant-email-service.ts` + - `src/app/(payload)/api/send-email/route.ts` + - `src/app/(payload)/api/test-email/route.ts` + - `src/lib/alerting/alert-service.ts` + - `src/hooks/sendFormNotification.ts` #### Database Optimization -- [ ] **Index-Audit** - - [ ] Composite-Indices fĂŒr lokalisierte Felder (slug + locale) - - [ ] Query-Performance-Analyse - - [ ] EXPLAIN ANALYZE fĂŒr hĂ€ufige Queries +- [x] **Index-Audit** (Erledigt: 09.12.2025) + - [x] Composite-Indices fĂŒr lokalisierte Felder (slug + locale) + - `posts_locales_slug_locale_idx` + - `pages_locales_slug_locale_idx` + - `categories_locales_slug_locale_idx` + - [x] Query-Performance-Analyse + - [x] EXPLAIN ANALYZE fĂŒr hĂ€ufige Queries + - [x] ZusĂ€tzliche Performance-Indexes erstellt: + - `posts_status_tenant_idx` (status + tenant) + - `posts_type_tenant_idx` (type + tenant) + - `posts_published_at_idx` (chronologische Sortierung) + - `posts_is_featured_tenant_idx` (partial index) + - `email_logs_status_tenant_idx`, `email_logs_status_created_at_idx` + - `audit_logs_action_created_at_idx` + - `newsletter_subscribers_status_tenant_idx` + - `consent_logs_created_at_desc_idx` - [ ] **Connection Pooling** - [ ] PgBouncer evaluieren fĂŒr Multi-Instanz-Betrieb @@ -431,7 +500,7 @@ 1. ~~**[KRITISCH]** AuditLogs Collection implementieren~~ ✅ Erledigt 2. **[KRITISCH]** Automatisierte Backups einrichten -3. **[HOCH]** Full-Text-Search aktivieren (USE_FTS=true) +3. ~~**[HOCH]** Full-Text-Search aktivieren (USE_FTS=true)~~ ✅ Erledigt 4. **[HOCH]** Rate-Limits auf Redis migrieren (In-Memory-Fallback funktioniert) 5. ~~**[MITTEL]** CI/CD Pipeline mit GitHub Actions~~ ✅ security.yml erstellt 6. **[MITTEL]** Frontend-Entwicklung starten @@ -457,6 +526,40 @@ - **Admin Login Fix:** Custom Login-Route unterstĂŒtzt nun `_payload` JSON-Feld aus multipart/form-data (Payload Admin Panel Format) - **Dokumentation bereinigt:** Obsolete PROMPT_*.md Instruktionsdateien gelöscht - **CLAUDE.md aktualisiert:** Security-Features, Test Suite, AuditLogs dokumentiert +- **Index-Audit:** 12 neue Performance-Indexes fĂŒr PostgreSQL erstellt + - Composite-Indexes fĂŒr slug+locale auf posts_locales, pages_locales, categories_locales + - Status/Tenant-Indexes fĂŒr posts, email_logs, newsletter_subscribers + - Partial Index fĂŒr Featured Posts + - Chronologische Sortierung fĂŒr published_at, created_at ### 10.12.2025 - **Audit-Fixes:** Vitest auf 3.2.4 aktualisiert, Payload-Mocks im Security-Test ergĂ€nzt + +### 09.12.2025 (Fortsetzung) +- **Full-Text-Search:** PostgreSQL FTS mit GIN-Indexes aktiviert + - 5 FTS-Indexes auf posts_locales, pages_locales, categories_locales + - Deutsche Sprachkonfiguration (`german` config) + - Relevanz-Ranking mit `ts_rank()` + - Feature-Flag `USE_FTS=true` in .env +- **Queue-System Evaluation:** BullMQ vs Agenda.js evaluiert + - **Empfehlung: BullMQ** (Redis-basiert, TypeScript-native, @bull-board UI) + - Implementierungsplan dokumentiert + - Betroffene Dateien identifiziert +- **BullMQ Implementation:** VollstĂ€ndiges Queue-System implementiert + - Queue-Service mit Redis-Connection und Job-Optionen + - Email-Job mit Priority, Delay und Batch-Support + - Email-Worker fĂŒr asynchrone Verarbeitung + - Worker-Script fĂŒr PM2 (`scripts/run-queue-worker.ts`) + - PM2-Konfiguration fĂŒr separaten Worker-Prozess + - Admin-API fĂŒr Queue-Monitoring (`/api/admin/queues`) + - Send-Email API mit `queued: true` Option +- **Audit-Fixes (BullMQ):** FTS und Dependencies bereinigt + - FTS SQL-Fix: `p.published_at` zu SELECT hinzugefĂŒgt (PostgreSQL DISTINCT-Regel) + - Guard fĂŒr fehlende `payload.db.drizzle` in Tests + - Ungenutzte `@bull-board/*` Packages entfernt (53 Dependencies weniger) +- **PDF-Queue-System:** VollstĂ€ndige PDF-Generierung ĂŒber BullMQ + - PDF-Job-Definition mit Priority, Delay und Batch-Support + - PDF-Service mit Playwright (HTML-zu-PDF, URL-zu-PDF) + - PDF-Worker fĂŒr asynchrone Verarbeitung + - REST-API `/api/generate-pdf` mit Auth, CSRF, Rate-Limiting + - PM2-Integration mit konfigurierbaren Workern (`QUEUE_ENABLE_PDF`) diff --git a/ecosystem.config.cjs b/ecosystem.config.cjs index 5338f02..2632222 100644 --- a/ecosystem.config.cjs +++ b/ecosystem.config.cjs @@ -1,22 +1,57 @@ module.exports = { - apps: [{ - name: 'payload', - cwd: '/home/payload/payload-cms', - script: 'pnpm', - args: 'start', - env: { - NODE_ENV: 'production', - PORT: 3000 + apps: [ + // Main Payload CMS App + { + name: 'payload', + cwd: '/home/payload/payload-cms', + script: 'pnpm', + args: 'start', + env: { + NODE_ENV: 'production', + PORT: 3000 + }, + instances: 1, + autorestart: true, + watch: false, + max_memory_restart: '1G', + error_file: '/home/payload/logs/error.log', + out_file: '/home/payload/logs/out.log', + time: true, + max_restarts: 10, + min_uptime: '10s', + restart_delay: 5000 }, - instances: 1, - autorestart: true, - watch: false, - max_memory_restart: '1G', - error_file: '/home/payload/logs/error.log', - out_file: '/home/payload/logs/out.log', - time: true, - max_restarts: 10, - min_uptime: '10s', - restart_delay: 5000 - }] + // Queue Worker (BullMQ - Email + PDF) + { + name: 'queue-worker', + cwd: '/home/payload/payload-cms', + script: 'npx', + args: 'tsx scripts/run-queue-worker.ts', + env: { + NODE_ENV: 'production', + // Credentials werden via dotenv aus .env geladen (siehe run-queue-worker.ts) + // Queue-spezifische Env Vars + QUEUE_EMAIL_CONCURRENCY: '3', + QUEUE_PDF_CONCURRENCY: '2', + QUEUE_DEFAULT_RETRY: '3', + QUEUE_REDIS_DB: '1', + // Worker aktivieren/deaktivieren + QUEUE_ENABLE_EMAIL: 'true', + QUEUE_ENABLE_PDF: 'true', + // PDF-spezifische Optionen + PDF_OUTPUT_DIR: '/tmp/payload-pdfs', + // PDF_GENERATION_DISABLED: 'true', // FĂŒr Tests + }, + instances: 1, + autorestart: true, + watch: false, + max_memory_restart: '768M', // PDF-Generierung braucht mehr RAM + error_file: '/home/payload/logs/queue-worker-error.log', + out_file: '/home/payload/logs/queue-worker-out.log', + time: true, + max_restarts: 10, + min_uptime: '5s', + restart_delay: 3000, + } + ] } diff --git a/package.json b/package.json index 6ddb2d1..daf552e 100644 --- a/package.json +++ b/package.json @@ -32,6 +32,7 @@ "@payloadcms/richtext-lexical": "3.65.0", "@payloadcms/translations": "^3.65.0", "@payloadcms/ui": "3.65.0", + "bullmq": "^5.65.1", "cross-env": "^7.0.3", "dotenv": "16.4.7", "graphql": "^16.8.1", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index e6bf852..d241e11 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -38,6 +38,9 @@ importers: '@payloadcms/ui': specifier: 3.65.0 version: 3.65.0(@types/react@19.1.8)(monaco-editor@0.55.1)(next@15.4.7(@babel/core@7.28.5)(@playwright/test@1.56.1)(react-dom@19.1.0(react@19.1.0))(react@19.1.0)(sass@1.77.4))(payload@3.65.0(graphql@16.12.0)(typescript@5.7.3))(react-dom@19.1.0(react@19.1.0))(react@19.1.0)(typescript@5.7.3) + bullmq: + specifier: ^5.65.1 + version: 5.65.1 cross-env: specifier: ^7.0.3 version: 7.0.3 @@ -1228,6 +1231,36 @@ packages: react: ^16.8.0 || ^17.0.0 || ^18.0.0 || ^19.0.0 react-dom: ^16.8.0 || ^17.0.0 || ^18.0.0 || ^19.0.0 + '@msgpackr-extract/msgpackr-extract-darwin-arm64@3.0.3': + resolution: {integrity: sha512-QZHtlVgbAdy2zAqNA9Gu1UpIuI8Xvsd1v8ic6B2pZmeFnFcMWiPLfWXh7TVw4eGEZ/C9TH281KwhVoeQUKbyjw==} + cpu: [arm64] + os: [darwin] + + '@msgpackr-extract/msgpackr-extract-darwin-x64@3.0.3': + resolution: {integrity: sha512-mdzd3AVzYKuUmiWOQ8GNhl64/IoFGol569zNRdkLReh6LRLHOXxU4U8eq0JwaD8iFHdVGqSy4IjFL4reoWCDFw==} + cpu: [x64] + os: [darwin] + + '@msgpackr-extract/msgpackr-extract-linux-arm64@3.0.3': + resolution: {integrity: sha512-YxQL+ax0XqBJDZiKimS2XQaf+2wDGVa1enVRGzEvLLVFeqa5kx2bWbtcSXgsxjQB7nRqqIGFIcLteF/sHeVtQg==} + cpu: [arm64] + os: [linux] + + '@msgpackr-extract/msgpackr-extract-linux-arm@3.0.3': + resolution: {integrity: sha512-fg0uy/dG/nZEXfYilKoRe7yALaNmHoYeIoJuJ7KJ+YyU2bvY8vPv27f7UKhGRpY6euFYqEVhxCFZgAUNQBM3nw==} + cpu: [arm] + os: [linux] + + '@msgpackr-extract/msgpackr-extract-linux-x64@3.0.3': + resolution: {integrity: sha512-cvwNfbP07pKUfq1uH+S6KJ7dT9K8WOE4ZiAcsrSes+UY55E/0jLYc+vq+DO7jlmqRb5zAggExKm0H7O/CBaesg==} + cpu: [x64] + os: [linux] + + '@msgpackr-extract/msgpackr-extract-win32-x64@3.0.3': + resolution: {integrity: sha512-x0fWaQtYp4E6sktbsdAqnehxDgEc/VwM7uLsRCYWaiGu0ykYdZPiS8zCWdnjHwyiumousxfBm4SO31eXqwEZhQ==} + cpu: [x64] + os: [win32] + '@napi-rs/wasm-runtime@0.2.12': resolution: {integrity: sha512-ZVWUcfwY4E/yPitQJl481FjFo3K22D6qF0DuFH6Y/nbnE11GY5uguDxZMGXPQ8WQ0128MXQD7TnfHyK4oWoIJQ==} @@ -2169,6 +2202,9 @@ packages: buffer-from@1.1.2: resolution: {integrity: sha512-E+XQCRwSbaaiChtv6k6Dwgc+bx+Bs6vuKJHHl5kox/BaKbhiXzqQOwK4cO22yElGp2OCmjwVhT3HmxgyPGnJfQ==} + bullmq@5.65.1: + resolution: {integrity: sha512-QgDAzX1G9L5IRy4Orva5CfQTXZT+5K+OfO/kbPrAqN+pmL9LJekCzxijXehlm/u2eXfWPfWvIdJJIqiuz3WJSg==} + busboy@1.6.0: resolution: {integrity: sha512-8SFQbg/0hQ9xy3UNTB0YEnsNBbWfhf7RtnzpL7TkBiTBRfrQ9Fxcnz7VJsleJpyp6rVLvXiuORqjlHi5q+PYuA==} engines: {node: '>=10.16.0'} @@ -2281,6 +2317,10 @@ packages: resolution: {integrity: sha512-AdmX6xUzdNASswsFtmwSt7Vj8po9IuqXm0UXz7QKPuEUmPB4XyjGfaAr2PSuELMwkRMVH1EpIkX5bTZGRB3eCA==} engines: {node: '>=10'} + cron-parser@4.9.0: + resolution: {integrity: sha512-p0SaNjrHOnQeR8/VnfGbmg9te2kfyYSQ7Sc/j/6DtPL3JQvKxmjO9TSjNFpujqV3vEYYBvNNvXSxzyksBWAx1Q==} + engines: {node: '>=12.0.0'} + croner@9.1.0: resolution: {integrity: sha512-p9nwwR4qyT5W996vBZhdvBCnMhicY5ytZkR4D1Xj0wuTDEiMnjwR57Q3RXYY/s0EpX6Ay3vgIcfaR+ewGHsi+g==} engines: {node: '>=18.0'} @@ -3288,6 +3328,10 @@ packages: lru-cache@5.1.1: resolution: {integrity: sha512-KpNARQA3Iwv+jTA0utUVVbrh+Jlrr1Fv0e56GGzAFOXN7dk/FviaDW8LHmK52DlcH4WP2n6gI8vN1aesBFgo9w==} + luxon@3.7.2: + resolution: {integrity: sha512-vtEhXh/gNjI9Yg1u4jX/0YVPMvxzHuGgCm6tC5kZyb08yjGWGnqAjGJvcXbqQR2P3MyMEFnRbpcdFS6PBcLqew==} + engines: {node: '>=12'} + lz-string@1.5.0: resolution: {integrity: sha512-h5bgJWpxJNswbU7qCrV0tIKQCaS3blPDrqKWx+QxzuzL1zGUzij9XCWLrSLsJPu5t+eWA/ycetzYAO5IOMcWAQ==} hasBin: true @@ -3432,6 +3476,13 @@ packages: ms@2.1.3: resolution: {integrity: sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==} + msgpackr-extract@3.0.3: + resolution: {integrity: sha512-P0efT1C9jIdVRefqjzOQ9Xml57zpOXnIuS+csaB4MdZbTdmGDLo8XhzBG1N7aO11gKDDkJvBLULeFTo46wwreA==} + hasBin: true + + msgpackr@1.11.5: + resolution: {integrity: sha512-UjkUHN0yqp9RWKy0Lplhh+wlpdt9oQBYgULZOiFhV3VclSF1JnSQWZ5r9gORQlNYaUKQoR8itv7g7z1xDDuACA==} + nanoid@3.3.11: resolution: {integrity: sha512-N8SpfPUnUp1bK+PMYW8qSWdl9U+wwNWI4QKxOYDy9JAro3WMX7p2OeVRF9v+347pnakNevPmiHhNmZ2HbFA76w==} engines: {node: ^10 || ^12 || ^13.7 || ^14 || >=15.0.1} @@ -3466,10 +3517,17 @@ packages: sass: optional: true + node-abort-controller@3.1.1: + resolution: {integrity: sha512-AGK2yQKIjRuqnc6VkX2Xj5d+QW8xZ87pa1UK6yA6ouUyuxfHuMP6umE5QK7UmTeOAymo+Zx1Fxiuw9rVx8taHQ==} + node-cron@4.2.1: resolution: {integrity: sha512-lgimEHPE/QDgFlywTd8yTR61ptugX3Qer29efeyWw2rv259HtGBNn1vZVmp8lB9uo9wC0t/AT4iGqXxia+CJFg==} engines: {node: '>=6.0.0'} + node-gyp-build-optional-packages@5.2.2: + resolution: {integrity: sha512-s+w+rBWnpTMwSFbaE0UXsRlg7hU4FjekKU4eyAih5T8nJuNZT1nNsskXpxmeqSK9UzkBl6UgRlnKc8hz8IEqOw==} + hasBin: true + node-releases@2.0.27: resolution: {integrity: sha512-nmh3lCkYZ3grZvqcCH+fjmQ7X+H0OeZgP40OierEaAptX4XofMh5kwNbWh7lBduUzCcV/8kZ+NDLCwm2iorIlA==} @@ -4341,6 +4399,10 @@ packages: resolution: {integrity: sha512-8XkAphELsDnEGrDxUOHB3RGvXz6TeuYSGEZBOjtTtPm2lwhGBjLgOzLHB63IUWfBpNucQjND6d3AOudO+H3RWQ==} hasBin: true + uuid@11.1.0: + resolution: {integrity: sha512-0/A9rDy9P7cJ+8w1c9WD9V//9Wj15Ce2MPz8Ri6032usz+NfePxx5AcN3bN+r6ZL6jEo066/yNYB3tn4pQEx+A==} + hasBin: true + uuid@9.0.0: resolution: {integrity: sha512-MXcSTerfPa4uqyzStbRoTgt5XIe3x5+42+q1sDuy3R5MDk66URdLMOZe5aPX/SQd+kuYAh0FdP/pO28IkQyTeg==} hasBin: true @@ -5829,6 +5891,24 @@ snapshots: react: 19.1.0 react-dom: 19.1.0(react@19.1.0) + '@msgpackr-extract/msgpackr-extract-darwin-arm64@3.0.3': + optional: true + + '@msgpackr-extract/msgpackr-extract-darwin-x64@3.0.3': + optional: true + + '@msgpackr-extract/msgpackr-extract-linux-arm64@3.0.3': + optional: true + + '@msgpackr-extract/msgpackr-extract-linux-arm@3.0.3': + optional: true + + '@msgpackr-extract/msgpackr-extract-linux-x64@3.0.3': + optional: true + + '@msgpackr-extract/msgpackr-extract-win32-x64@3.0.3': + optional: true + '@napi-rs/wasm-runtime@0.2.12': dependencies: '@emnapi/core': 1.7.1 @@ -7025,6 +7105,18 @@ snapshots: buffer-from@1.1.2: {} + bullmq@5.65.1: + dependencies: + cron-parser: 4.9.0 + ioredis: 5.8.2 + msgpackr: 1.11.5 + node-abort-controller: 3.1.1 + semver: 7.7.3 + tslib: 2.8.1 + uuid: 11.1.0 + transitivePeerDependencies: + - supports-color + busboy@1.6.0: dependencies: streamsearch: 1.1.0 @@ -7137,6 +7229,10 @@ snapshots: path-type: 4.0.0 yaml: 1.10.2 + cron-parser@4.9.0: + dependencies: + luxon: 3.7.2 + croner@9.1.0: {} cross-env@7.0.3: @@ -7470,8 +7566,8 @@ snapshots: '@typescript-eslint/parser': 8.48.0(eslint@9.39.1)(typescript@5.7.3) eslint: 9.39.1 eslint-import-resolver-node: 0.3.9 - eslint-import-resolver-typescript: 3.10.1(eslint-plugin-import@2.32.0(@typescript-eslint/parser@8.48.0(eslint@9.39.1)(typescript@5.7.3))(eslint@9.39.1))(eslint@9.39.1) - eslint-plugin-import: 2.32.0(@typescript-eslint/parser@8.48.0(eslint@9.39.1)(typescript@5.7.3))(eslint-import-resolver-typescript@3.10.1(eslint-plugin-import@2.32.0(@typescript-eslint/parser@8.48.0(eslint@9.39.1)(typescript@5.7.3))(eslint@9.39.1))(eslint@9.39.1))(eslint@9.39.1) + eslint-import-resolver-typescript: 3.10.1(eslint-plugin-import@2.32.0)(eslint@9.39.1) + eslint-plugin-import: 2.32.0(@typescript-eslint/parser@8.48.0(eslint@9.39.1)(typescript@5.7.3))(eslint-import-resolver-typescript@3.10.1)(eslint@9.39.1) eslint-plugin-jsx-a11y: 6.10.2(eslint@9.39.1) eslint-plugin-react: 7.37.5(eslint@9.39.1) eslint-plugin-react-hooks: 5.2.0(eslint@9.39.1) @@ -7490,7 +7586,7 @@ snapshots: transitivePeerDependencies: - supports-color - eslint-import-resolver-typescript@3.10.1(eslint-plugin-import@2.32.0(@typescript-eslint/parser@8.48.0(eslint@9.39.1)(typescript@5.7.3))(eslint@9.39.1))(eslint@9.39.1): + eslint-import-resolver-typescript@3.10.1(eslint-plugin-import@2.32.0)(eslint@9.39.1): dependencies: '@nolyfill/is-core-module': 1.0.39 debug: 4.4.3 @@ -7501,22 +7597,22 @@ snapshots: tinyglobby: 0.2.15 unrs-resolver: 1.11.1 optionalDependencies: - eslint-plugin-import: 2.32.0(@typescript-eslint/parser@8.48.0(eslint@9.39.1)(typescript@5.7.3))(eslint-import-resolver-typescript@3.10.1(eslint-plugin-import@2.32.0(@typescript-eslint/parser@8.48.0(eslint@9.39.1)(typescript@5.7.3))(eslint@9.39.1))(eslint@9.39.1))(eslint@9.39.1) + eslint-plugin-import: 2.32.0(@typescript-eslint/parser@8.48.0(eslint@9.39.1)(typescript@5.7.3))(eslint-import-resolver-typescript@3.10.1)(eslint@9.39.1) transitivePeerDependencies: - supports-color - eslint-module-utils@2.12.1(@typescript-eslint/parser@8.48.0(eslint@9.39.1)(typescript@5.7.3))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.10.1(eslint-plugin-import@2.32.0(@typescript-eslint/parser@8.48.0(eslint@9.39.1)(typescript@5.7.3))(eslint@9.39.1))(eslint@9.39.1))(eslint@9.39.1): + eslint-module-utils@2.12.1(@typescript-eslint/parser@8.48.0(eslint@9.39.1)(typescript@5.7.3))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.10.1)(eslint@9.39.1): dependencies: debug: 3.2.7 optionalDependencies: '@typescript-eslint/parser': 8.48.0(eslint@9.39.1)(typescript@5.7.3) eslint: 9.39.1 eslint-import-resolver-node: 0.3.9 - eslint-import-resolver-typescript: 3.10.1(eslint-plugin-import@2.32.0(@typescript-eslint/parser@8.48.0(eslint@9.39.1)(typescript@5.7.3))(eslint@9.39.1))(eslint@9.39.1) + eslint-import-resolver-typescript: 3.10.1(eslint-plugin-import@2.32.0)(eslint@9.39.1) transitivePeerDependencies: - supports-color - eslint-plugin-import@2.32.0(@typescript-eslint/parser@8.48.0(eslint@9.39.1)(typescript@5.7.3))(eslint-import-resolver-typescript@3.10.1(eslint-plugin-import@2.32.0(@typescript-eslint/parser@8.48.0(eslint@9.39.1)(typescript@5.7.3))(eslint@9.39.1))(eslint@9.39.1))(eslint@9.39.1): + eslint-plugin-import@2.32.0(@typescript-eslint/parser@8.48.0(eslint@9.39.1)(typescript@5.7.3))(eslint-import-resolver-typescript@3.10.1)(eslint@9.39.1): dependencies: '@rtsao/scc': 1.1.0 array-includes: 3.1.9 @@ -7527,7 +7623,7 @@ snapshots: doctrine: 2.1.0 eslint: 9.39.1 eslint-import-resolver-node: 0.3.9 - eslint-module-utils: 2.12.1(@typescript-eslint/parser@8.48.0(eslint@9.39.1)(typescript@5.7.3))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.10.1(eslint-plugin-import@2.32.0(@typescript-eslint/parser@8.48.0(eslint@9.39.1)(typescript@5.7.3))(eslint@9.39.1))(eslint@9.39.1))(eslint@9.39.1) + eslint-module-utils: 2.12.1(@typescript-eslint/parser@8.48.0(eslint@9.39.1)(typescript@5.7.3))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.10.1)(eslint@9.39.1) hasown: 2.0.2 is-core-module: 2.16.1 is-glob: 4.0.3 @@ -8243,6 +8339,8 @@ snapshots: dependencies: yallist: 3.1.1 + luxon@3.7.2: {} + lz-string@1.5.0: {} magic-string@0.30.21: @@ -8521,6 +8619,22 @@ snapshots: ms@2.1.3: {} + msgpackr-extract@3.0.3: + dependencies: + node-gyp-build-optional-packages: 5.2.2 + optionalDependencies: + '@msgpackr-extract/msgpackr-extract-darwin-arm64': 3.0.3 + '@msgpackr-extract/msgpackr-extract-darwin-x64': 3.0.3 + '@msgpackr-extract/msgpackr-extract-linux-arm': 3.0.3 + '@msgpackr-extract/msgpackr-extract-linux-arm64': 3.0.3 + '@msgpackr-extract/msgpackr-extract-linux-x64': 3.0.3 + '@msgpackr-extract/msgpackr-extract-win32-x64': 3.0.3 + optional: true + + msgpackr@1.11.5: + optionalDependencies: + msgpackr-extract: 3.0.3 + nanoid@3.3.11: {} napi-postinstall@0.3.4: {} @@ -8552,8 +8666,15 @@ snapshots: - '@babel/core' - babel-plugin-macros + node-abort-controller@3.1.1: {} + node-cron@4.2.1: {} + node-gyp-build-optional-packages@5.2.2: + dependencies: + detect-libc: 2.1.2 + optional: true + node-releases@2.0.27: {} nodemailer@7.0.11: {} @@ -9595,6 +9716,8 @@ snapshots: uuid@10.0.0: {} + uuid@11.1.0: {} + uuid@9.0.0: {} vfile-message@4.0.3: diff --git a/scripts/run-queue-worker.ts b/scripts/run-queue-worker.ts new file mode 100644 index 0000000..f6996a8 --- /dev/null +++ b/scripts/run-queue-worker.ts @@ -0,0 +1,100 @@ +#!/usr/bin/env node +/** + * Queue Worker Runner + * + * Standalone script zum Starten der Queue Workers. + * Wird von PM2 als separater Prozess gestartet. + * + * Usage: + * npx tsx scripts/run-queue-worker.ts + * # oder via PM2 + * pm2 start ecosystem.config.cjs --only queue-worker + */ + +// Load .env with explicit path BEFORE any other imports +import { config as dotenvConfig } from 'dotenv' +import { resolve, dirname } from 'path' +import { fileURLToPath } from 'url' + +// ES Module equivalent of __dirname +const __filename = fileURLToPath(import.meta.url) +const __dirname = dirname(__filename) + +// Explicitly load .env from project root (handles PM2 cwd issues) +const envPath = resolve(__dirname, '..', '.env') +dotenvConfig({ path: envPath }) + +console.log(`[QueueRunner] Loading env from: ${envPath}`) +console.log(`[QueueRunner] PAYLOAD_SECRET loaded: ${process.env.PAYLOAD_SECRET ? 'yes' : 'NO!'}`) + +// Dynamic imports after dotenv is loaded +async function main() { + const { startEmailWorker, stopEmailWorker } = await import('../src/lib/queue/workers/email-worker') + const { startPdfWorker, stopPdfWorker } = await import('../src/lib/queue/workers/pdf-worker') + + // Konfiguration via Umgebungsvariablen + const ENABLE_EMAIL_WORKER = process.env.QUEUE_ENABLE_EMAIL !== 'false' + const ENABLE_PDF_WORKER = process.env.QUEUE_ENABLE_PDF !== 'false' + + console.log('='.repeat(50)) + console.log('[QueueRunner] Starting queue workers...') + console.log(`[QueueRunner] PID: ${process.pid}`) + console.log(`[QueueRunner] Node: ${process.version}`) + console.log(`[QueueRunner] Email Worker: ${ENABLE_EMAIL_WORKER ? 'enabled' : 'disabled'}`) + console.log(`[QueueRunner] PDF Worker: ${ENABLE_PDF_WORKER ? 'enabled' : 'disabled'}`) + console.log('='.repeat(50)) + + // Workers starten + if (ENABLE_EMAIL_WORKER) { + startEmailWorker() + } + + if (ENABLE_PDF_WORKER) { + startPdfWorker() + } + + // Graceful Shutdown + async function shutdown(signal: string) { + console.log(`\n[QueueRunner] Received ${signal}, shutting down gracefully...`) + + try { + const stopPromises: Promise[] = [] + + if (ENABLE_EMAIL_WORKER) { + stopPromises.push(stopEmailWorker()) + } + if (ENABLE_PDF_WORKER) { + stopPromises.push(stopPdfWorker()) + } + + await Promise.all(stopPromises) + console.log('[QueueRunner] All workers stopped') + process.exit(0) + } catch (error) { + console.error('[QueueRunner] Error during shutdown:', error) + process.exit(1) + } + } + + // Signal Handlers + process.on('SIGTERM', () => shutdown('SIGTERM')) + process.on('SIGINT', () => shutdown('SIGINT')) + + // Unhandled Errors + process.on('unhandledRejection', (reason) => { + console.error('[QueueRunner] Unhandled Rejection:', reason) + }) + + process.on('uncaughtException', (error) => { + console.error('[QueueRunner] Uncaught Exception:', error) + shutdown('uncaughtException') + }) + + // Keep process alive + console.log('[QueueRunner] Workers started, waiting for jobs...') +} + +main().catch((error) => { + console.error('[QueueRunner] Fatal error:', error) + process.exit(1) +}) diff --git a/src/app/(payload)/api/admin/queues/[queueName]/jobs/route.ts b/src/app/(payload)/api/admin/queues/[queueName]/jobs/route.ts new file mode 100644 index 0000000..e77f7cf --- /dev/null +++ b/src/app/(payload)/api/admin/queues/[queueName]/jobs/route.ts @@ -0,0 +1,131 @@ +/** + * Queue Jobs API + * + * Endpunkt zum Abrufen der Jobs einer Queue. + * Nur fĂŒr Super-Admins zugĂ€nglich. + */ + +import { getPayload } from 'payload' +import config from '@payload-config' +import { NextRequest, NextResponse } from 'next/server' +import { getQueue, QUEUE_NAMES } from '@/lib/queue' +import { createSafeLogger } from '@/lib/security' + +const logger = createSafeLogger('API:AdminQueueJobs') + +interface UserWithAdmin { + id: number + isSuperAdmin?: boolean +} + +type JobState = 'waiting' | 'active' | 'completed' | 'failed' | 'delayed' + +/** + * GET /api/admin/queues/[queueName]/jobs + * + * Gibt die Jobs einer Queue zurĂŒck. + * Query-Parameter: + * - status: waiting | active | completed | failed | delayed (default: waiting) + * - start: number (default: 0) + * - end: number (default: 20) + */ +export async function GET( + req: NextRequest, + { params }: { params: Promise<{ queueName: string }> }, +) { + try { + const payload = await getPayload({ config }) + + // Authentifizierung prĂŒfen + const { user } = await payload.auth({ headers: req.headers }) + + if (!user) { + return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) + } + + const typedUser = user as UserWithAdmin + + // Nur Super-Admins haben Zugriff + if (!typedUser.isSuperAdmin) { + return NextResponse.json( + { error: 'Forbidden - Super admin access required' }, + { status: 403 }, + ) + } + + const { queueName } = await params + + // Queue-Namen validieren + const validQueues = Object.values(QUEUE_NAMES) + if (!validQueues.includes(queueName as typeof validQueues[number])) { + return NextResponse.json( + { error: `Invalid queue name. Valid: ${validQueues.join(', ')}` }, + { status: 400 }, + ) + } + + // Query-Parameter + const { searchParams } = new URL(req.url) + const status = (searchParams.get('status') || 'waiting') as JobState + const start = parseInt(searchParams.get('start') || '0', 10) + const end = parseInt(searchParams.get('end') || '20', 10) + + const validStatuses: JobState[] = ['waiting', 'active', 'completed', 'failed', 'delayed'] + if (!validStatuses.includes(status)) { + return NextResponse.json( + { error: `Invalid status. Valid: ${validStatuses.join(', ')}` }, + { status: 400 }, + ) + } + + const queue = getQueue(queueName as typeof validQueues[number]) + + // Jobs abrufen basierend auf Status + let jobs + switch (status) { + case 'waiting': + jobs = await queue.getWaiting(start, end) + break + case 'active': + jobs = await queue.getActive(start, end) + break + case 'completed': + jobs = await queue.getCompleted(start, end) + break + case 'failed': + jobs = await queue.getFailed(start, end) + break + case 'delayed': + jobs = await queue.getDelayed(start, end) + break + } + + // Jobs fĂŒr Response formatieren + const formattedJobs = jobs.map((job) => ({ + id: job.id, + name: job.name, + data: job.data, + timestamp: job.timestamp, + processedOn: job.processedOn, + finishedOn: job.finishedOn, + attemptsMade: job.attemptsMade, + failedReason: job.failedReason, + returnvalue: job.returnvalue, + progress: job.progress, + })) + + return NextResponse.json({ + queue: queueName, + status, + pagination: { start, end }, + count: formattedJobs.length, + jobs: formattedJobs, + }) + } catch (error) { + logger.error('admin-queue-jobs error', error) + return NextResponse.json( + { error: error instanceof Error ? error.message : 'Unknown error' }, + { status: 500 }, + ) + } +} diff --git a/src/app/(payload)/api/admin/queues/route.ts b/src/app/(payload)/api/admin/queues/route.ts new file mode 100644 index 0000000..ae1494f --- /dev/null +++ b/src/app/(payload)/api/admin/queues/route.ts @@ -0,0 +1,179 @@ +/** + * Queue Admin API + * + * Admin-Endpunkt fĂŒr Queue-Monitoring und -Verwaltung. + * Nur fĂŒr Super-Admins zugĂ€nglich. + */ + +import { getPayload } from 'payload' +import config from '@payload-config' +import { NextRequest, NextResponse } from 'next/server' +import { + getQueuesStatus, + isQueueAvailable, + getQueue, + QUEUE_NAMES, + type QueueStatus, +} from '@/lib/queue' +import { createSafeLogger } from '@/lib/security' + +const logger = createSafeLogger('API:AdminQueues') + +interface UserWithAdmin { + id: number + isSuperAdmin?: boolean +} + +/** + * GET /api/admin/queues + * + * Gibt den Status aller Queues zurĂŒck. + * Nur fĂŒr Super-Admins zugĂ€nglich. + */ +export async function GET(req: NextRequest) { + try { + const payload = await getPayload({ config }) + + // Authentifizierung prĂŒfen + const { user } = await payload.auth({ headers: req.headers }) + + if (!user) { + return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) + } + + const typedUser = user as UserWithAdmin + + // Nur Super-Admins haben Zugriff + if (!typedUser.isSuperAdmin) { + return NextResponse.json( + { error: 'Forbidden - Super admin access required' }, + { status: 403 }, + ) + } + + // Queue-VerfĂŒgbarkeit prĂŒfen + const available = await isQueueAvailable() + + if (!available) { + return NextResponse.json( + { + available: false, + error: 'Queue system unavailable (Redis connection failed)', + queues: [], + }, + { status: 503 }, + ) + } + + // Queue-Status abrufen + const statuses = await getQueuesStatus() + + return NextResponse.json({ + available: true, + timestamp: new Date().toISOString(), + queues: statuses, + summary: { + totalWaiting: statuses.reduce((sum: number, q: QueueStatus) => sum + q.waiting, 0), + totalActive: statuses.reduce((sum: number, q: QueueStatus) => sum + q.active, 0), + totalCompleted: statuses.reduce((sum: number, q: QueueStatus) => sum + q.completed, 0), + totalFailed: statuses.reduce((sum: number, q: QueueStatus) => sum + q.failed, 0), + totalDelayed: statuses.reduce((sum: number, q: QueueStatus) => sum + q.delayed, 0), + }, + }) + } catch (error) { + logger.error('admin-queues error', error) + return NextResponse.json( + { error: error instanceof Error ? error.message : 'Unknown error' }, + { status: 500 }, + ) + } +} + +/** + * POST /api/admin/queues + * + * Queue-Aktionen ausfĂŒhren (pause, resume, clean, etc.) + * Nur fĂŒr Super-Admins zugĂ€nglich. + */ +export async function POST(req: NextRequest) { + try { + const payload = await getPayload({ config }) + + // Authentifizierung prĂŒfen + const { user } = await payload.auth({ headers: req.headers }) + + if (!user) { + return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) + } + + const typedUser = user as UserWithAdmin + + // Nur Super-Admins haben Zugriff + if (!typedUser.isSuperAdmin) { + return NextResponse.json( + { error: 'Forbidden - Super admin access required' }, + { status: 403 }, + ) + } + + const body = await req.json() + const { action, queueName } = body + + if (!action || !queueName) { + return NextResponse.json( + { error: 'Missing required fields: action, queueName' }, + { status: 400 }, + ) + } + + // Queue-Namen validieren + const validQueues = Object.values(QUEUE_NAMES) + if (!validQueues.includes(queueName)) { + return NextResponse.json( + { error: `Invalid queue name. Valid: ${validQueues.join(', ')}` }, + { status: 400 }, + ) + } + + const queue = getQueue(queueName) + + switch (action) { + case 'pause': + await queue.pause() + logger.info(`Queue ${queueName} paused by user ${typedUser.id}`) + return NextResponse.json({ success: true, message: `Queue ${queueName} paused` }) + + case 'resume': + await queue.resume() + logger.info(`Queue ${queueName} resumed by user ${typedUser.id}`) + return NextResponse.json({ success: true, message: `Queue ${queueName} resumed` }) + + case 'clean': + const { status = 'completed', grace = 0 } = body + const cleanedCount = await queue.clean(grace, 1000, status) + logger.info(`Queue ${queueName} cleaned (${status}): ${cleanedCount.length} jobs`) + return NextResponse.json({ + success: true, + message: `Cleaned ${cleanedCount.length} ${status} jobs from ${queueName}`, + count: cleanedCount.length, + }) + + case 'drain': + await queue.drain() + logger.info(`Queue ${queueName} drained by user ${typedUser.id}`) + return NextResponse.json({ success: true, message: `Queue ${queueName} drained` }) + + default: + return NextResponse.json( + { error: `Unknown action: ${action}. Valid: pause, resume, clean, drain` }, + { status: 400 }, + ) + } + } catch (error) { + logger.error('admin-queues action error', error) + return NextResponse.json( + { error: error instanceof Error ? error.message : 'Unknown error' }, + { status: 500 }, + ) + } +} diff --git a/src/app/(payload)/api/generate-pdf/route.ts b/src/app/(payload)/api/generate-pdf/route.ts new file mode 100644 index 0000000..596cc13 --- /dev/null +++ b/src/app/(payload)/api/generate-pdf/route.ts @@ -0,0 +1,339 @@ +/** + * PDF Generation API + * + * Endpunkt fĂŒr PDF-Generierung ĂŒber die Queue oder direkt. + * UnterstĂŒtzt HTML-zu-PDF und URL-zu-PDF. + */ + +import { getPayload } from 'payload' +import config from '@payload-config' +import { NextRequest, NextResponse } from 'next/server' +import { enqueuePdf, getPdfJobStatus, getPdfJobResult, isQueueAvailable } from '@/lib/queue' +import { generatePdfFromHtml, generatePdfFromUrl } from '@/lib/pdf/pdf-service' +import { logAccessDenied } from '@/lib/audit/audit-service' +import { + publicApiLimiter, + rateLimitHeaders, + validateIpAccess, + createSafeLogger, + validateCsrf, +} from '@/lib/security' + +const RATE_LIMIT_MAX = 10 +const logger = createSafeLogger('API:GeneratePdf') + +interface UserWithTenants { + id: number + isSuperAdmin?: boolean + tenants?: Array<{ + tenant: { id: number } | number + }> +} + +/** + * PrĂŒft ob User Zugriff auf den angegebenen Tenant hat + */ +function userHasAccessToTenant(user: UserWithTenants, tenantId: number): boolean { + if (user.isSuperAdmin) { + return true + } + + if (!user.tenants || user.tenants.length === 0) { + return false + } + + return user.tenants.some((t) => { + const userTenantId = typeof t.tenant === 'object' ? t.tenant.id : t.tenant + return userTenantId === tenantId + }) +} + +/** + * POST /api/generate-pdf + * + * Generiert ein PDF aus HTML oder URL. + * + * Body: + * - tenantId: number (erforderlich) + * - source: 'html' | 'url' (erforderlich) + * - html?: string (wenn source='html') + * - url?: string (wenn source='url') + * - options?: { format, landscape, margin, printBackground, scale } + * - queued?: boolean (true = async via Queue, false = sync) + * - documentType?: string (invoice, report, export, etc.) + * - filename?: string + */ +export async function POST(req: NextRequest) { + try { + // IP-Allowlist prĂŒfen + const ipCheck = validateIpAccess(req, 'generatePdf') + if (!ipCheck.allowed) { + logger.warn(`IP blocked: ${ipCheck.ip}`, { reason: ipCheck.reason }) + return NextResponse.json({ error: 'Access denied' }, { status: 403 }) + } + + // CSRF-Schutz + const csrfResult = validateCsrf(req) + if (!csrfResult.valid) { + logger.warn('CSRF validation failed', { reason: csrfResult.reason }) + return NextResponse.json({ error: 'CSRF validation failed' }, { status: 403 }) + } + + const payload = await getPayload({ config }) + + // Authentifizierung prĂŒfen + const { user } = await payload.auth({ headers: req.headers }) + + if (!user) { + return NextResponse.json({ error: 'Unauthorized - Login required' }, { status: 401 }) + } + + const typedUser = user as UserWithTenants + + // Rate Limiting + const rateLimit = await publicApiLimiter.check(String(typedUser.id)) + if (!rateLimit.allowed) { + return NextResponse.json( + { + error: 'Rate limit exceeded', + message: `Maximum ${RATE_LIMIT_MAX} requests per minute.`, + }, + { + status: 429, + headers: rateLimitHeaders(rateLimit, RATE_LIMIT_MAX), + }, + ) + } + + const body = await req.json() + const { + tenantId, + source, + html, + url, + options = {}, + queued = true, + documentType, + filename, + priority, + } = body + + // Validierung + if (!tenantId) { + return NextResponse.json({ error: 'Missing required field: tenantId' }, { status: 400 }) + } + + const numericTenantId = Number(tenantId) + if (isNaN(numericTenantId)) { + return NextResponse.json({ error: 'Invalid tenantId: must be a number' }, { status: 400 }) + } + + if (!source || !['html', 'url'].includes(source)) { + return NextResponse.json( + { error: 'Invalid source: must be "html" or "url"' }, + { status: 400 }, + ) + } + + if (source === 'html' && !html) { + return NextResponse.json( + { error: 'Missing required field: html (for source="html")' }, + { status: 400 }, + ) + } + + if (source === 'url' && !url) { + return NextResponse.json( + { error: 'Missing required field: url (for source="url")' }, + { status: 400 }, + ) + } + + // Zugriffskontrolle + if (!userHasAccessToTenant(typedUser, numericTenantId)) { + await logAccessDenied( + payload, + `/api/generate-pdf (tenantId: ${numericTenantId})`, + typedUser.id, + user.email as string, + ) + + return NextResponse.json( + { error: 'Forbidden - You do not have access to this tenant' }, + { status: 403 }, + ) + } + + const rlHeaders = rateLimitHeaders(rateLimit, RATE_LIMIT_MAX) + + // Queued PDF Generation (async) + if (queued) { + const queueAvailable = await isQueueAvailable() + if (!queueAvailable) { + return NextResponse.json( + { error: 'Queue system unavailable. Use queued=false for direct generation.' }, + { status: 503, headers: rlHeaders }, + ) + } + + const job = await enqueuePdf({ + tenantId: numericTenantId, + source, + html, + url, + options, + documentType, + filename, + triggeredBy: String(typedUser.id), + priority: priority || 'normal', + correlationId: `pdf-${Date.now()}-${typedUser.id}`, + }) + + return NextResponse.json( + { + success: true, + queued: true, + message: 'PDF generation job queued successfully', + jobId: job.id, + }, + { headers: rlHeaders }, + ) + } + + // Direct PDF Generation (sync) + let result + if (source === 'html') { + result = await generatePdfFromHtml(html, options) + } else { + result = await generatePdfFromUrl(url, options) + } + + if (!result.success) { + return NextResponse.json( + { success: false, error: result.error }, + { status: 500, headers: rlHeaders }, + ) + } + + // PDF als Base64 zurĂŒckgeben + return NextResponse.json( + { + success: true, + queued: false, + pdf: result.buffer?.toString('base64'), + filename: filename || `document-${Date.now()}.pdf`, + pageCount: result.pageCount, + fileSize: result.fileSize, + duration: result.duration, + }, + { headers: rlHeaders }, + ) + } catch (error) { + logger.error('generate-pdf error', error) + return NextResponse.json( + { error: error instanceof Error ? error.message : 'Unknown error' }, + { status: 500 }, + ) + } +} + +/** + * GET /api/generate-pdf?jobId=... + * + * Gibt den Status/das Ergebnis eines PDF-Jobs zurĂŒck. + */ +export async function GET(req: NextRequest) { + try { + const { searchParams } = new URL(req.url) + const jobId = searchParams.get('jobId') + + // Wenn keine jobId, zeige API-Dokumentation + if (!jobId) { + return NextResponse.json({ + endpoint: '/api/generate-pdf', + methods: { + POST: { + description: 'Generate a PDF from HTML or URL', + authentication: 'Required', + body: { + tenantId: 'number (required)', + source: '"html" | "url" (required)', + html: 'string (required if source="html")', + url: 'string (required if source="url")', + options: { + format: '"A4" | "A3" | "Letter" | "Legal"', + landscape: 'boolean', + margin: '{ top, right, bottom, left }', + printBackground: 'boolean', + scale: 'number', + }, + queued: 'boolean (default: true)', + documentType: 'string (invoice, report, export, etc.)', + filename: 'string', + priority: '"high" | "normal" | "low"', + }, + }, + GET: { + description: 'Get status/result of a PDF generation job', + query: { + jobId: 'string (required)', + }, + }, + }, + examples: { + generateFromHtml: { + tenantId: 1, + source: 'html', + html: '

Hello

World

', + options: { format: 'A4' }, + queued: true, + }, + generateFromUrl: { + tenantId: 1, + source: 'url', + url: 'https://example.com/invoice/123', + queued: false, + }, + }, + }) + } + + const payload = await getPayload({ config }) + + // Authentifizierung prĂŒfen + const { user } = await payload.auth({ headers: req.headers }) + + if (!user) { + return NextResponse.json({ error: 'Unauthorized - Login required' }, { status: 401 }) + } + + // Job-Status abrufen + const status = await getPdfJobStatus(jobId) + + if (!status) { + return NextResponse.json({ error: 'Job not found' }, { status: 404 }) + } + + // Wenn abgeschlossen, Ergebnis mitliefern + if (status.state === 'completed' && status.result) { + return NextResponse.json({ + jobId, + state: status.state, + result: status.result, + }) + } + + return NextResponse.json({ + jobId, + state: status.state, + progress: status.progress, + failedReason: status.failedReason, + }) + } catch (error) { + logger.error('generate-pdf status error', error) + return NextResponse.json( + { error: error instanceof Error ? error.message : 'Unknown error' }, + { status: 500 }, + ) + } +} diff --git a/src/app/(payload)/api/send-email/route.ts b/src/app/(payload)/api/send-email/route.ts index 8876907..026d136 100644 --- a/src/app/(payload)/api/send-email/route.ts +++ b/src/app/(payload)/api/send-email/route.ts @@ -1,6 +1,7 @@ import { getPayload } from 'payload' import config from '@payload-config' import { sendTenantEmail, sendTestEmail } from '@/lib/email/tenant-email-service' +import { enqueueEmail, isQueueAvailable } from '@/lib/queue' import { NextRequest, NextResponse } from 'next/server' import { logRateLimit, logAccessDenied } from '@/lib/audit/audit-service' import { @@ -115,7 +116,7 @@ export async function POST(req: NextRequest) { } const body = await req.json() - const { tenantId, to, subject, html, text, replyTo, test } = body + const { tenantId, to, subject, html, text, replyTo, test, queued, priority } = body // Validierung if (!tenantId) { @@ -190,6 +191,41 @@ export async function POST(req: NextRequest) { ) } + // Queued Email Sending (async via BullMQ) + if (queued) { + const queueAvailable = await isQueueAvailable() + if (!queueAvailable) { + return NextResponse.json( + { error: 'Queue system unavailable. Use queued=false for direct sending.' }, + { status: 503, headers: rlHeaders }, + ) + } + + const job = await enqueueEmail({ + tenantId: numericTenantId, + to, + subject, + html, + text, + replyTo, + source: 'api', + triggeredBy: String(typedUser.id), + priority: priority || 'normal', + correlationId: `api-${Date.now()}-${typedUser.id}`, + }) + + return NextResponse.json( + { + success: true, + queued: true, + message: 'Email queued successfully', + jobId: job.id, + }, + { headers: rlHeaders }, + ) + } + + // Direct Email Sending (synchronous) const result = await sendTenantEmail(payload, numericTenantId, { to, subject, @@ -246,11 +282,14 @@ export async function GET() { text: 'string (optional) - Plain text content', replyTo: 'string (optional) - Reply-to address', test: 'boolean (optional) - Send test email', + queued: 'boolean (optional) - Queue email for async sending via BullMQ', + priority: 'string (optional) - Priority for queued emails: high, normal, low', }, response: { success: 'boolean', - messageId: 'string (on success)', - logId: 'number (email log ID)', + messageId: 'string (on direct send success)', + jobId: 'string (on queued success)', + logId: 'number (email log ID - direct send only)', error: 'string (on failure)', }, examples: { @@ -260,6 +299,14 @@ export async function GET() { subject: 'Hello World', html: '

Hello!

This is a test email.

', }, + sendQueuedEmail: { + tenantId: 1, + to: 'recipient@example.com', + subject: 'Hello World', + html: '

Hello!

This is a queued email.

', + queued: true, + priority: 'high', + }, sendTestEmail: { tenantId: 1, to: 'test@example.com', diff --git a/src/collections/Tenants.ts b/src/collections/Tenants.ts index 98cd094..8c17732 100644 --- a/src/collections/Tenants.ts +++ b/src/collections/Tenants.ts @@ -154,7 +154,7 @@ export const Tenants: CollectionConfig = { hooks: { beforeValidate: [validateSmtpHost], }, - validate: (value, { siblingData }) => { + validate: (value: string | undefined | null, { siblingData }: { siblingData: Record }) => { const emailData = siblingData as { useCustomSmtp?: boolean } if (emailData?.useCustomSmtp && !value) { return 'SMTP Host ist erforderlich' @@ -201,7 +201,7 @@ export const Tenants: CollectionConfig = { width: '50%', description: 'Meist die E-Mail-Adresse', }, - validate: (value, { siblingData }) => { + validate: (value: string | undefined | null, { siblingData }: { siblingData: Record }) => { const smtpData = siblingData as { host?: string } // Nur validieren wenn host gesetzt ist (d.h. SMTP aktiv) if (smtpData?.host && !value) { diff --git a/src/components/admin/TenantBreadcrumb.tsx b/src/components/admin/TenantBreadcrumb.tsx index efbfe61..a8f7d3f 100644 --- a/src/components/admin/TenantBreadcrumb.tsx +++ b/src/components/admin/TenantBreadcrumb.tsx @@ -19,10 +19,15 @@ export const TenantBreadcrumb: React.FC = () => { return null } + // Handle localized labels (Record) or plain strings + const displayLabel = typeof currentTenant.label === 'string' + ? currentTenant.label + : (currentTenant.label as Record)?.de || (currentTenant.label as Record)?.en || String(currentTenant.value) + return (
Aktiver Tenant: - {currentTenant.label} + {displayLabel}
) } diff --git a/src/components/admin/TenantDashboard.tsx b/src/components/admin/TenantDashboard.tsx index 67108a0..fa93d86 100644 --- a/src/components/admin/TenantDashboard.tsx +++ b/src/components/admin/TenantDashboard.tsx @@ -51,7 +51,12 @@ export const TenantDashboard: React.FC = () => { // Aktueller Tenant-Name const currentTenant = options?.find((opt) => opt.value === selectedTenantID) - const tenantName = currentTenant?.label || 'Unbekannt' + // Handle localized labels (Record) or plain strings + const tenantName = currentTenant?.label + ? (typeof currentTenant.label === 'string' + ? currentTenant.label + : (currentTenant.label as Record)?.de || (currentTenant.label as Record)?.en || String(selectedTenantID)) + : 'Unbekannt' const fetchStats = useCallback(async () => { if (!selectedTenantID) { diff --git a/src/hooks/auditAuthEvents.ts b/src/hooks/auditAuthEvents.ts index 31d3a1f..48208ae 100644 --- a/src/hooks/auditAuthEvents.ts +++ b/src/hooks/auditAuthEvents.ts @@ -74,20 +74,22 @@ function getHeaderValue(req: PayloadRequest, headerName: string): string | undef const lowerName = headerName.toLowerCase() // 1. Versuche req.get() (Express Request Methode) - if (typeof (req as { get?: (name: string) => string | undefined }).get === 'function') { - const value = (req as { get: (name: string) => string | undefined }).get(lowerName) + const reqWithGet = req as unknown as { get?: (name: string) => string | undefined } + if (typeof reqWithGet.get === 'function') { + const value = reqWithGet.get(lowerName) if (value) return value } // 2. Versuche headers.get() (Fetch API Headers) - if (req.headers && typeof (req.headers as { get?: (name: string) => string | null }).get === 'function') { - const value = (req.headers as { get: (name: string) => string | null }).get(lowerName) + const headersWithGet = req.headers as unknown as { get?: (name: string) => string | null } + if (req.headers && typeof headersWithGet.get === 'function') { + const value = headersWithGet.get(lowerName) if (value) return value } // 3. Direkter Zugriff auf headers object (IncomingHttpHeaders) if (req.headers && typeof req.headers === 'object') { - const headers = req.headers as Record + const headers = req.headers as unknown as Record const value = headers[lowerName] if (typeof value === 'string') return value if (Array.isArray(value) && value.length > 0) return value[0] diff --git a/src/lib/audit/audit-service.ts b/src/lib/audit/audit-service.ts index a6a451e..5321012 100644 --- a/src/lib/audit/audit-service.ts +++ b/src/lib/audit/audit-service.ts @@ -66,20 +66,22 @@ function getHeaderValue( const lowerName = headerName.toLowerCase() // 1. Versuche req.get() (Express Request Methode) - if (typeof (req as { get?: (name: string) => string | undefined }).get === 'function') { - const value = (req as { get: (name: string) => string | undefined }).get(lowerName) + const reqWithGet = req as unknown as { get?: (name: string) => string | undefined } + if (typeof reqWithGet.get === 'function') { + const value = reqWithGet.get(lowerName) if (value) return value } // 2. Versuche headers.get() (Fetch API Headers) - if (req.headers && typeof (req.headers as { get?: (name: string) => string | null }).get === 'function') { - const value = (req.headers as { get: (name: string) => string | null }).get(lowerName) + const headersWithGet = req.headers as unknown as { get?: (name: string) => string | null } + if (req.headers && typeof headersWithGet.get === 'function') { + const value = headersWithGet.get(lowerName) if (value) return value } // 3. Direkter Zugriff auf headers object (IncomingHttpHeaders) if (req.headers && typeof req.headers === 'object') { - const headers = req.headers as Record + const headers = req.headers as unknown as Record const value = headers[lowerName] if (typeof value === 'string') return value if (Array.isArray(value) && value.length > 0) return value[0] diff --git a/src/lib/pdf/pdf-service.ts b/src/lib/pdf/pdf-service.ts new file mode 100644 index 0000000..5648171 --- /dev/null +++ b/src/lib/pdf/pdf-service.ts @@ -0,0 +1,322 @@ +/** + * PDF Generation Service + * + * Generiert PDFs aus HTML oder URLs mittels Playwright. + * UnterstĂŒtzt verschiedene Formate und Optionen. + */ + +import { chromium, Browser, Page } from 'playwright' +import * as fs from 'fs/promises' +import * as path from 'path' + +// Umgebungsvariablen +const PDF_OUTPUT_DIR = process.env.PDF_OUTPUT_DIR || '/tmp/payload-pdfs' +const PDF_GENERATION_DISABLED = process.env.PDF_GENERATION_DISABLED === 'true' + +// Browser-Instanz (wird wiederverwendet) +let browserInstance: Browser | null = null + +export interface PdfOptions { + format?: 'A4' | 'A3' | 'Letter' | 'Legal' + landscape?: boolean + margin?: { + top?: string + right?: string + bottom?: string + left?: string + } + printBackground?: boolean + scale?: number + headerTemplate?: string + footerTemplate?: string + displayHeaderFooter?: boolean +} + +export interface PdfResult { + success: boolean + buffer?: Buffer + outputPath?: string + filename?: string + pageCount?: number + fileSize?: number + error?: string + duration?: number +} + +/** + * Holt oder erstellt eine Browser-Instanz + */ +async function getBrowser(): Promise { + if (!browserInstance || !browserInstance.isConnected()) { + console.log('[PDF] Launching browser...') + browserInstance = await chromium.launch({ + headless: true, + args: [ + '--no-sandbox', + '--disable-setuid-sandbox', + '--disable-dev-shm-usage', + '--disable-gpu', + ], + }) + } + return browserInstance +} + +/** + * Schließt die Browser-Instanz + */ +export async function closeBrowser(): Promise { + if (browserInstance) { + await browserInstance.close() + browserInstance = null + console.log('[PDF] Browser closed') + } +} + +/** + * Stellt sicher, dass das Output-Verzeichnis existiert + */ +async function ensureOutputDir(): Promise { + try { + await fs.mkdir(PDF_OUTPUT_DIR, { recursive: true }) + } catch { + // Verzeichnis existiert bereits + } +} + +/** + * Generiert ein PDF aus HTML-Content + */ +export async function generatePdfFromHtml( + html: string, + options: PdfOptions = {}, + outputPath?: string, +): Promise { + const startTime = Date.now() + + // Check ob PDF-Generierung deaktiviert ist (Test-Modus) + if (PDF_GENERATION_DISABLED) { + console.log('[PDF] Generation disabled - returning mock result') + return { + success: true, + buffer: Buffer.from('mock-pdf-content'), + filename: 'mock.pdf', + pageCount: 1, + fileSize: 17, + duration: Date.now() - startTime, + } + } + + let page: Page | null = null + + try { + const browser = await getBrowser() + page = await browser.newPage() + + // HTML laden + await page.setContent(html, { waitUntil: 'networkidle' }) + + // PDF-Optionen + const pdfOptions = { + format: options.format || 'A4', + landscape: options.landscape || false, + margin: options.margin || { + top: '20mm', + right: '15mm', + bottom: '20mm', + left: '15mm', + }, + printBackground: options.printBackground !== false, + scale: options.scale || 1, + displayHeaderFooter: options.displayHeaderFooter || false, + headerTemplate: options.headerTemplate || '', + footerTemplate: options.footerTemplate || '', + } + + // PDF generieren + const pdfBuffer = await page.pdf(pdfOptions) + + // Seitenzahl ermitteln (geschĂ€tzt aus DateigrĂ¶ĂŸe oder via PDF-Parser) + // FĂŒr genauere ZĂ€hlung wĂŒrde man einen PDF-Parser brauchen + const pageCount = Math.max(1, Math.ceil(pdfBuffer.length / 50000)) + + const result: PdfResult = { + success: true, + buffer: pdfBuffer, + pageCount, + fileSize: pdfBuffer.length, + duration: Date.now() - startTime, + } + + // Optional: In Datei speichern + if (outputPath) { + await ensureOutputDir() + const fullPath = path.isAbsolute(outputPath) + ? outputPath + : path.join(PDF_OUTPUT_DIR, outputPath) + + await fs.writeFile(fullPath, pdfBuffer) + result.outputPath = fullPath + result.filename = path.basename(fullPath) + console.log(`[PDF] Saved to ${fullPath}`) + } + + console.log(`[PDF] Generated successfully (${pdfBuffer.length} bytes, ~${pageCount} pages, ${result.duration}ms)`) + + return result + } catch (error) { + const errorMessage = error instanceof Error ? error.message : 'Unknown error' + console.error('[PDF] Generation failed:', errorMessage) + + return { + success: false, + error: errorMessage, + duration: Date.now() - startTime, + } + } finally { + if (page) { + await page.close() + } + } +} + +/** + * Generiert ein PDF von einer URL + */ +export async function generatePdfFromUrl( + url: string, + options: PdfOptions = {}, + outputPath?: string, +): Promise { + const startTime = Date.now() + + // Check ob PDF-Generierung deaktiviert ist (Test-Modus) + if (PDF_GENERATION_DISABLED) { + console.log('[PDF] Generation disabled - returning mock result') + return { + success: true, + buffer: Buffer.from('mock-pdf-content'), + filename: 'mock.pdf', + pageCount: 1, + fileSize: 17, + duration: Date.now() - startTime, + } + } + + let page: Page | null = null + + try { + const browser = await getBrowser() + page = await browser.newPage() + + // URL laden + await page.goto(url, { waitUntil: 'networkidle', timeout: 30000 }) + + // PDF-Optionen + const pdfOptions = { + format: options.format || 'A4', + landscape: options.landscape || false, + margin: options.margin || { + top: '20mm', + right: '15mm', + bottom: '20mm', + left: '15mm', + }, + printBackground: options.printBackground !== false, + scale: options.scale || 1, + displayHeaderFooter: options.displayHeaderFooter || false, + headerTemplate: options.headerTemplate || '', + footerTemplate: options.footerTemplate || '', + } + + // PDF generieren + const pdfBuffer = await page.pdf(pdfOptions) + const pageCount = Math.max(1, Math.ceil(pdfBuffer.length / 50000)) + + const result: PdfResult = { + success: true, + buffer: pdfBuffer, + pageCount, + fileSize: pdfBuffer.length, + duration: Date.now() - startTime, + } + + // Optional: In Datei speichern + if (outputPath) { + await ensureOutputDir() + const fullPath = path.isAbsolute(outputPath) + ? outputPath + : path.join(PDF_OUTPUT_DIR, outputPath) + + await fs.writeFile(fullPath, pdfBuffer) + result.outputPath = fullPath + result.filename = path.basename(fullPath) + console.log(`[PDF] Saved to ${fullPath}`) + } + + console.log(`[PDF] Generated from URL successfully (${pdfBuffer.length} bytes, ~${pageCount} pages, ${result.duration}ms)`) + + return result + } catch (error) { + const errorMessage = error instanceof Error ? error.message : 'Unknown error' + console.error('[PDF] Generation from URL failed:', errorMessage) + + return { + success: false, + error: errorMessage, + duration: Date.now() - startTime, + } + } finally { + if (page) { + await page.close() + } + } +} + +/** + * Generiert einen eindeutigen Dateinamen fĂŒr eine PDF + */ +export function generatePdfFilename( + prefix: string = 'document', + tenantId?: number, + documentType?: string, +): string { + const timestamp = new Date().toISOString().replace(/[:.]/g, '-') + const parts = [prefix] + + if (documentType) { + parts.push(documentType) + } + if (tenantId) { + parts.push(`tenant-${tenantId}`) + } + parts.push(timestamp) + + return `${parts.join('_')}.pdf` +} + +/** + * Löscht eine generierte PDF-Datei + */ +export async function deletePdfFile(filePath: string): Promise { + try { + await fs.unlink(filePath) + console.log(`[PDF] Deleted ${filePath}`) + return true + } catch { + console.warn(`[PDF] Could not delete ${filePath}`) + return false + } +} + +/** + * Liest eine PDF-Datei als Buffer + */ +export async function readPdfFile(filePath: string): Promise { + try { + return await fs.readFile(filePath) + } catch { + console.warn(`[PDF] Could not read ${filePath}`) + return null + } +} diff --git a/src/lib/queue/index.ts b/src/lib/queue/index.ts new file mode 100644 index 0000000..29c49ea --- /dev/null +++ b/src/lib/queue/index.ts @@ -0,0 +1,43 @@ +/** + * Queue Module Exports + * + * Zentrale Exports fĂŒr das BullMQ Queue-System. + */ + +// Queue Service +export { + QUEUE_NAMES, + type QueueName, + getQueue, + getQueueEvents, + getAllQueues, + closeAllQueues, + isQueueAvailable, + getQueuesStatus, + type QueueStatus, + defaultJobOptions, + getQueueRedisConnection, +} from './queue-service' + +// Email Job +export { + type EmailJobData, + type EmailJobResult, + enqueueEmail, + enqueueDelayedEmail, + enqueueEmailBatch, + isEmailJobPending, + getEmailJobStatus, +} from './jobs/email-job' + +// PDF Job +export { + type PdfJobData, + type PdfJobResult, + enqueuePdf, + enqueueDelayedPdf, + enqueuePdfBatch, + isPdfJobPending, + getPdfJobStatus, + getPdfJobResult, +} from './jobs/pdf-job' diff --git a/src/lib/queue/jobs/email-job.ts b/src/lib/queue/jobs/email-job.ts new file mode 100644 index 0000000..5263240 --- /dev/null +++ b/src/lib/queue/jobs/email-job.ts @@ -0,0 +1,150 @@ +/** + * Email Job Definition + * + * Definiert den E-Mail-Versand-Job fĂŒr die Queue. + * UnterstĂŒtzt Multi-Tenant SMTP-Konfiguration. + */ + +import { Job } from 'bullmq' +import { getQueue, QUEUE_NAMES, defaultJobOptions } from '../queue-service' + +// Job-Daten Typen +export interface EmailJobData { + // E-Mail-Inhalte + to: string | string[] + subject: string + html?: string + text?: string + replyTo?: string + + // Tenant-Kontext + tenantId: number + tenantSlug?: string + + // Metadaten + triggeredBy?: string // User ID oder System + source?: 'api' | 'form' | 'notification' | 'alert' + correlationId?: string // FĂŒr Tracking + + // Optionen + priority?: 'high' | 'normal' | 'low' +} + +export interface EmailJobResult { + success: boolean + messageId?: string + error?: string + timestamp: string + attempts: number +} + +// Priority-Mapping fĂŒr BullMQ (niedrigere Zahl = höhere PrioritĂ€t) +const PRIORITY_MAP = { + high: 1, + normal: 5, + low: 10, +} + +/** + * FĂŒgt einen E-Mail-Job zur Queue hinzu + */ +export async function enqueueEmail(data: EmailJobData): Promise> { + const queue = getQueue(QUEUE_NAMES.EMAIL) + + const jobOptions = { + ...defaultJobOptions, + priority: PRIORITY_MAP[data.priority || 'normal'], + // Job-ID fĂŒr Deduplizierung (optional) + jobId: data.correlationId, + } + + const job = await queue.add('send-email', data, jobOptions) + + console.log(`[EmailQueue] Job ${job.id} queued for ${Array.isArray(data.to) ? data.to.join(', ') : data.to}`) + + return job +} + +/** + * FĂŒgt einen verzögerten E-Mail-Job zur Queue hinzu + */ +export async function enqueueDelayedEmail( + data: EmailJobData, + delayMs: number +): Promise> { + const queue = getQueue(QUEUE_NAMES.EMAIL) + + const job = await queue.add('send-email', data, { + ...defaultJobOptions, + priority: PRIORITY_MAP[data.priority || 'normal'], + delay: delayMs, + }) + + console.log(`[EmailQueue] Delayed job ${job.id} queued (delay: ${delayMs}ms)`) + + return job +} + +/** + * FĂŒgt mehrere E-Mails als Batch zur Queue hinzu + */ +export async function enqueueEmailBatch( + emails: EmailJobData[] +): Promise[]> { + const queue = getQueue(QUEUE_NAMES.EMAIL) + + const jobs = await queue.addBulk( + emails.map((data) => ({ + name: 'send-email', + data, + opts: { + ...defaultJobOptions, + priority: PRIORITY_MAP[data.priority || 'normal'], + }, + })) + ) + + console.log(`[EmailQueue] ${jobs.length} batch jobs queued`) + + return jobs +} + +/** + * PrĂŒft ob ein Job bereits existiert (fĂŒr Deduplizierung) + */ +export async function isEmailJobPending(correlationId: string): Promise { + const queue = getQueue(QUEUE_NAMES.EMAIL) + + const job = await queue.getJob(correlationId) + if (!job) return false + + const state = await job.getState() + return state === 'waiting' || state === 'active' || state === 'delayed' +} + +/** + * Holt den Status eines E-Mail-Jobs + */ +export async function getEmailJobStatus(jobId: string): Promise<{ + state: string + progress: number + result?: EmailJobResult + failedReason?: string +} | null> { + const queue = getQueue(QUEUE_NAMES.EMAIL) + + const job = await queue.getJob(jobId) + if (!job) return null + + const [state, progress] = await Promise.all([ + job.getState(), + job.progress, + ]) + + return { + state, + progress: typeof progress === 'number' ? progress : 0, + result: job.returnvalue as EmailJobResult | undefined, + failedReason: job.failedReason, + } +} diff --git a/src/lib/queue/jobs/pdf-job.ts b/src/lib/queue/jobs/pdf-job.ts new file mode 100644 index 0000000..fefc250 --- /dev/null +++ b/src/lib/queue/jobs/pdf-job.ts @@ -0,0 +1,201 @@ +/** + * PDF Job Definition + * + * Definiert den PDF-Generierungs-Job fĂŒr die Queue. + * UnterstĂŒtzt HTML-zu-PDF Konvertierung und URL-zu-PDF. + */ + +import { Job } from 'bullmq' +import { getQueue, QUEUE_NAMES, defaultJobOptions } from '../queue-service' + +// Job-Daten Typen +export interface PdfJobData { + // Quelle fĂŒr PDF + source: 'html' | 'url' + html?: string // HTML-Content (wenn source='html') + url?: string // URL zum Rendern (wenn source='url') + + // Output-Konfiguration + outputPath?: string // Pfad zum Speichern (optional) + filename?: string // Dateiname fĂŒr Download + + // PDF-Optionen + options?: { + format?: 'A4' | 'A3' | 'Letter' | 'Legal' + landscape?: boolean + margin?: { + top?: string + right?: string + bottom?: string + left?: string + } + printBackground?: boolean + scale?: number + headerTemplate?: string + footerTemplate?: string + displayHeaderFooter?: boolean + } + + // Tenant-Kontext + tenantId: number + tenantSlug?: string + + // Metadaten + triggeredBy?: string // User ID oder System + documentType?: 'invoice' | 'report' | 'export' | 'certificate' | 'other' + correlationId?: string // FĂŒr Tracking + + // Optionen + priority?: 'high' | 'normal' | 'low' +} + +export interface PdfJobResult { + success: boolean + outputPath?: string // Wo die PDF gespeichert wurde + buffer?: string // Base64-encoded PDF (wenn kein outputPath) + filename?: string + pageCount?: number + fileSize?: number // In Bytes + error?: string + timestamp: string + duration?: number // Generierungszeit in ms +} + +// Priority-Mapping fĂŒr BullMQ (niedrigere Zahl = höhere PrioritĂ€t) +const PRIORITY_MAP = { + high: 1, + normal: 5, + low: 10, +} + +/** + * FĂŒgt einen PDF-Job zur Queue hinzu + */ +export async function enqueuePdf(data: PdfJobData): Promise> { + const queue = getQueue(QUEUE_NAMES.PDF) + + // Validierung + if (data.source === 'html' && !data.html) { + throw new Error('HTML content required when source is "html"') + } + if (data.source === 'url' && !data.url) { + throw new Error('URL required when source is "url"') + } + + const jobOptions = { + ...defaultJobOptions, + priority: PRIORITY_MAP[data.priority || 'normal'], + // PDF-Jobs können lĂ€nger dauern + attempts: 2, + backoff: { + type: 'fixed' as const, + delay: 5000, // 5s zwischen Retries + }, + // Job-ID fĂŒr Deduplizierung (optional) + jobId: data.correlationId, + } + + const job = await queue.add('generate-pdf', data, jobOptions) + + console.log(`[PdfQueue] Job ${job.id} queued (source: ${data.source}, type: ${data.documentType || 'other'})`) + + return job +} + +/** + * FĂŒgt einen verzögerten PDF-Job zur Queue hinzu + */ +export async function enqueueDelayedPdf( + data: PdfJobData, + delayMs: number, +): Promise> { + const queue = getQueue(QUEUE_NAMES.PDF) + + const job = await queue.add('generate-pdf', data, { + ...defaultJobOptions, + priority: PRIORITY_MAP[data.priority || 'normal'], + delay: delayMs, + attempts: 2, + }) + + console.log(`[PdfQueue] Delayed job ${job.id} queued (delay: ${delayMs}ms)`) + + return job +} + +/** + * FĂŒgt mehrere PDF-Jobs als Batch zur Queue hinzu + */ +export async function enqueuePdfBatch( + pdfs: PdfJobData[], +): Promise[]> { + const queue = getQueue(QUEUE_NAMES.PDF) + + const jobs = await queue.addBulk( + pdfs.map((data) => ({ + name: 'generate-pdf', + data, + opts: { + ...defaultJobOptions, + priority: PRIORITY_MAP[data.priority || 'normal'], + attempts: 2, + }, + })), + ) + + console.log(`[PdfQueue] ${jobs.length} batch jobs queued`) + + return jobs +} + +/** + * PrĂŒft ob ein Job bereits existiert (fĂŒr Deduplizierung) + */ +export async function isPdfJobPending(correlationId: string): Promise { + const queue = getQueue(QUEUE_NAMES.PDF) + + const job = await queue.getJob(correlationId) + if (!job) return false + + const state = await job.getState() + return state === 'waiting' || state === 'active' || state === 'delayed' +} + +/** + * Holt den Status eines PDF-Jobs + */ +export async function getPdfJobStatus(jobId: string): Promise<{ + state: string + progress: number + result?: PdfJobResult + failedReason?: string +} | null> { + const queue = getQueue(QUEUE_NAMES.PDF) + + const job = await queue.getJob(jobId) + if (!job) return null + + const [state, progress] = await Promise.all([job.getState(), job.progress]) + + return { + state, + progress: typeof progress === 'number' ? progress : 0, + result: job.returnvalue as PdfJobResult | undefined, + failedReason: job.failedReason, + } +} + +/** + * Holt das Ergebnis eines abgeschlossenen PDF-Jobs + */ +export async function getPdfJobResult(jobId: string): Promise { + const queue = getQueue(QUEUE_NAMES.PDF) + + const job = await queue.getJob(jobId) + if (!job) return null + + const state = await job.getState() + if (state !== 'completed') return null + + return job.returnvalue as PdfJobResult +} diff --git a/src/lib/queue/queue-service.ts b/src/lib/queue/queue-service.ts new file mode 100644 index 0000000..6b35d0c --- /dev/null +++ b/src/lib/queue/queue-service.ts @@ -0,0 +1,202 @@ +/** + * BullMQ Queue Service + * + * Zentrale Queue-Verwaltung fĂŒr Background Jobs. + * Verwendet Redis als Backend (gleiche Instanz wie Caching). + */ + +import { Queue, QueueEvents, JobsOptions } from 'bullmq' +import IORedis from 'ioredis' + +// Queue-Namen +export const QUEUE_NAMES = { + EMAIL: 'email', + PDF: 'pdf', + CLEANUP: 'cleanup', +} as const + +export type QueueName = (typeof QUEUE_NAMES)[keyof typeof QUEUE_NAMES] + +// Redis-Konfiguration fĂŒr Queue (separate DB) +const QUEUE_REDIS_DB = parseInt(process.env.QUEUE_REDIS_DB || '1', 10) + +// Gemeinsame Redis-Connection fĂŒr alle Queues +let redisConnection: IORedis | null = null + +/** + * Erstellt oder gibt die existierende Redis-Connection zurĂŒck + */ +export function getQueueRedisConnection(): IORedis { + if (!redisConnection) { + const host = process.env.REDIS_HOST || 'localhost' + const port = parseInt(process.env.REDIS_PORT || '6379', 10) + + redisConnection = new IORedis({ + host, + port, + db: QUEUE_REDIS_DB, + maxRetriesPerRequest: null, // BullMQ requirement + enableReadyCheck: false, + }) + + redisConnection.on('error', (err) => { + console.error('[Queue] Redis connection error:', err.message) + }) + + redisConnection.on('connect', () => { + console.log(`[Queue] Redis connected (db: ${QUEUE_REDIS_DB})`) + }) + } + + return redisConnection +} + +// Queue-Instanzen Cache +const queues = new Map() +const queueEvents = new Map() + +/** + * Standard Job-Optionen + */ +export const defaultJobOptions: JobsOptions = { + attempts: parseInt(process.env.QUEUE_DEFAULT_RETRY || '3', 10), + backoff: { + type: 'exponential', + delay: 1000, // 1s, 2s, 4s, 8s, ... + }, + removeOnComplete: { + count: 100, // Behalte letzte 100 erfolgreiche Jobs + age: 24 * 60 * 60, // Oder 24 Stunden + }, + removeOnFail: { + count: 500, // Behalte letzte 500 fehlgeschlagene Jobs + age: 7 * 24 * 60 * 60, // Oder 7 Tage + }, +} + +/** + * Erstellt oder gibt eine existierende Queue zurĂŒck + */ +export function getQueue(name: QueueName): Queue { + if (!queues.has(name)) { + const queue = new Queue(name, { + connection: getQueueRedisConnection(), + defaultJobOptions, + }) + + queues.set(name, queue) + console.log(`[Queue] Queue "${name}" initialized`) + } + + return queues.get(name)! +} + +/** + * Erstellt oder gibt QueueEvents fĂŒr eine Queue zurĂŒck + */ +export function getQueueEvents(name: QueueName): QueueEvents { + if (!queueEvents.has(name)) { + const events = new QueueEvents(name, { + connection: getQueueRedisConnection(), + }) + + queueEvents.set(name, events) + } + + return queueEvents.get(name)! +} + +/** + * Gibt alle aktiven Queues zurĂŒck (fĂŒr bull-board) + */ +export function getAllQueues(): Queue[] { + // Initialisiere alle Queues falls noch nicht geschehen + Object.values(QUEUE_NAMES).forEach((name) => getQueue(name)) + return Array.from(queues.values()) +} + +/** + * Schließt alle Queue-Verbindungen (fĂŒr graceful shutdown) + */ +export async function closeAllQueues(): Promise { + console.log('[Queue] Closing all queues...') + + // Schließe QueueEvents + const eventEntries = Array.from(queueEvents.values()) + for (const events of eventEntries) { + await events.close() + } + queueEvents.clear() + + // Schließe Queues + const queueEntries = Array.from(queues.values()) + for (const queue of queueEntries) { + await queue.close() + } + queues.clear() + + // Schließe Redis Connection + if (redisConnection) { + await redisConnection.quit() + redisConnection = null + } + + console.log('[Queue] All queues closed') +} + +/** + * PrĂŒft ob Queue-System verfĂŒgbar ist + */ +export async function isQueueAvailable(): Promise { + try { + const connection = getQueueRedisConnection() + const pong = await connection.ping() + return pong === 'PONG' + } catch { + return false + } +} + +/** + * Queue-Status fĂŒr Monitoring + */ +export interface QueueStatus { + name: string + waiting: number + active: number + completed: number + failed: number + delayed: number + paused: boolean +} + +/** + * Gibt den Status aller Queues zurĂŒck + */ +export async function getQueuesStatus(): Promise { + const statuses: QueueStatus[] = [] + + const entries = Array.from(queues.entries()) + for (const [name, queue] of entries) { + const [waiting, active, completed, failed, delayed, isPaused] = await Promise.all([ + queue.getWaitingCount(), + queue.getActiveCount(), + queue.getCompletedCount(), + queue.getFailedCount(), + queue.getDelayedCount(), + queue.isPaused(), + ]) + + statuses.push({ + name, + waiting, + active, + completed, + failed, + delayed, + paused: isPaused, + }) + } + + return statuses +} diff --git a/src/lib/queue/workers/email-worker.ts b/src/lib/queue/workers/email-worker.ts new file mode 100644 index 0000000..f9ffc5a --- /dev/null +++ b/src/lib/queue/workers/email-worker.ts @@ -0,0 +1,135 @@ +/** + * Email Worker + * + * Verarbeitet E-Mail-Jobs aus der Queue. + * Nutzt den bestehenden TenantEmailService. + */ + +import { Worker, Job } from 'bullmq' +import { getPayload } from 'payload' +import config from '@payload-config' +import { QUEUE_NAMES, getQueueRedisConnection } from '../queue-service' +import type { EmailJobData, EmailJobResult } from '../jobs/email-job' +import { sendTenantEmail, type EmailSource } from '../../email/tenant-email-service' + +// Worker-Konfiguration +const CONCURRENCY = parseInt(process.env.QUEUE_EMAIL_CONCURRENCY || '3', 10) + +/** + * E-Mail Job Processor + */ +async function processEmailJob(job: Job): Promise { + const { tenantId, to, subject, html, text, replyTo, source, correlationId } = job.data + + console.log(`[EmailWorker] Processing job ${job.id} for tenant ${tenantId}`) + + try { + // Payload-Instanz holen (fĂŒr DB-Zugriff und Logging) + const payload = await getPayload({ config }) + + // E-Mail ĂŒber bestehenden Service senden + const result = await sendTenantEmail(payload, tenantId, { + to, + subject, + html, + text, + replyTo, + source: (source as EmailSource) || 'system', + metadata: { + queueJobId: job.id, + correlationId, + attempts: job.attemptsMade + 1, + }, + }) + + if (!result.success) { + throw new Error(result.error || 'Unknown email error') + } + + const jobResult: EmailJobResult = { + success: true, + messageId: result.messageId, + timestamp: new Date().toISOString(), + attempts: job.attemptsMade + 1, + } + + console.log(`[EmailWorker] Job ${job.id} completed: ${result.messageId}`) + return jobResult + + } catch (error) { + const errorMessage = error instanceof Error ? error.message : 'Unknown error' + console.error(`[EmailWorker] Job ${job.id} failed:`, errorMessage) + + // Error werfen damit BullMQ Retry-Logik greift + throw error + } +} + +/** + * Email Worker Instanz + */ +let emailWorker: Worker | null = null + +/** + * Startet den Email Worker + */ +export function startEmailWorker(): Worker { + if (emailWorker) { + console.warn('[EmailWorker] Worker already running') + return emailWorker + } + + emailWorker = new Worker( + QUEUE_NAMES.EMAIL, + processEmailJob, + { + connection: getQueueRedisConnection(), + concurrency: CONCURRENCY, + // Stalled Job Detection + stalledInterval: 30000, // 30s + maxStalledCount: 2, + } + ) + + // Event Handlers + emailWorker.on('ready', () => { + console.log(`[EmailWorker] Ready (concurrency: ${CONCURRENCY})`) + }) + + emailWorker.on('completed', (job, result) => { + console.log(`[EmailWorker] Job ${job.id} completed in ${Date.now() - job.timestamp}ms`) + }) + + emailWorker.on('failed', (job, error) => { + console.error(`[EmailWorker] Job ${job?.id} failed after ${job?.attemptsMade} attempts:`, error.message) + }) + + emailWorker.on('stalled', (jobId) => { + console.warn(`[EmailWorker] Job ${jobId} stalled`) + }) + + emailWorker.on('error', (error) => { + console.error('[EmailWorker] Error:', error) + }) + + return emailWorker +} + +/** + * Stoppt den Email Worker + */ +export async function stopEmailWorker(): Promise { + if (emailWorker) { + console.log('[EmailWorker] Stopping...') + await emailWorker.close() + emailWorker = null + console.log('[EmailWorker] Stopped') + } +} + +/** + * Gibt die Worker-Instanz zurĂŒck (falls aktiv) + */ +export function getEmailWorker(): Worker | null { + return emailWorker +} diff --git a/src/lib/queue/workers/pdf-worker.ts b/src/lib/queue/workers/pdf-worker.ts new file mode 100644 index 0000000..8395c1d --- /dev/null +++ b/src/lib/queue/workers/pdf-worker.ts @@ -0,0 +1,172 @@ +/** + * PDF Worker + * + * Verarbeitet PDF-Generierungs-Jobs aus der Queue. + * Nutzt den PDF-Service fĂŒr die eigentliche Generierung. + */ + +import { Worker, Job } from 'bullmq' +import { QUEUE_NAMES, getQueueRedisConnection } from '../queue-service' +import type { PdfJobData, PdfJobResult } from '../jobs/pdf-job' +import { + generatePdfFromHtml, + generatePdfFromUrl, + generatePdfFilename, + closeBrowser, +} from '../../pdf/pdf-service' + +// Worker-Konfiguration +const CONCURRENCY = parseInt(process.env.QUEUE_PDF_CONCURRENCY || '2', 10) + +/** + * PDF Job Processor + */ +async function processPdfJob(job: Job): Promise { + const { + source, + html, + url, + outputPath, + filename, + options = {}, + tenantId, + documentType, + correlationId, + } = job.data + + console.log(`[PdfWorker] Processing job ${job.id} for tenant ${tenantId} (source: ${source})`) + + const startTime = Date.now() + + try { + // Dateiname generieren falls nicht angegeben + const targetFilename = filename || outputPath || generatePdfFilename('document', tenantId, documentType) + + let result + + if (source === 'html') { + if (!html) { + throw new Error('HTML content is required for source="html"') + } + + result = await generatePdfFromHtml(html, options, outputPath ? targetFilename : undefined) + } else if (source === 'url') { + if (!url) { + throw new Error('URL is required for source="url"') + } + + result = await generatePdfFromUrl(url, options, outputPath ? targetFilename : undefined) + } else { + throw new Error(`Invalid source: ${source}`) + } + + if (!result.success) { + throw new Error(result.error || 'PDF generation failed') + } + + const jobResult: PdfJobResult = { + success: true, + outputPath: result.outputPath, + filename: result.filename || targetFilename, + pageCount: result.pageCount, + fileSize: result.fileSize, + timestamp: new Date().toISOString(), + duration: Date.now() - startTime, + } + + // Buffer als Base64 zurĂŒckgeben wenn kein outputPath + if (!outputPath && result.buffer) { + jobResult.buffer = result.buffer.toString('base64') + } + + console.log(`[PdfWorker] Job ${job.id} completed: ${result.fileSize} bytes, ${result.pageCount} pages`) + + return jobResult + } catch (error) { + const errorMessage = error instanceof Error ? error.message : 'Unknown error' + console.error(`[PdfWorker] Job ${job.id} failed:`, errorMessage) + + // Error werfen damit BullMQ Retry-Logik greift + throw error + } +} + +/** + * PDF Worker Instanz + */ +let pdfWorker: Worker | null = null + +/** + * Startet den PDF Worker + */ +export function startPdfWorker(): Worker { + if (pdfWorker) { + console.warn('[PdfWorker] Worker already running') + return pdfWorker + } + + pdfWorker = new Worker( + QUEUE_NAMES.PDF, + processPdfJob, + { + connection: getQueueRedisConnection(), + concurrency: CONCURRENCY, + // Stalled Job Detection (PDF-Jobs können lĂ€nger dauern) + stalledInterval: 60000, // 60s + maxStalledCount: 1, + // LĂ€ngerer Lock fĂŒr PDF-Generierung + lockDuration: 120000, // 2 Minuten + }, + ) + + // Event Handlers + pdfWorker.on('ready', () => { + console.log(`[PdfWorker] Ready (concurrency: ${CONCURRENCY})`) + }) + + pdfWorker.on('completed', (job, result) => { + console.log( + `[PdfWorker] Job ${job.id} completed in ${Date.now() - job.timestamp}ms (${result.fileSize} bytes)`, + ) + }) + + pdfWorker.on('failed', (job, error) => { + console.error( + `[PdfWorker] Job ${job?.id} failed after ${job?.attemptsMade} attempts:`, + error.message, + ) + }) + + pdfWorker.on('stalled', (jobId) => { + console.warn(`[PdfWorker] Job ${jobId} stalled`) + }) + + pdfWorker.on('error', (error) => { + console.error('[PdfWorker] Error:', error) + }) + + return pdfWorker +} + +/** + * Stoppt den PDF Worker + */ +export async function stopPdfWorker(): Promise { + if (pdfWorker) { + console.log('[PdfWorker] Stopping...') + await pdfWorker.close() + pdfWorker = null + + // Browser schließen + await closeBrowser() + + console.log('[PdfWorker] Stopped') + } +} + +/** + * Gibt die Worker-Instanz zurĂŒck (falls aktiv) + */ +export function getPdfWorker(): Worker | null { + return pdfWorker +} diff --git a/src/lib/search.ts b/src/lib/search.ts index 640212d..ecf2983 100644 --- a/src/lib/search.ts +++ b/src/lib/search.ts @@ -1,8 +1,23 @@ // src/lib/search.ts // Shared search library with caching, rate limiting, and search functions +// Supports both ILIKE (default) and PostgreSQL Full-Text Search (FTS) import type { Payload, Where } from 'payload' import type { Post, Category } from '../payload-types' +import { sql } from '@payloadcms/db-postgres/drizzle' + +// FTS Feature Flag - aktivieren mit USE_FTS=true in .env +const USE_FTS = process.env.USE_FTS === 'true' + +// Type fĂŒr raw SQL Ergebnisse +interface FtsCountRow { + total: string +} + +interface FtsIdRow { + id: number + rank: number +} // ============================================================================ // Types @@ -236,13 +251,150 @@ function generateCacheKey(prefix: string, params: Record): stri return `${prefix}:${JSON.stringify(sortedEntries)}` } +// ============================================================================ +// Full-Text Search (FTS) Functions +// ============================================================================ + +/** + * PostgreSQL FTS-Konfiguration nach Sprache + */ +function getFtsConfig(locale: string): string { + return locale === 'en' ? 'english' : 'german' +} + +/** + * Bereitet Suchquery fĂŒr FTS vor + * Konvertiert "foo bar" zu "foo:* & bar:*" fĂŒr Prefix-Suche + */ +function prepareFtsQuery(query: string): string { + return query + .trim() + .split(/\s+/) + .filter(term => term.length >= 2) + .map(term => `${term}:*`) + .join(' & ') +} + +/** + * Escapet einen String fĂŒr sichere Verwendung in SQL + * Verhindert SQL Injection bei dynamischen Werten + */ +function escapeSqlString(str: string): string { + return str.replace(/'/g, "''") +} + +/** + * FTS-basierte Suche mit PostgreSQL to_tsvector + * Verwendet die GIN-Indexes auf posts_locales + */ +async function searchWithFts( + payload: Payload, + params: SearchParams, +): Promise<{ postIds: number[]; totalCount: number }> { + const { query, tenantId, type, locale = 'de', limit = 10, offset = 0 } = params + + if (!query || query.length < 2) { + return { postIds: [], totalCount: 0 } + } + + const ftsConfig = getFtsConfig(locale) + const ftsQuery = prepareFtsQuery(query) + + if (!ftsQuery) { + return { postIds: [], totalCount: 0 } + } + + // Guard: Check if payload.db exists (may be missing in test mocks) + if (!payload.db || !payload.db.drizzle) { + console.warn('[FTS Search] payload.db.drizzle not available, returning empty results') + return { postIds: [], totalCount: 0 } + } + + // Drizzle-ORM fĂŒr raw SQL Query + const db = payload.db.drizzle + + // Escape Werte fĂŒr sichere SQL-Queries + const safeFtsQuery = escapeSqlString(ftsQuery) + const safeLocale = escapeSqlString(locale) + + // Baue die SQL Query fĂŒr FTS + const conditions: string[] = [] + + // FTS-Bedingung (Title + Excerpt) + conditions.push(`to_tsvector('${ftsConfig}', COALESCE(pl.title, '') || ' ' || COALESCE(pl.excerpt, '')) @@ to_tsquery('${ftsConfig}', '${safeFtsQuery}')`) + + // Locale-Filter + conditions.push(`pl._locale = '${safeLocale}'`) + + // Status-Filter (nur published) + conditions.push(`p.status = 'published'`) + + // Tenant-Filter + if (tenantId && Number.isInteger(tenantId)) { + conditions.push(`p.tenant_id = ${tenantId}`) + } + + // Type-Filter + if (type) { + const safeType = escapeSqlString(type) + conditions.push(`p.type = '${safeType}'`) + } + + const whereClause = conditions.join(' AND ') + + // Count Query fĂŒr Pagination + const countQuery = ` + SELECT COUNT(DISTINCT p.id) as total + FROM posts p + INNER JOIN posts_locales pl ON p.id = pl._parent_id + WHERE ${whereClause} + ` + + // Main Query mit Relevanz-Sortierung + // Note: SELECT DISTINCT requires all ORDER BY columns to be in SELECT list + const mainQuery = ` + SELECT DISTINCT p.id, + ts_rank( + to_tsvector('${ftsConfig}', COALESCE(pl.title, '') || ' ' || COALESCE(pl.excerpt, '')), + to_tsquery('${ftsConfig}', '${safeFtsQuery}') + ) as rank, + p.published_at + FROM posts p + INNER JOIN posts_locales pl ON p.id = pl._parent_id + WHERE ${whereClause} + ORDER BY rank DESC, p.published_at DESC NULLS LAST + LIMIT ${Math.min(limit, 100)} OFFSET ${Math.max(0, offset)} + ` + + try { + // Execute queries mit drizzle-orm sql template + const [countResult, mainResult] = await Promise.all([ + db.execute(sql.raw(countQuery)), + db.execute(sql.raw(mainQuery)), + ]) + + // Extrahiere die Ergebnisse - node-postgres gibt { rows: [...] } zurĂŒck + const countRows = (countResult as unknown as { rows: FtsCountRow[] }).rows || [] + const totalCount = countRows.length > 0 ? parseInt(countRows[0].total, 10) : 0 + + const mainRows = (mainResult as unknown as { rows: FtsIdRow[] }).rows || [] + const postIds = mainRows.map((row) => row.id) + + return { postIds, totalCount } + } catch (error) { + console.error('[FTS Search] SQL Error:', error) + // Fallback zu leeren Ergebnissen bei Fehler + return { postIds: [], totalCount: 0 } + } +} + // ============================================================================ // Search Functions // ============================================================================ /** - * Search posts with ILIKE (Phase 1) - * Future: Add FTS support with USE_FTS=true + * Search posts with ILIKE or FTS (when USE_FTS=true) + * FTS uses PostgreSQL to_tsvector for better performance and relevance */ export async function searchPosts( payload: Payload, @@ -251,31 +403,13 @@ export async function searchPosts( const { query, tenantId, categorySlug, type, locale = 'de', limit = 10, offset = 0 } = params // Check cache - const cacheKey = generateCacheKey('search', { query, tenantId, categorySlug, type, locale, limit, offset }) + const cacheKey = generateCacheKey('search', { query, tenantId, categorySlug, type, locale, limit, offset, fts: USE_FTS }) const cached = searchCache.get(cacheKey) if (cached) { return cached } - // Build where clause - const whereConditions: Where[] = [{ status: { equals: 'published' } }] - - if (tenantId) { - whereConditions.push({ tenant: { equals: tenantId } }) - } - - if (type) { - whereConditions.push({ type: { equals: type } }) - } - - // Search in title and excerpt - if (query && query.length >= 2) { - whereConditions.push({ - or: [{ title: { contains: query } }, { excerpt: { contains: query } }], - }) - } - - // Category filter - need to lookup category first if categorySlug provided + // Category lookup (benötigt fĂŒr beide Methoden) let categoryId: number | undefined if (categorySlug) { const categoryResult = await payload.find({ @@ -290,27 +424,90 @@ export async function searchPosts( }) if (categoryResult.docs.length > 0) { categoryId = categoryResult.docs[0].id - whereConditions.push({ category: { equals: categoryId } }) } } - const where: Where = whereConditions.length > 1 ? { and: whereConditions } : whereConditions[0] + let result: Awaited> + let totalDocs: number - // Execute search - const result = await payload.find({ - collection: 'posts', - where, - locale: locale as 'de' | 'en', - fallbackLocale: 'de', - limit, - page: Math.floor(offset / limit) + 1, - sort: '-publishedAt', - depth: 1, // Include category relation - }) + // Verwende FTS wenn aktiviert und Query vorhanden + if (USE_FTS && query && query.length >= 2) { + // FTS-basierte Suche + const ftsResult = await searchWithFts(payload, { ...params, categorySlug: undefined }) - // Transform results + if (ftsResult.postIds.length === 0) { + // Keine FTS-Ergebnisse + result = { docs: [], totalDocs: 0, page: 1, totalPages: 0, hasNextPage: false, hasPrevPage: false, limit, pagingCounter: 1 } + totalDocs = 0 + } else { + // Lade Posts nach FTS-IDs (mit Category-Filter wenn nötig) + const whereConditions: Where[] = [ + { id: { in: ftsResult.postIds } }, + ] + if (categoryId) { + whereConditions.push({ category: { equals: categoryId } }) + } + + result = await payload.find({ + collection: 'posts', + where: whereConditions.length > 1 ? { and: whereConditions } : whereConditions[0], + locale: locale as 'de' | 'en', + fallbackLocale: 'de', + limit: ftsResult.postIds.length, // Alle gefundenen IDs laden + sort: '-publishedAt', + depth: 1, + }) + + // Sortiere nach FTS-Reihenfolge (Relevanz) + const orderedDocs = ftsResult.postIds + .map(id => result.docs.find(doc => doc.id === id)) + .filter((doc): doc is Post => doc !== undefined) + + result = { ...result, docs: orderedDocs } + totalDocs = categoryId ? result.totalDocs : ftsResult.totalCount + } + } else { + // Standard ILIKE-basierte Suche + const whereConditions: Where[] = [{ status: { equals: 'published' } }] + + if (tenantId) { + whereConditions.push({ tenant: { equals: tenantId } }) + } + + if (type) { + whereConditions.push({ type: { equals: type } }) + } + + // Search in title and excerpt mit ILIKE + if (query && query.length >= 2) { + whereConditions.push({ + or: [{ title: { contains: query } }, { excerpt: { contains: query } }], + }) + } + + if (categoryId) { + whereConditions.push({ category: { equals: categoryId } }) + } + + const where: Where = whereConditions.length > 1 ? { and: whereConditions } : whereConditions[0] + + result = await payload.find({ + collection: 'posts', + where, + locale: locale as 'de' | 'en', + fallbackLocale: 'de', + limit, + page: Math.floor(offset / limit) + 1, + sort: '-publishedAt', + depth: 1, + }) + totalDocs = result.totalDocs + } + + // Transform results - Cast docs to Post[] for proper typing + const posts = result.docs as Post[] const searchResult: SearchResult = { - results: result.docs.map((post) => ({ + results: posts.map((post) => ({ id: post.id, title: post.title, slug: post.slug, @@ -323,7 +520,7 @@ export async function searchPosts( .map((cat) => ({ name: cat.name, slug: cat.slug })) : [], })), - total: result.totalDocs, + total: totalDocs, query, filters: { category: categorySlug,