From 1748379253da36e61f35f193ce6978aced467b0b Mon Sep 17 00:00:00 2001 From: CCS Admin Date: Tue, 24 Feb 2026 08:07:28 +0000 Subject: [PATCH] feat: Excel export in Berichtswesen format + historical import Co-Authored-By: Claude Opus 4.6 --- backend/app/services/excel_export.py | 486 +++++++++++++++++++++++ backend/scripts/import_berichtswesen.py | 503 ++++++++++++++++++++++++ 2 files changed, 989 insertions(+) create mode 100644 backend/app/services/excel_export.py create mode 100644 backend/scripts/import_berichtswesen.py diff --git a/backend/app/services/excel_export.py b/backend/app/services/excel_export.py new file mode 100644 index 0000000..c4658c8 --- /dev/null +++ b/backend/app/services/excel_export.py @@ -0,0 +1,486 @@ +"""Excel export service for Berichtswesen files. + +Generates .xlsx files in the exact format used by the DAK Zweitmeinungs-Portal +Berichtswesen, matching the structure of historical files (2023-2026). + +The workbook contains 4 sheets: + 1. Auswertung KW gesamt - Overall weekly summary + year comparison + 2. Auswertung nach Fachgebieten - Weekly breakdown by Fallgruppe + 3. Auswertung Gutachten - Gutachten details (Alternative/Bestaetigung) + 4. Auswertung ICD onko - ICD code frequency listing +""" + +from __future__ import annotations + +from io import BytesIO +from typing import Any + +from openpyxl import Workbook +from openpyxl.styles import Alignment, Font, PatternFill +from openpyxl.utils import get_column_letter + +# --------------------------------------------------------------------------- +# Styles matching the reference Berichtswesen files +# --------------------------------------------------------------------------- + +# Header row fill: theme colour 0 with tint -0.15 (light grey). +# openpyxl cannot replicate theme-based tints portably, so we use a concrete +# grey that closely matches the rendered colour in the original files. +HEADER_FILL = PatternFill(start_color="FFD9D9D9", end_color="FFD9D9D9", fill_type="solid") +BOLD_FONT = Font(bold=True) +HEADER_FONT = Font(bold=True, name="Calibri", size=11) +TITLE_FONT = Font(bold=True, name="Calibri", size=12) +PCT_FORMAT = "0%" + +# Fallgruppe keys in canonical order +FALLGRUPPEN = ["onko", "kardio", "intensiv", "galle", "sd"] +FALLGRUPPEN_LABELS = { + "onko": "Fallgruppe onko", + "kardio": "Fallgruppe kardio", + "intensiv": "Fallgruppe intensiv", + "galle": "Fallgruppe Gallenblase", + "sd": "Fallgruppe Schilddr\u00fcse", +} + +MAX_KW = 52 # always output 52 rows + + +def generate_berichtswesen_xlsx( + report_data: dict[str, Any], + jahr: int, + vorjahr_data: dict[str, Any] | None = None, +) -> bytes: + """Generate a Berichtswesen Excel file. + + Args: + report_data: Output from ``report_service.generate_full_report()``. + Expected keys: ``sheet1``, ``sheet2``, ``sheet3``, ``icd_codes``. + jahr: The report year. + vorjahr_data: Previous-year summary data for the year-over-year + comparison block on Sheet 1. Structure same as *report_data* + ``sheet1`` (i.e. containing ``totals`` and ``weeks``). + + Returns: + The ``.xlsx`` file contents as *bytes*. + """ + wb = Workbook() + + _write_sheet1_kw_gesamt(wb, report_data.get("sheet1", {}), jahr, vorjahr_data) + _write_sheet2_fachgebiete(wb, report_data.get("sheet2", {}), jahr) + _write_sheet3_gutachten(wb, report_data.get("sheet3", {}), jahr) + _write_sheet4_icd_onko(wb, report_data.get("icd_codes", []), jahr) + + # Remove the default empty sheet created by Workbook() + if "Sheet" in wb.sheetnames: + del wb["Sheet"] + + buf = BytesIO() + wb.save(buf) + return buf.getvalue() + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + +def _safe(val: Any, default: int = 0) -> int: + """Return *val* as int, falling back to *default*.""" + if val is None: + return default + try: + return int(val) + except (TypeError, ValueError): + return default + + +def _pct(numerator: int, denominator: int) -> float | None: + """Return fraction or *None* if denominator is zero.""" + if denominator == 0: + return None + return numerator / denominator + + +def _apply_header_style(ws, row: int, min_col: int, max_col: int) -> None: + """Apply header fill + bold font to a row range.""" + for col in range(min_col, max_col + 1): + cell = ws.cell(row=row, column=col) + cell.fill = HEADER_FILL + cell.font = HEADER_FONT + + +def _auto_col_width(ws, min_col: int = 1, max_col: int | None = None, min_width: float = 8.0) -> None: + """Set column widths based on content length, with a sensible minimum.""" + if max_col is None: + max_col = ws.max_column or 1 + for col in range(min_col, max_col + 1): + max_len = min_width + col_letter = get_column_letter(col) + for row in range(1, (ws.max_row or 1) + 1): + val = ws.cell(row=row, column=col).value + if val is not None: + cell_len = len(str(val)) + 2 + if cell_len > max_len: + max_len = cell_len + # Cap at reasonable width + ws.column_dimensions[col_letter].width = min(max_len, 35) + + +def _weeks_lookup(weeks: list[dict]) -> dict[int, dict]: + """Convert a list of week dicts to ``{kw: dict}`` for fast lookup.""" + return {w["kw"]: w for w in weeks} if weeks else {} + + +# --------------------------------------------------------------------------- +# Sheet 1: Auswertung KW gesamt +# --------------------------------------------------------------------------- + +def _write_sheet1_kw_gesamt( + wb: Workbook, + data: dict[str, Any], + jahr: int, + vorjahr_data: dict[str, Any] | None, +) -> None: + """Write the 'Auswertung KW gesamt' sheet. + + Layout (matching reference files): + Row 1: A1 = "Gesamtuebersicht" + Row 2: C2 = , E2 = + Row 3: A3 = "Gesamtzahl an Erstberatungen", C3 = count, E3 = vorjahr + Row 4: A4 = "Anzahl Ablehnungen", C4 = count, D4 = %, E4 = vorjahr, F4 = % + Row 5: A5 = "Anzahl versendeter Unterlagen", C5 = count, D5 = %, E5 = vorjahr, F5 = % + Row 6: A6 = "Anzahl keine Rueckmeldung", C6 = count, D6 = %, E6 = vorjahr, F6 = % + Row 7: A7 = "Anzahl erstellter Gutachten", C7 = count, D7 = %, E7 = vorjahr, F7 = % + Row 8-9: (empty) + Row 10: Column headers (KW | Erstberatungen | Unterlagen | Ablehnung | Keine Rueckmeldung | Gutachten) + Row 11-62: KW 1-52 data + Row 63: Summe row + """ + ws = wb.create_sheet(title="Auswertung KW gesamt") + + totals = data.get("totals", {}) + weeks = _weeks_lookup(data.get("weeks", [])) + + erst = _safe(totals.get("erstberatungen")) + abl = _safe(totals.get("ablehnungen")) + unt = _safe(totals.get("unterlagen")) + keine_rm = _safe(totals.get("keine_rueckmeldung")) + gutachten = _safe(totals.get("gutachten_gesamt")) + + # Vorjahr totals + vj = vorjahr_data or {} + vj_totals = vj.get("totals", {}) if isinstance(vj, dict) else {} + vj_erst = _safe(vj_totals.get("erstberatungen")) + vj_abl = _safe(vj_totals.get("ablehnungen")) + vj_unt = _safe(vj_totals.get("unterlagen")) + vj_keine = _safe(vj_totals.get("keine_rueckmeldung")) + vj_ga = _safe(vj_totals.get("gutachten_gesamt")) + + # --- Title --- + ws["A1"] = "Gesamt\u00fcbersicht" + ws["A1"].font = TITLE_FONT + + # --- Year headers --- + ws["C2"] = jahr + ws["E2"] = jahr - 1 + + # --- Summary rows --- + summary_rows = [ + ("Gesamtzahl an Erstberatungen", erst, None, vj_erst, None), + ("Anzahl Ablehnungen", abl, _pct(abl, erst), vj_abl, _pct(vj_abl, vj_erst)), + ("Anzahl versendeter Unterlagen", unt, _pct(unt, erst), vj_unt, _pct(vj_unt, vj_erst)), + ("Anzahl keine R\u00fcckmeldung", keine_rm, _pct(keine_rm, erst), vj_keine, _pct(vj_keine, vj_erst)), + ("Anzahl erstellter Gutachten", gutachten, _pct(gutachten, erst), vj_ga, _pct(vj_ga, vj_erst)), + ] + for i, (label, val, pct, vj_val, vj_pct) in enumerate(summary_rows): + row = 3 + i + ws.cell(row=row, column=1, value=label) + ws.cell(row=row, column=3, value=val) + if pct is not None: + c = ws.cell(row=row, column=4, value=pct) + c.number_format = PCT_FORMAT + if vj_val: + ws.cell(row=row, column=5, value=vj_val) + if vj_pct is not None: + c = ws.cell(row=row, column=6, value=vj_pct) + c.number_format = PCT_FORMAT + + # --- Column headers (row 10) --- + headers = ["KW", "Anzahl an Erstberatungen", "Unterlagen", "Ablehnung", "Keine R\u00fcckmeldung", "Gutachten"] + for ci, h in enumerate(headers, start=1): + ws.cell(row=10, column=ci, value=h) + _apply_header_style(ws, 10, 1, 6) + + # --- Weekly data (rows 11-62) --- + for kw in range(1, MAX_KW + 1): + row = 10 + kw # row 11 = KW 1 + w = weeks.get(kw, {}) + ws.cell(row=row, column=1, value=kw) + ws.cell(row=row, column=2, value=_safe(w.get("erstberatungen"))) + ws.cell(row=row, column=3, value=_safe(w.get("unterlagen"))) + ws.cell(row=row, column=4, value=_safe(w.get("ablehnungen"))) + ws.cell(row=row, column=5, value=_safe(w.get("keine_rueckmeldung"))) + ws.cell(row=row, column=6, value=_safe(w.get("gutachten_gesamt"))) + + # --- Summe row (row 63) --- + summe_row = 10 + MAX_KW + 1 # 63 + ws.cell(row=summe_row, column=1, value="Summe") + ws.cell(row=summe_row, column=2, value=erst) + ws.cell(row=summe_row, column=3, value=unt) + ws.cell(row=summe_row, column=4, value=abl) + ws.cell(row=summe_row, column=5, value=keine_rm) + ws.cell(row=summe_row, column=6, value=gutachten) + _apply_header_style(ws, summe_row, 1, 6) + + # --- Column widths --- + ws.column_dimensions["A"].width = 10 + ws.column_dimensions["B"].width = 24 + ws.column_dimensions["C"].width = 12 + ws.column_dimensions["D"].width = 12 + ws.column_dimensions["E"].width = 20 + ws.column_dimensions["F"].width = 12 + + +# --------------------------------------------------------------------------- +# Sheet 2: Auswertung nach Fachgebieten +# --------------------------------------------------------------------------- + +def _write_sheet2_fachgebiete( + wb: Workbook, + data: dict[str, Any], + jahr: int, +) -> None: + """Write the 'Auswertung nach Fachgebieten' sheet. + + Layout: + Row 1: A1 = "Uebersicht nach Fallgruppen" + Row 3: Merged group headers (B3:D3, E3:G3, H3:J3, K3:M3, N3:P3) + Row 4: Sub-headers: KW | Anzahl | Gutachten | Keine RM/Ablehnung (x5) + Row 5-56: KW 1-52 data + Row 57: Summe row + """ + ws = wb.create_sheet(title="Auswertung nach Fachgebieten") + + weeks = _weeks_lookup(data.get("weeks", [])) + + # --- Title --- + ws["A1"] = "\u00dcbersicht nach Fallgruppen" + ws["A1"].font = TITLE_FONT + + # --- Group headers (row 3) with merged cells --- + group_start_cols = [2, 5, 8, 11, 14] # B, E, H, K, N + for fg_key, start_col in zip(FALLGRUPPEN, group_start_cols): + label = FALLGRUPPEN_LABELS[fg_key] + cell = ws.cell(row=3, column=start_col, value=label) + cell.fill = HEADER_FILL + cell.font = HEADER_FONT + cell.alignment = Alignment(horizontal="center") + # Merge across 3 columns + end_col = start_col + 2 + ws.merge_cells( + start_row=3, start_column=start_col, + end_row=3, end_column=end_col, + ) + # Apply fill to merged cells too + for c in range(start_col, end_col + 1): + ws.cell(row=3, column=c).fill = HEADER_FILL + + # --- Sub-headers (row 4) --- + ws.cell(row=4, column=1, value="KW") + for start_col in group_start_cols: + ws.cell(row=4, column=start_col, value="Anzahl") + ws.cell(row=4, column=start_col + 1, value="Gutachten") + ws.cell(row=4, column=start_col + 2, value="Keine R\u00fcckmeldung/Ablehnung") + _apply_header_style(ws, 4, 1, 16) + + # --- Weekly data (rows 5-56) --- + sums = {fg: {"anzahl": 0, "gutachten": 0, "keine_rm": 0} for fg in FALLGRUPPEN} + + for kw in range(1, MAX_KW + 1): + row = 4 + kw # row 5 = KW 1 + w = weeks.get(kw, {}) + ws.cell(row=row, column=1, value=kw) + + for fg_key, start_col in zip(FALLGRUPPEN, group_start_cols): + fg_data = w.get(fg_key, {}) + anz = _safe(fg_data.get("anzahl")) + ga = _safe(fg_data.get("gutachten")) + keine = _safe(fg_data.get("keine_rm")) + + ws.cell(row=row, column=start_col, value=anz) + ws.cell(row=row, column=start_col + 1, value=ga) + ws.cell(row=row, column=start_col + 2, value=keine) + + sums[fg_key]["anzahl"] += anz + sums[fg_key]["gutachten"] += ga + sums[fg_key]["keine_rm"] += keine + + # --- Summe row (row 57) --- + summe_row = 4 + MAX_KW + 1 # 57 + for fg_key, start_col in zip(FALLGRUPPEN, group_start_cols): + ws.cell(row=summe_row, column=start_col, value=sums[fg_key]["anzahl"]) + ws.cell(row=summe_row, column=start_col + 1, value=sums[fg_key]["gutachten"]) + ws.cell(row=summe_row, column=start_col + 2, value=sums[fg_key]["keine_rm"]) + _apply_header_style(ws, summe_row, 1, 16) + + # --- Column widths --- + ws.column_dimensions["A"].width = 6 + for start_col in group_start_cols: + ws.column_dimensions[get_column_letter(start_col)].width = 8 + ws.column_dimensions[get_column_letter(start_col + 1)].width = 10 + ws.column_dimensions[get_column_letter(start_col + 2)].width = 14 + + +# --------------------------------------------------------------------------- +# Sheet 3: Auswertung Gutachten +# --------------------------------------------------------------------------- + +def _write_sheet3_gutachten( + wb: Workbook, + data: dict[str, Any], + jahr: int, +) -> None: + """Write the 'Auswertung Gutachten' sheet. + + Layout: + Row 1: A1 = "Uebersicht nach Fallgruppen" + Row 3: Group headers: Gesamt (B3) + 5 Fallgruppen + Row 4: Sub-headers: KW | Gutachten | Alternative | Bestaetigung (x6) + Row 5-56: KW 1-52 data + Row 57: Summe row + """ + ws = wb.create_sheet(title="Auswertung Gutachten") + + weeks = _weeks_lookup(data.get("weeks", [])) + + # --- Title --- + ws["A1"] = "\u00dcbersicht nach Fallgruppen" + ws["A1"].font = TITLE_FONT + + # --- Group headers (row 3) --- + # Gesamt: B3 (no merge since it's a single-column-start group header, + # but in the reference the Gesamt label sits in B3 without a merge) + cell = ws.cell(row=3, column=2, value="Gesamt") + cell.fill = HEADER_FILL + cell.font = HEADER_FONT + cell.alignment = Alignment(horizontal="center") + + # Fallgruppen start at columns E, H, K, N, Q (each 3 cols wide) + fg_start_cols = [5, 8, 11, 14, 17] + for fg_key, start_col in zip(FALLGRUPPEN, fg_start_cols): + label = FALLGRUPPEN_LABELS[fg_key] + cell = ws.cell(row=3, column=start_col, value=label) + cell.fill = HEADER_FILL + cell.font = HEADER_FONT + cell.alignment = Alignment(horizontal="center") + # Merge: the reference merges first 2 of 3 columns (E3:F3, H3:I3, etc.) + end_col = start_col + 1 + ws.merge_cells( + start_row=3, start_column=start_col, + end_row=3, end_column=end_col, + ) + for c in range(start_col, end_col + 1): + ws.cell(row=3, column=c).fill = HEADER_FILL + + # --- Sub-headers (row 4) --- + ws.cell(row=4, column=1, value="KW") + + # Gesamt columns: B=Gutachten, C=Alternative, D=Bestaetigung + ws.cell(row=4, column=2, value="Gutachten") + ws.cell(row=4, column=3, value="Alternative") + ws.cell(row=4, column=4, value="Best\u00e4tigung") + + for start_col in fg_start_cols: + ws.cell(row=4, column=start_col, value="Gutachten") + ws.cell(row=4, column=start_col + 1, value="Alternative") + ws.cell(row=4, column=start_col + 2, value="Best\u00e4tigung") + _apply_header_style(ws, 4, 1, 19) + + # --- Weekly data (rows 5-56) --- + sums_gesamt = {"gutachten": 0, "alternative": 0, "bestaetigung": 0} + sums_fg = {fg: {"gutachten": 0, "alternative": 0, "bestaetigung": 0} for fg in FALLGRUPPEN} + + for kw in range(1, MAX_KW + 1): + row = 4 + kw + w = weeks.get(kw, {}) + ws.cell(row=row, column=1, value=kw) + + # Gesamt + g_ges = _safe(w.get("gutachten_gesamt")) + g_alt = _safe(w.get("gutachten_alternative")) + g_best = _safe(w.get("gutachten_bestaetigung")) + ws.cell(row=row, column=2, value=g_ges) + ws.cell(row=row, column=3, value=g_alt) + ws.cell(row=row, column=4, value=g_best) + sums_gesamt["gutachten"] += g_ges + sums_gesamt["alternative"] += g_alt + sums_gesamt["bestaetigung"] += g_best + + # Per Fallgruppe + for fg_key, start_col in zip(FALLGRUPPEN, fg_start_cols): + fg_data = w.get(fg_key, {}) + ga = _safe(fg_data.get("gutachten")) + alt = _safe(fg_data.get("alternative")) + best = _safe(fg_data.get("bestaetigung")) + ws.cell(row=row, column=start_col, value=ga) + ws.cell(row=row, column=start_col + 1, value=alt) + ws.cell(row=row, column=start_col + 2, value=best) + sums_fg[fg_key]["gutachten"] += ga + sums_fg[fg_key]["alternative"] += alt + sums_fg[fg_key]["bestaetigung"] += best + + # --- Summe row (row 57) --- + summe_row = 4 + MAX_KW + 1 + ws.cell(row=summe_row, column=2, value=sums_gesamt["gutachten"]) + ws.cell(row=summe_row, column=3, value=sums_gesamt["alternative"]) + ws.cell(row=summe_row, column=4, value=sums_gesamt["bestaetigung"]) + for fg_key, start_col in zip(FALLGRUPPEN, fg_start_cols): + ws.cell(row=summe_row, column=start_col, value=sums_fg[fg_key]["gutachten"]) + ws.cell(row=summe_row, column=start_col + 1, value=sums_fg[fg_key]["alternative"]) + ws.cell(row=summe_row, column=start_col + 2, value=sums_fg[fg_key]["bestaetigung"]) + _apply_header_style(ws, summe_row, 1, 19) + + # --- Column widths --- + ws.column_dimensions["A"].width = 6 + for col in range(2, 20): + ws.column_dimensions[get_column_letter(col)].width = 12 + + +# --------------------------------------------------------------------------- +# Sheet 4: Auswertung ICD onko +# --------------------------------------------------------------------------- + +def _write_sheet4_icd_onko( + wb: Workbook, + icd_data: list[dict[str, Any]], + jahr: int, +) -> None: + """Write the 'Auswertung ICD onko' sheet. + + Layout: + Row 1: A1 = "ICD", B1 = "Anzahl von ICD" + Row 2: A2 = "ICD", B2 = "AnzahlvonICD" (pivot-table artefact in originals) + Row 3+: ICD code | count (sorted by count descending) + """ + ws = wb.create_sheet(title="Auswertung ICD onko") + + # Header row (matches reference format exactly) + ws["A1"] = "ICD" + ws["B1"] = "Anzahl von ICD" + ws["A1"].font = HEADER_FONT + ws["B1"].font = HEADER_FONT + + # Second "header" row (artefact from pivot table in originals) + ws["A2"] = "ICD" + ws["B2"] = "AnzahlvonICD" + + # Sort by count descending + sorted_icd = sorted(icd_data, key=lambda x: _safe(x.get("count")), reverse=True) + + for i, entry in enumerate(sorted_icd): + row = 3 + i + ws.cell(row=row, column=1, value=entry.get("icd", "")) + ws.cell(row=row, column=2, value=_safe(entry.get("count"))) + + # Column widths + ws.column_dimensions["A"].width = 12 + ws.column_dimensions["B"].width = 16 diff --git a/backend/scripts/import_berichtswesen.py b/backend/scripts/import_berichtswesen.py new file mode 100644 index 0000000..9830c35 --- /dev/null +++ b/backend/scripts/import_berichtswesen.py @@ -0,0 +1,503 @@ +"""Import historical Berichtswesen .xlsx files into yearly_summary table. + +Reads Sheet 1 (KW gesamt), Sheet 2 (Fachgebiete), Sheet 3 (Gutachten) and +stores the aggregated per-week data in the yearly_summary table for +year-over-year comparison in future reports. + +Supports two file formats: + - 2023+ format: 4 sheets (KW gesamt, Fachgebiete, Gutachten, ICD onko) + - 2022 format: 9 sheets (separate per-Fallgruppe sheets) + +Usage: + cd /home/frontend/dak_c2s/backend + source venv/bin/activate + + # Import a single file: + python -m scripts.import_berichtswesen /path/to/Berichtswesen_2025.xlsx 2025 + + # Import all Berichtswesen files from data/ directory: + python -m scripts.import_berichtswesen + + # Dry run (parse only, no DB writes): + python -m scripts.import_berichtswesen --dry-run +""" + +from __future__ import annotations + +import argparse +import glob +import logging +import os +import re +import sys + +# Ensure the backend package is importable +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from openpyxl import load_workbook + +from app.database import SessionLocal +from app.models.report import YearlySummary + +logger = logging.getLogger(__name__) + +# Fallgruppe keys in canonical order +FALLGRUPPEN = ["onko", "kardio", "intensiv", "galle", "sd"] + +# Mapping from 2022-style sheet suffixes to Fallgruppe keys +FALLGRUPPE_2022_MAP = { + "O": "onko", + "K": "kardio", + "I": "intensiv", +} + + +# --------------------------------------------------------------------------- +# Parsing helpers +# --------------------------------------------------------------------------- + +def _parse_kw(value) -> int | None: + """Extract KW number from a cell value. + + Handles both formats: + - Integer: 1, 2, ... 52 + - String: "KW01", "KW02", etc. + """ + if isinstance(value, (int, float)): + kw = int(value) + if 1 <= kw <= 53: + return kw + return None + if isinstance(value, str): + m = re.match(r"KW\s*(\d+)", value, re.IGNORECASE) + if m: + return int(m.group(1)) + return None + + +def _safe_int(value, default: int = 0) -> int: + """Convert a cell value to int, defaulting to *default*.""" + if value is None: + return default + try: + return int(value) + except (TypeError, ValueError): + return default + + +def _detect_year_from_filename(filename: str) -> int | None: + """Try to extract the year from a Berichtswesen filename. + + Examples: + Berichtswesen_2025_31122025_final.xlsx -> 2025 + Berichtswesen_2022.xlsx -> 2022 + """ + m = re.search(r"Berichtswesen_(\d{4})", os.path.basename(filename)) + if m: + return int(m.group(1)) + return None + + +def _is_2022_format(wb) -> bool: + """Detect whether this is the older 2022 format (separate per-Fallgruppe sheets).""" + return "Auswertung KW-O" in wb.sheetnames + + +# --------------------------------------------------------------------------- +# 2023+ format parser +# --------------------------------------------------------------------------- + +def _parse_modern_format(wb, year: int) -> list[dict]: + """Parse Berichtswesen files in the 2023+ format. + + Returns a list of dicts, one per KW, with all fields matching YearlySummary columns. + """ + rows_by_kw: dict[int, dict] = {} + + # --- Sheet 1: Auswertung KW gesamt --- + if "Auswertung KW gesamt" in wb.sheetnames: + ws = wb["Auswertung KW gesamt"] + for row_idx in range(11, 63): # Rows 11-62 = KW 1-52 + kw_val = ws.cell(row=row_idx, column=1).value + kw = _parse_kw(kw_val) + if kw is None: + continue + + rows_by_kw.setdefault(kw, {"jahr": year, "kw": kw}) + d = rows_by_kw[kw] + d["erstberatungen"] = _safe_int(ws.cell(row=row_idx, column=2).value) + d["unterlagen"] = _safe_int(ws.cell(row=row_idx, column=3).value) + d["ablehnungen"] = _safe_int(ws.cell(row=row_idx, column=4).value) + d["keine_rueckmeldung"] = _safe_int(ws.cell(row=row_idx, column=5).value) + d["gutachten_gesamt"] = _safe_int(ws.cell(row=row_idx, column=6).value) + + # --- Sheet 2: Auswertung nach Fachgebieten --- + if "Auswertung nach Fachgebieten" in wb.sheetnames: + ws = wb["Auswertung nach Fachgebieten"] + # Fallgruppe start columns: B=onko, E=kardio, H=intensiv, K=galle, N=sd + fg_start_cols = [2, 5, 8, 11, 14] + + for row_idx in range(5, 57): # Rows 5-56 = KW 1-52 + kw_val = ws.cell(row=row_idx, column=1).value + kw = _parse_kw(kw_val) + if kw is None: + continue + + rows_by_kw.setdefault(kw, {"jahr": year, "kw": kw}) + d = rows_by_kw[kw] + + for fg_key, start_col in zip(FALLGRUPPEN, fg_start_cols): + d[f"{fg_key}_anzahl"] = _safe_int(ws.cell(row=row_idx, column=start_col).value) + d[f"{fg_key}_gutachten"] = _safe_int(ws.cell(row=row_idx, column=start_col + 1).value) + d[f"{fg_key}_keine_rm"] = _safe_int(ws.cell(row=row_idx, column=start_col + 2).value) + + # --- Sheet 3: Auswertung Gutachten --- + if "Auswertung Gutachten" in wb.sheetnames: + ws = wb["Auswertung Gutachten"] + # Gesamt: B=Gutachten, C=Alternative, D=Bestaetigung + # Fallgruppen: E/H/K/N/Q (each 3 cols: Gutachten, Alternative, Bestaetigung) + fg_gut_start_cols = [5, 8, 11, 14, 17] + + for row_idx in range(5, 57): # Rows 5-56 = KW 1-52 + kw_val = ws.cell(row=row_idx, column=1).value + kw = _parse_kw(kw_val) + if kw is None: + continue + + rows_by_kw.setdefault(kw, {"jahr": year, "kw": kw}) + d = rows_by_kw[kw] + + d["gutachten_alternative"] = _safe_int(ws.cell(row=row_idx, column=3).value) + d["gutachten_bestaetigung"] = _safe_int(ws.cell(row=row_idx, column=4).value) + + for fg_key, start_col in zip(FALLGRUPPEN, fg_gut_start_cols): + d[f"{fg_key}_alternative"] = _safe_int(ws.cell(row=row_idx, column=start_col + 1).value) + d[f"{fg_key}_bestaetigung"] = _safe_int(ws.cell(row=row_idx, column=start_col + 2).value) + + return list(rows_by_kw.values()) + + +# --------------------------------------------------------------------------- +# 2022 format parser +# --------------------------------------------------------------------------- + +def _parse_2022_format(wb, year: int) -> list[dict]: + """Parse the older 2022 Berichtswesen format. + + The 2022 format has: + - 'Auswertung KW gesamt': KW | Summe von EG | Summe von Gutachten | Summe von Keine RM + - 'Auswertung KW-O/K/I': Per-Fallgruppe weekly counts + - 'Auswertung Gutachten gesamt': KW | Gutachten | Alternative | Bestaetigung + - 'Auswertung Gutachten O/K/I': Per-Fallgruppe gutachten detail + + Note: 2022 only has 3 Fallgruppen (onko, kardio, intensiv). + No Unterlagen/Ablehnungen columns, no Gallenblase/Schilddruese. + """ + rows_by_kw: dict[int, dict] = {} + + # --- KW gesamt --- + if "Auswertung KW gesamt" in wb.sheetnames: + ws = wb["Auswertung KW gesamt"] + for row_idx in range(2, ws.max_row + 1): + kw_val = ws.cell(row=row_idx, column=1).value + kw = _parse_kw(kw_val) + if kw is None: + continue + + rows_by_kw.setdefault(kw, {"jahr": year, "kw": kw}) + d = rows_by_kw[kw] + d["erstberatungen"] = _safe_int(ws.cell(row=row_idx, column=2).value) + d["gutachten_gesamt"] = _safe_int(ws.cell(row=row_idx, column=3).value) + d["keine_rueckmeldung"] = _safe_int(ws.cell(row=row_idx, column=4).value) + # 2022 doesn't separate Unterlagen/Ablehnungen + d["unterlagen"] = 0 + d["ablehnungen"] = 0 + + # --- Per-Fallgruppe KW sheets --- + for suffix, fg_key in FALLGRUPPE_2022_MAP.items(): + sheet_name = f"Auswertung KW-{suffix}" + if sheet_name not in wb.sheetnames: + continue + ws = wb[sheet_name] + for row_idx in range(2, ws.max_row + 1): + kw_val = ws.cell(row=row_idx, column=1).value + kw = _parse_kw(kw_val) + if kw is None: + continue + + rows_by_kw.setdefault(kw, {"jahr": year, "kw": kw}) + d = rows_by_kw[kw] + d[f"{fg_key}_anzahl"] = _safe_int(ws.cell(row=row_idx, column=2).value) + d[f"{fg_key}_gutachten"] = _safe_int(ws.cell(row=row_idx, column=3).value) + d[f"{fg_key}_keine_rm"] = _safe_int(ws.cell(row=row_idx, column=4).value) + + # --- Gutachten gesamt --- + if "Auswertung Gutachten gesamt" in wb.sheetnames: + ws = wb["Auswertung Gutachten gesamt"] + for row_idx in range(2, ws.max_row + 1): + kw_val = ws.cell(row=row_idx, column=1).value + kw = _parse_kw(kw_val) + if kw is None: + continue + + rows_by_kw.setdefault(kw, {"jahr": year, "kw": kw}) + d = rows_by_kw[kw] + d["gutachten_alternative"] = _safe_int(ws.cell(row=row_idx, column=3).value) + d["gutachten_bestaetigung"] = _safe_int(ws.cell(row=row_idx, column=4).value) + + # --- Per-Fallgruppe Gutachten sheets --- + for suffix, fg_key in FALLGRUPPE_2022_MAP.items(): + sheet_name = f"Auswertung Gutachten {suffix}" + if sheet_name not in wb.sheetnames: + continue + ws = wb[sheet_name] + for row_idx in range(2, ws.max_row + 1): + kw_val = ws.cell(row=row_idx, column=1).value + kw = _parse_kw(kw_val) + if kw is None: + continue + + rows_by_kw.setdefault(kw, {"jahr": year, "kw": kw}) + d = rows_by_kw[kw] + d[f"{fg_key}_alternative"] = _safe_int(ws.cell(row=row_idx, column=3).value) + d[f"{fg_key}_bestaetigung"] = _safe_int(ws.cell(row=row_idx, column=4).value) + + return list(rows_by_kw.values()) + + +# --------------------------------------------------------------------------- +# DB upsert +# --------------------------------------------------------------------------- + +# Columns on YearlySummary that we populate +_YEARLY_SUMMARY_FIELDS = [ + "erstberatungen", "ablehnungen", "unterlagen", "keine_rueckmeldung", + "gutachten_gesamt", "gutachten_alternative", "gutachten_bestaetigung", + "onko_anzahl", "onko_gutachten", "onko_keine_rm", + "onko_alternative", "onko_bestaetigung", + "kardio_anzahl", "kardio_gutachten", "kardio_keine_rm", + "kardio_alternative", "kardio_bestaetigung", + "intensiv_anzahl", "intensiv_gutachten", "intensiv_keine_rm", + "intensiv_alternative", "intensiv_bestaetigung", + "galle_anzahl", "galle_gutachten", "galle_keine_rm", + "galle_alternative", "galle_bestaetigung", + "sd_anzahl", "sd_gutachten", "sd_keine_rm", + "sd_alternative", "sd_bestaetigung", +] + + +def _upsert_yearly_summary(db, rows: list[dict], dry_run: bool = False) -> dict: + """Insert or update yearly_summary rows. + + Returns summary dict with counts. + """ + inserted = 0 + updated = 0 + errors = [] + + for row_data in rows: + jahr = row_data.get("jahr") + kw = row_data.get("kw") + if jahr is None or kw is None: + errors.append(f"Missing jahr/kw in row: {row_data}") + continue + + try: + existing = ( + db.query(YearlySummary) + .filter(YearlySummary.jahr == jahr, YearlySummary.kw == kw) + .first() + ) + + if existing: + # Update existing record + for field in _YEARLY_SUMMARY_FIELDS: + if field in row_data: + setattr(existing, field, row_data[field]) + updated += 1 + else: + # Create new record + obj = YearlySummary(jahr=jahr, kw=kw) + for field in _YEARLY_SUMMARY_FIELDS: + if field in row_data: + setattr(obj, field, row_data[field]) + db.add(obj) + inserted += 1 + + except Exception as e: + errors.append(f"Error for KW {kw}/{jahr}: {e}") + + if not dry_run: + db.flush() + + return {"inserted": inserted, "updated": updated, "errors": errors} + + +# --------------------------------------------------------------------------- +# Public import function +# --------------------------------------------------------------------------- + +def import_berichtswesen_file( + db, + filepath: str, + year: int | None = None, + dry_run: bool = False, +) -> dict: + """Import a single Berichtswesen file into yearly_summary. + + Args: + db: SQLAlchemy session. + filepath: Path to the .xlsx file. + year: Report year (auto-detected from filename if None). + dry_run: If True, parse but don't write to DB. + + Returns: + Dict with 'year', 'weeks_parsed', 'inserted', 'updated', 'errors'. + """ + if year is None: + year = _detect_year_from_filename(filepath) + if year is None: + raise ValueError( + f"Cannot detect year from filename: {os.path.basename(filepath)}. " + "Please specify the year explicitly." + ) + + wb = load_workbook(filepath, read_only=True, data_only=True) + try: + if _is_2022_format(wb): + rows = _parse_2022_format(wb, year) + else: + rows = _parse_modern_format(wb, year) + finally: + wb.close() + + result = _upsert_yearly_summary(db, rows, dry_run=dry_run) + result["year"] = year + result["weeks_parsed"] = len(rows) + return result + + +# --------------------------------------------------------------------------- +# CLI +# --------------------------------------------------------------------------- + +def main() -> None: + parser = argparse.ArgumentParser( + description="Import historical Berichtswesen .xlsx files into yearly_summary table" + ) + parser.add_argument( + "filepath", + nargs="?", + default=None, + help="Path to a single Berichtswesen .xlsx file. " + "If omitted, imports all Berichtswesen_*.xlsx from ../data/", + ) + parser.add_argument( + "year", + nargs="?", + type=int, + default=None, + help="Report year (auto-detected from filename if omitted)", + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Parse and validate without writing to DB", + ) + parser.add_argument( + "--verbose", "-v", + action="store_true", + help="Show detailed per-row info", + ) + args = parser.parse_args() + + # Configure logging + log_level = logging.DEBUG if args.verbose else logging.INFO + logging.basicConfig( + level=log_level, + format="%(asctime)s %(levelname)-8s %(name)s: %(message)s", + datefmt="%H:%M:%S", + ) + + # Determine files to import + if args.filepath: + files = [(args.filepath, args.year)] + else: + data_dir = os.path.join( + os.path.dirname( # dak_c2s/ + os.path.dirname( # dak_c2s/backend/ + os.path.dirname(os.path.abspath(__file__)) # dak_c2s/backend/scripts/ + ) + ), + "data", + ) + pattern = os.path.join(data_dir, "Berichtswesen_*.xlsx") + found = sorted(glob.glob(pattern)) + if not found: + print(f"ERROR: No Berichtswesen files found in {data_dir}") + sys.exit(1) + files = [(f, None) for f in found] + + print(f"Importing {len(files)} Berichtswesen file(s)") + if args.dry_run: + print("*** DRY RUN -- no changes will be committed ***") + print() + + db = SessionLocal() + total_inserted = 0 + total_updated = 0 + total_errors = 0 + + try: + for filepath, year in files: + if not os.path.exists(filepath): + print(f" ERROR: File not found: {filepath}") + continue + + basename = os.path.basename(filepath) + try: + result = import_berichtswesen_file(db, filepath, year=year, dry_run=args.dry_run) + + status = "OK" if not result["errors"] else f"{len(result['errors'])} errors" + print( + f" {basename:45s} year={result['year']} " + f"weeks={result['weeks_parsed']:2d} " + f"ins={result['inserted']:3d} " + f"upd={result['updated']:3d} [{status}]" + ) + + if args.verbose and result["errors"]: + for err in result["errors"]: + print(f" - {err}") + + total_inserted += result["inserted"] + total_updated += result["updated"] + total_errors += len(result["errors"]) + + except Exception as e: + print(f" {basename:45s} ERROR: {e}") + total_errors += 1 + + if args.dry_run: + db.rollback() + print(f"\nDry run complete -- rolled back. " + f"Would have inserted {total_inserted}, updated {total_updated}.") + else: + db.commit() + print(f"\nImport committed. " + f"Inserted {total_inserted}, updated {total_updated}.") + + if total_errors: + print(f"Total errors: {total_errors}") + + except Exception as e: + db.rollback() + print(f"\nERROR: Import failed: {e}") + logging.exception("Import failed") + sys.exit(1) + finally: + db.close() + + +if __name__ == "__main__": + main()