"""Excel export service for Berichtswesen files. Generates .xlsx files in the exact format used by the DAK Zweitmeinungs-Portal Berichtswesen, matching the structure of historical files (2023-2026). The workbook contains 4 sheets: 1. Auswertung KW gesamt - Overall weekly summary + year comparison 2. Auswertung nach Fachgebieten - Weekly breakdown by Fallgruppe 3. Auswertung Gutachten - Gutachten details (Alternative/Bestaetigung) 4. Auswertung ICD onko - ICD code frequency listing """ from __future__ import annotations from io import BytesIO from typing import Any from openpyxl import Workbook from openpyxl.styles import Alignment, Font, PatternFill from openpyxl.utils import get_column_letter # --------------------------------------------------------------------------- # Styles matching the reference Berichtswesen files # --------------------------------------------------------------------------- # Header row fill: theme colour 0 with tint -0.15 (light grey). # openpyxl cannot replicate theme-based tints portably, so we use a concrete # grey that closely matches the rendered colour in the original files. HEADER_FILL = PatternFill(start_color="FFD9D9D9", end_color="FFD9D9D9", fill_type="solid") BOLD_FONT = Font(bold=True) HEADER_FONT = Font(bold=True, name="Calibri", size=11) TITLE_FONT = Font(bold=True, name="Calibri", size=12) PCT_FORMAT = "0.0%" # Fallgruppe keys in canonical order FALLGRUPPEN = ["onko", "kardio", "intensiv", "galle", "sd"] FALLGRUPPEN_LABELS = { "onko": "Fallgruppe onko", "kardio": "Fallgruppe kardio", "intensiv": "Fallgruppe intensiv", "galle": "Fallgruppe Gallenblase", "sd": "Fallgruppe Schilddr\u00fcse", } MAX_KW = 52 # always output 52 rows def generate_berichtswesen_xlsx( report_data: dict[str, Any], jahr: int, vorjahr_data: dict[str, Any] | None = None, ) -> bytes: """Generate a Berichtswesen Excel file. Args: report_data: Output from ``report_service.generate_full_report()``. Expected keys: ``sheet1``, ``sheet2``, ``sheet3``, ``icd_codes``. jahr: The report year. vorjahr_data: Previous-year summary data for the year-over-year comparison block on Sheet 1. Structure same as *report_data* ``sheet1`` (i.e. containing ``totals`` and ``weeks``). Returns: The ``.xlsx`` file contents as *bytes*. """ wb = Workbook() _write_sheet1_kw_gesamt(wb, report_data.get("sheet1", {}), jahr, vorjahr_data) _write_sheet2_fachgebiete(wb, report_data.get("sheet2", {}), jahr) _write_sheet3_gutachten(wb, report_data.get("sheet3", {}), jahr) # ICD codes live inside sheet5 sheet5 = report_data.get("sheet5", {}) icd_codes = sheet5.get("icd_codes", []) if isinstance(sheet5, dict) else [] _write_sheet4_icd_onko(wb, icd_codes, jahr) # Remove the default empty sheet created by Workbook() if "Sheet" in wb.sheetnames: del wb["Sheet"] buf = BytesIO() wb.save(buf) return buf.getvalue() # --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- def _safe(val: Any, default: int = 0) -> int: """Return *val* as int, falling back to *default*.""" if val is None: return default try: return int(val) except (TypeError, ValueError): return default def _pct(numerator: int, denominator: int) -> float | None: """Return fraction or *None* if denominator is zero.""" if denominator == 0: return None return numerator / denominator def _apply_header_style(ws, row: int, min_col: int, max_col: int) -> None: """Apply header fill + bold font to a row range.""" for col in range(min_col, max_col + 1): cell = ws.cell(row=row, column=col) cell.fill = HEADER_FILL cell.font = HEADER_FONT def _auto_col_width(ws, min_col: int = 1, max_col: int | None = None, min_width: float = 8.0) -> None: """Set column widths based on content length, with a sensible minimum.""" if max_col is None: max_col = ws.max_column or 1 for col in range(min_col, max_col + 1): max_len = min_width col_letter = get_column_letter(col) for row in range(1, (ws.max_row or 1) + 1): val = ws.cell(row=row, column=col).value if val is not None: cell_len = len(str(val)) + 2 if cell_len > max_len: max_len = cell_len # Cap at reasonable width ws.column_dimensions[col_letter].width = min(max_len, 35) def _weeks_lookup(weeks: list[dict]) -> dict[int, dict]: """Convert a list of week dicts to ``{kw: dict}`` for fast lookup.""" return {w["kw"]: w for w in weeks} if weeks else {} # --------------------------------------------------------------------------- # Sheet 1: Auswertung KW gesamt # --------------------------------------------------------------------------- def _write_sheet1_kw_gesamt( wb: Workbook, data: dict[str, Any], jahr: int, vorjahr_data: dict[str, Any] | None, ) -> None: """Write the 'Auswertung KW gesamt' sheet. Layout (matching reference files): Row 1: A1 = "Gesamtuebersicht" Row 2: C2 = , E2 = Row 3: A3 = "Gesamtzahl an Erstberatungen", C3 = count, E3 = vorjahr Row 4: A4 = "Anzahl Ablehnungen", C4 = count, D4 = %, E4 = vorjahr, F4 = % Row 5: A5 = "Anzahl versendeter Unterlagen", C5 = count, D5 = %, E5 = vorjahr, F5 = % Row 6: A6 = "Anzahl keine Rueckmeldung", C6 = count, D6 = %, E6 = vorjahr, F6 = % Row 7: A7 = "Anzahl erstellter Gutachten", C7 = count, D7 = %, E7 = vorjahr, F7 = % Row 8-9: (empty) Row 10: Column headers (KW | Erstberatungen | Unterlagen | Ablehnung | Keine Rueckmeldung | Gutachten) Row 11-62: KW 1-52 data Row 63: Summe row """ ws = wb.create_sheet(title="Auswertung KW gesamt") totals = data.get("summary", {}) weeks = _weeks_lookup(data.get("weekly", [])) erst = _safe(totals.get("erstberatungen")) abl = _safe(totals.get("ablehnungen")) unt = _safe(totals.get("unterlagen")) keine_rm = _safe(totals.get("keine_rueckmeldung")) gutachten = _safe(totals.get("gutachten")) # Vorjahr totals vj = vorjahr_data or {} vj_totals = vj.get("summary", {}) if isinstance(vj, dict) else {} vj_erst = _safe(vj_totals.get("erstberatungen")) vj_abl = _safe(vj_totals.get("ablehnungen")) vj_unt = _safe(vj_totals.get("unterlagen")) vj_keine = _safe(vj_totals.get("keine_rueckmeldung")) vj_ga = _safe(vj_totals.get("gutachten")) # --- Title --- ws["A1"] = "Gesamt\u00fcbersicht" ws["A1"].font = TITLE_FONT # --- Year headers --- ws["C2"] = jahr ws["E2"] = jahr - 1 # --- Summary rows --- summary_rows = [ ("Gesamtzahl an Erstberatungen", erst, None, vj_erst, None), ("Anzahl Ablehnungen", abl, _pct(abl, erst), vj_abl, _pct(vj_abl, vj_erst)), ("Anzahl versendeter Unterlagen", unt, _pct(unt, erst), vj_unt, _pct(vj_unt, vj_erst)), ("Anzahl keine R\u00fcckmeldung", keine_rm, _pct(keine_rm, erst), vj_keine, _pct(vj_keine, vj_erst)), ("Anzahl erstellter Gutachten", gutachten, _pct(gutachten, erst), vj_ga, _pct(vj_ga, vj_erst)), ] for i, (label, val, pct, vj_val, vj_pct) in enumerate(summary_rows): row = 3 + i ws.cell(row=row, column=1, value=label) ws.cell(row=row, column=3, value=val) if pct is not None: c = ws.cell(row=row, column=4, value=pct) c.number_format = PCT_FORMAT if vj_val: ws.cell(row=row, column=5, value=vj_val) if vj_pct is not None: c = ws.cell(row=row, column=6, value=vj_pct) c.number_format = PCT_FORMAT # --- Column headers (row 10) --- headers = ["KW", "Anzahl an Erstberatungen", "Unterlagen", "Ablehnung", "Keine R\u00fcckmeldung", "Gutachten"] for ci, h in enumerate(headers, start=1): ws.cell(row=10, column=ci, value=h) _apply_header_style(ws, 10, 1, 6) # --- Weekly data (rows 11-62) --- for kw in range(1, MAX_KW + 1): row = 10 + kw # row 11 = KW 1 w = weeks.get(kw, {}) ws.cell(row=row, column=1, value=kw) ws.cell(row=row, column=2, value=_safe(w.get("erstberatungen"))) ws.cell(row=row, column=3, value=_safe(w.get("unterlagen"))) ws.cell(row=row, column=4, value=_safe(w.get("ablehnungen"))) ws.cell(row=row, column=5, value=_safe(w.get("keine_rm"))) ws.cell(row=row, column=6, value=_safe(w.get("gutachten"))) # --- Summe row (row 63) --- summe_row = 10 + MAX_KW + 1 # 63 ws.cell(row=summe_row, column=1, value="Summe") ws.cell(row=summe_row, column=2, value=erst) ws.cell(row=summe_row, column=3, value=unt) ws.cell(row=summe_row, column=4, value=abl) ws.cell(row=summe_row, column=5, value=keine_rm) ws.cell(row=summe_row, column=6, value=gutachten) _apply_header_style(ws, summe_row, 1, 6) # --- Column widths --- ws.column_dimensions["A"].width = 10 ws.column_dimensions["B"].width = 24 ws.column_dimensions["C"].width = 12 ws.column_dimensions["D"].width = 12 ws.column_dimensions["E"].width = 20 ws.column_dimensions["F"].width = 12 # --------------------------------------------------------------------------- # Sheet 2: Auswertung nach Fachgebieten # --------------------------------------------------------------------------- def _write_sheet2_fachgebiete( wb: Workbook, data: dict[str, Any], jahr: int, ) -> None: """Write the 'Auswertung nach Fachgebieten' sheet. Layout: Row 1: A1 = "Uebersicht nach Fallgruppen" Row 3: Merged group headers (B3:D3, E3:G3, H3:J3, K3:M3, N3:P3) Row 4: Sub-headers: KW | Anzahl | Gutachten | Keine RM/Ablehnung (x5) Row 5-56: KW 1-52 data Row 57: Summe row """ ws = wb.create_sheet(title="Auswertung nach Fachgebieten") weeks = _weeks_lookup(data.get("weekly", [])) # --- Title --- ws["A1"] = "\u00dcbersicht nach Fallgruppen" ws["A1"].font = TITLE_FONT # --- Group headers (row 3) with merged cells --- group_start_cols = [2, 5, 8, 11, 14] # B, E, H, K, N for fg_key, start_col in zip(FALLGRUPPEN, group_start_cols): label = FALLGRUPPEN_LABELS[fg_key] cell = ws.cell(row=3, column=start_col, value=label) cell.fill = HEADER_FILL cell.font = HEADER_FONT cell.alignment = Alignment(horizontal="center") # Merge across 3 columns end_col = start_col + 2 ws.merge_cells( start_row=3, start_column=start_col, end_row=3, end_column=end_col, ) # Apply fill to merged cells too for c in range(start_col, end_col + 1): ws.cell(row=3, column=c).fill = HEADER_FILL # --- Sub-headers (row 4) --- ws.cell(row=4, column=1, value="KW") for start_col in group_start_cols: ws.cell(row=4, column=start_col, value="Anzahl") ws.cell(row=4, column=start_col + 1, value="Gutachten") ws.cell(row=4, column=start_col + 2, value="Keine R\u00fcckmeldung/Ablehnung") _apply_header_style(ws, 4, 1, 16) # --- Weekly data (rows 5-56) --- sums = {fg: {"anzahl": 0, "gutachten": 0, "keine_rm": 0} for fg in FALLGRUPPEN} for kw in range(1, MAX_KW + 1): row = 4 + kw # row 5 = KW 1 w = weeks.get(kw, {}) ws.cell(row=row, column=1, value=kw) for fg_key, start_col in zip(FALLGRUPPEN, group_start_cols): fg_data = w.get(fg_key, {}) anz = _safe(fg_data.get("anzahl")) ga = _safe(fg_data.get("gutachten")) keine = _safe(fg_data.get("keine_rm")) ws.cell(row=row, column=start_col, value=anz) ws.cell(row=row, column=start_col + 1, value=ga) ws.cell(row=row, column=start_col + 2, value=keine) sums[fg_key]["anzahl"] += anz sums[fg_key]["gutachten"] += ga sums[fg_key]["keine_rm"] += keine # --- Summe row (row 57) --- summe_row = 4 + MAX_KW + 1 # 57 for fg_key, start_col in zip(FALLGRUPPEN, group_start_cols): ws.cell(row=summe_row, column=start_col, value=sums[fg_key]["anzahl"]) ws.cell(row=summe_row, column=start_col + 1, value=sums[fg_key]["gutachten"]) ws.cell(row=summe_row, column=start_col + 2, value=sums[fg_key]["keine_rm"]) _apply_header_style(ws, summe_row, 1, 16) # --- Column widths --- ws.column_dimensions["A"].width = 6 for start_col in group_start_cols: ws.column_dimensions[get_column_letter(start_col)].width = 8 ws.column_dimensions[get_column_letter(start_col + 1)].width = 10 ws.column_dimensions[get_column_letter(start_col + 2)].width = 14 # --------------------------------------------------------------------------- # Sheet 3: Auswertung Gutachten # --------------------------------------------------------------------------- def _write_sheet3_gutachten( wb: Workbook, data: dict[str, Any], jahr: int, ) -> None: """Write the 'Auswertung Gutachten' sheet. Layout: Row 1: A1 = "Uebersicht nach Fallgruppen" Row 3: Group headers: Gesamt (B3) + 5 Fallgruppen Row 4: Sub-headers: KW | Gutachten | Alternative | Bestaetigung (x6) Row 5-56: KW 1-52 data Row 57: Summe row """ ws = wb.create_sheet(title="Auswertung Gutachten") weeks = _weeks_lookup(data.get("weekly", [])) # --- Title --- ws["A1"] = "\u00dcbersicht nach Fallgruppen" ws["A1"].font = TITLE_FONT # --- Group headers (row 3) --- # Gesamt: B3 (no merge since it's a single-column-start group header, # but in the reference the Gesamt label sits in B3 without a merge) cell = ws.cell(row=3, column=2, value="Gesamt") cell.fill = HEADER_FILL cell.font = HEADER_FONT cell.alignment = Alignment(horizontal="center") # Fallgruppen start at columns E, H, K, N, Q (each 3 cols wide) fg_start_cols = [5, 8, 11, 14, 17] for fg_key, start_col in zip(FALLGRUPPEN, fg_start_cols): label = FALLGRUPPEN_LABELS[fg_key] cell = ws.cell(row=3, column=start_col, value=label) cell.fill = HEADER_FILL cell.font = HEADER_FONT cell.alignment = Alignment(horizontal="center") # Merge: the reference merges first 2 of 3 columns (E3:F3, H3:I3, etc.) end_col = start_col + 1 ws.merge_cells( start_row=3, start_column=start_col, end_row=3, end_column=end_col, ) for c in range(start_col, end_col + 1): ws.cell(row=3, column=c).fill = HEADER_FILL # --- Sub-headers (row 4) --- ws.cell(row=4, column=1, value="KW") # Gesamt columns: B=Gutachten, C=Alternative, D=Bestaetigung ws.cell(row=4, column=2, value="Gutachten") ws.cell(row=4, column=3, value="Alternative") ws.cell(row=4, column=4, value="Best\u00e4tigung") for start_col in fg_start_cols: ws.cell(row=4, column=start_col, value="Gutachten") ws.cell(row=4, column=start_col + 1, value="Alternative") ws.cell(row=4, column=start_col + 2, value="Best\u00e4tigung") _apply_header_style(ws, 4, 1, 19) # --- Weekly data (rows 5-56) --- sums_gesamt = {"gutachten": 0, "alternative": 0, "bestaetigung": 0} sums_fg = {fg: {"gutachten": 0, "alternative": 0, "bestaetigung": 0} for fg in FALLGRUPPEN} for kw in range(1, MAX_KW + 1): row = 4 + kw w = weeks.get(kw, {}) ws.cell(row=row, column=1, value=kw) # Gesamt gesamt = w.get("gesamt", {}) g_ges = _safe(gesamt.get("gutachten")) g_alt = _safe(gesamt.get("alternative")) g_best = _safe(gesamt.get("bestaetigung")) ws.cell(row=row, column=2, value=g_ges) ws.cell(row=row, column=3, value=g_alt) ws.cell(row=row, column=4, value=g_best) sums_gesamt["gutachten"] += g_ges sums_gesamt["alternative"] += g_alt sums_gesamt["bestaetigung"] += g_best # Per Fallgruppe for fg_key, start_col in zip(FALLGRUPPEN, fg_start_cols): fg_data = w.get(fg_key, {}) ga = _safe(fg_data.get("gutachten")) alt = _safe(fg_data.get("alternative")) best = _safe(fg_data.get("bestaetigung")) ws.cell(row=row, column=start_col, value=ga) ws.cell(row=row, column=start_col + 1, value=alt) ws.cell(row=row, column=start_col + 2, value=best) sums_fg[fg_key]["gutachten"] += ga sums_fg[fg_key]["alternative"] += alt sums_fg[fg_key]["bestaetigung"] += best # --- Summe row (row 57) --- summe_row = 4 + MAX_KW + 1 ws.cell(row=summe_row, column=2, value=sums_gesamt["gutachten"]) ws.cell(row=summe_row, column=3, value=sums_gesamt["alternative"]) ws.cell(row=summe_row, column=4, value=sums_gesamt["bestaetigung"]) for fg_key, start_col in zip(FALLGRUPPEN, fg_start_cols): ws.cell(row=summe_row, column=start_col, value=sums_fg[fg_key]["gutachten"]) ws.cell(row=summe_row, column=start_col + 1, value=sums_fg[fg_key]["alternative"]) ws.cell(row=summe_row, column=start_col + 2, value=sums_fg[fg_key]["bestaetigung"]) _apply_header_style(ws, summe_row, 1, 19) # --- Column widths --- ws.column_dimensions["A"].width = 6 for col in range(2, 20): ws.column_dimensions[get_column_letter(col)].width = 12 # --------------------------------------------------------------------------- # Sheet 4: Auswertung ICD onko # --------------------------------------------------------------------------- def _write_sheet4_icd_onko( wb: Workbook, icd_data: list[dict[str, Any]], jahr: int, ) -> None: """Write the 'Auswertung ICD onko' sheet. Layout: Row 1: A1 = "ICD", B1 = "Anzahl von ICD" Row 2: A2 = "ICD", B2 = "AnzahlvonICD" (pivot-table artefact in originals) Row 3+: ICD code | count (sorted by count descending) """ ws = wb.create_sheet(title="Auswertung ICD onko") # Header row (matches reference format exactly) ws["A1"] = "ICD" ws["B1"] = "Anzahl von ICD" ws["A1"].font = HEADER_FONT ws["B1"].font = HEADER_FONT # Second "header" row (artefact from pivot table in originals) ws["A2"] = "ICD" ws["B2"] = "AnzahlvonICD" # Sort by count descending sorted_icd = sorted(icd_data, key=lambda x: _safe(x.get("count")), reverse=True) for i, entry in enumerate(sorted_icd): row = 3 + i ws.cell(row=row, column=1, value=entry.get("icd", "")) ws.cell(row=row, column=2, value=_safe(entry.get("count"))) # Column widths ws.column_dimensions["A"].width = 12 ws.column_dimensions["B"].width = 16