feat: coding queue, reports API, Excel sync

- Add coding_service.py with queue retrieval, single + batch coding updates
- Add report schemas (DashboardKPIs, WeeklyDataPoint, ReportMeta)
- Add coding API router with /queue, PUT /{case_id}, POST /batch endpoints
- Add reports API router with /dashboard, /weekly, /generate, /download, /list
- Add excel_sync.py for bidirectional Abrechnung DB<->XLSX sync
- Register coding and reports routers in main.py

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
CCS Admin 2026-02-24 08:03:17 +00:00
parent 72b6f784fc
commit d6fb04d5a7
6 changed files with 625 additions and 0 deletions

78
backend/app/api/coding.py Normal file
View file

@ -0,0 +1,78 @@
"""Coding queue API — dedicated endpoints for Gutachten classification workflow."""
from fastapi import APIRouter, Depends, Query
from sqlalchemy.orm import Session
from app.core.dependencies import require_admin
from app.database import get_db
from app.models.user import User
from app.schemas.case import CaseListResponse, CaseResponse, CodingUpdate
from app.services.coding_service import (
batch_update_coding,
get_coding_queue,
update_coding,
)
router = APIRouter()
@router.get("/queue", response_model=CaseListResponse)
def coding_queue(
fallgruppe: str | None = Query(None),
page: int = Query(1, ge=1),
per_page: int = Query(50, ge=1, le=200),
db: Session = Depends(get_db),
user: User = Depends(require_admin),
):
"""Return cases that need coding (gutachten=True, gutachten_typ=NULL).
Admin only. Supports optional fallgruppe filter and pagination.
"""
cases, total = get_coding_queue(db, fallgruppe, page, per_page)
return CaseListResponse(
items=[CaseResponse.model_validate(c) for c in cases],
total=total,
page=page,
per_page=per_page,
)
@router.put("/{case_id}", response_model=CaseResponse)
def update_case_coding(
case_id: int,
data: CodingUpdate,
db: Session = Depends(get_db),
user: User = Depends(require_admin),
):
"""Set Gutachten classification and therapy-change coding for a single case.
Admin only. Validates gutachten_typ and therapieaenderung values.
"""
case = update_coding(
db,
case_id,
data.gutachten_typ,
data.therapieaenderung,
data.ta_diagnosekorrektur,
data.ta_unterversorgung,
data.ta_uebertherapie,
user_id=user.id,
)
return CaseResponse.model_validate(case)
@router.post("/batch")
def batch_coding(
updates: list[dict],
db: Session = Depends(get_db),
user: User = Depends(require_admin),
):
"""Batch update coding for multiple cases at once.
Admin only. Accepts a list of dicts, each containing at minimum:
``case_id``, ``gutachten_typ``, ``therapieaenderung``.
Returns a summary with ``updated`` count and ``errors`` list.
"""
result = batch_update_coding(db, updates, user_id=user.id)
return result

184
backend/app/api/reports.py Normal file
View file

@ -0,0 +1,184 @@
"""Reports API — dashboard KPIs, weekly data, report generation, and download."""
import logging
import os
from datetime import date
from io import BytesIO
from fastapi import APIRouter, Depends, HTTPException, Query
from fastapi.responses import StreamingResponse
from sqlalchemy.orm import Session
from app.core.dependencies import get_current_user, require_admin
from app.database import get_db
from app.models.report import WeeklyReport
from app.models.user import User
from app.schemas.report import (
DashboardResponse,
ReportListResponse,
ReportMeta,
)
logger = logging.getLogger(__name__)
router = APIRouter()
@router.get("/dashboard", response_model=DashboardResponse)
def dashboard(
jahr: int | None = Query(None),
db: Session = Depends(get_db),
user: User = Depends(get_current_user),
):
"""Return dashboard KPIs and weekly time-series for the given year.
Defaults to the current ISO year if *jahr* is not provided.
Accessible to both admin and dak_mitarbeiter users.
"""
if not jahr:
from app.utils.kw_utils import date_to_jahr
jahr = date_to_jahr(date.today())
try:
from app.services.report_service import (
calculate_dashboard_kpis,
calculate_sheet1_data,
)
kpis = calculate_dashboard_kpis(db, jahr)
sheet1 = calculate_sheet1_data(db, jahr)
return DashboardResponse(kpis=kpis, weekly=sheet1.get("weekly", []))
except ImportError:
# report_service not yet implemented (parallel task)
raise HTTPException(501, "Report service not yet available")
@router.get("/weekly/{jahr}/{kw}")
def weekly_report(
jahr: int,
kw: int,
db: Session = Depends(get_db),
user: User = Depends(get_current_user),
):
"""Return aggregated data for a single calendar week.
Accessible to both admin and dak_mitarbeiter users.
"""
try:
from app.services.report_service import calculate_sheet1_data
data = calculate_sheet1_data(db, jahr)
weekly = [w for w in data.get("weekly", []) if w.get("kw") == kw]
return weekly[0] if weekly else {"kw": kw, "erstberatungen": 0}
except ImportError:
raise HTTPException(501, "Report service not yet available")
@router.post("/generate", response_model=ReportMeta)
def generate_report(
jahr: int | None = Query(None),
kw: int | None = Query(None),
db: Session = Depends(get_db),
user: User = Depends(require_admin),
):
"""Generate a full Berichtswesen Excel report and persist it to disk + DB.
Admin only. Defaults to the current ISO year/week if not specified.
Depends on report_service, excel_export, and vorjahr_service (parallel tasks).
"""
if not jahr:
from app.utils.kw_utils import date_to_jahr, date_to_kw
today = date.today()
jahr = date_to_jahr(today)
kw = kw or date_to_kw(today)
if not kw:
from app.utils.kw_utils import date_to_kw
kw = date_to_kw(date.today())
try:
from app.services.excel_export import generate_berichtswesen_xlsx
from app.services.report_service import generate_full_report
from app.services.vorjahr_service import get_vorjahr_summary
report_data = generate_full_report(db, jahr, kw)
vorjahr = get_vorjahr_summary(db, jahr)
xlsx_bytes = generate_berichtswesen_xlsx(report_data, jahr, vorjahr)
# Persist Excel file to disk
reports_dir = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
"reports",
)
os.makedirs(reports_dir, exist_ok=True)
filename = f"Berichtswesen_{jahr}_KW{kw:02d}.xlsx"
filepath = os.path.join(reports_dir, filename)
with open(filepath, "wb") as f:
f.write(xlsx_bytes)
# Save report metadata to DB
report = WeeklyReport(
jahr=jahr,
kw=kw,
report_date=date.today(),
report_data=report_data,
generated_by=user.id,
)
report.report_file_path = filepath
db.add(report)
db.commit()
db.refresh(report)
return ReportMeta.model_validate(report)
except ImportError as exc:
raise HTTPException(501, f"Required service not yet available: {exc}")
@router.get("/download/{report_id}")
def download_report(
report_id: int,
db: Session = Depends(get_db),
user: User = Depends(get_current_user),
):
"""Download a previously generated Berichtswesen Excel file.
Accessible to both admin and dak_mitarbeiter users.
"""
report = db.query(WeeklyReport).filter(WeeklyReport.id == report_id).first()
if not report or not report.report_file_path:
raise HTTPException(404, "Report not found")
if not os.path.exists(report.report_file_path):
raise HTTPException(404, "Report file not found on disk")
with open(report.report_file_path, "rb") as f:
content = f.read()
filename = os.path.basename(report.report_file_path)
return StreamingResponse(
BytesIO(content),
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
@router.get("/list", response_model=ReportListResponse)
def list_reports(
db: Session = Depends(get_db),
user: User = Depends(get_current_user),
):
"""List all generated reports, newest first.
Accessible to both admin and dak_mitarbeiter users.
"""
reports = (
db.query(WeeklyReport).order_by(WeeklyReport.generated_at.desc()).all()
)
return ReportListResponse(
items=[ReportMeta.model_validate(r) for r in reports],
total=len(reports),
)

View file

@ -4,7 +4,11 @@ from fastapi.middleware.cors import CORSMiddleware
from app.api.admin import router as admin_router from app.api.admin import router as admin_router
from app.api.auth import router as auth_router from app.api.auth import router as auth_router
from app.api.cases import router as cases_router
from app.api.coding import router as coding_router
from app.api.import_router import router as import_router
from app.api.notifications import router as notifications_router from app.api.notifications import router as notifications_router
from app.api.reports import router as reports_router
from app.config import get_settings from app.config import get_settings
settings = get_settings() settings = get_settings()
@ -22,7 +26,11 @@ app.add_middleware(
# --- Routers --- # --- Routers ---
app.include_router(auth_router, prefix="/api/auth", tags=["auth"]) app.include_router(auth_router, prefix="/api/auth", tags=["auth"])
app.include_router(admin_router, prefix="/api/admin", tags=["admin"]) app.include_router(admin_router, prefix="/api/admin", tags=["admin"])
app.include_router(import_router, prefix="/api/import", tags=["import"])
app.include_router(cases_router, prefix="/api/cases", tags=["cases"])
app.include_router(notifications_router, prefix="/api/notifications", tags=["notifications"]) app.include_router(notifications_router, prefix="/api/notifications", tags=["notifications"])
app.include_router(coding_router, prefix="/api/coding", tags=["coding"])
app.include_router(reports_router, prefix="/api/reports", tags=["reports"])
@app.get("/api/health") @app.get("/api/health")

View file

@ -0,0 +1,54 @@
"""Pydantic schemas for dashboard KPIs, weekly data points, and report metadata."""
from datetime import date, datetime
from typing import Optional
from pydantic import BaseModel
class DashboardKPIs(BaseModel):
"""Top-level KPI summary for the dashboard."""
total_cases: int
pending_icd: int
pending_coding: int
total_gutachten: int
fallgruppen: dict[str, int] # e.g. {"onko": 123, "kardio": 45, ...}
class WeeklyDataPoint(BaseModel):
"""A single calendar-week row for the dashboard chart / table."""
kw: int
erstberatungen: int = 0
unterlagen: int = 0
ablehnungen: int = 0
keine_rm: int = 0
gutachten: int = 0
class DashboardResponse(BaseModel):
"""Combined dashboard payload: KPIs + weekly time-series."""
kpis: DashboardKPIs
weekly: list[WeeklyDataPoint]
class ReportMeta(BaseModel):
"""Metadata for a generated weekly report (no file content)."""
id: int
jahr: int
kw: int
report_date: date
generated_at: datetime
generated_by: Optional[int] = None
model_config = {"from_attributes": True}
class ReportListResponse(BaseModel):
"""Paginated list of report metadata."""
items: list[ReportMeta]
total: int

View file

@ -0,0 +1,109 @@
"""Coding queue and batch coding operations for Gutachten classification."""
from datetime import datetime, timezone
from sqlalchemy.orm import Session
from app.core.exceptions import CaseNotFoundError
from app.models.case import Case
def get_coding_queue(
db: Session,
fallgruppe: str | None = None,
page: int = 1,
per_page: int = 50,
) -> tuple[list[Case], int]:
"""Get cases with gutachten=True but no gutachten_typ yet.
These cases have received a Gutachten but still need classification
(Bestaetigung/Alternative) and therapy-change coding.
Returns:
Tuple of (cases, total_count) for pagination.
"""
query = db.query(Case).filter(
Case.gutachten == True, # noqa: E712
Case.gutachten_typ == None, # noqa: E711
)
if fallgruppe:
query = query.filter(Case.fallgruppe == fallgruppe)
total = query.count()
cases = (
query.order_by(Case.datum.desc())
.offset((page - 1) * per_page)
.limit(per_page)
.all()
)
return cases, total
def update_coding(
db: Session,
case_id: int,
gutachten_typ: str,
therapieaenderung: str,
ta_diagnosekorrektur: bool = False,
ta_unterversorgung: bool = False,
ta_uebertherapie: bool = False,
user_id: int | None = None,
) -> Case:
"""Set coding for a single case.
Validates gutachten_typ and therapieaenderung values, then persists the
coding classification along with the user and timestamp.
Raises:
CaseNotFoundError: If no case with the given ID exists.
ValueError: If gutachten_typ or therapieaenderung have invalid values.
"""
case = db.query(Case).filter(Case.id == case_id).first()
if not case:
raise CaseNotFoundError()
if gutachten_typ not in ("Bestätigung", "Alternative"):
raise ValueError(f"Invalid gutachten_typ: {gutachten_typ}")
if therapieaenderung not in ("Ja", "Nein"):
raise ValueError(f"Invalid therapieaenderung: {therapieaenderung}")
case.gutachten_typ = gutachten_typ
case.therapieaenderung = therapieaenderung
case.ta_diagnosekorrektur = ta_diagnosekorrektur
case.ta_unterversorgung = ta_unterversorgung
case.ta_uebertherapie = ta_uebertherapie
case.coding_completed_by = user_id
case.coding_completed_at = datetime.now(timezone.utc)
db.commit()
db.refresh(case)
return case
def batch_update_coding(
db: Session,
updates: list[dict],
user_id: int | None = None,
) -> dict:
"""Batch update coding for multiple cases.
Each dict in *updates* must contain at minimum:
- ``case_id`` (int)
- ``gutachten_typ`` (str)
- ``therapieaenderung`` (str)
and optionally the ``ta_*`` boolean flags.
Returns:
Dict with ``updated`` count and ``errors`` list of error messages.
"""
updated = 0
errors: list[str] = []
for item in updates:
case_id = item.get("case_id")
try:
params = {k: v for k, v in item.items() if k != "case_id"}
update_coding(db, case_id, user_id=user_id, **params)
updated += 1
except Exception as exc:
errors.append(f"Case {case_id}: {exc}")
return {"updated": updated, "errors": errors}

View file

@ -0,0 +1,192 @@
"""Bidirectional sync between the database and Abrechnung_DAK.xlsx files.
This is a convenience feature for exporting case data to the DAK Abrechnung
Excel format and re-importing changes made in the spreadsheet back into the DB.
"""
import logging
from datetime import date, datetime
from io import BytesIO
from typing import Any
from openpyxl import Workbook, load_workbook
from sqlalchemy.orm import Session
from app.models.case import Case
logger = logging.getLogger(__name__)
# Columns in the Abrechnung export, matching the expected DAK format
ABRECHNUNG_COLUMNS = [
("fall_id", "Fall-ID"),
("datum", "Datum"),
("nachname", "Nachname"),
("vorname", "Vorname"),
("geburtsdatum", "Geburtsdatum"),
("kvnr", "KVNR"),
("fallgruppe", "Fallgruppe"),
("icd", "ICD"),
("gutachten", "Gutachten"),
("gutachten_typ", "Gutachten-Typ"),
("therapieaenderung", "Therapieaenderung"),
("abgerechnet", "Abgerechnet"),
("abrechnung_datum", "Abrechnungsdatum"),
]
def sync_db_to_excel(db: Session, filepath: str | None = None) -> bytes:
"""Export DB cases to Abrechnung format xlsx.
Cases are grouped by year into separate sheets. Each sheet contains the
columns defined in ABRECHNUNG_COLUMNS.
Args:
db: Database session.
filepath: Optional path to write the file to disk. If None, only
returns bytes.
Returns:
The Excel file as bytes.
"""
cases = db.query(Case).order_by(Case.jahr.desc(), Case.datum.desc()).all()
# Group cases by year
by_year: dict[int, list[Case]] = {}
for case in cases:
by_year.setdefault(case.jahr, []).append(case)
wb = Workbook()
# Remove default sheet
wb.remove(wb.active)
for jahr in sorted(by_year.keys(), reverse=True):
ws = wb.create_sheet(title=str(jahr))
# Header row
for col_idx, (_, header) in enumerate(ABRECHNUNG_COLUMNS, start=1):
ws.cell(row=1, column=col_idx, value=header)
# Data rows
for row_idx, case in enumerate(by_year[jahr], start=2):
for col_idx, (field, _) in enumerate(ABRECHNUNG_COLUMNS, start=1):
value = getattr(case, field, None)
# Convert date objects to strings for Excel
if isinstance(value, (date, datetime)):
value = value.strftime("%d.%m.%Y")
elif isinstance(value, bool):
value = "Ja" if value else "Nein"
ws.cell(row=row_idx, column=col_idx, value=value)
# Ensure at least one sheet exists
if not wb.sheetnames:
wb.create_sheet(title="Leer")
output = BytesIO()
wb.save(output)
xlsx_bytes = output.getvalue()
if filepath:
with open(filepath, "wb") as f:
f.write(xlsx_bytes)
logger.info("Wrote Abrechnung export to %s", filepath)
return xlsx_bytes
def sync_excel_to_db(
db: Session, content: bytes, user_id: int | None = None
) -> dict[str, Any]:
"""Import changes from edited Abrechnung xlsx back to DB.
Compares the spreadsheet rows (matched by fall_id) against existing
cases and updates any changed fields.
Args:
db: Database session.
content: The Excel file content as bytes.
user_id: ID of the user performing the import.
Returns:
Dict with ``updated``, ``skipped``, and ``errors`` counts.
"""
wb = load_workbook(BytesIO(content), read_only=True, data_only=True)
# Build a header-to-column-index map from ABRECHNUNG_COLUMNS
field_by_header: dict[str, str] = {
header: field for field, header in ABRECHNUNG_COLUMNS
}
updated = 0
skipped = 0
errors: list[str] = []
for ws in wb.worksheets:
rows = list(ws.iter_rows(values_only=True))
if not rows:
continue
# Map header row to field names
header_row = rows[0]
col_map: dict[int, str] = {}
for col_idx, header_val in enumerate(header_row):
if header_val and str(header_val).strip() in field_by_header:
col_map[col_idx] = field_by_header[str(header_val).strip()]
if "fall_id" not in col_map.values():
logger.warning("Sheet '%s' has no Fall-ID column, skipping", ws.title)
continue
# Find the fall_id column index
fall_id_col = next(
idx for idx, field in col_map.items() if field == "fall_id"
)
for row_num, row in enumerate(rows[1:], start=2):
try:
fall_id = row[fall_id_col]
if not fall_id:
skipped += 1
continue
case = (
db.query(Case)
.filter(Case.fall_id == str(fall_id).strip())
.first()
)
if not case:
skipped += 1
continue
changed = False
for col_idx, field in col_map.items():
if field == "fall_id":
continue
new_val = row[col_idx] if col_idx < len(row) else None
# Convert "Ja"/"Nein" strings to booleans for bool fields
if field in ("gutachten", "abgerechnet"):
if isinstance(new_val, str):
new_val = new_val.strip().lower() in ("ja", "1", "true")
elif new_val is None:
continue
current_val = getattr(case, field, None)
# Rough comparison (skip type mismatches gracefully)
if str(new_val) != str(current_val) and new_val is not None:
setattr(case, field, new_val)
changed = True
if changed:
if user_id:
case.updated_by = user_id
db.commit()
updated += 1
else:
skipped += 1
except Exception as exc:
errors.append(f"Sheet '{ws.title}' row {row_num}: {exc}")
logger.warning("Error in sync row: %s", exc)
return {"updated": updated, "skipped": skipped, "errors": errors}