dak.c2s/backend/app/services/excel_export.py
CCS Admin 8502c7a7fb fix: use one-decimal percentage format in Sheet 1 (0.0% instead of 0%)
Shows "0,8%" instead of "1%" for small ratios like Ablehnungen/Erstberatungen.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-24 09:32:32 +00:00

490 lines
19 KiB
Python

"""Excel export service for Berichtswesen files.
Generates .xlsx files in the exact format used by the DAK Zweitmeinungs-Portal
Berichtswesen, matching the structure of historical files (2023-2026).
The workbook contains 4 sheets:
1. Auswertung KW gesamt - Overall weekly summary + year comparison
2. Auswertung nach Fachgebieten - Weekly breakdown by Fallgruppe
3. Auswertung Gutachten - Gutachten details (Alternative/Bestaetigung)
4. Auswertung ICD onko - ICD code frequency listing
"""
from __future__ import annotations
from io import BytesIO
from typing import Any
from openpyxl import Workbook
from openpyxl.styles import Alignment, Font, PatternFill
from openpyxl.utils import get_column_letter
# ---------------------------------------------------------------------------
# Styles matching the reference Berichtswesen files
# ---------------------------------------------------------------------------
# Header row fill: theme colour 0 with tint -0.15 (light grey).
# openpyxl cannot replicate theme-based tints portably, so we use a concrete
# grey that closely matches the rendered colour in the original files.
HEADER_FILL = PatternFill(start_color="FFD9D9D9", end_color="FFD9D9D9", fill_type="solid")
BOLD_FONT = Font(bold=True)
HEADER_FONT = Font(bold=True, name="Calibri", size=11)
TITLE_FONT = Font(bold=True, name="Calibri", size=12)
PCT_FORMAT = "0.0%"
# Fallgruppe keys in canonical order
FALLGRUPPEN = ["onko", "kardio", "intensiv", "galle", "sd"]
FALLGRUPPEN_LABELS = {
"onko": "Fallgruppe onko",
"kardio": "Fallgruppe kardio",
"intensiv": "Fallgruppe intensiv",
"galle": "Fallgruppe Gallenblase",
"sd": "Fallgruppe Schilddr\u00fcse",
}
MAX_KW = 52 # always output 52 rows
def generate_berichtswesen_xlsx(
report_data: dict[str, Any],
jahr: int,
vorjahr_data: dict[str, Any] | None = None,
) -> bytes:
"""Generate a Berichtswesen Excel file.
Args:
report_data: Output from ``report_service.generate_full_report()``.
Expected keys: ``sheet1``, ``sheet2``, ``sheet3``, ``icd_codes``.
jahr: The report year.
vorjahr_data: Previous-year summary data for the year-over-year
comparison block on Sheet 1. Structure same as *report_data*
``sheet1`` (i.e. containing ``totals`` and ``weeks``).
Returns:
The ``.xlsx`` file contents as *bytes*.
"""
wb = Workbook()
_write_sheet1_kw_gesamt(wb, report_data.get("sheet1", {}), jahr, vorjahr_data)
_write_sheet2_fachgebiete(wb, report_data.get("sheet2", {}), jahr)
_write_sheet3_gutachten(wb, report_data.get("sheet3", {}), jahr)
# ICD codes live inside sheet5
sheet5 = report_data.get("sheet5", {})
icd_codes = sheet5.get("icd_codes", []) if isinstance(sheet5, dict) else []
_write_sheet4_icd_onko(wb, icd_codes, jahr)
# Remove the default empty sheet created by Workbook()
if "Sheet" in wb.sheetnames:
del wb["Sheet"]
buf = BytesIO()
wb.save(buf)
return buf.getvalue()
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _safe(val: Any, default: int = 0) -> int:
"""Return *val* as int, falling back to *default*."""
if val is None:
return default
try:
return int(val)
except (TypeError, ValueError):
return default
def _pct(numerator: int, denominator: int) -> float | None:
"""Return fraction or *None* if denominator is zero."""
if denominator == 0:
return None
return numerator / denominator
def _apply_header_style(ws, row: int, min_col: int, max_col: int) -> None:
"""Apply header fill + bold font to a row range."""
for col in range(min_col, max_col + 1):
cell = ws.cell(row=row, column=col)
cell.fill = HEADER_FILL
cell.font = HEADER_FONT
def _auto_col_width(ws, min_col: int = 1, max_col: int | None = None, min_width: float = 8.0) -> None:
"""Set column widths based on content length, with a sensible minimum."""
if max_col is None:
max_col = ws.max_column or 1
for col in range(min_col, max_col + 1):
max_len = min_width
col_letter = get_column_letter(col)
for row in range(1, (ws.max_row or 1) + 1):
val = ws.cell(row=row, column=col).value
if val is not None:
cell_len = len(str(val)) + 2
if cell_len > max_len:
max_len = cell_len
# Cap at reasonable width
ws.column_dimensions[col_letter].width = min(max_len, 35)
def _weeks_lookup(weeks: list[dict]) -> dict[int, dict]:
"""Convert a list of week dicts to ``{kw: dict}`` for fast lookup."""
return {w["kw"]: w for w in weeks} if weeks else {}
# ---------------------------------------------------------------------------
# Sheet 1: Auswertung KW gesamt
# ---------------------------------------------------------------------------
def _write_sheet1_kw_gesamt(
wb: Workbook,
data: dict[str, Any],
jahr: int,
vorjahr_data: dict[str, Any] | None,
) -> None:
"""Write the 'Auswertung KW gesamt' sheet.
Layout (matching reference files):
Row 1: A1 = "Gesamtuebersicht"
Row 2: C2 = <jahr>, E2 = <vorjahr>
Row 3: A3 = "Gesamtzahl an Erstberatungen", C3 = count, E3 = vorjahr
Row 4: A4 = "Anzahl Ablehnungen", C4 = count, D4 = %, E4 = vorjahr, F4 = %
Row 5: A5 = "Anzahl versendeter Unterlagen", C5 = count, D5 = %, E5 = vorjahr, F5 = %
Row 6: A6 = "Anzahl keine Rueckmeldung", C6 = count, D6 = %, E6 = vorjahr, F6 = %
Row 7: A7 = "Anzahl erstellter Gutachten", C7 = count, D7 = %, E7 = vorjahr, F7 = %
Row 8-9: (empty)
Row 10: Column headers (KW | Erstberatungen | Unterlagen | Ablehnung | Keine Rueckmeldung | Gutachten)
Row 11-62: KW 1-52 data
Row 63: Summe row
"""
ws = wb.create_sheet(title="Auswertung KW gesamt")
totals = data.get("summary", {})
weeks = _weeks_lookup(data.get("weekly", []))
erst = _safe(totals.get("erstberatungen"))
abl = _safe(totals.get("ablehnungen"))
unt = _safe(totals.get("unterlagen"))
keine_rm = _safe(totals.get("keine_rueckmeldung"))
gutachten = _safe(totals.get("gutachten"))
# Vorjahr totals
vj = vorjahr_data or {}
vj_totals = vj.get("summary", {}) if isinstance(vj, dict) else {}
vj_erst = _safe(vj_totals.get("erstberatungen"))
vj_abl = _safe(vj_totals.get("ablehnungen"))
vj_unt = _safe(vj_totals.get("unterlagen"))
vj_keine = _safe(vj_totals.get("keine_rueckmeldung"))
vj_ga = _safe(vj_totals.get("gutachten"))
# --- Title ---
ws["A1"] = "Gesamt\u00fcbersicht"
ws["A1"].font = TITLE_FONT
# --- Year headers ---
ws["C2"] = jahr
ws["E2"] = jahr - 1
# --- Summary rows ---
summary_rows = [
("Gesamtzahl an Erstberatungen", erst, None, vj_erst, None),
("Anzahl Ablehnungen", abl, _pct(abl, erst), vj_abl, _pct(vj_abl, vj_erst)),
("Anzahl versendeter Unterlagen", unt, _pct(unt, erst), vj_unt, _pct(vj_unt, vj_erst)),
("Anzahl keine R\u00fcckmeldung", keine_rm, _pct(keine_rm, erst), vj_keine, _pct(vj_keine, vj_erst)),
("Anzahl erstellter Gutachten", gutachten, _pct(gutachten, erst), vj_ga, _pct(vj_ga, vj_erst)),
]
for i, (label, val, pct, vj_val, vj_pct) in enumerate(summary_rows):
row = 3 + i
ws.cell(row=row, column=1, value=label)
ws.cell(row=row, column=3, value=val)
if pct is not None:
c = ws.cell(row=row, column=4, value=pct)
c.number_format = PCT_FORMAT
if vj_val:
ws.cell(row=row, column=5, value=vj_val)
if vj_pct is not None:
c = ws.cell(row=row, column=6, value=vj_pct)
c.number_format = PCT_FORMAT
# --- Column headers (row 10) ---
headers = ["KW", "Anzahl an Erstberatungen", "Unterlagen", "Ablehnung", "Keine R\u00fcckmeldung", "Gutachten"]
for ci, h in enumerate(headers, start=1):
ws.cell(row=10, column=ci, value=h)
_apply_header_style(ws, 10, 1, 6)
# --- Weekly data (rows 11-62) ---
for kw in range(1, MAX_KW + 1):
row = 10 + kw # row 11 = KW 1
w = weeks.get(kw, {})
ws.cell(row=row, column=1, value=kw)
ws.cell(row=row, column=2, value=_safe(w.get("erstberatungen")))
ws.cell(row=row, column=3, value=_safe(w.get("unterlagen")))
ws.cell(row=row, column=4, value=_safe(w.get("ablehnungen")))
ws.cell(row=row, column=5, value=_safe(w.get("keine_rm")))
ws.cell(row=row, column=6, value=_safe(w.get("gutachten")))
# --- Summe row (row 63) ---
summe_row = 10 + MAX_KW + 1 # 63
ws.cell(row=summe_row, column=1, value="Summe")
ws.cell(row=summe_row, column=2, value=erst)
ws.cell(row=summe_row, column=3, value=unt)
ws.cell(row=summe_row, column=4, value=abl)
ws.cell(row=summe_row, column=5, value=keine_rm)
ws.cell(row=summe_row, column=6, value=gutachten)
_apply_header_style(ws, summe_row, 1, 6)
# --- Column widths ---
ws.column_dimensions["A"].width = 10
ws.column_dimensions["B"].width = 24
ws.column_dimensions["C"].width = 12
ws.column_dimensions["D"].width = 12
ws.column_dimensions["E"].width = 20
ws.column_dimensions["F"].width = 12
# ---------------------------------------------------------------------------
# Sheet 2: Auswertung nach Fachgebieten
# ---------------------------------------------------------------------------
def _write_sheet2_fachgebiete(
wb: Workbook,
data: dict[str, Any],
jahr: int,
) -> None:
"""Write the 'Auswertung nach Fachgebieten' sheet.
Layout:
Row 1: A1 = "Uebersicht nach Fallgruppen"
Row 3: Merged group headers (B3:D3, E3:G3, H3:J3, K3:M3, N3:P3)
Row 4: Sub-headers: KW | Anzahl | Gutachten | Keine RM/Ablehnung (x5)
Row 5-56: KW 1-52 data
Row 57: Summe row
"""
ws = wb.create_sheet(title="Auswertung nach Fachgebieten")
weeks = _weeks_lookup(data.get("weekly", []))
# --- Title ---
ws["A1"] = "\u00dcbersicht nach Fallgruppen"
ws["A1"].font = TITLE_FONT
# --- Group headers (row 3) with merged cells ---
group_start_cols = [2, 5, 8, 11, 14] # B, E, H, K, N
for fg_key, start_col in zip(FALLGRUPPEN, group_start_cols):
label = FALLGRUPPEN_LABELS[fg_key]
cell = ws.cell(row=3, column=start_col, value=label)
cell.fill = HEADER_FILL
cell.font = HEADER_FONT
cell.alignment = Alignment(horizontal="center")
# Merge across 3 columns
end_col = start_col + 2
ws.merge_cells(
start_row=3, start_column=start_col,
end_row=3, end_column=end_col,
)
# Apply fill to merged cells too
for c in range(start_col, end_col + 1):
ws.cell(row=3, column=c).fill = HEADER_FILL
# --- Sub-headers (row 4) ---
ws.cell(row=4, column=1, value="KW")
for start_col in group_start_cols:
ws.cell(row=4, column=start_col, value="Anzahl")
ws.cell(row=4, column=start_col + 1, value="Gutachten")
ws.cell(row=4, column=start_col + 2, value="Keine R\u00fcckmeldung/Ablehnung")
_apply_header_style(ws, 4, 1, 16)
# --- Weekly data (rows 5-56) ---
sums = {fg: {"anzahl": 0, "gutachten": 0, "keine_rm": 0} for fg in FALLGRUPPEN}
for kw in range(1, MAX_KW + 1):
row = 4 + kw # row 5 = KW 1
w = weeks.get(kw, {})
ws.cell(row=row, column=1, value=kw)
for fg_key, start_col in zip(FALLGRUPPEN, group_start_cols):
fg_data = w.get(fg_key, {})
anz = _safe(fg_data.get("anzahl"))
ga = _safe(fg_data.get("gutachten"))
keine = _safe(fg_data.get("keine_rm"))
ws.cell(row=row, column=start_col, value=anz)
ws.cell(row=row, column=start_col + 1, value=ga)
ws.cell(row=row, column=start_col + 2, value=keine)
sums[fg_key]["anzahl"] += anz
sums[fg_key]["gutachten"] += ga
sums[fg_key]["keine_rm"] += keine
# --- Summe row (row 57) ---
summe_row = 4 + MAX_KW + 1 # 57
for fg_key, start_col in zip(FALLGRUPPEN, group_start_cols):
ws.cell(row=summe_row, column=start_col, value=sums[fg_key]["anzahl"])
ws.cell(row=summe_row, column=start_col + 1, value=sums[fg_key]["gutachten"])
ws.cell(row=summe_row, column=start_col + 2, value=sums[fg_key]["keine_rm"])
_apply_header_style(ws, summe_row, 1, 16)
# --- Column widths ---
ws.column_dimensions["A"].width = 6
for start_col in group_start_cols:
ws.column_dimensions[get_column_letter(start_col)].width = 8
ws.column_dimensions[get_column_letter(start_col + 1)].width = 10
ws.column_dimensions[get_column_letter(start_col + 2)].width = 14
# ---------------------------------------------------------------------------
# Sheet 3: Auswertung Gutachten
# ---------------------------------------------------------------------------
def _write_sheet3_gutachten(
wb: Workbook,
data: dict[str, Any],
jahr: int,
) -> None:
"""Write the 'Auswertung Gutachten' sheet.
Layout:
Row 1: A1 = "Uebersicht nach Fallgruppen"
Row 3: Group headers: Gesamt (B3) + 5 Fallgruppen
Row 4: Sub-headers: KW | Gutachten | Alternative | Bestaetigung (x6)
Row 5-56: KW 1-52 data
Row 57: Summe row
"""
ws = wb.create_sheet(title="Auswertung Gutachten")
weeks = _weeks_lookup(data.get("weekly", []))
# --- Title ---
ws["A1"] = "\u00dcbersicht nach Fallgruppen"
ws["A1"].font = TITLE_FONT
# --- Group headers (row 3) ---
# Gesamt: B3 (no merge since it's a single-column-start group header,
# but in the reference the Gesamt label sits in B3 without a merge)
cell = ws.cell(row=3, column=2, value="Gesamt")
cell.fill = HEADER_FILL
cell.font = HEADER_FONT
cell.alignment = Alignment(horizontal="center")
# Fallgruppen start at columns E, H, K, N, Q (each 3 cols wide)
fg_start_cols = [5, 8, 11, 14, 17]
for fg_key, start_col in zip(FALLGRUPPEN, fg_start_cols):
label = FALLGRUPPEN_LABELS[fg_key]
cell = ws.cell(row=3, column=start_col, value=label)
cell.fill = HEADER_FILL
cell.font = HEADER_FONT
cell.alignment = Alignment(horizontal="center")
# Merge: the reference merges first 2 of 3 columns (E3:F3, H3:I3, etc.)
end_col = start_col + 1
ws.merge_cells(
start_row=3, start_column=start_col,
end_row=3, end_column=end_col,
)
for c in range(start_col, end_col + 1):
ws.cell(row=3, column=c).fill = HEADER_FILL
# --- Sub-headers (row 4) ---
ws.cell(row=4, column=1, value="KW")
# Gesamt columns: B=Gutachten, C=Alternative, D=Bestaetigung
ws.cell(row=4, column=2, value="Gutachten")
ws.cell(row=4, column=3, value="Alternative")
ws.cell(row=4, column=4, value="Best\u00e4tigung")
for start_col in fg_start_cols:
ws.cell(row=4, column=start_col, value="Gutachten")
ws.cell(row=4, column=start_col + 1, value="Alternative")
ws.cell(row=4, column=start_col + 2, value="Best\u00e4tigung")
_apply_header_style(ws, 4, 1, 19)
# --- Weekly data (rows 5-56) ---
sums_gesamt = {"gutachten": 0, "alternative": 0, "bestaetigung": 0}
sums_fg = {fg: {"gutachten": 0, "alternative": 0, "bestaetigung": 0} for fg in FALLGRUPPEN}
for kw in range(1, MAX_KW + 1):
row = 4 + kw
w = weeks.get(kw, {})
ws.cell(row=row, column=1, value=kw)
# Gesamt
gesamt = w.get("gesamt", {})
g_ges = _safe(gesamt.get("gutachten"))
g_alt = _safe(gesamt.get("alternative"))
g_best = _safe(gesamt.get("bestaetigung"))
ws.cell(row=row, column=2, value=g_ges)
ws.cell(row=row, column=3, value=g_alt)
ws.cell(row=row, column=4, value=g_best)
sums_gesamt["gutachten"] += g_ges
sums_gesamt["alternative"] += g_alt
sums_gesamt["bestaetigung"] += g_best
# Per Fallgruppe
for fg_key, start_col in zip(FALLGRUPPEN, fg_start_cols):
fg_data = w.get(fg_key, {})
ga = _safe(fg_data.get("gutachten"))
alt = _safe(fg_data.get("alternative"))
best = _safe(fg_data.get("bestaetigung"))
ws.cell(row=row, column=start_col, value=ga)
ws.cell(row=row, column=start_col + 1, value=alt)
ws.cell(row=row, column=start_col + 2, value=best)
sums_fg[fg_key]["gutachten"] += ga
sums_fg[fg_key]["alternative"] += alt
sums_fg[fg_key]["bestaetigung"] += best
# --- Summe row (row 57) ---
summe_row = 4 + MAX_KW + 1
ws.cell(row=summe_row, column=2, value=sums_gesamt["gutachten"])
ws.cell(row=summe_row, column=3, value=sums_gesamt["alternative"])
ws.cell(row=summe_row, column=4, value=sums_gesamt["bestaetigung"])
for fg_key, start_col in zip(FALLGRUPPEN, fg_start_cols):
ws.cell(row=summe_row, column=start_col, value=sums_fg[fg_key]["gutachten"])
ws.cell(row=summe_row, column=start_col + 1, value=sums_fg[fg_key]["alternative"])
ws.cell(row=summe_row, column=start_col + 2, value=sums_fg[fg_key]["bestaetigung"])
_apply_header_style(ws, summe_row, 1, 19)
# --- Column widths ---
ws.column_dimensions["A"].width = 6
for col in range(2, 20):
ws.column_dimensions[get_column_letter(col)].width = 12
# ---------------------------------------------------------------------------
# Sheet 4: Auswertung ICD onko
# ---------------------------------------------------------------------------
def _write_sheet4_icd_onko(
wb: Workbook,
icd_data: list[dict[str, Any]],
jahr: int,
) -> None:
"""Write the 'Auswertung ICD onko' sheet.
Layout:
Row 1: A1 = "ICD", B1 = "Anzahl von ICD"
Row 2: A2 = "ICD", B2 = "AnzahlvonICD" (pivot-table artefact in originals)
Row 3+: ICD code | count (sorted by count descending)
"""
ws = wb.create_sheet(title="Auswertung ICD onko")
# Header row (matches reference format exactly)
ws["A1"] = "ICD"
ws["B1"] = "Anzahl von ICD"
ws["A1"].font = HEADER_FONT
ws["B1"].font = HEADER_FONT
# Second "header" row (artefact from pivot table in originals)
ws["A2"] = "ICD"
ws["B2"] = "AnzahlvonICD"
# Sort by count descending
sorted_icd = sorted(icd_data, key=lambda x: _safe(x.get("count")), reverse=True)
for i, entry in enumerate(sorted_icd):
row = 3 + i
ws.cell(row=row, column=1, value=entry.get("icd", ""))
ws.cell(row=row, column=2, value=_safe(entry.get("count")))
# Column widths
ws.column_dimensions["A"].width = 12
ws.column_dimensions["B"].width = 16