"""Import historical Berichtswesen .xlsx files into yearly_summary table. Reads Sheet 1 (KW gesamt), Sheet 2 (Fachgebiete), Sheet 3 (Gutachten) and stores the aggregated per-week data in the yearly_summary table for year-over-year comparison in future reports. Supports two file formats: - 2023+ format: 4 sheets (KW gesamt, Fachgebiete, Gutachten, ICD onko) - 2022 format: 9 sheets (separate per-Fallgruppe sheets) Usage: cd /home/frontend/dak_c2s/backend source venv/bin/activate # Import a single file: python -m scripts.import_berichtswesen /path/to/Berichtswesen_2025.xlsx 2025 # Import all Berichtswesen files from data/ directory: python -m scripts.import_berichtswesen # Dry run (parse only, no DB writes): python -m scripts.import_berichtswesen --dry-run """ from __future__ import annotations import argparse import glob import logging import os import re import sys # Ensure the backend package is importable sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from openpyxl import load_workbook from app.database import SessionLocal from app.models.report import YearlySummary logger = logging.getLogger(__name__) # Fallgruppe keys in canonical order FALLGRUPPEN = ["onko", "kardio", "intensiv", "galle", "sd"] # Mapping from 2022-style sheet suffixes to Fallgruppe keys FALLGRUPPE_2022_MAP = { "O": "onko", "K": "kardio", "I": "intensiv", } # --------------------------------------------------------------------------- # Parsing helpers # --------------------------------------------------------------------------- def _parse_kw(value) -> int | None: """Extract KW number from a cell value. Handles both formats: - Integer: 1, 2, ... 52 - String: "KW01", "KW02", etc. """ if isinstance(value, (int, float)): kw = int(value) if 1 <= kw <= 53: return kw return None if isinstance(value, str): m = re.match(r"KW\s*(\d+)", value, re.IGNORECASE) if m: return int(m.group(1)) return None def _safe_int(value, default: int = 0) -> int: """Convert a cell value to int, defaulting to *default*.""" if value is None: return default try: return int(value) except (TypeError, ValueError): return default def _detect_year_from_filename(filename: str) -> int | None: """Try to extract the year from a Berichtswesen filename. Examples: Berichtswesen_2025_31122025_final.xlsx -> 2025 Berichtswesen_2022.xlsx -> 2022 """ m = re.search(r"Berichtswesen_(\d{4})", os.path.basename(filename)) if m: return int(m.group(1)) return None def _is_2022_format(wb) -> bool: """Detect whether this is the older 2022 format (separate per-Fallgruppe sheets).""" return "Auswertung KW-O" in wb.sheetnames # --------------------------------------------------------------------------- # 2023+ format parser # --------------------------------------------------------------------------- def _parse_modern_format(wb, year: int) -> list[dict]: """Parse Berichtswesen files in the 2023+ format. Returns a list of dicts, one per KW, with all fields matching YearlySummary columns. """ rows_by_kw: dict[int, dict] = {} # --- Sheet 1: Auswertung KW gesamt --- if "Auswertung KW gesamt" in wb.sheetnames: ws = wb["Auswertung KW gesamt"] for row_idx in range(11, 63): # Rows 11-62 = KW 1-52 kw_val = ws.cell(row=row_idx, column=1).value kw = _parse_kw(kw_val) if kw is None: continue rows_by_kw.setdefault(kw, {"jahr": year, "kw": kw}) d = rows_by_kw[kw] d["erstberatungen"] = _safe_int(ws.cell(row=row_idx, column=2).value) d["unterlagen"] = _safe_int(ws.cell(row=row_idx, column=3).value) d["ablehnungen"] = _safe_int(ws.cell(row=row_idx, column=4).value) d["keine_rueckmeldung"] = _safe_int(ws.cell(row=row_idx, column=5).value) d["gutachten_gesamt"] = _safe_int(ws.cell(row=row_idx, column=6).value) # --- Sheet 2: Auswertung nach Fachgebieten --- if "Auswertung nach Fachgebieten" in wb.sheetnames: ws = wb["Auswertung nach Fachgebieten"] # Fallgruppe start columns: B=onko, E=kardio, H=intensiv, K=galle, N=sd fg_start_cols = [2, 5, 8, 11, 14] for row_idx in range(5, 57): # Rows 5-56 = KW 1-52 kw_val = ws.cell(row=row_idx, column=1).value kw = _parse_kw(kw_val) if kw is None: continue rows_by_kw.setdefault(kw, {"jahr": year, "kw": kw}) d = rows_by_kw[kw] for fg_key, start_col in zip(FALLGRUPPEN, fg_start_cols): d[f"{fg_key}_anzahl"] = _safe_int(ws.cell(row=row_idx, column=start_col).value) d[f"{fg_key}_gutachten"] = _safe_int(ws.cell(row=row_idx, column=start_col + 1).value) d[f"{fg_key}_keine_rm"] = _safe_int(ws.cell(row=row_idx, column=start_col + 2).value) # --- Sheet 3: Auswertung Gutachten --- if "Auswertung Gutachten" in wb.sheetnames: ws = wb["Auswertung Gutachten"] # Gesamt: B=Gutachten, C=Alternative, D=Bestaetigung # Fallgruppen: E/H/K/N/Q (each 3 cols: Gutachten, Alternative, Bestaetigung) fg_gut_start_cols = [5, 8, 11, 14, 17] for row_idx in range(5, 57): # Rows 5-56 = KW 1-52 kw_val = ws.cell(row=row_idx, column=1).value kw = _parse_kw(kw_val) if kw is None: continue rows_by_kw.setdefault(kw, {"jahr": year, "kw": kw}) d = rows_by_kw[kw] d["gutachten_alternative"] = _safe_int(ws.cell(row=row_idx, column=3).value) d["gutachten_bestaetigung"] = _safe_int(ws.cell(row=row_idx, column=4).value) for fg_key, start_col in zip(FALLGRUPPEN, fg_gut_start_cols): d[f"{fg_key}_alternative"] = _safe_int(ws.cell(row=row_idx, column=start_col + 1).value) d[f"{fg_key}_bestaetigung"] = _safe_int(ws.cell(row=row_idx, column=start_col + 2).value) return list(rows_by_kw.values()) # --------------------------------------------------------------------------- # 2022 format parser # --------------------------------------------------------------------------- def _parse_2022_format(wb, year: int) -> list[dict]: """Parse the older 2022 Berichtswesen format. The 2022 format has: - 'Auswertung KW gesamt': KW | Summe von EG | Summe von Gutachten | Summe von Keine RM - 'Auswertung KW-O/K/I': Per-Fallgruppe weekly counts - 'Auswertung Gutachten gesamt': KW | Gutachten | Alternative | Bestaetigung - 'Auswertung Gutachten O/K/I': Per-Fallgruppe gutachten detail Note: 2022 only has 3 Fallgruppen (onko, kardio, intensiv). No Unterlagen/Ablehnungen columns, no Gallenblase/Schilddruese. """ rows_by_kw: dict[int, dict] = {} # --- KW gesamt --- if "Auswertung KW gesamt" in wb.sheetnames: ws = wb["Auswertung KW gesamt"] for row_idx in range(2, ws.max_row + 1): kw_val = ws.cell(row=row_idx, column=1).value kw = _parse_kw(kw_val) if kw is None: continue rows_by_kw.setdefault(kw, {"jahr": year, "kw": kw}) d = rows_by_kw[kw] d["erstberatungen"] = _safe_int(ws.cell(row=row_idx, column=2).value) d["gutachten_gesamt"] = _safe_int(ws.cell(row=row_idx, column=3).value) d["keine_rueckmeldung"] = _safe_int(ws.cell(row=row_idx, column=4).value) # 2022 doesn't separate Unterlagen/Ablehnungen d["unterlagen"] = 0 d["ablehnungen"] = 0 # --- Per-Fallgruppe KW sheets --- for suffix, fg_key in FALLGRUPPE_2022_MAP.items(): sheet_name = f"Auswertung KW-{suffix}" if sheet_name not in wb.sheetnames: continue ws = wb[sheet_name] for row_idx in range(2, ws.max_row + 1): kw_val = ws.cell(row=row_idx, column=1).value kw = _parse_kw(kw_val) if kw is None: continue rows_by_kw.setdefault(kw, {"jahr": year, "kw": kw}) d = rows_by_kw[kw] d[f"{fg_key}_anzahl"] = _safe_int(ws.cell(row=row_idx, column=2).value) d[f"{fg_key}_gutachten"] = _safe_int(ws.cell(row=row_idx, column=3).value) d[f"{fg_key}_keine_rm"] = _safe_int(ws.cell(row=row_idx, column=4).value) # --- Gutachten gesamt --- if "Auswertung Gutachten gesamt" in wb.sheetnames: ws = wb["Auswertung Gutachten gesamt"] for row_idx in range(2, ws.max_row + 1): kw_val = ws.cell(row=row_idx, column=1).value kw = _parse_kw(kw_val) if kw is None: continue rows_by_kw.setdefault(kw, {"jahr": year, "kw": kw}) d = rows_by_kw[kw] d["gutachten_alternative"] = _safe_int(ws.cell(row=row_idx, column=3).value) d["gutachten_bestaetigung"] = _safe_int(ws.cell(row=row_idx, column=4).value) # --- Per-Fallgruppe Gutachten sheets --- for suffix, fg_key in FALLGRUPPE_2022_MAP.items(): sheet_name = f"Auswertung Gutachten {suffix}" if sheet_name not in wb.sheetnames: continue ws = wb[sheet_name] for row_idx in range(2, ws.max_row + 1): kw_val = ws.cell(row=row_idx, column=1).value kw = _parse_kw(kw_val) if kw is None: continue rows_by_kw.setdefault(kw, {"jahr": year, "kw": kw}) d = rows_by_kw[kw] d[f"{fg_key}_alternative"] = _safe_int(ws.cell(row=row_idx, column=3).value) d[f"{fg_key}_bestaetigung"] = _safe_int(ws.cell(row=row_idx, column=4).value) return list(rows_by_kw.values()) # --------------------------------------------------------------------------- # DB upsert # --------------------------------------------------------------------------- # Columns on YearlySummary that we populate _YEARLY_SUMMARY_FIELDS = [ "erstberatungen", "ablehnungen", "unterlagen", "keine_rueckmeldung", "gutachten_gesamt", "gutachten_alternative", "gutachten_bestaetigung", "onko_anzahl", "onko_gutachten", "onko_keine_rm", "onko_alternative", "onko_bestaetigung", "kardio_anzahl", "kardio_gutachten", "kardio_keine_rm", "kardio_alternative", "kardio_bestaetigung", "intensiv_anzahl", "intensiv_gutachten", "intensiv_keine_rm", "intensiv_alternative", "intensiv_bestaetigung", "galle_anzahl", "galle_gutachten", "galle_keine_rm", "galle_alternative", "galle_bestaetigung", "sd_anzahl", "sd_gutachten", "sd_keine_rm", "sd_alternative", "sd_bestaetigung", ] def _upsert_yearly_summary(db, rows: list[dict], dry_run: bool = False) -> dict: """Insert or update yearly_summary rows. Returns summary dict with counts. """ inserted = 0 updated = 0 errors = [] for row_data in rows: jahr = row_data.get("jahr") kw = row_data.get("kw") if jahr is None or kw is None: errors.append(f"Missing jahr/kw in row: {row_data}") continue try: existing = ( db.query(YearlySummary) .filter(YearlySummary.jahr == jahr, YearlySummary.kw == kw) .first() ) if existing: # Update existing record for field in _YEARLY_SUMMARY_FIELDS: if field in row_data: setattr(existing, field, row_data[field]) updated += 1 else: # Create new record obj = YearlySummary(jahr=jahr, kw=kw) for field in _YEARLY_SUMMARY_FIELDS: if field in row_data: setattr(obj, field, row_data[field]) db.add(obj) inserted += 1 except Exception as e: errors.append(f"Error for KW {kw}/{jahr}: {e}") if not dry_run: db.flush() return {"inserted": inserted, "updated": updated, "errors": errors} # --------------------------------------------------------------------------- # Public import function # --------------------------------------------------------------------------- def import_berichtswesen_file( db, filepath: str, year: int | None = None, dry_run: bool = False, ) -> dict: """Import a single Berichtswesen file into yearly_summary. Args: db: SQLAlchemy session. filepath: Path to the .xlsx file. year: Report year (auto-detected from filename if None). dry_run: If True, parse but don't write to DB. Returns: Dict with 'year', 'weeks_parsed', 'inserted', 'updated', 'errors'. """ if year is None: year = _detect_year_from_filename(filepath) if year is None: raise ValueError( f"Cannot detect year from filename: {os.path.basename(filepath)}. " "Please specify the year explicitly." ) wb = load_workbook(filepath, read_only=True, data_only=True) try: if _is_2022_format(wb): rows = _parse_2022_format(wb, year) else: rows = _parse_modern_format(wb, year) finally: wb.close() result = _upsert_yearly_summary(db, rows, dry_run=dry_run) result["year"] = year result["weeks_parsed"] = len(rows) return result # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def main() -> None: parser = argparse.ArgumentParser( description="Import historical Berichtswesen .xlsx files into yearly_summary table" ) parser.add_argument( "filepath", nargs="?", default=None, help="Path to a single Berichtswesen .xlsx file. " "If omitted, imports all Berichtswesen_*.xlsx from ../data/", ) parser.add_argument( "year", nargs="?", type=int, default=None, help="Report year (auto-detected from filename if omitted)", ) parser.add_argument( "--dry-run", action="store_true", help="Parse and validate without writing to DB", ) parser.add_argument( "--verbose", "-v", action="store_true", help="Show detailed per-row info", ) args = parser.parse_args() # Configure logging log_level = logging.DEBUG if args.verbose else logging.INFO logging.basicConfig( level=log_level, format="%(asctime)s %(levelname)-8s %(name)s: %(message)s", datefmt="%H:%M:%S", ) # Determine files to import if args.filepath: files = [(args.filepath, args.year)] else: data_dir = os.path.join( os.path.dirname( # dak_c2s/ os.path.dirname( # dak_c2s/backend/ os.path.dirname(os.path.abspath(__file__)) # dak_c2s/backend/scripts/ ) ), "data", ) pattern = os.path.join(data_dir, "Berichtswesen_*.xlsx") found = sorted(glob.glob(pattern)) if not found: print(f"ERROR: No Berichtswesen files found in {data_dir}") sys.exit(1) files = [(f, None) for f in found] print(f"Importing {len(files)} Berichtswesen file(s)") if args.dry_run: print("*** DRY RUN -- no changes will be committed ***") print() db = SessionLocal() total_inserted = 0 total_updated = 0 total_errors = 0 try: for filepath, year in files: if not os.path.exists(filepath): print(f" ERROR: File not found: {filepath}") continue basename = os.path.basename(filepath) try: result = import_berichtswesen_file(db, filepath, year=year, dry_run=args.dry_run) status = "OK" if not result["errors"] else f"{len(result['errors'])} errors" print( f" {basename:45s} year={result['year']} " f"weeks={result['weeks_parsed']:2d} " f"ins={result['inserted']:3d} " f"upd={result['updated']:3d} [{status}]" ) if args.verbose and result["errors"]: for err in result["errors"]: print(f" - {err}") total_inserted += result["inserted"] total_updated += result["updated"] total_errors += len(result["errors"]) except Exception as e: print(f" {basename:45s} ERROR: {e}") total_errors += 1 if args.dry_run: db.rollback() print(f"\nDry run complete -- rolled back. " f"Would have inserted {total_inserted}, updated {total_updated}.") else: db.commit() print(f"\nImport committed. " f"Inserted {total_inserted}, updated {total_updated}.") if total_errors: print(f"Total errors: {total_errors}") except Exception as e: db.rollback() print(f"\nERROR: Import failed: {e}") logging.exception("Import failed") sys.exit(1) finally: db.close() if __name__ == "__main__": main()