dak.c2s/backend/app/api/reports.py
CCS Admin 42fd1808c4 feat: add admin-only billing status donut and top gutachter charts to Dashboard
Extends DashboardKPIs with total_abgerechnet/pending_abrechnung. Adds
new GET /reports/dashboard/top-gutachter endpoint (admin-only). Frontend
shows Abrechnungsstatus donut + Gutachter-Verteilung progress bars in a
new third row, visible only to admins.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-28 15:55:30 +00:00

600 lines
19 KiB
Python

"""Reports API — dashboard KPIs, weekly data, report generation, and download."""
import logging
import os
from collections import defaultdict
from datetime import date
from io import BytesIO
from fastapi import APIRouter, Depends, File, HTTPException, Query, Request, UploadFile, status
from fastapi.responses import StreamingResponse
from sqlalchemy.orm import Session
from app.core.dependencies import get_current_user, require_admin
from app.database import get_db
from app.models.report import WeeklyReport
from app.models.user import User
from app.schemas.report import (
DashboardResponse,
ReportListResponse,
ReportMeta,
)
from app.config import get_settings
from app.services.audit_service import log_action
logger = logging.getLogger(__name__)
router = APIRouter()
@router.get("/dashboard", response_model=DashboardResponse)
def dashboard(
jahr: int | None = Query(None),
db: Session = Depends(get_db),
user: User = Depends(get_current_user),
):
"""Return dashboard KPIs and weekly time-series for the given year.
Defaults to the current ISO year if *jahr* is not provided.
Accessible to both admin and dak_mitarbeiter users.
"""
if not jahr:
from app.utils.kw_utils import date_to_jahr
jahr = date_to_jahr(date.today())
try:
from sqlalchemy import func as sa_func
from app.models.case import Case
from app.services.report_service import (
calculate_dashboard_kpis,
calculate_sheet1_data,
)
# Determine the highest KW with data in the selected year so
# the previous-year comparison covers the same calendar-week range.
max_kw = (
db.query(sa_func.max(Case.kw))
.filter(
Case.versicherung == get_settings().VERSICHERUNG_FILTER,
Case.jahr == jahr,
)
.scalar()
)
kpis = calculate_dashboard_kpis(db, jahr)
prev_kpis = calculate_dashboard_kpis(db, jahr - 1, max_kw=max_kw)
sheet1 = calculate_sheet1_data(db, jahr)
return DashboardResponse(kpis=kpis, prev_kpis=prev_kpis, weekly=sheet1.get("weekly", []))
except ImportError:
# report_service not yet implemented (parallel task)
raise HTTPException(501, "Report service not yet available")
@router.get("/dashboard/yearly-comparison")
def yearly_comparison(
db: Session = Depends(get_db),
user: User = Depends(get_current_user),
):
"""Return per-KW case counts for each year from 2022 onwards.
Used for the multi-year comparison bar chart on the dashboard.
Accessible to both admin and dak_mitarbeiter users.
"""
from app.services.report_service import calculate_yearly_kw_comparison
data = calculate_yearly_kw_comparison(db)
return {"data": data}
@router.get("/dashboard/top-icd")
def top_icd(
jahr: int | None = Query(None),
db: Session = Depends(get_db),
user: User = Depends(get_current_user),
):
"""Return the top 10 most common ICD codes for the given year.
Defaults to the current ISO year if *jahr* is not provided.
Accessible to both admin and dak_mitarbeiter users.
"""
if not jahr:
from app.utils.kw_utils import date_to_jahr
jahr = date_to_jahr(date.today())
from app.services.report_service import calculate_top_icd
items = calculate_top_icd(db, jahr)
return {"items": items}
@router.get("/dashboard/top-gutachter")
def top_gutachter(
jahr: int | None = Query(None),
db: Session = Depends(get_db),
user: User = Depends(require_admin),
):
"""Return the top 10 Gutachter by case count for the given year.
Admin only. Defaults to the current ISO year if *jahr* is not provided.
"""
if not jahr:
from app.utils.kw_utils import date_to_jahr
jahr = date_to_jahr(date.today())
from app.services.report_service import calculate_top_gutachter
items = calculate_top_gutachter(db, jahr)
return {"items": items}
@router.get("/gutachten-statistik")
def gutachten_statistik(
jahr: int | None = Query(None),
db: Session = Depends(get_db),
user: User = Depends(get_current_user),
):
"""Return combined gutachten statistics (KPIs + weekly breakdowns).
Defaults to the current ISO year if *jahr* is not provided.
Accessible to both admin and dak_mitarbeiter users.
"""
if not jahr:
from app.utils.kw_utils import date_to_jahr
jahr = date_to_jahr(date.today())
from app.services.report_service import calculate_gutachten_statistik
return calculate_gutachten_statistik(db, jahr)
@router.get("/weekly/{jahr}/{kw}")
def weekly_report(
jahr: int,
kw: int,
db: Session = Depends(get_db),
user: User = Depends(get_current_user),
):
"""Return aggregated data for a single calendar week.
Accessible to both admin and dak_mitarbeiter users.
"""
try:
from app.services.report_service import calculate_sheet1_data
data = calculate_sheet1_data(db, jahr)
weekly = [w for w in data.get("weekly", []) if w.get("kw") == kw]
return weekly[0] if weekly else {"kw": kw, "erstberatungen": 0}
except ImportError:
raise HTTPException(501, "Report service not yet available")
@router.post("/generate", response_model=ReportMeta)
def generate_report(
jahr: int | None = Query(None),
kw: int | None = Query(None),
report_type: str = Query("gesamt"),
db: Session = Depends(get_db),
user: User = Depends(require_admin),
):
"""Generate a full Berichtswesen Excel report and persist it to disk + DB.
Admin only. Defaults to the current ISO year/week if not specified.
Accepts *report_type* to generate filtered reports (``onko_intensiv``,
``galle_schild``, or the default ``gesamt``).
"""
from app.services.report_service import REPORT_TYPES, REPORT_TYPE_LABELS
if report_type not in REPORT_TYPES:
raise HTTPException(422, f"Unknown report_type: {report_type}")
if not jahr:
from app.utils.kw_utils import date_to_jahr, date_to_kw
today = date.today()
jahr = date_to_jahr(today)
kw = kw or date_to_kw(today)
if not kw:
from app.utils.kw_utils import date_to_kw
kw = date_to_kw(date.today())
try:
from app.services.excel_export import generate_berichtswesen_xlsx
from app.services.report_service import generate_full_report
from app.services.vorjahr_service import get_vorjahr_summary
fallgruppen = list(REPORT_TYPES[report_type])
report_data = generate_full_report(db, jahr, kw, report_type=report_type)
# Vorjahr data: use cached summary for gesamt, live calculation for filtered
vorjahr = get_vorjahr_summary(db, jahr) if report_type == "gesamt" else None
xlsx_bytes = generate_berichtswesen_xlsx(
report_data, jahr, vorjahr, fallgruppen=fallgruppen,
)
# Persist Excel file to disk
reports_dir = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
"reports",
)
os.makedirs(reports_dir, exist_ok=True)
type_label = REPORT_TYPE_LABELS.get(report_type, report_type)
if report_type != "gesamt":
filename = f"Berichtswesen_{type_label}_{jahr}_KW{kw:02d}.xlsx"
else:
filename = f"Berichtswesen_{jahr}_KW{kw:02d}.xlsx"
filepath = os.path.join(reports_dir, filename)
with open(filepath, "wb") as f:
f.write(xlsx_bytes)
# Upsert report metadata (replace if same jahr/kw/report_type exists)
report = (
db.query(WeeklyReport)
.filter(
WeeklyReport.jahr == jahr,
WeeklyReport.kw == kw,
WeeklyReport.report_type == report_type,
)
.first()
)
if report:
report.report_date = date.today()
report.report_data = report_data
report.report_file_path = filepath
report.generated_by = user.id
else:
report = WeeklyReport(
jahr=jahr,
kw=kw,
report_type=report_type,
report_date=date.today(),
report_data=report_data,
generated_by=user.id,
)
report.report_file_path = filepath
db.add(report)
db.commit()
db.refresh(report)
log_action(
db,
user_id=user.id,
action="report_generated",
entity_type="report",
entity_id=report.id,
new_values={"jahr": jahr, "kw": kw, "report_type": report_type, "filename": filename},
)
return ReportMeta.model_validate(report)
except ImportError as exc:
raise HTTPException(501, f"Required service not yet available: {exc}")
@router.get("/download/{report_id}")
def download_report(
report_id: int,
request: Request,
token: str | None = Query(None),
db: Session = Depends(get_db),
):
"""Download a previously generated Berichtswesen Excel file.
Supports both ``Authorization: Bearer`` header and ``?token=`` query
parameter so the browser can open the URL directly in a new tab.
"""
from app.core.security import decode_access_token
from jose import JWTError
# Resolve token from header or query param
raw_token = token
if not raw_token:
auth = request.headers.get("authorization", "")
if auth.lower().startswith("bearer "):
raw_token = auth[7:]
if not raw_token:
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Not authenticated")
try:
payload = decode_access_token(raw_token)
user_id = int(payload["sub"])
except (JWTError, KeyError, ValueError):
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid token")
user = db.query(User).filter(User.id == user_id, User.is_active == True).first() # noqa: E712
if not user:
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "User not found")
report = db.query(WeeklyReport).filter(WeeklyReport.id == report_id).first()
if not report or not report.report_file_path:
raise HTTPException(404, "Report not found")
if not os.path.exists(report.report_file_path):
raise HTTPException(404, "Report file not found on disk")
with open(report.report_file_path, "rb") as f:
content = f.read()
filename = os.path.basename(report.report_file_path)
return StreamingResponse(
BytesIO(content),
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
@router.get("/{report_id}/data")
def get_report_data(
report_id: int,
db: Session = Depends(get_db),
user: User = Depends(get_current_user),
):
"""Return the stored report_data JSON for a given report.
Accessible to both admin and dak_mitarbeiter users.
"""
report = db.query(WeeklyReport).filter(WeeklyReport.id == report_id).first()
if not report:
raise HTTPException(status.HTTP_404_NOT_FOUND, "Report not found")
if not report.report_data:
raise HTTPException(status.HTTP_404_NOT_FOUND, "Report data not available")
return report.report_data
@router.delete("/delete")
def delete_reports(
ids: list[int],
db: Session = Depends(get_db),
user: User = Depends(require_admin),
):
"""Delete one or more reports by ID (admin only).
Removes both the database record and the file on disk.
"""
deleted = 0
for report_id in ids:
report = db.query(WeeklyReport).filter(WeeklyReport.id == report_id).first()
if not report:
continue
if report.report_file_path and os.path.exists(report.report_file_path):
os.remove(report.report_file_path)
db.delete(report)
deleted += 1
if deleted:
db.commit()
log_action(
db,
user_id=user.id,
action="reports_deleted",
entity_type="report",
new_values={"ids": ids, "deleted": deleted},
)
return {"deleted": deleted}
@router.post("/wochenuebersicht/generate", response_model=ReportMeta)
def generate_wochenuebersicht(
export_type: str = Query(...),
jahr: int = Query(...),
kw_von: int = Query(...),
kw_bis: int = Query(...),
db: Session = Depends(get_db),
user: User = Depends(require_admin),
):
"""Generate a Wochenübersicht Excel file and persist it to disk + DB.
Admin only. Creates a WeeklyReport entry so DAK-Mitarbeiter can download it later.
"""
from app.config import get_settings
from app.models.case import Case
from app.services.wochenuebersicht_export import (
WOCHENUEBERSICHT_TYPES,
generate_wochenuebersicht_xlsx,
)
settings = get_settings()
if export_type not in WOCHENUEBERSICHT_TYPES:
raise HTTPException(422, f"Unknown export_type: {export_type}")
if not (1 <= kw_von <= 53 and 1 <= kw_bis <= 53):
raise HTTPException(422, "kw_von and kw_bis must be between 1 and 53")
if kw_von > kw_bis:
raise HTTPException(422, "kw_von must be <= kw_bis")
type_cfg = WOCHENUEBERSICHT_TYPES[export_type]
fallgruppen = type_cfg["fallgruppen"]
cases = (
db.query(Case)
.filter(
Case.jahr == jahr,
Case.kw >= kw_von,
Case.kw <= kw_bis,
Case.fallgruppe.in_(fallgruppen),
Case.versicherung == settings.VERSICHERUNG_FILTER,
)
.order_by(Case.kw, Case.datum)
.all()
)
cases_by_kw: dict[int, list[Case]] = defaultdict(list)
for case in cases:
cases_by_kw[case.kw].append(case)
xlsx_bytes = generate_wochenuebersicht_xlsx(cases_by_kw, export_type, jahr)
# Persist Excel file to disk
reports_dir = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
"reports",
)
os.makedirs(reports_dir, exist_ok=True)
infix = type_cfg["filename_infix"]
filename = f"Wochenübersicht {infix} KW{kw_von:02d}-{kw_bis:02d}_{jahr}.xlsx"
filepath = os.path.join(reports_dir, filename)
with open(filepath, "wb") as f:
f.write(xlsx_bytes)
# report_type encodes both the wochenuebersicht prefix and the export variant
report_type = f"wochenuebersicht_{export_type}"
# Upsert: replace if same jahr/kw_bis/report_type exists
report = (
db.query(WeeklyReport)
.filter(
WeeklyReport.jahr == jahr,
WeeklyReport.kw == kw_bis,
WeeklyReport.report_type == report_type,
)
.first()
)
# Build per-KW case data for frontend preview (mirrors Excel content)
fg1, fg2 = fallgruppen
fg1_label, fg2_label = type_cfg["fg_labels"]
weeks_data = []
for kw in sorted(cases_by_kw.keys(), reverse=True):
kw_cases = sorted(cases_by_kw[kw], key=lambda c: (c.datum or date.min))
rows = []
for c in kw_cases:
rows.append({
"kvnr": c.kvnr or "",
"datum": c.datum.strftime("%d.%m.%Y") if c.datum else "",
"erstgespraech": True,
"abbruch": bool(c.ablehnung or c.abbruch),
"unterlagen": bool(c.unterlagen and not c.ablehnung and not c.abbruch),
"gutachten": bool(c.gutachten),
"fg1": c.fallgruppe == fg1,
"fg2": c.fallgruppe == fg2,
"icd": c.icd or "",
})
weeks_data.append({"kw": kw, "cases": rows})
report_data = {
"export_type": export_type,
"kw_von": kw_von,
"kw_bis": kw_bis,
"case_count": len(cases),
"fg1_label": fg1_label,
"fg2_label": fg2_label,
"weeks": weeks_data,
}
if report:
report.report_date = date.today()
report.report_data = report_data
report.report_file_path = filepath
report.generated_by = user.id
else:
report = WeeklyReport(
jahr=jahr,
kw=kw_bis,
report_type=report_type,
report_date=date.today(),
report_data=report_data,
generated_by=user.id,
)
report.report_file_path = filepath
db.add(report)
db.commit()
db.refresh(report)
log_action(
db,
user_id=user.id,
action="wochenuebersicht_generated",
entity_type="report",
entity_id=report.id,
new_values={
"export_type": export_type,
"jahr": jahr,
"kw_von": kw_von,
"kw_bis": kw_bis,
"case_count": len(cases),
"filename": filename,
},
)
return ReportMeta.model_validate(report)
@router.post("/wochenuebersicht/upload-icd")
async def upload_wochenuebersicht_icd(
file: UploadFile = File(...),
db: Session = Depends(get_db),
user: User = Depends(get_current_user),
):
"""Upload a filled-in Wochenübersicht Excel with ICD codes.
Accessible to both admin and dak_mitarbeiter users.
Auto-detects the format and imports ICD codes by KVNR matching.
"""
from app.config import get_settings
from app.services.icd_service import import_icd_from_xlsx
settings = get_settings()
if not file.filename or not file.filename.lower().endswith((".xlsx", ".xls")):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Datei muss eine Excel-Datei (.xlsx) sein",
)
content = await file.read()
if len(content) > settings.MAX_UPLOAD_SIZE:
raise HTTPException(
status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
detail=f"Datei überschreitet die maximale Größe von {settings.MAX_UPLOAD_SIZE // (1024 * 1024)}MB",
)
try:
result = import_icd_from_xlsx(db, content, user.id)
except Exception as exc:
logger.exception("Wochenübersicht ICD import failed for %s", file.filename)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"ICD-Import fehlgeschlagen: {exc}",
)
log_action(
db,
user_id=user.id,
action="wochenuebersicht_icd_uploaded",
entity_type="import",
new_values={
"filename": file.filename,
"updated": result["updated"],
"errors": len(result["errors"]),
},
)
return result
@router.get("/list", response_model=ReportListResponse)
def list_reports(
report_type_prefix: str | None = Query(None),
db: Session = Depends(get_db),
user: User = Depends(get_current_user),
):
"""List generated reports, newest first.
Accessible to both admin and dak_mitarbeiter users.
Optional *report_type_prefix* filter (e.g. ``wochenuebersicht`` to show
only Wochenübersicht exports, or omit to show all).
"""
query = db.query(WeeklyReport)
if report_type_prefix:
query = query.filter(WeeklyReport.report_type.startswith(report_type_prefix))
reports = query.order_by(WeeklyReport.generated_at.desc()).all()
return ReportListResponse(
items=[ReportMeta.model_validate(r) for r in reports],
total=len(reports),
)