dak.c2s/backend/app/services/report_service.py
CCS Admin 5ee1cff0d6 fix: use SQLAlchemy Integer type in .cast() calls in report_service
.cast(int) uses Python's builtin int, which lacks SQLAlchemy's
_isnull attribute. Replace all occurrences with .cast(Integer).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-24 09:06:48 +00:00

570 lines
17 KiB
Python

"""Report service — all 5 sheet calculations for the DAK Berichtswesen.
Sheet 1: Auswertung KW gesamt — weekly totals + year summary
Sheet 2: Auswertung nach Fachgebieten — per-KW per-Fallgruppe breakdown
Sheet 3: Auswertung Gutachten — per-KW gutachten / alternative / bestaetigung
Sheet 4: Auswertung Therapieaenderungen — per-KW therapy-change metrics
Sheet 5: Auswertung ICD onko — ICD code frequency for onko cases
All queries use SQLAlchemy (not pandas) against the cases / case_icd_codes tables.
"""
from __future__ import annotations
import logging
from typing import Any
from sqlalchemy import Integer, and_, func
from sqlalchemy.orm import Session
from app.models.case import Case, CaseICDCode
logger = logging.getLogger(__name__)
# Canonical Fallgruppen in display order
FALLGRUPPEN = ("onko", "kardio", "intensiv", "galle", "sd")
# Number of calendar weeks to include (ISO weeks 1..52; 53 is rare)
MAX_KW = 52
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _int(val: Any) -> int:
"""Safely coerce a query result to int (None -> 0)."""
if val is None:
return 0
return int(val)
def _pct(part: int, total: int) -> float | None:
"""Return part/total as a float, or None if total==0."""
if total == 0:
return None
return round(part / total, 4)
def _empty_weekly_row(kw: int) -> dict:
"""Return a zeroed-out weekly row template for Sheet 1."""
return {
"kw": kw,
"erstberatungen": 0,
"unterlagen": 0,
"ablehnungen": 0,
"keine_rm": 0,
"gutachten": 0,
}
def _empty_fg_weekly_row(kw: int) -> dict:
"""Return a zeroed-out weekly row template for Sheet 2."""
row: dict[str, Any] = {"kw": kw}
for fg in FALLGRUPPEN:
row[fg] = {"anzahl": 0, "gutachten": 0, "keine_rm": 0}
return row
def _empty_gutachten_weekly_row(kw: int) -> dict:
"""Return a zeroed-out weekly row template for Sheet 3."""
row: dict[str, Any] = {"kw": kw}
for group in ("gesamt",) + FALLGRUPPEN:
row[group] = {"gutachten": 0, "alternative": 0, "bestaetigung": 0}
return row
def _empty_ta_weekly_row(kw: int) -> dict:
"""Return a zeroed-out weekly row template for Sheet 4."""
return {
"kw": kw,
"gutachten": 0,
"ta_ja": 0,
"ta_nein": 0,
"diagnosekorrektur": 0,
"unterversorgung": 0,
"uebertherapie": 0,
}
# ---------------------------------------------------------------------------
# Sheet 1: Auswertung KW gesamt
# ---------------------------------------------------------------------------
def calculate_sheet1_data(db: Session, jahr: int) -> dict:
"""Calculate *Auswertung KW gesamt*.
Returns::
{
"summary": {
"erstberatungen": int,
"ablehnungen": int,
"unterlagen": int,
"keine_rueckmeldung": int,
"gutachten": int,
},
"weekly": [
{"kw": 1, "erstberatungen": X, "unterlagen": X,
"ablehnungen": X, "keine_rm": X, "gutachten": X},
... # kw 1..52
]
}
Business rules (matching the Excel formulas):
* Erstberatungen = total cases for the KW
* Unterlagen = cases where unterlagen == True
* Ablehnungen = cases where ablehnung == True
* Gutachten = cases where gutachten == True
* Keine RM = Unterlagen - Gutachten (derived, per KW row)
"""
# One query: group by kw, count the four flags
rows = (
db.query(
Case.kw,
func.count(Case.id).label("erstberatungen"),
func.sum(Case.unterlagen.cast(Integer)).label("unterlagen"),
func.sum(Case.ablehnung.cast(Integer)).label("ablehnungen"),
func.sum(Case.gutachten.cast(Integer)).label("gutachten"),
)
.filter(Case.jahr == jahr)
.group_by(Case.kw)
.all()
)
# Build a lookup kw -> values
kw_map: dict[int, dict] = {}
for row in rows:
kw = _int(row.kw)
unterlagen = _int(row.unterlagen)
gutachten = _int(row.gutachten)
kw_map[kw] = {
"kw": kw,
"erstberatungen": _int(row.erstberatungen),
"unterlagen": unterlagen,
"ablehnungen": _int(row.ablehnungen),
"keine_rm": unterlagen - gutachten,
"gutachten": gutachten,
}
# Build full 1..52 list (filling gaps with zeros)
weekly = []
for kw in range(1, MAX_KW + 1):
weekly.append(kw_map.get(kw, _empty_weekly_row(kw)))
# Summary (sums across all weeks)
total_erst = sum(w["erstberatungen"] for w in weekly)
total_abl = sum(w["ablehnungen"] for w in weekly)
total_unt = sum(w["unterlagen"] for w in weekly)
total_keine = sum(w["keine_rm"] for w in weekly)
total_gut = sum(w["gutachten"] for w in weekly)
summary = {
"erstberatungen": total_erst,
"ablehnungen": total_abl,
"unterlagen": total_unt,
"keine_rueckmeldung": total_keine,
"gutachten": total_gut,
}
return {"summary": summary, "weekly": weekly}
# ---------------------------------------------------------------------------
# Sheet 2: Auswertung nach Fachgebieten
# ---------------------------------------------------------------------------
def calculate_sheet2_data(db: Session, jahr: int) -> dict:
"""Calculate *Auswertung nach Fachgebieten*.
Per KW, per Fallgruppe: Anzahl, Gutachten, Keine RM/Ablehnung.
Returns::
{
"weekly": [
{
"kw": 1,
"onko": {"anzahl": X, "gutachten": X, "keine_rm": X},
"kardio": {...},
"intensiv": {...},
"galle": {...},
"sd": {...},
},
...
]
}
Keine RM/Ablehnung = Anzahl - Gutachten (per the Excel formula).
"""
rows = (
db.query(
Case.kw,
Case.fallgruppe,
func.count(Case.id).label("anzahl"),
func.sum(Case.gutachten.cast(Integer)).label("gutachten"),
)
.filter(Case.jahr == jahr)
.group_by(Case.kw, Case.fallgruppe)
.all()
)
# Build kw -> fg -> values
kw_map: dict[int, dict] = {}
for row in rows:
kw = _int(row.kw)
fg = row.fallgruppe
if fg not in FALLGRUPPEN:
logger.warning("Unknown fallgruppe '%s' in case data, skipping", fg)
continue
if kw not in kw_map:
kw_map[kw] = _empty_fg_weekly_row(kw)
anzahl = _int(row.anzahl)
gutachten = _int(row.gutachten)
kw_map[kw][fg] = {
"anzahl": anzahl,
"gutachten": gutachten,
"keine_rm": anzahl - gutachten,
}
weekly = []
for kw in range(1, MAX_KW + 1):
weekly.append(kw_map.get(kw, _empty_fg_weekly_row(kw)))
return {"weekly": weekly}
# ---------------------------------------------------------------------------
# Sheet 3: Auswertung Gutachten
# ---------------------------------------------------------------------------
def calculate_sheet3_data(db: Session, jahr: int) -> dict:
"""Calculate *Auswertung Gutachten*.
Per KW, per group (gesamt + 5 Fallgruppen):
Gutachten count, Alternative, Bestaetigung.
Returns::
{
"weekly": [
{
"kw": 1,
"gesamt": {"gutachten": X, "alternative": X, "bestaetigung": X},
"onko": {...},
"kardio": {...},
"intensiv": {...},
"galle": {...},
"sd": {...},
},
...
]
}
In the Excel:
- Per Fallgruppe: Gutachten = count, Alternative = count where typ='Alternative',
Bestaetigung = Gutachten - Alternative
- Gesamt = sum across all Fallgruppen
"""
rows = (
db.query(
Case.kw,
Case.fallgruppe,
func.count(Case.id).label("gutachten"),
func.sum(
(Case.gutachten_typ == "Alternative").cast(Integer)
).label("alternative"),
)
.filter(Case.jahr == jahr, Case.gutachten == True) # noqa: E712
.group_by(Case.kw, Case.fallgruppe)
.all()
)
kw_map: dict[int, dict] = {}
for row in rows:
kw = _int(row.kw)
fg = row.fallgruppe
if fg not in FALLGRUPPEN:
continue
if kw not in kw_map:
kw_map[kw] = _empty_gutachten_weekly_row(kw)
gutachten = _int(row.gutachten)
alternative = _int(row.alternative)
kw_map[kw][fg] = {
"gutachten": gutachten,
"alternative": alternative,
"bestaetigung": gutachten - alternative,
}
# Compute gesamt (sum of all Fallgruppen per KW)
for kw_data in kw_map.values():
total_g = sum(kw_data[fg]["gutachten"] for fg in FALLGRUPPEN)
total_a = sum(kw_data[fg]["alternative"] for fg in FALLGRUPPEN)
kw_data["gesamt"] = {
"gutachten": total_g,
"alternative": total_a,
"bestaetigung": total_g - total_a,
}
weekly = []
for kw in range(1, MAX_KW + 1):
weekly.append(kw_map.get(kw, _empty_gutachten_weekly_row(kw)))
return {"weekly": weekly}
# ---------------------------------------------------------------------------
# Sheet 4: Auswertung Therapieaenderungen
# ---------------------------------------------------------------------------
def calculate_sheet4_data(db: Session, jahr: int) -> dict:
"""Calculate *Auswertung Therapieaenderungen*.
Per KW: Gutachten count, TA Ja, TA Nein, Diagnosekorrektur,
Unterversorgung, Uebertherapie.
Returns::
{
"weekly": [
{
"kw": 1,
"gutachten": X,
"ta_ja": X,
"ta_nein": X,
"diagnosekorrektur": X,
"unterversorgung": X,
"uebertherapie": X,
},
...
]
}
"""
rows = (
db.query(
Case.kw,
func.count(Case.id).label("gutachten"),
func.sum(
(Case.therapieaenderung == "Ja").cast(Integer)
).label("ta_ja"),
func.sum(
(Case.therapieaenderung == "Nein").cast(Integer)
).label("ta_nein"),
func.sum(Case.ta_diagnosekorrektur.cast(Integer)).label("diagnosekorrektur"),
func.sum(Case.ta_unterversorgung.cast(Integer)).label("unterversorgung"),
func.sum(Case.ta_uebertherapie.cast(Integer)).label("uebertherapie"),
)
.filter(Case.jahr == jahr, Case.gutachten == True) # noqa: E712
.group_by(Case.kw)
.all()
)
kw_map: dict[int, dict] = {}
for row in rows:
kw = _int(row.kw)
kw_map[kw] = {
"kw": kw,
"gutachten": _int(row.gutachten),
"ta_ja": _int(row.ta_ja),
"ta_nein": _int(row.ta_nein),
"diagnosekorrektur": _int(row.diagnosekorrektur),
"unterversorgung": _int(row.unterversorgung),
"uebertherapie": _int(row.uebertherapie),
}
weekly = []
for kw in range(1, MAX_KW + 1):
weekly.append(kw_map.get(kw, _empty_ta_weekly_row(kw)))
return {"weekly": weekly}
# ---------------------------------------------------------------------------
# Sheet 5: Auswertung ICD onko
# ---------------------------------------------------------------------------
def calculate_sheet5_data(db: Session, jahr: int) -> dict:
"""Calculate *Auswertung ICD onko*.
Returns sorted list of ICD codes from onko cases with counts.
Query: case_icd_codes JOIN cases
WHERE cases.fallgruppe = 'onko' AND cases.jahr = jahr
GROUP BY UPPER(icd_code)
ORDER BY count DESC, icd_code ASC
Returns::
{
"icd_codes": [
{"icd": "C18", "count": 17},
{"icd": "C50", "count": 12},
...
]
}
"""
rows = (
db.query(
func.upper(CaseICDCode.icd_code).label("icd"),
func.count(CaseICDCode.id).label("cnt"),
)
.join(Case, CaseICDCode.case_id == Case.id)
.filter(
and_(
Case.fallgruppe == "onko",
Case.jahr == jahr,
)
)
.group_by(func.upper(CaseICDCode.icd_code))
.order_by(func.count(CaseICDCode.id).desc(), func.upper(CaseICDCode.icd_code))
.all()
)
icd_codes = [{"icd": row.icd, "count": _int(row.cnt)} for row in rows]
return {"icd_codes": icd_codes}
# ---------------------------------------------------------------------------
# Dashboard KPIs
# ---------------------------------------------------------------------------
def calculate_dashboard_kpis(db: Session, jahr: int) -> dict:
"""Calculate live KPIs for the dashboard.
Returns::
{
"total_cases": int,
"pending_icd": int,
"pending_coding": int,
"total_gutachten": int,
"total_ablehnungen": int,
"total_unterlagen": int,
"fallgruppen": {"onko": X, "kardio": X, "intensiv": X, "galle": X, "sd": X},
"gutachten_typen": {"alternative": X, "bestaetigung": X, "uncodiert": X},
}
"""
# Total cases for the year
total_cases = (
db.query(func.count(Case.id)).filter(Case.jahr == jahr).scalar() or 0
)
# Cases without ICD codes entered
pending_icd = (
db.query(func.count(Case.id))
.filter(Case.jahr == jahr, Case.icd == None) # noqa: E711
.scalar()
or 0
)
# Gutachten without gutachten_typ (need coding)
pending_coding = (
db.query(func.count(Case.id))
.filter(
Case.jahr == jahr,
Case.gutachten == True, # noqa: E712
Case.gutachten_typ == None, # noqa: E711
)
.scalar()
or 0
)
# Gutachten totals
total_gutachten = (
db.query(func.count(Case.id))
.filter(Case.jahr == jahr, Case.gutachten == True) # noqa: E712
.scalar()
or 0
)
# Ablehnungen
total_ablehnungen = (
db.query(func.count(Case.id))
.filter(Case.jahr == jahr, Case.ablehnung == True) # noqa: E712
.scalar()
or 0
)
# Unterlagen
total_unterlagen = (
db.query(func.count(Case.id))
.filter(Case.jahr == jahr, Case.unterlagen == True) # noqa: E712
.scalar()
or 0
)
# Per-Fallgruppe counts
fg_rows = (
db.query(Case.fallgruppe, func.count(Case.id).label("cnt"))
.filter(Case.jahr == jahr)
.group_by(Case.fallgruppe)
.all()
)
fallgruppen = {fg: 0 for fg in FALLGRUPPEN}
for row in fg_rows:
if row.fallgruppe in fallgruppen:
fallgruppen[row.fallgruppe] = _int(row.cnt)
# Gutachten type breakdown
typ_rows = (
db.query(Case.gutachten_typ, func.count(Case.id).label("cnt"))
.filter(Case.jahr == jahr, Case.gutachten == True) # noqa: E712
.group_by(Case.gutachten_typ)
.all()
)
gutachten_typen = {"alternative": 0, "bestaetigung": 0, "uncodiert": 0}
for row in typ_rows:
if row.gutachten_typ == "Alternative":
gutachten_typen["alternative"] = _int(row.cnt)
elif row.gutachten_typ == "Bestätigung":
gutachten_typen["bestaetigung"] = _int(row.cnt)
else:
gutachten_typen["uncodiert"] = _int(row.cnt)
return {
"total_cases": total_cases,
"pending_icd": pending_icd,
"pending_coding": pending_coding,
"total_gutachten": total_gutachten,
"total_ablehnungen": total_ablehnungen,
"total_unterlagen": total_unterlagen,
"fallgruppen": fallgruppen,
"gutachten_typen": gutachten_typen,
}
# ---------------------------------------------------------------------------
# Full report generation (all 5 sheets)
# ---------------------------------------------------------------------------
def generate_full_report(db: Session, jahr: int, kw: int | None = None) -> dict:
"""Generate complete report data for all 5 sheets.
The *kw* parameter is recorded for metadata but does not filter the data
-- all sheets always contain the full year up to the latest available KW.
Returns::
{
"jahr": int,
"kw": int | None,
"sheet1": {...},
"sheet2": {...},
"sheet3": {...},
"sheet4": {...},
"sheet5": {...},
}
"""
logger.info("Generating full report for jahr=%d, kw=%s", jahr, kw)
return {
"jahr": jahr,
"kw": kw,
"sheet1": calculate_sheet1_data(db, jahr),
"sheet2": calculate_sheet2_data(db, jahr),
"sheet3": calculate_sheet3_data(db, jahr),
"sheet4": calculate_sheet4_data(db, jahr),
"sheet5": calculate_sheet5_data(db, jahr),
}