dak.c2s/backend/app/services/report_service.py
CCS Admin 95e84a6978 fix: filter report data by max KW when generating reports
Previously, generate_full_report() ignored the kw parameter for data
filtering — it was only stored as metadata. This caused all reports to
contain data up to the latest available KW, making historical reports
(e.g., for KW 8) identical to the current one.

Now all 5 sheet calculation functions accept an optional max_kw parameter.
When generating a report for a specific KW, only cases with kw <= max_kw
are included. Dashboard and vorjahr callers are unaffected (max_kw=None).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-26 23:16:20 +00:00

610 lines
19 KiB
Python

"""Report service — all 5 sheet calculations for the DAK Berichtswesen.
Sheet 1: Auswertung KW gesamt — weekly totals + year summary
Sheet 2: Auswertung nach Fachgebieten — per-KW per-Fallgruppe breakdown
Sheet 3: Auswertung Gutachten — per-KW gutachten / alternative / bestaetigung
Sheet 4: Auswertung Therapieaenderungen — per-KW therapy-change metrics
Sheet 5: Auswertung ICD onko — ICD code frequency for onko cases
All queries use SQLAlchemy (not pandas) against the cases / case_icd_codes tables.
"""
from __future__ import annotations
import logging
from typing import Any
from sqlalchemy import Integer, and_, func
from sqlalchemy.orm import Session
from app.config import get_settings
from app.models.case import Case, CaseICDCode
settings = get_settings()
logger = logging.getLogger(__name__)
# Canonical Fallgruppen in display order
FALLGRUPPEN = ("onko", "kardio", "intensiv", "galle", "sd")
# Number of calendar weeks to include (ISO weeks 1..52; 53 is rare)
MAX_KW = 52
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _int(val: Any) -> int:
"""Safely coerce a query result to int (None -> 0)."""
if val is None:
return 0
return int(val)
def _pct(part: int, total: int) -> float | None:
"""Return part/total as a float, or None if total==0."""
if total == 0:
return None
return round(part / total, 4)
def _empty_weekly_row(kw: int) -> dict:
"""Return a zeroed-out weekly row template for Sheet 1."""
return {
"kw": kw,
"erstberatungen": 0,
"unterlagen": 0,
"ablehnungen": 0,
"keine_rm": 0,
"gutachten": 0,
}
def _empty_fg_weekly_row(kw: int) -> dict:
"""Return a zeroed-out weekly row template for Sheet 2."""
row: dict[str, Any] = {"kw": kw}
for fg in FALLGRUPPEN:
row[fg] = {"anzahl": 0, "gutachten": 0, "keine_rm": 0}
return row
def _empty_gutachten_weekly_row(kw: int) -> dict:
"""Return a zeroed-out weekly row template for Sheet 3."""
row: dict[str, Any] = {"kw": kw}
for group in ("gesamt",) + FALLGRUPPEN:
row[group] = {"gutachten": 0, "alternative": 0, "bestaetigung": 0}
return row
def _empty_ta_weekly_row(kw: int) -> dict:
"""Return a zeroed-out weekly row template for Sheet 4."""
return {
"kw": kw,
"gutachten": 0,
"ta_ja": 0,
"ta_nein": 0,
"diagnosekorrektur": 0,
"unterversorgung": 0,
"uebertherapie": 0,
}
# ---------------------------------------------------------------------------
# Sheet 1: Auswertung KW gesamt
# ---------------------------------------------------------------------------
def calculate_sheet1_data(db: Session, jahr: int, max_kw: int | None = None) -> dict:
"""Calculate *Auswertung KW gesamt*.
Returns::
{
"summary": {
"erstberatungen": int,
"ablehnungen": int,
"unterlagen": int,
"keine_rueckmeldung": int,
"gutachten": int,
},
"weekly": [
{"kw": 1, "erstberatungen": X, "unterlagen": X,
"ablehnungen": X, "keine_rm": X, "gutachten": X},
... # kw 1..52
]
}
Business rules (matching the Excel formulas):
* Erstberatungen = total cases for the KW
* Unterlagen = cases where unterlagen == True
* Ablehnungen = cases where ablehnung == True
* Gutachten = cases where gutachten == True
* Keine RM = Unterlagen - Gutachten (derived, per KW row)
If *max_kw* is given, only data up to and including that KW is included.
"""
# One query: group by kw, count the four flags
filters = [Case.versicherung == settings.VERSICHERUNG_FILTER, Case.jahr == jahr]
if max_kw is not None:
filters.append(Case.kw <= max_kw)
rows = (
db.query(
Case.kw,
func.count(Case.id).label("erstberatungen"),
func.sum(Case.unterlagen.cast(Integer)).label("unterlagen"),
func.sum(Case.ablehnung.cast(Integer)).label("ablehnungen"),
func.sum(Case.gutachten.cast(Integer)).label("gutachten"),
)
.filter(*filters)
.group_by(Case.kw)
.all()
)
# Build a lookup kw -> values
kw_map: dict[int, dict] = {}
for row in rows:
kw = _int(row.kw)
unterlagen = _int(row.unterlagen)
gutachten = _int(row.gutachten)
kw_map[kw] = {
"kw": kw,
"erstberatungen": _int(row.erstberatungen),
"unterlagen": unterlagen,
"ablehnungen": _int(row.ablehnungen),
"keine_rm": unterlagen - gutachten,
"gutachten": gutachten,
}
# Build full 1..52 list (filling gaps with zeros)
weekly = []
for kw in range(1, MAX_KW + 1):
weekly.append(kw_map.get(kw, _empty_weekly_row(kw)))
# Summary (sums across all weeks)
total_erst = sum(w["erstberatungen"] for w in weekly)
total_abl = sum(w["ablehnungen"] for w in weekly)
total_unt = sum(w["unterlagen"] for w in weekly)
total_keine = sum(w["keine_rm"] for w in weekly)
total_gut = sum(w["gutachten"] for w in weekly)
summary = {
"erstberatungen": total_erst,
"ablehnungen": total_abl,
"unterlagen": total_unt,
"keine_rueckmeldung": total_keine,
"gutachten": total_gut,
}
return {"summary": summary, "weekly": weekly}
# ---------------------------------------------------------------------------
# Sheet 2: Auswertung nach Fachgebieten
# ---------------------------------------------------------------------------
def calculate_sheet2_data(db: Session, jahr: int, max_kw: int | None = None) -> dict:
"""Calculate *Auswertung nach Fachgebieten*.
Per KW, per Fallgruppe: Anzahl, Gutachten, Keine RM/Ablehnung.
Returns::
{
"weekly": [
{
"kw": 1,
"onko": {"anzahl": X, "gutachten": X, "keine_rm": X},
"kardio": {...},
"intensiv": {...},
"galle": {...},
"sd": {...},
},
...
]
}
Keine RM/Ablehnung = Anzahl - Gutachten (per the Excel formula).
If *max_kw* is given, only data up to and including that KW is included.
"""
filters = [Case.versicherung == settings.VERSICHERUNG_FILTER, Case.jahr == jahr]
if max_kw is not None:
filters.append(Case.kw <= max_kw)
rows = (
db.query(
Case.kw,
Case.fallgruppe,
func.count(Case.id).label("anzahl"),
func.sum(Case.gutachten.cast(Integer)).label("gutachten"),
)
.filter(*filters)
.group_by(Case.kw, Case.fallgruppe)
.all()
)
# Build kw -> fg -> values
kw_map: dict[int, dict] = {}
for row in rows:
kw = _int(row.kw)
fg = row.fallgruppe
if fg not in FALLGRUPPEN:
logger.warning("Unknown fallgruppe '%s' in case data, skipping", fg)
continue
if kw not in kw_map:
kw_map[kw] = _empty_fg_weekly_row(kw)
anzahl = _int(row.anzahl)
gutachten = _int(row.gutachten)
kw_map[kw][fg] = {
"anzahl": anzahl,
"gutachten": gutachten,
"keine_rm": anzahl - gutachten,
}
weekly = []
for kw in range(1, MAX_KW + 1):
weekly.append(kw_map.get(kw, _empty_fg_weekly_row(kw)))
return {"weekly": weekly}
# ---------------------------------------------------------------------------
# Sheet 3: Auswertung Gutachten
# ---------------------------------------------------------------------------
def calculate_sheet3_data(db: Session, jahr: int, max_kw: int | None = None) -> dict:
"""Calculate *Auswertung Gutachten*.
Per KW, per group (gesamt + 5 Fallgruppen):
Gutachten count, Alternative, Bestaetigung.
Returns::
{
"weekly": [
{
"kw": 1,
"gesamt": {"gutachten": X, "alternative": X, "bestaetigung": X},
"onko": {...},
"kardio": {...},
"intensiv": {...},
"galle": {...},
"sd": {...},
},
...
]
}
In the Excel:
- Per Fallgruppe: Gutachten = count, Alternative = count where typ='Alternative',
Bestaetigung = Gutachten - Alternative
- Gesamt = sum across all Fallgruppen
If *max_kw* is given, only data up to and including that KW is included.
"""
filters = [
Case.versicherung == settings.VERSICHERUNG_FILTER,
Case.jahr == jahr,
Case.gutachten == True, # noqa: E712
]
if max_kw is not None:
filters.append(Case.kw <= max_kw)
rows = (
db.query(
Case.kw,
Case.fallgruppe,
func.count(Case.id).label("gutachten"),
func.sum(
(Case.gutachten_typ == "Alternative").cast(Integer)
).label("alternative"),
)
.filter(*filters)
.group_by(Case.kw, Case.fallgruppe)
.all()
)
kw_map: dict[int, dict] = {}
for row in rows:
kw = _int(row.kw)
fg = row.fallgruppe
if fg not in FALLGRUPPEN:
continue
if kw not in kw_map:
kw_map[kw] = _empty_gutachten_weekly_row(kw)
gutachten = _int(row.gutachten)
alternative = _int(row.alternative)
kw_map[kw][fg] = {
"gutachten": gutachten,
"alternative": alternative,
"bestaetigung": gutachten - alternative,
}
# Compute gesamt (sum of all Fallgruppen per KW)
for kw_data in kw_map.values():
total_g = sum(kw_data[fg]["gutachten"] for fg in FALLGRUPPEN)
total_a = sum(kw_data[fg]["alternative"] for fg in FALLGRUPPEN)
kw_data["gesamt"] = {
"gutachten": total_g,
"alternative": total_a,
"bestaetigung": total_g - total_a,
}
weekly = []
for kw in range(1, MAX_KW + 1):
weekly.append(kw_map.get(kw, _empty_gutachten_weekly_row(kw)))
return {"weekly": weekly}
# ---------------------------------------------------------------------------
# Sheet 4: Auswertung Therapieaenderungen
# ---------------------------------------------------------------------------
def calculate_sheet4_data(db: Session, jahr: int, max_kw: int | None = None) -> dict:
"""Calculate *Auswertung Therapieaenderungen*.
Per KW: Gutachten count, TA Ja, TA Nein, Diagnosekorrektur,
Unterversorgung, Uebertherapie.
Returns::
{
"weekly": [
{
"kw": 1,
"gutachten": X,
"ta_ja": X,
"ta_nein": X,
"diagnosekorrektur": X,
"unterversorgung": X,
"uebertherapie": X,
},
...
]
}
If *max_kw* is given, only data up to and including that KW is included.
"""
filters = [
Case.versicherung == settings.VERSICHERUNG_FILTER,
Case.jahr == jahr,
Case.gutachten == True, # noqa: E712
]
if max_kw is not None:
filters.append(Case.kw <= max_kw)
rows = (
db.query(
Case.kw,
func.count(Case.id).label("gutachten"),
func.sum(
(Case.therapieaenderung == "Ja").cast(Integer)
).label("ta_ja"),
func.sum(
(Case.therapieaenderung == "Nein").cast(Integer)
).label("ta_nein"),
func.sum(Case.ta_diagnosekorrektur.cast(Integer)).label("diagnosekorrektur"),
func.sum(Case.ta_unterversorgung.cast(Integer)).label("unterversorgung"),
func.sum(Case.ta_uebertherapie.cast(Integer)).label("uebertherapie"),
)
.filter(*filters)
.group_by(Case.kw)
.all()
)
kw_map: dict[int, dict] = {}
for row in rows:
kw = _int(row.kw)
kw_map[kw] = {
"kw": kw,
"gutachten": _int(row.gutachten),
"ta_ja": _int(row.ta_ja),
"ta_nein": _int(row.ta_nein),
"diagnosekorrektur": _int(row.diagnosekorrektur),
"unterversorgung": _int(row.unterversorgung),
"uebertherapie": _int(row.uebertherapie),
}
weekly = []
for kw in range(1, MAX_KW + 1):
weekly.append(kw_map.get(kw, _empty_ta_weekly_row(kw)))
return {"weekly": weekly}
# ---------------------------------------------------------------------------
# Sheet 5: Auswertung ICD onko
# ---------------------------------------------------------------------------
def calculate_sheet5_data(db: Session, jahr: int, max_kw: int | None = None) -> dict:
"""Calculate *Auswertung ICD onko*.
Returns sorted list of ICD codes from onko cases with counts.
Query: case_icd_codes JOIN cases
WHERE cases.fallgruppe = 'onko' AND cases.jahr = jahr
GROUP BY UPPER(icd_code)
ORDER BY count DESC, icd_code ASC
Returns::
{
"icd_codes": [
{"icd": "C18", "count": 17},
{"icd": "C50", "count": 12},
...
]
}
If *max_kw* is given, only data up to and including that KW is included.
"""
filter_conditions = [
Case.versicherung == settings.VERSICHERUNG_FILTER,
Case.fallgruppe == "onko",
Case.jahr == jahr,
]
if max_kw is not None:
filter_conditions.append(Case.kw <= max_kw)
rows = (
db.query(
func.upper(CaseICDCode.icd_code).label("icd"),
func.count(CaseICDCode.id).label("cnt"),
)
.join(Case, CaseICDCode.case_id == Case.id)
.filter(and_(*filter_conditions))
.group_by(func.upper(CaseICDCode.icd_code))
.order_by(func.count(CaseICDCode.id).desc(), func.upper(CaseICDCode.icd_code))
.all()
)
icd_codes = [{"icd": row.icd, "count": _int(row.cnt)} for row in rows]
return {"icd_codes": icd_codes}
# ---------------------------------------------------------------------------
# Dashboard KPIs
# ---------------------------------------------------------------------------
def calculate_dashboard_kpis(db: Session, jahr: int) -> dict:
"""Calculate live KPIs for the dashboard.
Returns::
{
"total_cases": int,
"pending_icd": int,
"pending_coding": int,
"total_gutachten": int,
"total_ablehnungen": int,
"total_unterlagen": int,
"fallgruppen": {"onko": X, "kardio": X, "intensiv": X, "galle": X, "sd": X},
"gutachten_typen": {"alternative": X, "bestaetigung": X, "uncodiert": X},
}
"""
# Base filter for this portal's insurance
v_filter = Case.versicherung == settings.VERSICHERUNG_FILTER
# Total cases for the year
total_cases = (
db.query(func.count(Case.id)).filter(v_filter, Case.jahr == jahr).scalar() or 0
)
# Cases without ICD codes entered
pending_icd = (
db.query(func.count(Case.id))
.filter(v_filter, Case.jahr == jahr, Case.icd == None) # noqa: E711
.scalar()
or 0
)
# Gutachten without gutachten_typ (need coding)
pending_coding = (
db.query(func.count(Case.id))
.filter(
v_filter,
Case.jahr == jahr,
Case.gutachten == True, # noqa: E712
Case.gutachten_typ == None, # noqa: E711
)
.scalar()
or 0
)
# Gutachten totals
total_gutachten = (
db.query(func.count(Case.id))
.filter(v_filter, Case.jahr == jahr, Case.gutachten == True) # noqa: E712
.scalar()
or 0
)
# Ablehnungen
total_ablehnungen = (
db.query(func.count(Case.id))
.filter(v_filter, Case.jahr == jahr, Case.ablehnung == True) # noqa: E712
.scalar()
or 0
)
# Unterlagen
total_unterlagen = (
db.query(func.count(Case.id))
.filter(v_filter, Case.jahr == jahr, Case.unterlagen == True) # noqa: E712
.scalar()
or 0
)
# Per-Fallgruppe counts
fg_rows = (
db.query(Case.fallgruppe, func.count(Case.id).label("cnt"))
.filter(v_filter, Case.jahr == jahr)
.group_by(Case.fallgruppe)
.all()
)
fallgruppen = {fg: 0 for fg in FALLGRUPPEN}
for row in fg_rows:
if row.fallgruppe in fallgruppen:
fallgruppen[row.fallgruppe] = _int(row.cnt)
# Gutachten type breakdown
typ_rows = (
db.query(Case.gutachten_typ, func.count(Case.id).label("cnt"))
.filter(v_filter, Case.jahr == jahr, Case.gutachten == True) # noqa: E712
.group_by(Case.gutachten_typ)
.all()
)
gutachten_typen = {"alternative": 0, "bestaetigung": 0, "uncodiert": 0}
for row in typ_rows:
if row.gutachten_typ == "Alternative":
gutachten_typen["alternative"] = _int(row.cnt)
elif row.gutachten_typ == "Bestätigung":
gutachten_typen["bestaetigung"] = _int(row.cnt)
else:
gutachten_typen["uncodiert"] = _int(row.cnt)
return {
"total_cases": total_cases,
"pending_icd": pending_icd,
"pending_coding": pending_coding,
"total_gutachten": total_gutachten,
"total_ablehnungen": total_ablehnungen,
"total_unterlagen": total_unterlagen,
"fallgruppen": fallgruppen,
"gutachten_typen": gutachten_typen,
}
# ---------------------------------------------------------------------------
# Full report generation (all 5 sheets)
# ---------------------------------------------------------------------------
def generate_full_report(db: Session, jahr: int, kw: int | None = None) -> dict:
"""Generate complete report data for all 5 sheets.
If *kw* is given, only data up to and including that calendar week is
included in the report. This allows generating historical reports
that reflect the state at a specific point in the year.
Returns::
{
"jahr": int,
"kw": int | None,
"sheet1": {...},
"sheet2": {...},
"sheet3": {...},
"sheet4": {...},
"sheet5": {...},
}
"""
logger.info("Generating full report for jahr=%d, kw=%s", jahr, kw)
return {
"jahr": jahr,
"kw": kw,
"sheet1": calculate_sheet1_data(db, jahr, max_kw=kw),
"sheet2": calculate_sheet2_data(db, jahr, max_kw=kw),
"sheet3": calculate_sheet3_data(db, jahr, max_kw=kw),
"sheet4": calculate_sheet4_data(db, jahr, max_kw=kw),
"sheet5": calculate_sheet5_data(db, jahr, max_kw=kw),
}