dak.c2s/backend/app/api/reports.py
CCS Admin 6f6a721973 feat: add Wochenübersicht export + ICD import auto-detect
- New Excel export service for weekly DAK summary sheets (c2s / c2s_g_s variants)
- New API endpoint GET /reports/wochenuebersicht (admin-only)
- ICD import auto-detects format (coding template vs. Wochenübersicht KVNR-based)
- New admin frontend page with download form
- Route + sidebar navigation entry

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-27 13:28:17 +00:00

377 lines
12 KiB
Python

"""Reports API — dashboard KPIs, weekly data, report generation, and download."""
import logging
import os
from collections import defaultdict
from datetime import date
from io import BytesIO
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
from fastapi.responses import StreamingResponse
from sqlalchemy.orm import Session
from app.core.dependencies import get_current_user, require_admin
from app.database import get_db
from app.models.report import WeeklyReport
from app.models.user import User
from app.schemas.report import (
DashboardResponse,
ReportListResponse,
ReportMeta,
)
from app.services.audit_service import log_action
logger = logging.getLogger(__name__)
router = APIRouter()
@router.get("/dashboard", response_model=DashboardResponse)
def dashboard(
jahr: int | None = Query(None),
db: Session = Depends(get_db),
user: User = Depends(get_current_user),
):
"""Return dashboard KPIs and weekly time-series for the given year.
Defaults to the current ISO year if *jahr* is not provided.
Accessible to both admin and dak_mitarbeiter users.
"""
if not jahr:
from app.utils.kw_utils import date_to_jahr
jahr = date_to_jahr(date.today())
try:
from app.services.report_service import (
calculate_dashboard_kpis,
calculate_sheet1_data,
)
kpis = calculate_dashboard_kpis(db, jahr)
sheet1 = calculate_sheet1_data(db, jahr)
return DashboardResponse(kpis=kpis, weekly=sheet1.get("weekly", []))
except ImportError:
# report_service not yet implemented (parallel task)
raise HTTPException(501, "Report service not yet available")
@router.get("/weekly/{jahr}/{kw}")
def weekly_report(
jahr: int,
kw: int,
db: Session = Depends(get_db),
user: User = Depends(get_current_user),
):
"""Return aggregated data for a single calendar week.
Accessible to both admin and dak_mitarbeiter users.
"""
try:
from app.services.report_service import calculate_sheet1_data
data = calculate_sheet1_data(db, jahr)
weekly = [w for w in data.get("weekly", []) if w.get("kw") == kw]
return weekly[0] if weekly else {"kw": kw, "erstberatungen": 0}
except ImportError:
raise HTTPException(501, "Report service not yet available")
@router.post("/generate", response_model=ReportMeta)
def generate_report(
jahr: int | None = Query(None),
kw: int | None = Query(None),
report_type: str = Query("gesamt"),
db: Session = Depends(get_db),
user: User = Depends(require_admin),
):
"""Generate a full Berichtswesen Excel report and persist it to disk + DB.
Admin only. Defaults to the current ISO year/week if not specified.
Accepts *report_type* to generate filtered reports (``onko_intensiv``,
``galle_schild``, or the default ``gesamt``).
"""
from app.services.report_service import REPORT_TYPES, REPORT_TYPE_LABELS
if report_type not in REPORT_TYPES:
raise HTTPException(422, f"Unknown report_type: {report_type}")
if not jahr:
from app.utils.kw_utils import date_to_jahr, date_to_kw
today = date.today()
jahr = date_to_jahr(today)
kw = kw or date_to_kw(today)
if not kw:
from app.utils.kw_utils import date_to_kw
kw = date_to_kw(date.today())
try:
from app.services.excel_export import generate_berichtswesen_xlsx
from app.services.report_service import generate_full_report
from app.services.vorjahr_service import get_vorjahr_summary
fallgruppen = list(REPORT_TYPES[report_type])
report_data = generate_full_report(db, jahr, kw, report_type=report_type)
# Vorjahr data: use cached summary for gesamt, live calculation for filtered
vorjahr = get_vorjahr_summary(db, jahr) if report_type == "gesamt" else None
xlsx_bytes = generate_berichtswesen_xlsx(
report_data, jahr, vorjahr, fallgruppen=fallgruppen,
)
# Persist Excel file to disk
reports_dir = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
"reports",
)
os.makedirs(reports_dir, exist_ok=True)
type_label = REPORT_TYPE_LABELS.get(report_type, report_type)
if report_type != "gesamt":
filename = f"Berichtswesen_{type_label}_{jahr}_KW{kw:02d}.xlsx"
else:
filename = f"Berichtswesen_{jahr}_KW{kw:02d}.xlsx"
filepath = os.path.join(reports_dir, filename)
with open(filepath, "wb") as f:
f.write(xlsx_bytes)
# Upsert report metadata (replace if same jahr/kw/report_type exists)
report = (
db.query(WeeklyReport)
.filter(
WeeklyReport.jahr == jahr,
WeeklyReport.kw == kw,
WeeklyReport.report_type == report_type,
)
.first()
)
if report:
report.report_date = date.today()
report.report_data = report_data
report.report_file_path = filepath
report.generated_by = user.id
else:
report = WeeklyReport(
jahr=jahr,
kw=kw,
report_type=report_type,
report_date=date.today(),
report_data=report_data,
generated_by=user.id,
)
report.report_file_path = filepath
db.add(report)
db.commit()
db.refresh(report)
log_action(
db,
user_id=user.id,
action="report_generated",
entity_type="report",
entity_id=report.id,
new_values={"jahr": jahr, "kw": kw, "report_type": report_type, "filename": filename},
)
return ReportMeta.model_validate(report)
except ImportError as exc:
raise HTTPException(501, f"Required service not yet available: {exc}")
@router.get("/download/{report_id}")
def download_report(
report_id: int,
request: Request,
token: str | None = Query(None),
db: Session = Depends(get_db),
):
"""Download a previously generated Berichtswesen Excel file.
Supports both ``Authorization: Bearer`` header and ``?token=`` query
parameter so the browser can open the URL directly in a new tab.
"""
from app.core.security import decode_access_token
from jose import JWTError
# Resolve token from header or query param
raw_token = token
if not raw_token:
auth = request.headers.get("authorization", "")
if auth.lower().startswith("bearer "):
raw_token = auth[7:]
if not raw_token:
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Not authenticated")
try:
payload = decode_access_token(raw_token)
user_id = int(payload["sub"])
except (JWTError, KeyError, ValueError):
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid token")
user = db.query(User).filter(User.id == user_id, User.is_active == True).first() # noqa: E712
if not user:
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "User not found")
report = db.query(WeeklyReport).filter(WeeklyReport.id == report_id).first()
if not report or not report.report_file_path:
raise HTTPException(404, "Report not found")
if not os.path.exists(report.report_file_path):
raise HTTPException(404, "Report file not found on disk")
with open(report.report_file_path, "rb") as f:
content = f.read()
filename = os.path.basename(report.report_file_path)
return StreamingResponse(
BytesIO(content),
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
@router.get("/{report_id}/data")
def get_report_data(
report_id: int,
db: Session = Depends(get_db),
user: User = Depends(get_current_user),
):
"""Return the stored report_data JSON for a given report.
Accessible to both admin and dak_mitarbeiter users.
"""
report = db.query(WeeklyReport).filter(WeeklyReport.id == report_id).first()
if not report:
raise HTTPException(status.HTTP_404_NOT_FOUND, "Report not found")
if not report.report_data:
raise HTTPException(status.HTTP_404_NOT_FOUND, "Report data not available")
return report.report_data
@router.delete("/delete")
def delete_reports(
ids: list[int],
db: Session = Depends(get_db),
user: User = Depends(require_admin),
):
"""Delete one or more reports by ID (admin only).
Removes both the database record and the file on disk.
"""
deleted = 0
for report_id in ids:
report = db.query(WeeklyReport).filter(WeeklyReport.id == report_id).first()
if not report:
continue
if report.report_file_path and os.path.exists(report.report_file_path):
os.remove(report.report_file_path)
db.delete(report)
deleted += 1
if deleted:
db.commit()
log_action(
db,
user_id=user.id,
action="reports_deleted",
entity_type="report",
new_values={"ids": ids, "deleted": deleted},
)
return {"deleted": deleted}
@router.get("/wochenuebersicht")
def download_wochenuebersicht(
export_type: str = Query(...),
jahr: int = Query(...),
kw_von: int = Query(...),
kw_bis: int = Query(...),
db: Session = Depends(get_db),
user: User = Depends(require_admin),
):
"""Generate and download a Wochenübersicht Excel file for DAK.
Admin only. Returns an .xlsx file with weekly case summaries grouped by KW.
"""
from app.config import get_settings
from app.models.case import Case
from app.services.wochenuebersicht_export import (
WOCHENUEBERSICHT_TYPES,
generate_wochenuebersicht_xlsx,
)
settings = get_settings()
if export_type not in WOCHENUEBERSICHT_TYPES:
raise HTTPException(422, f"Unknown export_type: {export_type}")
if not (1 <= kw_von <= 53 and 1 <= kw_bis <= 53):
raise HTTPException(422, "kw_von and kw_bis must be between 1 and 53")
if kw_von > kw_bis:
raise HTTPException(422, "kw_von must be <= kw_bis")
type_cfg = WOCHENUEBERSICHT_TYPES[export_type]
fallgruppen = type_cfg["fallgruppen"]
cases = (
db.query(Case)
.filter(
Case.jahr == jahr,
Case.kw >= kw_von,
Case.kw <= kw_bis,
Case.fallgruppe.in_(fallgruppen),
Case.versicherung == settings.VERSICHERUNG_FILTER,
)
.order_by(Case.kw, Case.datum)
.all()
)
cases_by_kw: dict[int, list[Case]] = defaultdict(list)
for case in cases:
cases_by_kw[case.kw].append(case)
xlsx_bytes = generate_wochenuebersicht_xlsx(cases_by_kw, export_type, jahr)
infix = type_cfg["filename_infix"]
filename = f"Wochenübersicht {infix} KW{kw_bis:02d}{jahr % 100:02d}.xlsx"
log_action(
db,
user_id=user.id,
action="wochenuebersicht_downloaded",
entity_type="report",
new_values={
"export_type": export_type,
"jahr": jahr,
"kw_von": kw_von,
"kw_bis": kw_bis,
"case_count": len(cases),
},
)
return StreamingResponse(
BytesIO(xlsx_bytes),
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
@router.get("/list", response_model=ReportListResponse)
def list_reports(
db: Session = Depends(get_db),
user: User = Depends(get_current_user),
):
"""List all generated reports, newest first.
Accessible to both admin and dak_mitarbeiter users.
"""
reports = (
db.query(WeeklyReport).order_by(WeeklyReport.generated_at.desc()).all()
)
return ReportListResponse(
items=[ReportMeta.model_validate(r) for r in reports],
total=len(reports),
)