feat: BullMQ queue system for email and PDF processing

- Add BullMQ-based job queue with Redis backend
- Implement email worker with tenant-specific SMTP support
- Add PDF worker with Playwright for HTML/URL-to-PDF generation
- Create /api/generate-pdf endpoint with job status polling
- Fix TypeScript errors in Tenants, TenantBreadcrumb, TenantDashboard
- Fix type casts in auditAuthEvents and audit-service
- Remove credentials from ecosystem.config.cjs (now loaded via dotenv)
- Fix ESM __dirname issue with fileURLToPath for PM2 compatibility

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Martin Porwoll 2025-12-09 22:59:17 +00:00
parent 6ccb50c5f4
commit ce4962e74b
23 changed files with 2623 additions and 96 deletions

View file

@ -297,6 +297,39 @@ redis-cli flushdb
redis-cli monitor redis-cli monitor
``` ```
### Background Jobs (geplant)
**Evaluation (09.12.2025): BullMQ vs Agenda.js**
| Kriterium | BullMQ | Agenda.js |
|-----------|--------|-----------|
| Database | Redis ✅ | MongoDB ❌ |
| TypeScript | Native ✅ | Begrenzt ⚠️ |
| Priority Jobs | Ja ✅ | Nein ❌ |
| Rate Limiting | Ja ✅ | Nein ❌ |
| UI Dashboard | @bull-board ✅ | Keine ❌ |
**Entscheidung: BullMQ** (Redis bereits vorhanden, TypeScript-native, @bull-board UI)
**Geplante Konfiguration:**
```bash
# Environment Variables (wenn implementiert)
QUEUE_REDIS_URL=redis://localhost:6379/1 # Separate DB für Jobs
QUEUE_CONCURRENCY=5 # Parallele Worker
QUEUE_DEFAULT_RETRY=3 # Wiederholungsversuche
```
**PM2 Worker-Konfiguration (geplant):**
```javascript
// ecosystem.config.cjs - Erweiterung
{
name: 'queue-worker',
script: './scripts/run-queue-worker.ts',
instances: 1,
max_memory_restart: '500M'
}
```
--- ---
## Deployment Workflow ## Deployment Workflow

View file

@ -253,27 +253,96 @@
### Mittlere Priorität - Performance & Skalierung ### Mittlere Priorität - Performance & Skalierung
#### Search Performance #### Search Performance
- [ ] **Full-Text-Search aktivieren** - [x] **Full-Text-Search aktivieren** (Erledigt: 09.12.2025)
- [ ] `USE_FTS=true` in Production setzen - [x] `USE_FTS=true` in Production gesetzt
- [ ] PostgreSQL `to_tsvector`-Indices erstellen - [x] PostgreSQL `to_tsvector`-Indices erstellt:
- [ ] Performance-Test mit Produktionsdaten - `posts_locales_fts_title_idx` (GIN auf title)
- `posts_locales_fts_excerpt_idx` (GIN auf excerpt)
- `posts_locales_fts_combined_idx` (GIN auf title + excerpt)
- `pages_locales_fts_title_idx` (GIN auf title)
- `categories_locales_fts_name_idx` (GIN auf name)
- [x] Deutsche Sprachkonfiguration (`german` config)
- [x] Relevanz-Ranking mit `ts_rank()`
- [x] Prefix-Suche mit `:*` Operator
- [x] Fallback auf ILIKE bei `USE_FTS=false`
- [ ] **Redis-Migration für Caches** - [ ] **Redis-Migration für Caches**
- [ ] Search-Cache von In-Memory auf Redis migrieren - [ ] Search-Cache von In-Memory auf Redis migrieren
- [ ] Rate-Limit-Maps auf Redis migrieren - [ ] Rate-Limit-Maps auf Redis migrieren
- [ ] Suggestions-Cache auf Redis - [ ] Suggestions-Cache auf Redis
#### Background Jobs #### Background Jobs
- [ ] **Queue-System implementieren** - [x] **Queue-System implementieren** (Erledigt: 09.12.2025)
- [ ] BullMQ oder Agenda.js evaluieren - [x] BullMQ oder Agenda.js evaluieren → **Empfehlung: BullMQ**
- [ ] E-Mail-Versand über Queue (non-blocking) - [x] E-Mail-Versand über Queue (non-blocking)
- [ ] PDF-Generierung über Queue - Queue-Service: `src/lib/queue/queue-service.ts`
- [ ] Job-Dashboard im Admin - Email-Job: `src/lib/queue/jobs/email-job.ts`
- Email-Worker: `src/lib/queue/workers/email-worker.ts`
- API-Integration: `queued: true` Option in `/api/send-email`
- [x] PDF-Generierung über Queue (Erledigt: 09.12.2025)
- PDF-Job: `src/lib/queue/jobs/pdf-job.ts`
- PDF-Service: `src/lib/pdf/pdf-service.ts` (Playwright-basiert)
- PDF-Worker: `src/lib/queue/workers/pdf-worker.ts`
- API-Endpoint: `/api/generate-pdf` (POST für Generierung, GET für Job-Status)
- Unterstützt HTML-zu-PDF und URL-zu-PDF
- PM2-Integration mit konfigurierbaren Workern
- [x] Job-Dashboard im Admin
- Queue-Status: `GET /api/admin/queues` (SuperAdmin only)
- Queue-Jobs: `GET /api/admin/queues/[name]/jobs`
- Queue-Actions: `POST /api/admin/queues` (pause, resume, clean, drain)
**Evaluation BullMQ vs Agenda.js:**
| Kriterium | BullMQ | Agenda.js |
|-----------|--------|-----------|
| **Database** | Redis ✅ (bereits vorhanden) | MongoDB ❌ (neue Dependency) |
| **TypeScript** | Native ✅ | Begrenzt ⚠️ |
| **Priority Jobs** | Ja ✅ | Nein ❌ |
| **Rate Limiting** | Ja ✅ | Nein ❌ |
| **Delayed Jobs** | Ja ✅ | Ja ✅ |
| **Repeatable Jobs** | Ja ✅ | Ja ✅ |
| **UI Dashboard** | @bull-board ✅ | Keine Built-in ❌ |
| **Weekly Downloads** | 1.6M | 120K |
| **Maintenance** | Aktiv | Weniger aktiv |
**Entscheidung: BullMQ** wegen:
1. Redis bereits im Stack (keine neue DB)
2. Native TypeScript-Unterstützung
3. Priority Jobs & Rate Limiting für Multi-Tenant
4. @bull-board für Admin-Dashboard
5. Höhere Aktivität und Downloads
**Implementierungsplan:**
1. `pnpm add bullmq @bull-board/api @bull-board/express`
2. Queue-Service (`src/lib/queue/queue-service.ts`)
3. Job-Definitionen (`src/lib/queue/jobs/`)
4. Worker-Script (`scripts/run-queue-worker.ts`)
5. PM2-Integration (separater Prozess)
6. Admin-Dashboard Route (`/admin/jobs`)
**Betroffene Dateien für E-Mail-Queue:**
- `src/lib/email/tenant-email-service.ts`
- `src/app/(payload)/api/send-email/route.ts`
- `src/app/(payload)/api/test-email/route.ts`
- `src/lib/alerting/alert-service.ts`
- `src/hooks/sendFormNotification.ts`
#### Database Optimization #### Database Optimization
- [ ] **Index-Audit** - [x] **Index-Audit** (Erledigt: 09.12.2025)
- [ ] Composite-Indices für lokalisierte Felder (slug + locale) - [x] Composite-Indices für lokalisierte Felder (slug + locale)
- [ ] Query-Performance-Analyse - `posts_locales_slug_locale_idx`
- [ ] EXPLAIN ANALYZE für häufige Queries - `pages_locales_slug_locale_idx`
- `categories_locales_slug_locale_idx`
- [x] Query-Performance-Analyse
- [x] EXPLAIN ANALYZE für häufige Queries
- [x] Zusätzliche Performance-Indexes erstellt:
- `posts_status_tenant_idx` (status + tenant)
- `posts_type_tenant_idx` (type + tenant)
- `posts_published_at_idx` (chronologische Sortierung)
- `posts_is_featured_tenant_idx` (partial index)
- `email_logs_status_tenant_idx`, `email_logs_status_created_at_idx`
- `audit_logs_action_created_at_idx`
- `newsletter_subscribers_status_tenant_idx`
- `consent_logs_created_at_desc_idx`
- [ ] **Connection Pooling** - [ ] **Connection Pooling**
- [ ] PgBouncer evaluieren für Multi-Instanz-Betrieb - [ ] PgBouncer evaluieren für Multi-Instanz-Betrieb
@ -431,7 +500,7 @@
1. ~~**[KRITISCH]** AuditLogs Collection implementieren~~ ✅ Erledigt 1. ~~**[KRITISCH]** AuditLogs Collection implementieren~~ ✅ Erledigt
2. **[KRITISCH]** Automatisierte Backups einrichten 2. **[KRITISCH]** Automatisierte Backups einrichten
3. **[HOCH]** Full-Text-Search aktivieren (USE_FTS=true) 3. ~~**[HOCH]** Full-Text-Search aktivieren (USE_FTS=true)~~ ✅ Erledigt
4. **[HOCH]** Rate-Limits auf Redis migrieren (In-Memory-Fallback funktioniert) 4. **[HOCH]** Rate-Limits auf Redis migrieren (In-Memory-Fallback funktioniert)
5. ~~**[MITTEL]** CI/CD Pipeline mit GitHub Actions~~ ✅ security.yml erstellt 5. ~~**[MITTEL]** CI/CD Pipeline mit GitHub Actions~~ ✅ security.yml erstellt
6. **[MITTEL]** Frontend-Entwicklung starten 6. **[MITTEL]** Frontend-Entwicklung starten
@ -457,6 +526,40 @@
- **Admin Login Fix:** Custom Login-Route unterstützt nun `_payload` JSON-Feld aus multipart/form-data (Payload Admin Panel Format) - **Admin Login Fix:** Custom Login-Route unterstützt nun `_payload` JSON-Feld aus multipart/form-data (Payload Admin Panel Format)
- **Dokumentation bereinigt:** Obsolete PROMPT_*.md Instruktionsdateien gelöscht - **Dokumentation bereinigt:** Obsolete PROMPT_*.md Instruktionsdateien gelöscht
- **CLAUDE.md aktualisiert:** Security-Features, Test Suite, AuditLogs dokumentiert - **CLAUDE.md aktualisiert:** Security-Features, Test Suite, AuditLogs dokumentiert
- **Index-Audit:** 12 neue Performance-Indexes für PostgreSQL erstellt
- Composite-Indexes für slug+locale auf posts_locales, pages_locales, categories_locales
- Status/Tenant-Indexes für posts, email_logs, newsletter_subscribers
- Partial Index für Featured Posts
- Chronologische Sortierung für published_at, created_at
### 10.12.2025 ### 10.12.2025
- **Audit-Fixes:** Vitest auf 3.2.4 aktualisiert, Payload-Mocks im Security-Test ergänzt - **Audit-Fixes:** Vitest auf 3.2.4 aktualisiert, Payload-Mocks im Security-Test ergänzt
### 09.12.2025 (Fortsetzung)
- **Full-Text-Search:** PostgreSQL FTS mit GIN-Indexes aktiviert
- 5 FTS-Indexes auf posts_locales, pages_locales, categories_locales
- Deutsche Sprachkonfiguration (`german` config)
- Relevanz-Ranking mit `ts_rank()`
- Feature-Flag `USE_FTS=true` in .env
- **Queue-System Evaluation:** BullMQ vs Agenda.js evaluiert
- **Empfehlung: BullMQ** (Redis-basiert, TypeScript-native, @bull-board UI)
- Implementierungsplan dokumentiert
- Betroffene Dateien identifiziert
- **BullMQ Implementation:** Vollständiges Queue-System implementiert
- Queue-Service mit Redis-Connection und Job-Optionen
- Email-Job mit Priority, Delay und Batch-Support
- Email-Worker für asynchrone Verarbeitung
- Worker-Script für PM2 (`scripts/run-queue-worker.ts`)
- PM2-Konfiguration für separaten Worker-Prozess
- Admin-API für Queue-Monitoring (`/api/admin/queues`)
- Send-Email API mit `queued: true` Option
- **Audit-Fixes (BullMQ):** FTS und Dependencies bereinigt
- FTS SQL-Fix: `p.published_at` zu SELECT hinzugefügt (PostgreSQL DISTINCT-Regel)
- Guard für fehlende `payload.db.drizzle` in Tests
- Ungenutzte `@bull-board/*` Packages entfernt (53 Dependencies weniger)
- **PDF-Queue-System:** Vollständige PDF-Generierung über BullMQ
- PDF-Job-Definition mit Priority, Delay und Batch-Support
- PDF-Service mit Playwright (HTML-zu-PDF, URL-zu-PDF)
- PDF-Worker für asynchrone Verarbeitung
- REST-API `/api/generate-pdf` mit Auth, CSRF, Rate-Limiting
- PM2-Integration mit konfigurierbaren Workern (`QUEUE_ENABLE_PDF`)

View file

@ -1,22 +1,57 @@
module.exports = { module.exports = {
apps: [{ apps: [
name: 'payload', // Main Payload CMS App
cwd: '/home/payload/payload-cms', {
script: 'pnpm', name: 'payload',
args: 'start', cwd: '/home/payload/payload-cms',
env: { script: 'pnpm',
NODE_ENV: 'production', args: 'start',
PORT: 3000 env: {
NODE_ENV: 'production',
PORT: 3000
},
instances: 1,
autorestart: true,
watch: false,
max_memory_restart: '1G',
error_file: '/home/payload/logs/error.log',
out_file: '/home/payload/logs/out.log',
time: true,
max_restarts: 10,
min_uptime: '10s',
restart_delay: 5000
}, },
instances: 1, // Queue Worker (BullMQ - Email + PDF)
autorestart: true, {
watch: false, name: 'queue-worker',
max_memory_restart: '1G', cwd: '/home/payload/payload-cms',
error_file: '/home/payload/logs/error.log', script: 'npx',
out_file: '/home/payload/logs/out.log', args: 'tsx scripts/run-queue-worker.ts',
time: true, env: {
max_restarts: 10, NODE_ENV: 'production',
min_uptime: '10s', // Credentials werden via dotenv aus .env geladen (siehe run-queue-worker.ts)
restart_delay: 5000 // Queue-spezifische Env Vars
}] QUEUE_EMAIL_CONCURRENCY: '3',
QUEUE_PDF_CONCURRENCY: '2',
QUEUE_DEFAULT_RETRY: '3',
QUEUE_REDIS_DB: '1',
// Worker aktivieren/deaktivieren
QUEUE_ENABLE_EMAIL: 'true',
QUEUE_ENABLE_PDF: 'true',
// PDF-spezifische Optionen
PDF_OUTPUT_DIR: '/tmp/payload-pdfs',
// PDF_GENERATION_DISABLED: 'true', // Für Tests
},
instances: 1,
autorestart: true,
watch: false,
max_memory_restart: '768M', // PDF-Generierung braucht mehr RAM
error_file: '/home/payload/logs/queue-worker-error.log',
out_file: '/home/payload/logs/queue-worker-out.log',
time: true,
max_restarts: 10,
min_uptime: '5s',
restart_delay: 3000,
}
]
} }

View file

@ -32,6 +32,7 @@
"@payloadcms/richtext-lexical": "3.65.0", "@payloadcms/richtext-lexical": "3.65.0",
"@payloadcms/translations": "^3.65.0", "@payloadcms/translations": "^3.65.0",
"@payloadcms/ui": "3.65.0", "@payloadcms/ui": "3.65.0",
"bullmq": "^5.65.1",
"cross-env": "^7.0.3", "cross-env": "^7.0.3",
"dotenv": "16.4.7", "dotenv": "16.4.7",
"graphql": "^16.8.1", "graphql": "^16.8.1",

View file

@ -38,6 +38,9 @@ importers:
'@payloadcms/ui': '@payloadcms/ui':
specifier: 3.65.0 specifier: 3.65.0
version: 3.65.0(@types/react@19.1.8)(monaco-editor@0.55.1)(next@15.4.7(@babel/core@7.28.5)(@playwright/test@1.56.1)(react-dom@19.1.0(react@19.1.0))(react@19.1.0)(sass@1.77.4))(payload@3.65.0(graphql@16.12.0)(typescript@5.7.3))(react-dom@19.1.0(react@19.1.0))(react@19.1.0)(typescript@5.7.3) version: 3.65.0(@types/react@19.1.8)(monaco-editor@0.55.1)(next@15.4.7(@babel/core@7.28.5)(@playwright/test@1.56.1)(react-dom@19.1.0(react@19.1.0))(react@19.1.0)(sass@1.77.4))(payload@3.65.0(graphql@16.12.0)(typescript@5.7.3))(react-dom@19.1.0(react@19.1.0))(react@19.1.0)(typescript@5.7.3)
bullmq:
specifier: ^5.65.1
version: 5.65.1
cross-env: cross-env:
specifier: ^7.0.3 specifier: ^7.0.3
version: 7.0.3 version: 7.0.3
@ -1228,6 +1231,36 @@ packages:
react: ^16.8.0 || ^17.0.0 || ^18.0.0 || ^19.0.0 react: ^16.8.0 || ^17.0.0 || ^18.0.0 || ^19.0.0
react-dom: ^16.8.0 || ^17.0.0 || ^18.0.0 || ^19.0.0 react-dom: ^16.8.0 || ^17.0.0 || ^18.0.0 || ^19.0.0
'@msgpackr-extract/msgpackr-extract-darwin-arm64@3.0.3':
resolution: {integrity: sha512-QZHtlVgbAdy2zAqNA9Gu1UpIuI8Xvsd1v8ic6B2pZmeFnFcMWiPLfWXh7TVw4eGEZ/C9TH281KwhVoeQUKbyjw==}
cpu: [arm64]
os: [darwin]
'@msgpackr-extract/msgpackr-extract-darwin-x64@3.0.3':
resolution: {integrity: sha512-mdzd3AVzYKuUmiWOQ8GNhl64/IoFGol569zNRdkLReh6LRLHOXxU4U8eq0JwaD8iFHdVGqSy4IjFL4reoWCDFw==}
cpu: [x64]
os: [darwin]
'@msgpackr-extract/msgpackr-extract-linux-arm64@3.0.3':
resolution: {integrity: sha512-YxQL+ax0XqBJDZiKimS2XQaf+2wDGVa1enVRGzEvLLVFeqa5kx2bWbtcSXgsxjQB7nRqqIGFIcLteF/sHeVtQg==}
cpu: [arm64]
os: [linux]
'@msgpackr-extract/msgpackr-extract-linux-arm@3.0.3':
resolution: {integrity: sha512-fg0uy/dG/nZEXfYilKoRe7yALaNmHoYeIoJuJ7KJ+YyU2bvY8vPv27f7UKhGRpY6euFYqEVhxCFZgAUNQBM3nw==}
cpu: [arm]
os: [linux]
'@msgpackr-extract/msgpackr-extract-linux-x64@3.0.3':
resolution: {integrity: sha512-cvwNfbP07pKUfq1uH+S6KJ7dT9K8WOE4ZiAcsrSes+UY55E/0jLYc+vq+DO7jlmqRb5zAggExKm0H7O/CBaesg==}
cpu: [x64]
os: [linux]
'@msgpackr-extract/msgpackr-extract-win32-x64@3.0.3':
resolution: {integrity: sha512-x0fWaQtYp4E6sktbsdAqnehxDgEc/VwM7uLsRCYWaiGu0ykYdZPiS8zCWdnjHwyiumousxfBm4SO31eXqwEZhQ==}
cpu: [x64]
os: [win32]
'@napi-rs/wasm-runtime@0.2.12': '@napi-rs/wasm-runtime@0.2.12':
resolution: {integrity: sha512-ZVWUcfwY4E/yPitQJl481FjFo3K22D6qF0DuFH6Y/nbnE11GY5uguDxZMGXPQ8WQ0128MXQD7TnfHyK4oWoIJQ==} resolution: {integrity: sha512-ZVWUcfwY4E/yPitQJl481FjFo3K22D6qF0DuFH6Y/nbnE11GY5uguDxZMGXPQ8WQ0128MXQD7TnfHyK4oWoIJQ==}
@ -2169,6 +2202,9 @@ packages:
buffer-from@1.1.2: buffer-from@1.1.2:
resolution: {integrity: sha512-E+XQCRwSbaaiChtv6k6Dwgc+bx+Bs6vuKJHHl5kox/BaKbhiXzqQOwK4cO22yElGp2OCmjwVhT3HmxgyPGnJfQ==} resolution: {integrity: sha512-E+XQCRwSbaaiChtv6k6Dwgc+bx+Bs6vuKJHHl5kox/BaKbhiXzqQOwK4cO22yElGp2OCmjwVhT3HmxgyPGnJfQ==}
bullmq@5.65.1:
resolution: {integrity: sha512-QgDAzX1G9L5IRy4Orva5CfQTXZT+5K+OfO/kbPrAqN+pmL9LJekCzxijXehlm/u2eXfWPfWvIdJJIqiuz3WJSg==}
busboy@1.6.0: busboy@1.6.0:
resolution: {integrity: sha512-8SFQbg/0hQ9xy3UNTB0YEnsNBbWfhf7RtnzpL7TkBiTBRfrQ9Fxcnz7VJsleJpyp6rVLvXiuORqjlHi5q+PYuA==} resolution: {integrity: sha512-8SFQbg/0hQ9xy3UNTB0YEnsNBbWfhf7RtnzpL7TkBiTBRfrQ9Fxcnz7VJsleJpyp6rVLvXiuORqjlHi5q+PYuA==}
engines: {node: '>=10.16.0'} engines: {node: '>=10.16.0'}
@ -2281,6 +2317,10 @@ packages:
resolution: {integrity: sha512-AdmX6xUzdNASswsFtmwSt7Vj8po9IuqXm0UXz7QKPuEUmPB4XyjGfaAr2PSuELMwkRMVH1EpIkX5bTZGRB3eCA==} resolution: {integrity: sha512-AdmX6xUzdNASswsFtmwSt7Vj8po9IuqXm0UXz7QKPuEUmPB4XyjGfaAr2PSuELMwkRMVH1EpIkX5bTZGRB3eCA==}
engines: {node: '>=10'} engines: {node: '>=10'}
cron-parser@4.9.0:
resolution: {integrity: sha512-p0SaNjrHOnQeR8/VnfGbmg9te2kfyYSQ7Sc/j/6DtPL3JQvKxmjO9TSjNFpujqV3vEYYBvNNvXSxzyksBWAx1Q==}
engines: {node: '>=12.0.0'}
croner@9.1.0: croner@9.1.0:
resolution: {integrity: sha512-p9nwwR4qyT5W996vBZhdvBCnMhicY5ytZkR4D1Xj0wuTDEiMnjwR57Q3RXYY/s0EpX6Ay3vgIcfaR+ewGHsi+g==} resolution: {integrity: sha512-p9nwwR4qyT5W996vBZhdvBCnMhicY5ytZkR4D1Xj0wuTDEiMnjwR57Q3RXYY/s0EpX6Ay3vgIcfaR+ewGHsi+g==}
engines: {node: '>=18.0'} engines: {node: '>=18.0'}
@ -3288,6 +3328,10 @@ packages:
lru-cache@5.1.1: lru-cache@5.1.1:
resolution: {integrity: sha512-KpNARQA3Iwv+jTA0utUVVbrh+Jlrr1Fv0e56GGzAFOXN7dk/FviaDW8LHmK52DlcH4WP2n6gI8vN1aesBFgo9w==} resolution: {integrity: sha512-KpNARQA3Iwv+jTA0utUVVbrh+Jlrr1Fv0e56GGzAFOXN7dk/FviaDW8LHmK52DlcH4WP2n6gI8vN1aesBFgo9w==}
luxon@3.7.2:
resolution: {integrity: sha512-vtEhXh/gNjI9Yg1u4jX/0YVPMvxzHuGgCm6tC5kZyb08yjGWGnqAjGJvcXbqQR2P3MyMEFnRbpcdFS6PBcLqew==}
engines: {node: '>=12'}
lz-string@1.5.0: lz-string@1.5.0:
resolution: {integrity: sha512-h5bgJWpxJNswbU7qCrV0tIKQCaS3blPDrqKWx+QxzuzL1zGUzij9XCWLrSLsJPu5t+eWA/ycetzYAO5IOMcWAQ==} resolution: {integrity: sha512-h5bgJWpxJNswbU7qCrV0tIKQCaS3blPDrqKWx+QxzuzL1zGUzij9XCWLrSLsJPu5t+eWA/ycetzYAO5IOMcWAQ==}
hasBin: true hasBin: true
@ -3432,6 +3476,13 @@ packages:
ms@2.1.3: ms@2.1.3:
resolution: {integrity: sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==} resolution: {integrity: sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==}
msgpackr-extract@3.0.3:
resolution: {integrity: sha512-P0efT1C9jIdVRefqjzOQ9Xml57zpOXnIuS+csaB4MdZbTdmGDLo8XhzBG1N7aO11gKDDkJvBLULeFTo46wwreA==}
hasBin: true
msgpackr@1.11.5:
resolution: {integrity: sha512-UjkUHN0yqp9RWKy0Lplhh+wlpdt9oQBYgULZOiFhV3VclSF1JnSQWZ5r9gORQlNYaUKQoR8itv7g7z1xDDuACA==}
nanoid@3.3.11: nanoid@3.3.11:
resolution: {integrity: sha512-N8SpfPUnUp1bK+PMYW8qSWdl9U+wwNWI4QKxOYDy9JAro3WMX7p2OeVRF9v+347pnakNevPmiHhNmZ2HbFA76w==} resolution: {integrity: sha512-N8SpfPUnUp1bK+PMYW8qSWdl9U+wwNWI4QKxOYDy9JAro3WMX7p2OeVRF9v+347pnakNevPmiHhNmZ2HbFA76w==}
engines: {node: ^10 || ^12 || ^13.7 || ^14 || >=15.0.1} engines: {node: ^10 || ^12 || ^13.7 || ^14 || >=15.0.1}
@ -3466,10 +3517,17 @@ packages:
sass: sass:
optional: true optional: true
node-abort-controller@3.1.1:
resolution: {integrity: sha512-AGK2yQKIjRuqnc6VkX2Xj5d+QW8xZ87pa1UK6yA6ouUyuxfHuMP6umE5QK7UmTeOAymo+Zx1Fxiuw9rVx8taHQ==}
node-cron@4.2.1: node-cron@4.2.1:
resolution: {integrity: sha512-lgimEHPE/QDgFlywTd8yTR61ptugX3Qer29efeyWw2rv259HtGBNn1vZVmp8lB9uo9wC0t/AT4iGqXxia+CJFg==} resolution: {integrity: sha512-lgimEHPE/QDgFlywTd8yTR61ptugX3Qer29efeyWw2rv259HtGBNn1vZVmp8lB9uo9wC0t/AT4iGqXxia+CJFg==}
engines: {node: '>=6.0.0'} engines: {node: '>=6.0.0'}
node-gyp-build-optional-packages@5.2.2:
resolution: {integrity: sha512-s+w+rBWnpTMwSFbaE0UXsRlg7hU4FjekKU4eyAih5T8nJuNZT1nNsskXpxmeqSK9UzkBl6UgRlnKc8hz8IEqOw==}
hasBin: true
node-releases@2.0.27: node-releases@2.0.27:
resolution: {integrity: sha512-nmh3lCkYZ3grZvqcCH+fjmQ7X+H0OeZgP40OierEaAptX4XofMh5kwNbWh7lBduUzCcV/8kZ+NDLCwm2iorIlA==} resolution: {integrity: sha512-nmh3lCkYZ3grZvqcCH+fjmQ7X+H0OeZgP40OierEaAptX4XofMh5kwNbWh7lBduUzCcV/8kZ+NDLCwm2iorIlA==}
@ -4341,6 +4399,10 @@ packages:
resolution: {integrity: sha512-8XkAphELsDnEGrDxUOHB3RGvXz6TeuYSGEZBOjtTtPm2lwhGBjLgOzLHB63IUWfBpNucQjND6d3AOudO+H3RWQ==} resolution: {integrity: sha512-8XkAphELsDnEGrDxUOHB3RGvXz6TeuYSGEZBOjtTtPm2lwhGBjLgOzLHB63IUWfBpNucQjND6d3AOudO+H3RWQ==}
hasBin: true hasBin: true
uuid@11.1.0:
resolution: {integrity: sha512-0/A9rDy9P7cJ+8w1c9WD9V//9Wj15Ce2MPz8Ri6032usz+NfePxx5AcN3bN+r6ZL6jEo066/yNYB3tn4pQEx+A==}
hasBin: true
uuid@9.0.0: uuid@9.0.0:
resolution: {integrity: sha512-MXcSTerfPa4uqyzStbRoTgt5XIe3x5+42+q1sDuy3R5MDk66URdLMOZe5aPX/SQd+kuYAh0FdP/pO28IkQyTeg==} resolution: {integrity: sha512-MXcSTerfPa4uqyzStbRoTgt5XIe3x5+42+q1sDuy3R5MDk66URdLMOZe5aPX/SQd+kuYAh0FdP/pO28IkQyTeg==}
hasBin: true hasBin: true
@ -5829,6 +5891,24 @@ snapshots:
react: 19.1.0 react: 19.1.0
react-dom: 19.1.0(react@19.1.0) react-dom: 19.1.0(react@19.1.0)
'@msgpackr-extract/msgpackr-extract-darwin-arm64@3.0.3':
optional: true
'@msgpackr-extract/msgpackr-extract-darwin-x64@3.0.3':
optional: true
'@msgpackr-extract/msgpackr-extract-linux-arm64@3.0.3':
optional: true
'@msgpackr-extract/msgpackr-extract-linux-arm@3.0.3':
optional: true
'@msgpackr-extract/msgpackr-extract-linux-x64@3.0.3':
optional: true
'@msgpackr-extract/msgpackr-extract-win32-x64@3.0.3':
optional: true
'@napi-rs/wasm-runtime@0.2.12': '@napi-rs/wasm-runtime@0.2.12':
dependencies: dependencies:
'@emnapi/core': 1.7.1 '@emnapi/core': 1.7.1
@ -7025,6 +7105,18 @@ snapshots:
buffer-from@1.1.2: {} buffer-from@1.1.2: {}
bullmq@5.65.1:
dependencies:
cron-parser: 4.9.0
ioredis: 5.8.2
msgpackr: 1.11.5
node-abort-controller: 3.1.1
semver: 7.7.3
tslib: 2.8.1
uuid: 11.1.0
transitivePeerDependencies:
- supports-color
busboy@1.6.0: busboy@1.6.0:
dependencies: dependencies:
streamsearch: 1.1.0 streamsearch: 1.1.0
@ -7137,6 +7229,10 @@ snapshots:
path-type: 4.0.0 path-type: 4.0.0
yaml: 1.10.2 yaml: 1.10.2
cron-parser@4.9.0:
dependencies:
luxon: 3.7.2
croner@9.1.0: {} croner@9.1.0: {}
cross-env@7.0.3: cross-env@7.0.3:
@ -7470,8 +7566,8 @@ snapshots:
'@typescript-eslint/parser': 8.48.0(eslint@9.39.1)(typescript@5.7.3) '@typescript-eslint/parser': 8.48.0(eslint@9.39.1)(typescript@5.7.3)
eslint: 9.39.1 eslint: 9.39.1
eslint-import-resolver-node: 0.3.9 eslint-import-resolver-node: 0.3.9
eslint-import-resolver-typescript: 3.10.1(eslint-plugin-import@2.32.0(@typescript-eslint/parser@8.48.0(eslint@9.39.1)(typescript@5.7.3))(eslint@9.39.1))(eslint@9.39.1) eslint-import-resolver-typescript: 3.10.1(eslint-plugin-import@2.32.0)(eslint@9.39.1)
eslint-plugin-import: 2.32.0(@typescript-eslint/parser@8.48.0(eslint@9.39.1)(typescript@5.7.3))(eslint-import-resolver-typescript@3.10.1(eslint-plugin-import@2.32.0(@typescript-eslint/parser@8.48.0(eslint@9.39.1)(typescript@5.7.3))(eslint@9.39.1))(eslint@9.39.1))(eslint@9.39.1) eslint-plugin-import: 2.32.0(@typescript-eslint/parser@8.48.0(eslint@9.39.1)(typescript@5.7.3))(eslint-import-resolver-typescript@3.10.1)(eslint@9.39.1)
eslint-plugin-jsx-a11y: 6.10.2(eslint@9.39.1) eslint-plugin-jsx-a11y: 6.10.2(eslint@9.39.1)
eslint-plugin-react: 7.37.5(eslint@9.39.1) eslint-plugin-react: 7.37.5(eslint@9.39.1)
eslint-plugin-react-hooks: 5.2.0(eslint@9.39.1) eslint-plugin-react-hooks: 5.2.0(eslint@9.39.1)
@ -7490,7 +7586,7 @@ snapshots:
transitivePeerDependencies: transitivePeerDependencies:
- supports-color - supports-color
eslint-import-resolver-typescript@3.10.1(eslint-plugin-import@2.32.0(@typescript-eslint/parser@8.48.0(eslint@9.39.1)(typescript@5.7.3))(eslint@9.39.1))(eslint@9.39.1): eslint-import-resolver-typescript@3.10.1(eslint-plugin-import@2.32.0)(eslint@9.39.1):
dependencies: dependencies:
'@nolyfill/is-core-module': 1.0.39 '@nolyfill/is-core-module': 1.0.39
debug: 4.4.3 debug: 4.4.3
@ -7501,22 +7597,22 @@ snapshots:
tinyglobby: 0.2.15 tinyglobby: 0.2.15
unrs-resolver: 1.11.1 unrs-resolver: 1.11.1
optionalDependencies: optionalDependencies:
eslint-plugin-import: 2.32.0(@typescript-eslint/parser@8.48.0(eslint@9.39.1)(typescript@5.7.3))(eslint-import-resolver-typescript@3.10.1(eslint-plugin-import@2.32.0(@typescript-eslint/parser@8.48.0(eslint@9.39.1)(typescript@5.7.3))(eslint@9.39.1))(eslint@9.39.1))(eslint@9.39.1) eslint-plugin-import: 2.32.0(@typescript-eslint/parser@8.48.0(eslint@9.39.1)(typescript@5.7.3))(eslint-import-resolver-typescript@3.10.1)(eslint@9.39.1)
transitivePeerDependencies: transitivePeerDependencies:
- supports-color - supports-color
eslint-module-utils@2.12.1(@typescript-eslint/parser@8.48.0(eslint@9.39.1)(typescript@5.7.3))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.10.1(eslint-plugin-import@2.32.0(@typescript-eslint/parser@8.48.0(eslint@9.39.1)(typescript@5.7.3))(eslint@9.39.1))(eslint@9.39.1))(eslint@9.39.1): eslint-module-utils@2.12.1(@typescript-eslint/parser@8.48.0(eslint@9.39.1)(typescript@5.7.3))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.10.1)(eslint@9.39.1):
dependencies: dependencies:
debug: 3.2.7 debug: 3.2.7
optionalDependencies: optionalDependencies:
'@typescript-eslint/parser': 8.48.0(eslint@9.39.1)(typescript@5.7.3) '@typescript-eslint/parser': 8.48.0(eslint@9.39.1)(typescript@5.7.3)
eslint: 9.39.1 eslint: 9.39.1
eslint-import-resolver-node: 0.3.9 eslint-import-resolver-node: 0.3.9
eslint-import-resolver-typescript: 3.10.1(eslint-plugin-import@2.32.0(@typescript-eslint/parser@8.48.0(eslint@9.39.1)(typescript@5.7.3))(eslint@9.39.1))(eslint@9.39.1) eslint-import-resolver-typescript: 3.10.1(eslint-plugin-import@2.32.0)(eslint@9.39.1)
transitivePeerDependencies: transitivePeerDependencies:
- supports-color - supports-color
eslint-plugin-import@2.32.0(@typescript-eslint/parser@8.48.0(eslint@9.39.1)(typescript@5.7.3))(eslint-import-resolver-typescript@3.10.1(eslint-plugin-import@2.32.0(@typescript-eslint/parser@8.48.0(eslint@9.39.1)(typescript@5.7.3))(eslint@9.39.1))(eslint@9.39.1))(eslint@9.39.1): eslint-plugin-import@2.32.0(@typescript-eslint/parser@8.48.0(eslint@9.39.1)(typescript@5.7.3))(eslint-import-resolver-typescript@3.10.1)(eslint@9.39.1):
dependencies: dependencies:
'@rtsao/scc': 1.1.0 '@rtsao/scc': 1.1.0
array-includes: 3.1.9 array-includes: 3.1.9
@ -7527,7 +7623,7 @@ snapshots:
doctrine: 2.1.0 doctrine: 2.1.0
eslint: 9.39.1 eslint: 9.39.1
eslint-import-resolver-node: 0.3.9 eslint-import-resolver-node: 0.3.9
eslint-module-utils: 2.12.1(@typescript-eslint/parser@8.48.0(eslint@9.39.1)(typescript@5.7.3))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.10.1(eslint-plugin-import@2.32.0(@typescript-eslint/parser@8.48.0(eslint@9.39.1)(typescript@5.7.3))(eslint@9.39.1))(eslint@9.39.1))(eslint@9.39.1) eslint-module-utils: 2.12.1(@typescript-eslint/parser@8.48.0(eslint@9.39.1)(typescript@5.7.3))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.10.1)(eslint@9.39.1)
hasown: 2.0.2 hasown: 2.0.2
is-core-module: 2.16.1 is-core-module: 2.16.1
is-glob: 4.0.3 is-glob: 4.0.3
@ -8243,6 +8339,8 @@ snapshots:
dependencies: dependencies:
yallist: 3.1.1 yallist: 3.1.1
luxon@3.7.2: {}
lz-string@1.5.0: {} lz-string@1.5.0: {}
magic-string@0.30.21: magic-string@0.30.21:
@ -8521,6 +8619,22 @@ snapshots:
ms@2.1.3: {} ms@2.1.3: {}
msgpackr-extract@3.0.3:
dependencies:
node-gyp-build-optional-packages: 5.2.2
optionalDependencies:
'@msgpackr-extract/msgpackr-extract-darwin-arm64': 3.0.3
'@msgpackr-extract/msgpackr-extract-darwin-x64': 3.0.3
'@msgpackr-extract/msgpackr-extract-linux-arm': 3.0.3
'@msgpackr-extract/msgpackr-extract-linux-arm64': 3.0.3
'@msgpackr-extract/msgpackr-extract-linux-x64': 3.0.3
'@msgpackr-extract/msgpackr-extract-win32-x64': 3.0.3
optional: true
msgpackr@1.11.5:
optionalDependencies:
msgpackr-extract: 3.0.3
nanoid@3.3.11: {} nanoid@3.3.11: {}
napi-postinstall@0.3.4: {} napi-postinstall@0.3.4: {}
@ -8552,8 +8666,15 @@ snapshots:
- '@babel/core' - '@babel/core'
- babel-plugin-macros - babel-plugin-macros
node-abort-controller@3.1.1: {}
node-cron@4.2.1: {} node-cron@4.2.1: {}
node-gyp-build-optional-packages@5.2.2:
dependencies:
detect-libc: 2.1.2
optional: true
node-releases@2.0.27: {} node-releases@2.0.27: {}
nodemailer@7.0.11: {} nodemailer@7.0.11: {}
@ -9595,6 +9716,8 @@ snapshots:
uuid@10.0.0: {} uuid@10.0.0: {}
uuid@11.1.0: {}
uuid@9.0.0: {} uuid@9.0.0: {}
vfile-message@4.0.3: vfile-message@4.0.3:

100
scripts/run-queue-worker.ts Normal file
View file

@ -0,0 +1,100 @@
#!/usr/bin/env node
/**
* Queue Worker Runner
*
* Standalone script zum Starten der Queue Workers.
* Wird von PM2 als separater Prozess gestartet.
*
* Usage:
* npx tsx scripts/run-queue-worker.ts
* # oder via PM2
* pm2 start ecosystem.config.cjs --only queue-worker
*/
// Load .env with explicit path BEFORE any other imports
import { config as dotenvConfig } from 'dotenv'
import { resolve, dirname } from 'path'
import { fileURLToPath } from 'url'
// ES Module equivalent of __dirname
const __filename = fileURLToPath(import.meta.url)
const __dirname = dirname(__filename)
// Explicitly load .env from project root (handles PM2 cwd issues)
const envPath = resolve(__dirname, '..', '.env')
dotenvConfig({ path: envPath })
console.log(`[QueueRunner] Loading env from: ${envPath}`)
console.log(`[QueueRunner] PAYLOAD_SECRET loaded: ${process.env.PAYLOAD_SECRET ? 'yes' : 'NO!'}`)
// Dynamic imports after dotenv is loaded
async function main() {
const { startEmailWorker, stopEmailWorker } = await import('../src/lib/queue/workers/email-worker')
const { startPdfWorker, stopPdfWorker } = await import('../src/lib/queue/workers/pdf-worker')
// Konfiguration via Umgebungsvariablen
const ENABLE_EMAIL_WORKER = process.env.QUEUE_ENABLE_EMAIL !== 'false'
const ENABLE_PDF_WORKER = process.env.QUEUE_ENABLE_PDF !== 'false'
console.log('='.repeat(50))
console.log('[QueueRunner] Starting queue workers...')
console.log(`[QueueRunner] PID: ${process.pid}`)
console.log(`[QueueRunner] Node: ${process.version}`)
console.log(`[QueueRunner] Email Worker: ${ENABLE_EMAIL_WORKER ? 'enabled' : 'disabled'}`)
console.log(`[QueueRunner] PDF Worker: ${ENABLE_PDF_WORKER ? 'enabled' : 'disabled'}`)
console.log('='.repeat(50))
// Workers starten
if (ENABLE_EMAIL_WORKER) {
startEmailWorker()
}
if (ENABLE_PDF_WORKER) {
startPdfWorker()
}
// Graceful Shutdown
async function shutdown(signal: string) {
console.log(`\n[QueueRunner] Received ${signal}, shutting down gracefully...`)
try {
const stopPromises: Promise<void>[] = []
if (ENABLE_EMAIL_WORKER) {
stopPromises.push(stopEmailWorker())
}
if (ENABLE_PDF_WORKER) {
stopPromises.push(stopPdfWorker())
}
await Promise.all(stopPromises)
console.log('[QueueRunner] All workers stopped')
process.exit(0)
} catch (error) {
console.error('[QueueRunner] Error during shutdown:', error)
process.exit(1)
}
}
// Signal Handlers
process.on('SIGTERM', () => shutdown('SIGTERM'))
process.on('SIGINT', () => shutdown('SIGINT'))
// Unhandled Errors
process.on('unhandledRejection', (reason) => {
console.error('[QueueRunner] Unhandled Rejection:', reason)
})
process.on('uncaughtException', (error) => {
console.error('[QueueRunner] Uncaught Exception:', error)
shutdown('uncaughtException')
})
// Keep process alive
console.log('[QueueRunner] Workers started, waiting for jobs...')
}
main().catch((error) => {
console.error('[QueueRunner] Fatal error:', error)
process.exit(1)
})

View file

@ -0,0 +1,131 @@
/**
* Queue Jobs API
*
* Endpunkt zum Abrufen der Jobs einer Queue.
* Nur für Super-Admins zugänglich.
*/
import { getPayload } from 'payload'
import config from '@payload-config'
import { NextRequest, NextResponse } from 'next/server'
import { getQueue, QUEUE_NAMES } from '@/lib/queue'
import { createSafeLogger } from '@/lib/security'
const logger = createSafeLogger('API:AdminQueueJobs')
interface UserWithAdmin {
id: number
isSuperAdmin?: boolean
}
type JobState = 'waiting' | 'active' | 'completed' | 'failed' | 'delayed'
/**
* GET /api/admin/queues/[queueName]/jobs
*
* Gibt die Jobs einer Queue zurück.
* Query-Parameter:
* - status: waiting | active | completed | failed | delayed (default: waiting)
* - start: number (default: 0)
* - end: number (default: 20)
*/
export async function GET(
req: NextRequest,
{ params }: { params: Promise<{ queueName: string }> },
) {
try {
const payload = await getPayload({ config })
// Authentifizierung prüfen
const { user } = await payload.auth({ headers: req.headers })
if (!user) {
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
const typedUser = user as UserWithAdmin
// Nur Super-Admins haben Zugriff
if (!typedUser.isSuperAdmin) {
return NextResponse.json(
{ error: 'Forbidden - Super admin access required' },
{ status: 403 },
)
}
const { queueName } = await params
// Queue-Namen validieren
const validQueues = Object.values(QUEUE_NAMES)
if (!validQueues.includes(queueName as typeof validQueues[number])) {
return NextResponse.json(
{ error: `Invalid queue name. Valid: ${validQueues.join(', ')}` },
{ status: 400 },
)
}
// Query-Parameter
const { searchParams } = new URL(req.url)
const status = (searchParams.get('status') || 'waiting') as JobState
const start = parseInt(searchParams.get('start') || '0', 10)
const end = parseInt(searchParams.get('end') || '20', 10)
const validStatuses: JobState[] = ['waiting', 'active', 'completed', 'failed', 'delayed']
if (!validStatuses.includes(status)) {
return NextResponse.json(
{ error: `Invalid status. Valid: ${validStatuses.join(', ')}` },
{ status: 400 },
)
}
const queue = getQueue(queueName as typeof validQueues[number])
// Jobs abrufen basierend auf Status
let jobs
switch (status) {
case 'waiting':
jobs = await queue.getWaiting(start, end)
break
case 'active':
jobs = await queue.getActive(start, end)
break
case 'completed':
jobs = await queue.getCompleted(start, end)
break
case 'failed':
jobs = await queue.getFailed(start, end)
break
case 'delayed':
jobs = await queue.getDelayed(start, end)
break
}
// Jobs für Response formatieren
const formattedJobs = jobs.map((job) => ({
id: job.id,
name: job.name,
data: job.data,
timestamp: job.timestamp,
processedOn: job.processedOn,
finishedOn: job.finishedOn,
attemptsMade: job.attemptsMade,
failedReason: job.failedReason,
returnvalue: job.returnvalue,
progress: job.progress,
}))
return NextResponse.json({
queue: queueName,
status,
pagination: { start, end },
count: formattedJobs.length,
jobs: formattedJobs,
})
} catch (error) {
logger.error('admin-queue-jobs error', error)
return NextResponse.json(
{ error: error instanceof Error ? error.message : 'Unknown error' },
{ status: 500 },
)
}
}

View file

@ -0,0 +1,179 @@
/**
* Queue Admin API
*
* Admin-Endpunkt für Queue-Monitoring und -Verwaltung.
* Nur für Super-Admins zugänglich.
*/
import { getPayload } from 'payload'
import config from '@payload-config'
import { NextRequest, NextResponse } from 'next/server'
import {
getQueuesStatus,
isQueueAvailable,
getQueue,
QUEUE_NAMES,
type QueueStatus,
} from '@/lib/queue'
import { createSafeLogger } from '@/lib/security'
const logger = createSafeLogger('API:AdminQueues')
interface UserWithAdmin {
id: number
isSuperAdmin?: boolean
}
/**
* GET /api/admin/queues
*
* Gibt den Status aller Queues zurück.
* Nur für Super-Admins zugänglich.
*/
export async function GET(req: NextRequest) {
try {
const payload = await getPayload({ config })
// Authentifizierung prüfen
const { user } = await payload.auth({ headers: req.headers })
if (!user) {
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
const typedUser = user as UserWithAdmin
// Nur Super-Admins haben Zugriff
if (!typedUser.isSuperAdmin) {
return NextResponse.json(
{ error: 'Forbidden - Super admin access required' },
{ status: 403 },
)
}
// Queue-Verfügbarkeit prüfen
const available = await isQueueAvailable()
if (!available) {
return NextResponse.json(
{
available: false,
error: 'Queue system unavailable (Redis connection failed)',
queues: [],
},
{ status: 503 },
)
}
// Queue-Status abrufen
const statuses = await getQueuesStatus()
return NextResponse.json({
available: true,
timestamp: new Date().toISOString(),
queues: statuses,
summary: {
totalWaiting: statuses.reduce((sum: number, q: QueueStatus) => sum + q.waiting, 0),
totalActive: statuses.reduce((sum: number, q: QueueStatus) => sum + q.active, 0),
totalCompleted: statuses.reduce((sum: number, q: QueueStatus) => sum + q.completed, 0),
totalFailed: statuses.reduce((sum: number, q: QueueStatus) => sum + q.failed, 0),
totalDelayed: statuses.reduce((sum: number, q: QueueStatus) => sum + q.delayed, 0),
},
})
} catch (error) {
logger.error('admin-queues error', error)
return NextResponse.json(
{ error: error instanceof Error ? error.message : 'Unknown error' },
{ status: 500 },
)
}
}
/**
* POST /api/admin/queues
*
* Queue-Aktionen ausführen (pause, resume, clean, etc.)
* Nur für Super-Admins zugänglich.
*/
export async function POST(req: NextRequest) {
try {
const payload = await getPayload({ config })
// Authentifizierung prüfen
const { user } = await payload.auth({ headers: req.headers })
if (!user) {
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
const typedUser = user as UserWithAdmin
// Nur Super-Admins haben Zugriff
if (!typedUser.isSuperAdmin) {
return NextResponse.json(
{ error: 'Forbidden - Super admin access required' },
{ status: 403 },
)
}
const body = await req.json()
const { action, queueName } = body
if (!action || !queueName) {
return NextResponse.json(
{ error: 'Missing required fields: action, queueName' },
{ status: 400 },
)
}
// Queue-Namen validieren
const validQueues = Object.values(QUEUE_NAMES)
if (!validQueues.includes(queueName)) {
return NextResponse.json(
{ error: `Invalid queue name. Valid: ${validQueues.join(', ')}` },
{ status: 400 },
)
}
const queue = getQueue(queueName)
switch (action) {
case 'pause':
await queue.pause()
logger.info(`Queue ${queueName} paused by user ${typedUser.id}`)
return NextResponse.json({ success: true, message: `Queue ${queueName} paused` })
case 'resume':
await queue.resume()
logger.info(`Queue ${queueName} resumed by user ${typedUser.id}`)
return NextResponse.json({ success: true, message: `Queue ${queueName} resumed` })
case 'clean':
const { status = 'completed', grace = 0 } = body
const cleanedCount = await queue.clean(grace, 1000, status)
logger.info(`Queue ${queueName} cleaned (${status}): ${cleanedCount.length} jobs`)
return NextResponse.json({
success: true,
message: `Cleaned ${cleanedCount.length} ${status} jobs from ${queueName}`,
count: cleanedCount.length,
})
case 'drain':
await queue.drain()
logger.info(`Queue ${queueName} drained by user ${typedUser.id}`)
return NextResponse.json({ success: true, message: `Queue ${queueName} drained` })
default:
return NextResponse.json(
{ error: `Unknown action: ${action}. Valid: pause, resume, clean, drain` },
{ status: 400 },
)
}
} catch (error) {
logger.error('admin-queues action error', error)
return NextResponse.json(
{ error: error instanceof Error ? error.message : 'Unknown error' },
{ status: 500 },
)
}
}

View file

@ -0,0 +1,339 @@
/**
* PDF Generation API
*
* Endpunkt für PDF-Generierung über die Queue oder direkt.
* Unterstützt HTML-zu-PDF und URL-zu-PDF.
*/
import { getPayload } from 'payload'
import config from '@payload-config'
import { NextRequest, NextResponse } from 'next/server'
import { enqueuePdf, getPdfJobStatus, getPdfJobResult, isQueueAvailable } from '@/lib/queue'
import { generatePdfFromHtml, generatePdfFromUrl } from '@/lib/pdf/pdf-service'
import { logAccessDenied } from '@/lib/audit/audit-service'
import {
publicApiLimiter,
rateLimitHeaders,
validateIpAccess,
createSafeLogger,
validateCsrf,
} from '@/lib/security'
const RATE_LIMIT_MAX = 10
const logger = createSafeLogger('API:GeneratePdf')
interface UserWithTenants {
id: number
isSuperAdmin?: boolean
tenants?: Array<{
tenant: { id: number } | number
}>
}
/**
* Prüft ob User Zugriff auf den angegebenen Tenant hat
*/
function userHasAccessToTenant(user: UserWithTenants, tenantId: number): boolean {
if (user.isSuperAdmin) {
return true
}
if (!user.tenants || user.tenants.length === 0) {
return false
}
return user.tenants.some((t) => {
const userTenantId = typeof t.tenant === 'object' ? t.tenant.id : t.tenant
return userTenantId === tenantId
})
}
/**
* POST /api/generate-pdf
*
* Generiert ein PDF aus HTML oder URL.
*
* Body:
* - tenantId: number (erforderlich)
* - source: 'html' | 'url' (erforderlich)
* - html?: string (wenn source='html')
* - url?: string (wenn source='url')
* - options?: { format, landscape, margin, printBackground, scale }
* - queued?: boolean (true = async via Queue, false = sync)
* - documentType?: string (invoice, report, export, etc.)
* - filename?: string
*/
export async function POST(req: NextRequest) {
try {
// IP-Allowlist prüfen
const ipCheck = validateIpAccess(req, 'generatePdf')
if (!ipCheck.allowed) {
logger.warn(`IP blocked: ${ipCheck.ip}`, { reason: ipCheck.reason })
return NextResponse.json({ error: 'Access denied' }, { status: 403 })
}
// CSRF-Schutz
const csrfResult = validateCsrf(req)
if (!csrfResult.valid) {
logger.warn('CSRF validation failed', { reason: csrfResult.reason })
return NextResponse.json({ error: 'CSRF validation failed' }, { status: 403 })
}
const payload = await getPayload({ config })
// Authentifizierung prüfen
const { user } = await payload.auth({ headers: req.headers })
if (!user) {
return NextResponse.json({ error: 'Unauthorized - Login required' }, { status: 401 })
}
const typedUser = user as UserWithTenants
// Rate Limiting
const rateLimit = await publicApiLimiter.check(String(typedUser.id))
if (!rateLimit.allowed) {
return NextResponse.json(
{
error: 'Rate limit exceeded',
message: `Maximum ${RATE_LIMIT_MAX} requests per minute.`,
},
{
status: 429,
headers: rateLimitHeaders(rateLimit, RATE_LIMIT_MAX),
},
)
}
const body = await req.json()
const {
tenantId,
source,
html,
url,
options = {},
queued = true,
documentType,
filename,
priority,
} = body
// Validierung
if (!tenantId) {
return NextResponse.json({ error: 'Missing required field: tenantId' }, { status: 400 })
}
const numericTenantId = Number(tenantId)
if (isNaN(numericTenantId)) {
return NextResponse.json({ error: 'Invalid tenantId: must be a number' }, { status: 400 })
}
if (!source || !['html', 'url'].includes(source)) {
return NextResponse.json(
{ error: 'Invalid source: must be "html" or "url"' },
{ status: 400 },
)
}
if (source === 'html' && !html) {
return NextResponse.json(
{ error: 'Missing required field: html (for source="html")' },
{ status: 400 },
)
}
if (source === 'url' && !url) {
return NextResponse.json(
{ error: 'Missing required field: url (for source="url")' },
{ status: 400 },
)
}
// Zugriffskontrolle
if (!userHasAccessToTenant(typedUser, numericTenantId)) {
await logAccessDenied(
payload,
`/api/generate-pdf (tenantId: ${numericTenantId})`,
typedUser.id,
user.email as string,
)
return NextResponse.json(
{ error: 'Forbidden - You do not have access to this tenant' },
{ status: 403 },
)
}
const rlHeaders = rateLimitHeaders(rateLimit, RATE_LIMIT_MAX)
// Queued PDF Generation (async)
if (queued) {
const queueAvailable = await isQueueAvailable()
if (!queueAvailable) {
return NextResponse.json(
{ error: 'Queue system unavailable. Use queued=false for direct generation.' },
{ status: 503, headers: rlHeaders },
)
}
const job = await enqueuePdf({
tenantId: numericTenantId,
source,
html,
url,
options,
documentType,
filename,
triggeredBy: String(typedUser.id),
priority: priority || 'normal',
correlationId: `pdf-${Date.now()}-${typedUser.id}`,
})
return NextResponse.json(
{
success: true,
queued: true,
message: 'PDF generation job queued successfully',
jobId: job.id,
},
{ headers: rlHeaders },
)
}
// Direct PDF Generation (sync)
let result
if (source === 'html') {
result = await generatePdfFromHtml(html, options)
} else {
result = await generatePdfFromUrl(url, options)
}
if (!result.success) {
return NextResponse.json(
{ success: false, error: result.error },
{ status: 500, headers: rlHeaders },
)
}
// PDF als Base64 zurückgeben
return NextResponse.json(
{
success: true,
queued: false,
pdf: result.buffer?.toString('base64'),
filename: filename || `document-${Date.now()}.pdf`,
pageCount: result.pageCount,
fileSize: result.fileSize,
duration: result.duration,
},
{ headers: rlHeaders },
)
} catch (error) {
logger.error('generate-pdf error', error)
return NextResponse.json(
{ error: error instanceof Error ? error.message : 'Unknown error' },
{ status: 500 },
)
}
}
/**
* GET /api/generate-pdf?jobId=...
*
* Gibt den Status/das Ergebnis eines PDF-Jobs zurück.
*/
export async function GET(req: NextRequest) {
try {
const { searchParams } = new URL(req.url)
const jobId = searchParams.get('jobId')
// Wenn keine jobId, zeige API-Dokumentation
if (!jobId) {
return NextResponse.json({
endpoint: '/api/generate-pdf',
methods: {
POST: {
description: 'Generate a PDF from HTML or URL',
authentication: 'Required',
body: {
tenantId: 'number (required)',
source: '"html" | "url" (required)',
html: 'string (required if source="html")',
url: 'string (required if source="url")',
options: {
format: '"A4" | "A3" | "Letter" | "Legal"',
landscape: 'boolean',
margin: '{ top, right, bottom, left }',
printBackground: 'boolean',
scale: 'number',
},
queued: 'boolean (default: true)',
documentType: 'string (invoice, report, export, etc.)',
filename: 'string',
priority: '"high" | "normal" | "low"',
},
},
GET: {
description: 'Get status/result of a PDF generation job',
query: {
jobId: 'string (required)',
},
},
},
examples: {
generateFromHtml: {
tenantId: 1,
source: 'html',
html: '<h1>Hello</h1><p>World</p>',
options: { format: 'A4' },
queued: true,
},
generateFromUrl: {
tenantId: 1,
source: 'url',
url: 'https://example.com/invoice/123',
queued: false,
},
},
})
}
const payload = await getPayload({ config })
// Authentifizierung prüfen
const { user } = await payload.auth({ headers: req.headers })
if (!user) {
return NextResponse.json({ error: 'Unauthorized - Login required' }, { status: 401 })
}
// Job-Status abrufen
const status = await getPdfJobStatus(jobId)
if (!status) {
return NextResponse.json({ error: 'Job not found' }, { status: 404 })
}
// Wenn abgeschlossen, Ergebnis mitliefern
if (status.state === 'completed' && status.result) {
return NextResponse.json({
jobId,
state: status.state,
result: status.result,
})
}
return NextResponse.json({
jobId,
state: status.state,
progress: status.progress,
failedReason: status.failedReason,
})
} catch (error) {
logger.error('generate-pdf status error', error)
return NextResponse.json(
{ error: error instanceof Error ? error.message : 'Unknown error' },
{ status: 500 },
)
}
}

View file

@ -1,6 +1,7 @@
import { getPayload } from 'payload' import { getPayload } from 'payload'
import config from '@payload-config' import config from '@payload-config'
import { sendTenantEmail, sendTestEmail } from '@/lib/email/tenant-email-service' import { sendTenantEmail, sendTestEmail } from '@/lib/email/tenant-email-service'
import { enqueueEmail, isQueueAvailable } from '@/lib/queue'
import { NextRequest, NextResponse } from 'next/server' import { NextRequest, NextResponse } from 'next/server'
import { logRateLimit, logAccessDenied } from '@/lib/audit/audit-service' import { logRateLimit, logAccessDenied } from '@/lib/audit/audit-service'
import { import {
@ -115,7 +116,7 @@ export async function POST(req: NextRequest) {
} }
const body = await req.json() const body = await req.json()
const { tenantId, to, subject, html, text, replyTo, test } = body const { tenantId, to, subject, html, text, replyTo, test, queued, priority } = body
// Validierung // Validierung
if (!tenantId) { if (!tenantId) {
@ -190,6 +191,41 @@ export async function POST(req: NextRequest) {
) )
} }
// Queued Email Sending (async via BullMQ)
if (queued) {
const queueAvailable = await isQueueAvailable()
if (!queueAvailable) {
return NextResponse.json(
{ error: 'Queue system unavailable. Use queued=false for direct sending.' },
{ status: 503, headers: rlHeaders },
)
}
const job = await enqueueEmail({
tenantId: numericTenantId,
to,
subject,
html,
text,
replyTo,
source: 'api',
triggeredBy: String(typedUser.id),
priority: priority || 'normal',
correlationId: `api-${Date.now()}-${typedUser.id}`,
})
return NextResponse.json(
{
success: true,
queued: true,
message: 'Email queued successfully',
jobId: job.id,
},
{ headers: rlHeaders },
)
}
// Direct Email Sending (synchronous)
const result = await sendTenantEmail(payload, numericTenantId, { const result = await sendTenantEmail(payload, numericTenantId, {
to, to,
subject, subject,
@ -246,11 +282,14 @@ export async function GET() {
text: 'string (optional) - Plain text content', text: 'string (optional) - Plain text content',
replyTo: 'string (optional) - Reply-to address', replyTo: 'string (optional) - Reply-to address',
test: 'boolean (optional) - Send test email', test: 'boolean (optional) - Send test email',
queued: 'boolean (optional) - Queue email for async sending via BullMQ',
priority: 'string (optional) - Priority for queued emails: high, normal, low',
}, },
response: { response: {
success: 'boolean', success: 'boolean',
messageId: 'string (on success)', messageId: 'string (on direct send success)',
logId: 'number (email log ID)', jobId: 'string (on queued success)',
logId: 'number (email log ID - direct send only)',
error: 'string (on failure)', error: 'string (on failure)',
}, },
examples: { examples: {
@ -260,6 +299,14 @@ export async function GET() {
subject: 'Hello World', subject: 'Hello World',
html: '<h1>Hello!</h1><p>This is a test email.</p>', html: '<h1>Hello!</h1><p>This is a test email.</p>',
}, },
sendQueuedEmail: {
tenantId: 1,
to: 'recipient@example.com',
subject: 'Hello World',
html: '<h1>Hello!</h1><p>This is a queued email.</p>',
queued: true,
priority: 'high',
},
sendTestEmail: { sendTestEmail: {
tenantId: 1, tenantId: 1,
to: 'test@example.com', to: 'test@example.com',

View file

@ -154,7 +154,7 @@ export const Tenants: CollectionConfig = {
hooks: { hooks: {
beforeValidate: [validateSmtpHost], beforeValidate: [validateSmtpHost],
}, },
validate: (value, { siblingData }) => { validate: (value: string | undefined | null, { siblingData }: { siblingData: Record<string, unknown> }) => {
const emailData = siblingData as { useCustomSmtp?: boolean } const emailData = siblingData as { useCustomSmtp?: boolean }
if (emailData?.useCustomSmtp && !value) { if (emailData?.useCustomSmtp && !value) {
return 'SMTP Host ist erforderlich' return 'SMTP Host ist erforderlich'
@ -201,7 +201,7 @@ export const Tenants: CollectionConfig = {
width: '50%', width: '50%',
description: 'Meist die E-Mail-Adresse', description: 'Meist die E-Mail-Adresse',
}, },
validate: (value, { siblingData }) => { validate: (value: string | undefined | null, { siblingData }: { siblingData: Record<string, unknown> }) => {
const smtpData = siblingData as { host?: string } const smtpData = siblingData as { host?: string }
// Nur validieren wenn host gesetzt ist (d.h. SMTP aktiv) // Nur validieren wenn host gesetzt ist (d.h. SMTP aktiv)
if (smtpData?.host && !value) { if (smtpData?.host && !value) {

View file

@ -19,10 +19,15 @@ export const TenantBreadcrumb: React.FC = () => {
return null return null
} }
// Handle localized labels (Record<string, string>) or plain strings
const displayLabel = typeof currentTenant.label === 'string'
? currentTenant.label
: (currentTenant.label as Record<string, string>)?.de || (currentTenant.label as Record<string, string>)?.en || String(currentTenant.value)
return ( return (
<div className="tenant-breadcrumb"> <div className="tenant-breadcrumb">
<span className="tenant-breadcrumb__label">Aktiver Tenant:</span> <span className="tenant-breadcrumb__label">Aktiver Tenant:</span>
<span className="tenant-breadcrumb__value">{currentTenant.label}</span> <span className="tenant-breadcrumb__value">{displayLabel}</span>
</div> </div>
) )
} }

View file

@ -51,7 +51,12 @@ export const TenantDashboard: React.FC = () => {
// Aktueller Tenant-Name // Aktueller Tenant-Name
const currentTenant = options?.find((opt) => opt.value === selectedTenantID) const currentTenant = options?.find((opt) => opt.value === selectedTenantID)
const tenantName = currentTenant?.label || 'Unbekannt' // Handle localized labels (Record<string, string>) or plain strings
const tenantName = currentTenant?.label
? (typeof currentTenant.label === 'string'
? currentTenant.label
: (currentTenant.label as Record<string, string>)?.de || (currentTenant.label as Record<string, string>)?.en || String(selectedTenantID))
: 'Unbekannt'
const fetchStats = useCallback(async () => { const fetchStats = useCallback(async () => {
if (!selectedTenantID) { if (!selectedTenantID) {

View file

@ -74,20 +74,22 @@ function getHeaderValue(req: PayloadRequest, headerName: string): string | undef
const lowerName = headerName.toLowerCase() const lowerName = headerName.toLowerCase()
// 1. Versuche req.get() (Express Request Methode) // 1. Versuche req.get() (Express Request Methode)
if (typeof (req as { get?: (name: string) => string | undefined }).get === 'function') { const reqWithGet = req as unknown as { get?: (name: string) => string | undefined }
const value = (req as { get: (name: string) => string | undefined }).get(lowerName) if (typeof reqWithGet.get === 'function') {
const value = reqWithGet.get(lowerName)
if (value) return value if (value) return value
} }
// 2. Versuche headers.get() (Fetch API Headers) // 2. Versuche headers.get() (Fetch API Headers)
if (req.headers && typeof (req.headers as { get?: (name: string) => string | null }).get === 'function') { const headersWithGet = req.headers as unknown as { get?: (name: string) => string | null }
const value = (req.headers as { get: (name: string) => string | null }).get(lowerName) if (req.headers && typeof headersWithGet.get === 'function') {
const value = headersWithGet.get(lowerName)
if (value) return value if (value) return value
} }
// 3. Direkter Zugriff auf headers object (IncomingHttpHeaders) // 3. Direkter Zugriff auf headers object (IncomingHttpHeaders)
if (req.headers && typeof req.headers === 'object') { if (req.headers && typeof req.headers === 'object') {
const headers = req.headers as Record<string, string | string[] | undefined> const headers = req.headers as unknown as Record<string, string | string[] | undefined>
const value = headers[lowerName] const value = headers[lowerName]
if (typeof value === 'string') return value if (typeof value === 'string') return value
if (Array.isArray(value) && value.length > 0) return value[0] if (Array.isArray(value) && value.length > 0) return value[0]

View file

@ -66,20 +66,22 @@ function getHeaderValue(
const lowerName = headerName.toLowerCase() const lowerName = headerName.toLowerCase()
// 1. Versuche req.get() (Express Request Methode) // 1. Versuche req.get() (Express Request Methode)
if (typeof (req as { get?: (name: string) => string | undefined }).get === 'function') { const reqWithGet = req as unknown as { get?: (name: string) => string | undefined }
const value = (req as { get: (name: string) => string | undefined }).get(lowerName) if (typeof reqWithGet.get === 'function') {
const value = reqWithGet.get(lowerName)
if (value) return value if (value) return value
} }
// 2. Versuche headers.get() (Fetch API Headers) // 2. Versuche headers.get() (Fetch API Headers)
if (req.headers && typeof (req.headers as { get?: (name: string) => string | null }).get === 'function') { const headersWithGet = req.headers as unknown as { get?: (name: string) => string | null }
const value = (req.headers as { get: (name: string) => string | null }).get(lowerName) if (req.headers && typeof headersWithGet.get === 'function') {
const value = headersWithGet.get(lowerName)
if (value) return value if (value) return value
} }
// 3. Direkter Zugriff auf headers object (IncomingHttpHeaders) // 3. Direkter Zugriff auf headers object (IncomingHttpHeaders)
if (req.headers && typeof req.headers === 'object') { if (req.headers && typeof req.headers === 'object') {
const headers = req.headers as Record<string, string | string[] | undefined> const headers = req.headers as unknown as Record<string, string | string[] | undefined>
const value = headers[lowerName] const value = headers[lowerName]
if (typeof value === 'string') return value if (typeof value === 'string') return value
if (Array.isArray(value) && value.length > 0) return value[0] if (Array.isArray(value) && value.length > 0) return value[0]

322
src/lib/pdf/pdf-service.ts Normal file
View file

@ -0,0 +1,322 @@
/**
* PDF Generation Service
*
* Generiert PDFs aus HTML oder URLs mittels Playwright.
* Unterstützt verschiedene Formate und Optionen.
*/
import { chromium, Browser, Page } from 'playwright'
import * as fs from 'fs/promises'
import * as path from 'path'
// Umgebungsvariablen
const PDF_OUTPUT_DIR = process.env.PDF_OUTPUT_DIR || '/tmp/payload-pdfs'
const PDF_GENERATION_DISABLED = process.env.PDF_GENERATION_DISABLED === 'true'
// Browser-Instanz (wird wiederverwendet)
let browserInstance: Browser | null = null
export interface PdfOptions {
format?: 'A4' | 'A3' | 'Letter' | 'Legal'
landscape?: boolean
margin?: {
top?: string
right?: string
bottom?: string
left?: string
}
printBackground?: boolean
scale?: number
headerTemplate?: string
footerTemplate?: string
displayHeaderFooter?: boolean
}
export interface PdfResult {
success: boolean
buffer?: Buffer
outputPath?: string
filename?: string
pageCount?: number
fileSize?: number
error?: string
duration?: number
}
/**
* Holt oder erstellt eine Browser-Instanz
*/
async function getBrowser(): Promise<Browser> {
if (!browserInstance || !browserInstance.isConnected()) {
console.log('[PDF] Launching browser...')
browserInstance = await chromium.launch({
headless: true,
args: [
'--no-sandbox',
'--disable-setuid-sandbox',
'--disable-dev-shm-usage',
'--disable-gpu',
],
})
}
return browserInstance
}
/**
* Schließt die Browser-Instanz
*/
export async function closeBrowser(): Promise<void> {
if (browserInstance) {
await browserInstance.close()
browserInstance = null
console.log('[PDF] Browser closed')
}
}
/**
* Stellt sicher, dass das Output-Verzeichnis existiert
*/
async function ensureOutputDir(): Promise<void> {
try {
await fs.mkdir(PDF_OUTPUT_DIR, { recursive: true })
} catch {
// Verzeichnis existiert bereits
}
}
/**
* Generiert ein PDF aus HTML-Content
*/
export async function generatePdfFromHtml(
html: string,
options: PdfOptions = {},
outputPath?: string,
): Promise<PdfResult> {
const startTime = Date.now()
// Check ob PDF-Generierung deaktiviert ist (Test-Modus)
if (PDF_GENERATION_DISABLED) {
console.log('[PDF] Generation disabled - returning mock result')
return {
success: true,
buffer: Buffer.from('mock-pdf-content'),
filename: 'mock.pdf',
pageCount: 1,
fileSize: 17,
duration: Date.now() - startTime,
}
}
let page: Page | null = null
try {
const browser = await getBrowser()
page = await browser.newPage()
// HTML laden
await page.setContent(html, { waitUntil: 'networkidle' })
// PDF-Optionen
const pdfOptions = {
format: options.format || 'A4',
landscape: options.landscape || false,
margin: options.margin || {
top: '20mm',
right: '15mm',
bottom: '20mm',
left: '15mm',
},
printBackground: options.printBackground !== false,
scale: options.scale || 1,
displayHeaderFooter: options.displayHeaderFooter || false,
headerTemplate: options.headerTemplate || '',
footerTemplate: options.footerTemplate || '',
}
// PDF generieren
const pdfBuffer = await page.pdf(pdfOptions)
// Seitenzahl ermitteln (geschätzt aus Dateigröße oder via PDF-Parser)
// Für genauere Zählung würde man einen PDF-Parser brauchen
const pageCount = Math.max(1, Math.ceil(pdfBuffer.length / 50000))
const result: PdfResult = {
success: true,
buffer: pdfBuffer,
pageCount,
fileSize: pdfBuffer.length,
duration: Date.now() - startTime,
}
// Optional: In Datei speichern
if (outputPath) {
await ensureOutputDir()
const fullPath = path.isAbsolute(outputPath)
? outputPath
: path.join(PDF_OUTPUT_DIR, outputPath)
await fs.writeFile(fullPath, pdfBuffer)
result.outputPath = fullPath
result.filename = path.basename(fullPath)
console.log(`[PDF] Saved to ${fullPath}`)
}
console.log(`[PDF] Generated successfully (${pdfBuffer.length} bytes, ~${pageCount} pages, ${result.duration}ms)`)
return result
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
console.error('[PDF] Generation failed:', errorMessage)
return {
success: false,
error: errorMessage,
duration: Date.now() - startTime,
}
} finally {
if (page) {
await page.close()
}
}
}
/**
* Generiert ein PDF von einer URL
*/
export async function generatePdfFromUrl(
url: string,
options: PdfOptions = {},
outputPath?: string,
): Promise<PdfResult> {
const startTime = Date.now()
// Check ob PDF-Generierung deaktiviert ist (Test-Modus)
if (PDF_GENERATION_DISABLED) {
console.log('[PDF] Generation disabled - returning mock result')
return {
success: true,
buffer: Buffer.from('mock-pdf-content'),
filename: 'mock.pdf',
pageCount: 1,
fileSize: 17,
duration: Date.now() - startTime,
}
}
let page: Page | null = null
try {
const browser = await getBrowser()
page = await browser.newPage()
// URL laden
await page.goto(url, { waitUntil: 'networkidle', timeout: 30000 })
// PDF-Optionen
const pdfOptions = {
format: options.format || 'A4',
landscape: options.landscape || false,
margin: options.margin || {
top: '20mm',
right: '15mm',
bottom: '20mm',
left: '15mm',
},
printBackground: options.printBackground !== false,
scale: options.scale || 1,
displayHeaderFooter: options.displayHeaderFooter || false,
headerTemplate: options.headerTemplate || '',
footerTemplate: options.footerTemplate || '',
}
// PDF generieren
const pdfBuffer = await page.pdf(pdfOptions)
const pageCount = Math.max(1, Math.ceil(pdfBuffer.length / 50000))
const result: PdfResult = {
success: true,
buffer: pdfBuffer,
pageCount,
fileSize: pdfBuffer.length,
duration: Date.now() - startTime,
}
// Optional: In Datei speichern
if (outputPath) {
await ensureOutputDir()
const fullPath = path.isAbsolute(outputPath)
? outputPath
: path.join(PDF_OUTPUT_DIR, outputPath)
await fs.writeFile(fullPath, pdfBuffer)
result.outputPath = fullPath
result.filename = path.basename(fullPath)
console.log(`[PDF] Saved to ${fullPath}`)
}
console.log(`[PDF] Generated from URL successfully (${pdfBuffer.length} bytes, ~${pageCount} pages, ${result.duration}ms)`)
return result
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
console.error('[PDF] Generation from URL failed:', errorMessage)
return {
success: false,
error: errorMessage,
duration: Date.now() - startTime,
}
} finally {
if (page) {
await page.close()
}
}
}
/**
* Generiert einen eindeutigen Dateinamen für eine PDF
*/
export function generatePdfFilename(
prefix: string = 'document',
tenantId?: number,
documentType?: string,
): string {
const timestamp = new Date().toISOString().replace(/[:.]/g, '-')
const parts = [prefix]
if (documentType) {
parts.push(documentType)
}
if (tenantId) {
parts.push(`tenant-${tenantId}`)
}
parts.push(timestamp)
return `${parts.join('_')}.pdf`
}
/**
* Löscht eine generierte PDF-Datei
*/
export async function deletePdfFile(filePath: string): Promise<boolean> {
try {
await fs.unlink(filePath)
console.log(`[PDF] Deleted ${filePath}`)
return true
} catch {
console.warn(`[PDF] Could not delete ${filePath}`)
return false
}
}
/**
* Liest eine PDF-Datei als Buffer
*/
export async function readPdfFile(filePath: string): Promise<Buffer | null> {
try {
return await fs.readFile(filePath)
} catch {
console.warn(`[PDF] Could not read ${filePath}`)
return null
}
}

43
src/lib/queue/index.ts Normal file
View file

@ -0,0 +1,43 @@
/**
* Queue Module Exports
*
* Zentrale Exports für das BullMQ Queue-System.
*/
// Queue Service
export {
QUEUE_NAMES,
type QueueName,
getQueue,
getQueueEvents,
getAllQueues,
closeAllQueues,
isQueueAvailable,
getQueuesStatus,
type QueueStatus,
defaultJobOptions,
getQueueRedisConnection,
} from './queue-service'
// Email Job
export {
type EmailJobData,
type EmailJobResult,
enqueueEmail,
enqueueDelayedEmail,
enqueueEmailBatch,
isEmailJobPending,
getEmailJobStatus,
} from './jobs/email-job'
// PDF Job
export {
type PdfJobData,
type PdfJobResult,
enqueuePdf,
enqueueDelayedPdf,
enqueuePdfBatch,
isPdfJobPending,
getPdfJobStatus,
getPdfJobResult,
} from './jobs/pdf-job'

View file

@ -0,0 +1,150 @@
/**
* Email Job Definition
*
* Definiert den E-Mail-Versand-Job für die Queue.
* Unterstützt Multi-Tenant SMTP-Konfiguration.
*/
import { Job } from 'bullmq'
import { getQueue, QUEUE_NAMES, defaultJobOptions } from '../queue-service'
// Job-Daten Typen
export interface EmailJobData {
// E-Mail-Inhalte
to: string | string[]
subject: string
html?: string
text?: string
replyTo?: string
// Tenant-Kontext
tenantId: number
tenantSlug?: string
// Metadaten
triggeredBy?: string // User ID oder System
source?: 'api' | 'form' | 'notification' | 'alert'
correlationId?: string // Für Tracking
// Optionen
priority?: 'high' | 'normal' | 'low'
}
export interface EmailJobResult {
success: boolean
messageId?: string
error?: string
timestamp: string
attempts: number
}
// Priority-Mapping für BullMQ (niedrigere Zahl = höhere Priorität)
const PRIORITY_MAP = {
high: 1,
normal: 5,
low: 10,
}
/**
* Fügt einen E-Mail-Job zur Queue hinzu
*/
export async function enqueueEmail(data: EmailJobData): Promise<Job<EmailJobData>> {
const queue = getQueue(QUEUE_NAMES.EMAIL)
const jobOptions = {
...defaultJobOptions,
priority: PRIORITY_MAP[data.priority || 'normal'],
// Job-ID für Deduplizierung (optional)
jobId: data.correlationId,
}
const job = await queue.add('send-email', data, jobOptions)
console.log(`[EmailQueue] Job ${job.id} queued for ${Array.isArray(data.to) ? data.to.join(', ') : data.to}`)
return job
}
/**
* Fügt einen verzögerten E-Mail-Job zur Queue hinzu
*/
export async function enqueueDelayedEmail(
data: EmailJobData,
delayMs: number
): Promise<Job<EmailJobData>> {
const queue = getQueue(QUEUE_NAMES.EMAIL)
const job = await queue.add('send-email', data, {
...defaultJobOptions,
priority: PRIORITY_MAP[data.priority || 'normal'],
delay: delayMs,
})
console.log(`[EmailQueue] Delayed job ${job.id} queued (delay: ${delayMs}ms)`)
return job
}
/**
* Fügt mehrere E-Mails als Batch zur Queue hinzu
*/
export async function enqueueEmailBatch(
emails: EmailJobData[]
): Promise<Job<EmailJobData>[]> {
const queue = getQueue(QUEUE_NAMES.EMAIL)
const jobs = await queue.addBulk(
emails.map((data) => ({
name: 'send-email',
data,
opts: {
...defaultJobOptions,
priority: PRIORITY_MAP[data.priority || 'normal'],
},
}))
)
console.log(`[EmailQueue] ${jobs.length} batch jobs queued`)
return jobs
}
/**
* Prüft ob ein Job bereits existiert (für Deduplizierung)
*/
export async function isEmailJobPending(correlationId: string): Promise<boolean> {
const queue = getQueue(QUEUE_NAMES.EMAIL)
const job = await queue.getJob(correlationId)
if (!job) return false
const state = await job.getState()
return state === 'waiting' || state === 'active' || state === 'delayed'
}
/**
* Holt den Status eines E-Mail-Jobs
*/
export async function getEmailJobStatus(jobId: string): Promise<{
state: string
progress: number
result?: EmailJobResult
failedReason?: string
} | null> {
const queue = getQueue(QUEUE_NAMES.EMAIL)
const job = await queue.getJob(jobId)
if (!job) return null
const [state, progress] = await Promise.all([
job.getState(),
job.progress,
])
return {
state,
progress: typeof progress === 'number' ? progress : 0,
result: job.returnvalue as EmailJobResult | undefined,
failedReason: job.failedReason,
}
}

View file

@ -0,0 +1,201 @@
/**
* PDF Job Definition
*
* Definiert den PDF-Generierungs-Job für die Queue.
* Unterstützt HTML-zu-PDF Konvertierung und URL-zu-PDF.
*/
import { Job } from 'bullmq'
import { getQueue, QUEUE_NAMES, defaultJobOptions } from '../queue-service'
// Job-Daten Typen
export interface PdfJobData {
// Quelle für PDF
source: 'html' | 'url'
html?: string // HTML-Content (wenn source='html')
url?: string // URL zum Rendern (wenn source='url')
// Output-Konfiguration
outputPath?: string // Pfad zum Speichern (optional)
filename?: string // Dateiname für Download
// PDF-Optionen
options?: {
format?: 'A4' | 'A3' | 'Letter' | 'Legal'
landscape?: boolean
margin?: {
top?: string
right?: string
bottom?: string
left?: string
}
printBackground?: boolean
scale?: number
headerTemplate?: string
footerTemplate?: string
displayHeaderFooter?: boolean
}
// Tenant-Kontext
tenantId: number
tenantSlug?: string
// Metadaten
triggeredBy?: string // User ID oder System
documentType?: 'invoice' | 'report' | 'export' | 'certificate' | 'other'
correlationId?: string // Für Tracking
// Optionen
priority?: 'high' | 'normal' | 'low'
}
export interface PdfJobResult {
success: boolean
outputPath?: string // Wo die PDF gespeichert wurde
buffer?: string // Base64-encoded PDF (wenn kein outputPath)
filename?: string
pageCount?: number
fileSize?: number // In Bytes
error?: string
timestamp: string
duration?: number // Generierungszeit in ms
}
// Priority-Mapping für BullMQ (niedrigere Zahl = höhere Priorität)
const PRIORITY_MAP = {
high: 1,
normal: 5,
low: 10,
}
/**
* Fügt einen PDF-Job zur Queue hinzu
*/
export async function enqueuePdf(data: PdfJobData): Promise<Job<PdfJobData>> {
const queue = getQueue(QUEUE_NAMES.PDF)
// Validierung
if (data.source === 'html' && !data.html) {
throw new Error('HTML content required when source is "html"')
}
if (data.source === 'url' && !data.url) {
throw new Error('URL required when source is "url"')
}
const jobOptions = {
...defaultJobOptions,
priority: PRIORITY_MAP[data.priority || 'normal'],
// PDF-Jobs können länger dauern
attempts: 2,
backoff: {
type: 'fixed' as const,
delay: 5000, // 5s zwischen Retries
},
// Job-ID für Deduplizierung (optional)
jobId: data.correlationId,
}
const job = await queue.add('generate-pdf', data, jobOptions)
console.log(`[PdfQueue] Job ${job.id} queued (source: ${data.source}, type: ${data.documentType || 'other'})`)
return job
}
/**
* Fügt einen verzögerten PDF-Job zur Queue hinzu
*/
export async function enqueueDelayedPdf(
data: PdfJobData,
delayMs: number,
): Promise<Job<PdfJobData>> {
const queue = getQueue(QUEUE_NAMES.PDF)
const job = await queue.add('generate-pdf', data, {
...defaultJobOptions,
priority: PRIORITY_MAP[data.priority || 'normal'],
delay: delayMs,
attempts: 2,
})
console.log(`[PdfQueue] Delayed job ${job.id} queued (delay: ${delayMs}ms)`)
return job
}
/**
* Fügt mehrere PDF-Jobs als Batch zur Queue hinzu
*/
export async function enqueuePdfBatch(
pdfs: PdfJobData[],
): Promise<Job<PdfJobData>[]> {
const queue = getQueue(QUEUE_NAMES.PDF)
const jobs = await queue.addBulk(
pdfs.map((data) => ({
name: 'generate-pdf',
data,
opts: {
...defaultJobOptions,
priority: PRIORITY_MAP[data.priority || 'normal'],
attempts: 2,
},
})),
)
console.log(`[PdfQueue] ${jobs.length} batch jobs queued`)
return jobs
}
/**
* Prüft ob ein Job bereits existiert (für Deduplizierung)
*/
export async function isPdfJobPending(correlationId: string): Promise<boolean> {
const queue = getQueue(QUEUE_NAMES.PDF)
const job = await queue.getJob(correlationId)
if (!job) return false
const state = await job.getState()
return state === 'waiting' || state === 'active' || state === 'delayed'
}
/**
* Holt den Status eines PDF-Jobs
*/
export async function getPdfJobStatus(jobId: string): Promise<{
state: string
progress: number
result?: PdfJobResult
failedReason?: string
} | null> {
const queue = getQueue(QUEUE_NAMES.PDF)
const job = await queue.getJob(jobId)
if (!job) return null
const [state, progress] = await Promise.all([job.getState(), job.progress])
return {
state,
progress: typeof progress === 'number' ? progress : 0,
result: job.returnvalue as PdfJobResult | undefined,
failedReason: job.failedReason,
}
}
/**
* Holt das Ergebnis eines abgeschlossenen PDF-Jobs
*/
export async function getPdfJobResult(jobId: string): Promise<PdfJobResult | null> {
const queue = getQueue(QUEUE_NAMES.PDF)
const job = await queue.getJob(jobId)
if (!job) return null
const state = await job.getState()
if (state !== 'completed') return null
return job.returnvalue as PdfJobResult
}

View file

@ -0,0 +1,202 @@
/**
* BullMQ Queue Service
*
* Zentrale Queue-Verwaltung für Background Jobs.
* Verwendet Redis als Backend (gleiche Instanz wie Caching).
*/
import { Queue, QueueEvents, JobsOptions } from 'bullmq'
import IORedis from 'ioredis'
// Queue-Namen
export const QUEUE_NAMES = {
EMAIL: 'email',
PDF: 'pdf',
CLEANUP: 'cleanup',
} as const
export type QueueName = (typeof QUEUE_NAMES)[keyof typeof QUEUE_NAMES]
// Redis-Konfiguration für Queue (separate DB)
const QUEUE_REDIS_DB = parseInt(process.env.QUEUE_REDIS_DB || '1', 10)
// Gemeinsame Redis-Connection für alle Queues
let redisConnection: IORedis | null = null
/**
* Erstellt oder gibt die existierende Redis-Connection zurück
*/
export function getQueueRedisConnection(): IORedis {
if (!redisConnection) {
const host = process.env.REDIS_HOST || 'localhost'
const port = parseInt(process.env.REDIS_PORT || '6379', 10)
redisConnection = new IORedis({
host,
port,
db: QUEUE_REDIS_DB,
maxRetriesPerRequest: null, // BullMQ requirement
enableReadyCheck: false,
})
redisConnection.on('error', (err) => {
console.error('[Queue] Redis connection error:', err.message)
})
redisConnection.on('connect', () => {
console.log(`[Queue] Redis connected (db: ${QUEUE_REDIS_DB})`)
})
}
return redisConnection
}
// Queue-Instanzen Cache
const queues = new Map<QueueName, Queue>()
const queueEvents = new Map<QueueName, QueueEvents>()
/**
* Standard Job-Optionen
*/
export const defaultJobOptions: JobsOptions = {
attempts: parseInt(process.env.QUEUE_DEFAULT_RETRY || '3', 10),
backoff: {
type: 'exponential',
delay: 1000, // 1s, 2s, 4s, 8s, ...
},
removeOnComplete: {
count: 100, // Behalte letzte 100 erfolgreiche Jobs
age: 24 * 60 * 60, // Oder 24 Stunden
},
removeOnFail: {
count: 500, // Behalte letzte 500 fehlgeschlagene Jobs
age: 7 * 24 * 60 * 60, // Oder 7 Tage
},
}
/**
* Erstellt oder gibt eine existierende Queue zurück
*/
export function getQueue(name: QueueName): Queue {
if (!queues.has(name)) {
const queue = new Queue(name, {
connection: getQueueRedisConnection(),
defaultJobOptions,
})
queues.set(name, queue)
console.log(`[Queue] Queue "${name}" initialized`)
}
return queues.get(name)!
}
/**
* Erstellt oder gibt QueueEvents für eine Queue zurück
*/
export function getQueueEvents(name: QueueName): QueueEvents {
if (!queueEvents.has(name)) {
const events = new QueueEvents(name, {
connection: getQueueRedisConnection(),
})
queueEvents.set(name, events)
}
return queueEvents.get(name)!
}
/**
* Gibt alle aktiven Queues zurück (für bull-board)
*/
export function getAllQueues(): Queue[] {
// Initialisiere alle Queues falls noch nicht geschehen
Object.values(QUEUE_NAMES).forEach((name) => getQueue(name))
return Array.from(queues.values())
}
/**
* Schließt alle Queue-Verbindungen (für graceful shutdown)
*/
export async function closeAllQueues(): Promise<void> {
console.log('[Queue] Closing all queues...')
// Schließe QueueEvents
const eventEntries = Array.from(queueEvents.values())
for (const events of eventEntries) {
await events.close()
}
queueEvents.clear()
// Schließe Queues
const queueEntries = Array.from(queues.values())
for (const queue of queueEntries) {
await queue.close()
}
queues.clear()
// Schließe Redis Connection
if (redisConnection) {
await redisConnection.quit()
redisConnection = null
}
console.log('[Queue] All queues closed')
}
/**
* Prüft ob Queue-System verfügbar ist
*/
export async function isQueueAvailable(): Promise<boolean> {
try {
const connection = getQueueRedisConnection()
const pong = await connection.ping()
return pong === 'PONG'
} catch {
return false
}
}
/**
* Queue-Status für Monitoring
*/
export interface QueueStatus {
name: string
waiting: number
active: number
completed: number
failed: number
delayed: number
paused: boolean
}
/**
* Gibt den Status aller Queues zurück
*/
export async function getQueuesStatus(): Promise<QueueStatus[]> {
const statuses: QueueStatus[] = []
const entries = Array.from(queues.entries())
for (const [name, queue] of entries) {
const [waiting, active, completed, failed, delayed, isPaused] = await Promise.all([
queue.getWaitingCount(),
queue.getActiveCount(),
queue.getCompletedCount(),
queue.getFailedCount(),
queue.getDelayedCount(),
queue.isPaused(),
])
statuses.push({
name,
waiting,
active,
completed,
failed,
delayed,
paused: isPaused,
})
}
return statuses
}

View file

@ -0,0 +1,135 @@
/**
* Email Worker
*
* Verarbeitet E-Mail-Jobs aus der Queue.
* Nutzt den bestehenden TenantEmailService.
*/
import { Worker, Job } from 'bullmq'
import { getPayload } from 'payload'
import config from '@payload-config'
import { QUEUE_NAMES, getQueueRedisConnection } from '../queue-service'
import type { EmailJobData, EmailJobResult } from '../jobs/email-job'
import { sendTenantEmail, type EmailSource } from '../../email/tenant-email-service'
// Worker-Konfiguration
const CONCURRENCY = parseInt(process.env.QUEUE_EMAIL_CONCURRENCY || '3', 10)
/**
* E-Mail Job Processor
*/
async function processEmailJob(job: Job<EmailJobData>): Promise<EmailJobResult> {
const { tenantId, to, subject, html, text, replyTo, source, correlationId } = job.data
console.log(`[EmailWorker] Processing job ${job.id} for tenant ${tenantId}`)
try {
// Payload-Instanz holen (für DB-Zugriff und Logging)
const payload = await getPayload({ config })
// E-Mail über bestehenden Service senden
const result = await sendTenantEmail(payload, tenantId, {
to,
subject,
html,
text,
replyTo,
source: (source as EmailSource) || 'system',
metadata: {
queueJobId: job.id,
correlationId,
attempts: job.attemptsMade + 1,
},
})
if (!result.success) {
throw new Error(result.error || 'Unknown email error')
}
const jobResult: EmailJobResult = {
success: true,
messageId: result.messageId,
timestamp: new Date().toISOString(),
attempts: job.attemptsMade + 1,
}
console.log(`[EmailWorker] Job ${job.id} completed: ${result.messageId}`)
return jobResult
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
console.error(`[EmailWorker] Job ${job.id} failed:`, errorMessage)
// Error werfen damit BullMQ Retry-Logik greift
throw error
}
}
/**
* Email Worker Instanz
*/
let emailWorker: Worker<EmailJobData, EmailJobResult> | null = null
/**
* Startet den Email Worker
*/
export function startEmailWorker(): Worker<EmailJobData, EmailJobResult> {
if (emailWorker) {
console.warn('[EmailWorker] Worker already running')
return emailWorker
}
emailWorker = new Worker<EmailJobData, EmailJobResult>(
QUEUE_NAMES.EMAIL,
processEmailJob,
{
connection: getQueueRedisConnection(),
concurrency: CONCURRENCY,
// Stalled Job Detection
stalledInterval: 30000, // 30s
maxStalledCount: 2,
}
)
// Event Handlers
emailWorker.on('ready', () => {
console.log(`[EmailWorker] Ready (concurrency: ${CONCURRENCY})`)
})
emailWorker.on('completed', (job, result) => {
console.log(`[EmailWorker] Job ${job.id} completed in ${Date.now() - job.timestamp}ms`)
})
emailWorker.on('failed', (job, error) => {
console.error(`[EmailWorker] Job ${job?.id} failed after ${job?.attemptsMade} attempts:`, error.message)
})
emailWorker.on('stalled', (jobId) => {
console.warn(`[EmailWorker] Job ${jobId} stalled`)
})
emailWorker.on('error', (error) => {
console.error('[EmailWorker] Error:', error)
})
return emailWorker
}
/**
* Stoppt den Email Worker
*/
export async function stopEmailWorker(): Promise<void> {
if (emailWorker) {
console.log('[EmailWorker] Stopping...')
await emailWorker.close()
emailWorker = null
console.log('[EmailWorker] Stopped')
}
}
/**
* Gibt die Worker-Instanz zurück (falls aktiv)
*/
export function getEmailWorker(): Worker<EmailJobData, EmailJobResult> | null {
return emailWorker
}

View file

@ -0,0 +1,172 @@
/**
* PDF Worker
*
* Verarbeitet PDF-Generierungs-Jobs aus der Queue.
* Nutzt den PDF-Service für die eigentliche Generierung.
*/
import { Worker, Job } from 'bullmq'
import { QUEUE_NAMES, getQueueRedisConnection } from '../queue-service'
import type { PdfJobData, PdfJobResult } from '../jobs/pdf-job'
import {
generatePdfFromHtml,
generatePdfFromUrl,
generatePdfFilename,
closeBrowser,
} from '../../pdf/pdf-service'
// Worker-Konfiguration
const CONCURRENCY = parseInt(process.env.QUEUE_PDF_CONCURRENCY || '2', 10)
/**
* PDF Job Processor
*/
async function processPdfJob(job: Job<PdfJobData>): Promise<PdfJobResult> {
const {
source,
html,
url,
outputPath,
filename,
options = {},
tenantId,
documentType,
correlationId,
} = job.data
console.log(`[PdfWorker] Processing job ${job.id} for tenant ${tenantId} (source: ${source})`)
const startTime = Date.now()
try {
// Dateiname generieren falls nicht angegeben
const targetFilename = filename || outputPath || generatePdfFilename('document', tenantId, documentType)
let result
if (source === 'html') {
if (!html) {
throw new Error('HTML content is required for source="html"')
}
result = await generatePdfFromHtml(html, options, outputPath ? targetFilename : undefined)
} else if (source === 'url') {
if (!url) {
throw new Error('URL is required for source="url"')
}
result = await generatePdfFromUrl(url, options, outputPath ? targetFilename : undefined)
} else {
throw new Error(`Invalid source: ${source}`)
}
if (!result.success) {
throw new Error(result.error || 'PDF generation failed')
}
const jobResult: PdfJobResult = {
success: true,
outputPath: result.outputPath,
filename: result.filename || targetFilename,
pageCount: result.pageCount,
fileSize: result.fileSize,
timestamp: new Date().toISOString(),
duration: Date.now() - startTime,
}
// Buffer als Base64 zurückgeben wenn kein outputPath
if (!outputPath && result.buffer) {
jobResult.buffer = result.buffer.toString('base64')
}
console.log(`[PdfWorker] Job ${job.id} completed: ${result.fileSize} bytes, ${result.pageCount} pages`)
return jobResult
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
console.error(`[PdfWorker] Job ${job.id} failed:`, errorMessage)
// Error werfen damit BullMQ Retry-Logik greift
throw error
}
}
/**
* PDF Worker Instanz
*/
let pdfWorker: Worker<PdfJobData, PdfJobResult> | null = null
/**
* Startet den PDF Worker
*/
export function startPdfWorker(): Worker<PdfJobData, PdfJobResult> {
if (pdfWorker) {
console.warn('[PdfWorker] Worker already running')
return pdfWorker
}
pdfWorker = new Worker<PdfJobData, PdfJobResult>(
QUEUE_NAMES.PDF,
processPdfJob,
{
connection: getQueueRedisConnection(),
concurrency: CONCURRENCY,
// Stalled Job Detection (PDF-Jobs können länger dauern)
stalledInterval: 60000, // 60s
maxStalledCount: 1,
// Längerer Lock für PDF-Generierung
lockDuration: 120000, // 2 Minuten
},
)
// Event Handlers
pdfWorker.on('ready', () => {
console.log(`[PdfWorker] Ready (concurrency: ${CONCURRENCY})`)
})
pdfWorker.on('completed', (job, result) => {
console.log(
`[PdfWorker] Job ${job.id} completed in ${Date.now() - job.timestamp}ms (${result.fileSize} bytes)`,
)
})
pdfWorker.on('failed', (job, error) => {
console.error(
`[PdfWorker] Job ${job?.id} failed after ${job?.attemptsMade} attempts:`,
error.message,
)
})
pdfWorker.on('stalled', (jobId) => {
console.warn(`[PdfWorker] Job ${jobId} stalled`)
})
pdfWorker.on('error', (error) => {
console.error('[PdfWorker] Error:', error)
})
return pdfWorker
}
/**
* Stoppt den PDF Worker
*/
export async function stopPdfWorker(): Promise<void> {
if (pdfWorker) {
console.log('[PdfWorker] Stopping...')
await pdfWorker.close()
pdfWorker = null
// Browser schließen
await closeBrowser()
console.log('[PdfWorker] Stopped')
}
}
/**
* Gibt die Worker-Instanz zurück (falls aktiv)
*/
export function getPdfWorker(): Worker<PdfJobData, PdfJobResult> | null {
return pdfWorker
}

View file

@ -1,8 +1,23 @@
// src/lib/search.ts // src/lib/search.ts
// Shared search library with caching, rate limiting, and search functions // Shared search library with caching, rate limiting, and search functions
// Supports both ILIKE (default) and PostgreSQL Full-Text Search (FTS)
import type { Payload, Where } from 'payload' import type { Payload, Where } from 'payload'
import type { Post, Category } from '../payload-types' import type { Post, Category } from '../payload-types'
import { sql } from '@payloadcms/db-postgres/drizzle'
// FTS Feature Flag - aktivieren mit USE_FTS=true in .env
const USE_FTS = process.env.USE_FTS === 'true'
// Type für raw SQL Ergebnisse
interface FtsCountRow {
total: string
}
interface FtsIdRow {
id: number
rank: number
}
// ============================================================================ // ============================================================================
// Types // Types
@ -236,13 +251,150 @@ function generateCacheKey(prefix: string, params: Record<string, unknown>): stri
return `${prefix}:${JSON.stringify(sortedEntries)}` return `${prefix}:${JSON.stringify(sortedEntries)}`
} }
// ============================================================================
// Full-Text Search (FTS) Functions
// ============================================================================
/**
* PostgreSQL FTS-Konfiguration nach Sprache
*/
function getFtsConfig(locale: string): string {
return locale === 'en' ? 'english' : 'german'
}
/**
* Bereitet Suchquery für FTS vor
* Konvertiert "foo bar" zu "foo:* & bar:*" für Prefix-Suche
*/
function prepareFtsQuery(query: string): string {
return query
.trim()
.split(/\s+/)
.filter(term => term.length >= 2)
.map(term => `${term}:*`)
.join(' & ')
}
/**
* Escapet einen String für sichere Verwendung in SQL
* Verhindert SQL Injection bei dynamischen Werten
*/
function escapeSqlString(str: string): string {
return str.replace(/'/g, "''")
}
/**
* FTS-basierte Suche mit PostgreSQL to_tsvector
* Verwendet die GIN-Indexes auf posts_locales
*/
async function searchWithFts(
payload: Payload,
params: SearchParams,
): Promise<{ postIds: number[]; totalCount: number }> {
const { query, tenantId, type, locale = 'de', limit = 10, offset = 0 } = params
if (!query || query.length < 2) {
return { postIds: [], totalCount: 0 }
}
const ftsConfig = getFtsConfig(locale)
const ftsQuery = prepareFtsQuery(query)
if (!ftsQuery) {
return { postIds: [], totalCount: 0 }
}
// Guard: Check if payload.db exists (may be missing in test mocks)
if (!payload.db || !payload.db.drizzle) {
console.warn('[FTS Search] payload.db.drizzle not available, returning empty results')
return { postIds: [], totalCount: 0 }
}
// Drizzle-ORM für raw SQL Query
const db = payload.db.drizzle
// Escape Werte für sichere SQL-Queries
const safeFtsQuery = escapeSqlString(ftsQuery)
const safeLocale = escapeSqlString(locale)
// Baue die SQL Query für FTS
const conditions: string[] = []
// FTS-Bedingung (Title + Excerpt)
conditions.push(`to_tsvector('${ftsConfig}', COALESCE(pl.title, '') || ' ' || COALESCE(pl.excerpt, '')) @@ to_tsquery('${ftsConfig}', '${safeFtsQuery}')`)
// Locale-Filter
conditions.push(`pl._locale = '${safeLocale}'`)
// Status-Filter (nur published)
conditions.push(`p.status = 'published'`)
// Tenant-Filter
if (tenantId && Number.isInteger(tenantId)) {
conditions.push(`p.tenant_id = ${tenantId}`)
}
// Type-Filter
if (type) {
const safeType = escapeSqlString(type)
conditions.push(`p.type = '${safeType}'`)
}
const whereClause = conditions.join(' AND ')
// Count Query für Pagination
const countQuery = `
SELECT COUNT(DISTINCT p.id) as total
FROM posts p
INNER JOIN posts_locales pl ON p.id = pl._parent_id
WHERE ${whereClause}
`
// Main Query mit Relevanz-Sortierung
// Note: SELECT DISTINCT requires all ORDER BY columns to be in SELECT list
const mainQuery = `
SELECT DISTINCT p.id,
ts_rank(
to_tsvector('${ftsConfig}', COALESCE(pl.title, '') || ' ' || COALESCE(pl.excerpt, '')),
to_tsquery('${ftsConfig}', '${safeFtsQuery}')
) as rank,
p.published_at
FROM posts p
INNER JOIN posts_locales pl ON p.id = pl._parent_id
WHERE ${whereClause}
ORDER BY rank DESC, p.published_at DESC NULLS LAST
LIMIT ${Math.min(limit, 100)} OFFSET ${Math.max(0, offset)}
`
try {
// Execute queries mit drizzle-orm sql template
const [countResult, mainResult] = await Promise.all([
db.execute(sql.raw(countQuery)),
db.execute(sql.raw(mainQuery)),
])
// Extrahiere die Ergebnisse - node-postgres gibt { rows: [...] } zurück
const countRows = (countResult as unknown as { rows: FtsCountRow[] }).rows || []
const totalCount = countRows.length > 0 ? parseInt(countRows[0].total, 10) : 0
const mainRows = (mainResult as unknown as { rows: FtsIdRow[] }).rows || []
const postIds = mainRows.map((row) => row.id)
return { postIds, totalCount }
} catch (error) {
console.error('[FTS Search] SQL Error:', error)
// Fallback zu leeren Ergebnissen bei Fehler
return { postIds: [], totalCount: 0 }
}
}
// ============================================================================ // ============================================================================
// Search Functions // Search Functions
// ============================================================================ // ============================================================================
/** /**
* Search posts with ILIKE (Phase 1) * Search posts with ILIKE or FTS (when USE_FTS=true)
* Future: Add FTS support with USE_FTS=true * FTS uses PostgreSQL to_tsvector for better performance and relevance
*/ */
export async function searchPosts( export async function searchPosts(
payload: Payload, payload: Payload,
@ -251,31 +403,13 @@ export async function searchPosts(
const { query, tenantId, categorySlug, type, locale = 'de', limit = 10, offset = 0 } = params const { query, tenantId, categorySlug, type, locale = 'de', limit = 10, offset = 0 } = params
// Check cache // Check cache
const cacheKey = generateCacheKey('search', { query, tenantId, categorySlug, type, locale, limit, offset }) const cacheKey = generateCacheKey('search', { query, tenantId, categorySlug, type, locale, limit, offset, fts: USE_FTS })
const cached = searchCache.get(cacheKey) const cached = searchCache.get(cacheKey)
if (cached) { if (cached) {
return cached return cached
} }
// Build where clause // Category lookup (benötigt für beide Methoden)
const whereConditions: Where[] = [{ status: { equals: 'published' } }]
if (tenantId) {
whereConditions.push({ tenant: { equals: tenantId } })
}
if (type) {
whereConditions.push({ type: { equals: type } })
}
// Search in title and excerpt
if (query && query.length >= 2) {
whereConditions.push({
or: [{ title: { contains: query } }, { excerpt: { contains: query } }],
})
}
// Category filter - need to lookup category first if categorySlug provided
let categoryId: number | undefined let categoryId: number | undefined
if (categorySlug) { if (categorySlug) {
const categoryResult = await payload.find({ const categoryResult = await payload.find({
@ -290,27 +424,90 @@ export async function searchPosts(
}) })
if (categoryResult.docs.length > 0) { if (categoryResult.docs.length > 0) {
categoryId = categoryResult.docs[0].id categoryId = categoryResult.docs[0].id
whereConditions.push({ category: { equals: categoryId } })
} }
} }
const where: Where = whereConditions.length > 1 ? { and: whereConditions } : whereConditions[0] let result: Awaited<ReturnType<typeof payload.find>>
let totalDocs: number
// Execute search // Verwende FTS wenn aktiviert und Query vorhanden
const result = await payload.find({ if (USE_FTS && query && query.length >= 2) {
collection: 'posts', // FTS-basierte Suche
where, const ftsResult = await searchWithFts(payload, { ...params, categorySlug: undefined })
locale: locale as 'de' | 'en',
fallbackLocale: 'de',
limit,
page: Math.floor(offset / limit) + 1,
sort: '-publishedAt',
depth: 1, // Include category relation
})
// Transform results if (ftsResult.postIds.length === 0) {
// Keine FTS-Ergebnisse
result = { docs: [], totalDocs: 0, page: 1, totalPages: 0, hasNextPage: false, hasPrevPage: false, limit, pagingCounter: 1 }
totalDocs = 0
} else {
// Lade Posts nach FTS-IDs (mit Category-Filter wenn nötig)
const whereConditions: Where[] = [
{ id: { in: ftsResult.postIds } },
]
if (categoryId) {
whereConditions.push({ category: { equals: categoryId } })
}
result = await payload.find({
collection: 'posts',
where: whereConditions.length > 1 ? { and: whereConditions } : whereConditions[0],
locale: locale as 'de' | 'en',
fallbackLocale: 'de',
limit: ftsResult.postIds.length, // Alle gefundenen IDs laden
sort: '-publishedAt',
depth: 1,
})
// Sortiere nach FTS-Reihenfolge (Relevanz)
const orderedDocs = ftsResult.postIds
.map(id => result.docs.find(doc => doc.id === id))
.filter((doc): doc is Post => doc !== undefined)
result = { ...result, docs: orderedDocs }
totalDocs = categoryId ? result.totalDocs : ftsResult.totalCount
}
} else {
// Standard ILIKE-basierte Suche
const whereConditions: Where[] = [{ status: { equals: 'published' } }]
if (tenantId) {
whereConditions.push({ tenant: { equals: tenantId } })
}
if (type) {
whereConditions.push({ type: { equals: type } })
}
// Search in title and excerpt mit ILIKE
if (query && query.length >= 2) {
whereConditions.push({
or: [{ title: { contains: query } }, { excerpt: { contains: query } }],
})
}
if (categoryId) {
whereConditions.push({ category: { equals: categoryId } })
}
const where: Where = whereConditions.length > 1 ? { and: whereConditions } : whereConditions[0]
result = await payload.find({
collection: 'posts',
where,
locale: locale as 'de' | 'en',
fallbackLocale: 'de',
limit,
page: Math.floor(offset / limit) + 1,
sort: '-publishedAt',
depth: 1,
})
totalDocs = result.totalDocs
}
// Transform results - Cast docs to Post[] for proper typing
const posts = result.docs as Post[]
const searchResult: SearchResult = { const searchResult: SearchResult = {
results: result.docs.map((post) => ({ results: posts.map((post) => ({
id: post.id, id: post.id,
title: post.title, title: post.title,
slug: post.slug, slug: post.slug,
@ -323,7 +520,7 @@ export async function searchPosts(
.map((cat) => ({ name: cat.name, slug: cat.slug })) .map((cat) => ({ name: cat.name, slug: cat.slug }))
: [], : [],
})), })),
total: result.totalDocs, total: totalDocs,
query, query,
filters: { filters: {
category: categorySlug, category: categorySlug,