dak.c2s/backend/scripts/import_berichtswesen.py
CCS Admin 1748379253 feat: Excel export in Berichtswesen format + historical import
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-24 08:07:28 +00:00

503 lines
17 KiB
Python

"""Import historical Berichtswesen .xlsx files into yearly_summary table.
Reads Sheet 1 (KW gesamt), Sheet 2 (Fachgebiete), Sheet 3 (Gutachten) and
stores the aggregated per-week data in the yearly_summary table for
year-over-year comparison in future reports.
Supports two file formats:
- 2023+ format: 4 sheets (KW gesamt, Fachgebiete, Gutachten, ICD onko)
- 2022 format: 9 sheets (separate per-Fallgruppe sheets)
Usage:
cd /home/frontend/dak_c2s/backend
source venv/bin/activate
# Import a single file:
python -m scripts.import_berichtswesen /path/to/Berichtswesen_2025.xlsx 2025
# Import all Berichtswesen files from data/ directory:
python -m scripts.import_berichtswesen
# Dry run (parse only, no DB writes):
python -m scripts.import_berichtswesen --dry-run
"""
from __future__ import annotations
import argparse
import glob
import logging
import os
import re
import sys
# Ensure the backend package is importable
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from openpyxl import load_workbook
from app.database import SessionLocal
from app.models.report import YearlySummary
logger = logging.getLogger(__name__)
# Fallgruppe keys in canonical order
FALLGRUPPEN = ["onko", "kardio", "intensiv", "galle", "sd"]
# Mapping from 2022-style sheet suffixes to Fallgruppe keys
FALLGRUPPE_2022_MAP = {
"O": "onko",
"K": "kardio",
"I": "intensiv",
}
# ---------------------------------------------------------------------------
# Parsing helpers
# ---------------------------------------------------------------------------
def _parse_kw(value) -> int | None:
"""Extract KW number from a cell value.
Handles both formats:
- Integer: 1, 2, ... 52
- String: "KW01", "KW02", etc.
"""
if isinstance(value, (int, float)):
kw = int(value)
if 1 <= kw <= 53:
return kw
return None
if isinstance(value, str):
m = re.match(r"KW\s*(\d+)", value, re.IGNORECASE)
if m:
return int(m.group(1))
return None
def _safe_int(value, default: int = 0) -> int:
"""Convert a cell value to int, defaulting to *default*."""
if value is None:
return default
try:
return int(value)
except (TypeError, ValueError):
return default
def _detect_year_from_filename(filename: str) -> int | None:
"""Try to extract the year from a Berichtswesen filename.
Examples:
Berichtswesen_2025_31122025_final.xlsx -> 2025
Berichtswesen_2022.xlsx -> 2022
"""
m = re.search(r"Berichtswesen_(\d{4})", os.path.basename(filename))
if m:
return int(m.group(1))
return None
def _is_2022_format(wb) -> bool:
"""Detect whether this is the older 2022 format (separate per-Fallgruppe sheets)."""
return "Auswertung KW-O" in wb.sheetnames
# ---------------------------------------------------------------------------
# 2023+ format parser
# ---------------------------------------------------------------------------
def _parse_modern_format(wb, year: int) -> list[dict]:
"""Parse Berichtswesen files in the 2023+ format.
Returns a list of dicts, one per KW, with all fields matching YearlySummary columns.
"""
rows_by_kw: dict[int, dict] = {}
# --- Sheet 1: Auswertung KW gesamt ---
if "Auswertung KW gesamt" in wb.sheetnames:
ws = wb["Auswertung KW gesamt"]
for row_idx in range(11, 63): # Rows 11-62 = KW 1-52
kw_val = ws.cell(row=row_idx, column=1).value
kw = _parse_kw(kw_val)
if kw is None:
continue
rows_by_kw.setdefault(kw, {"jahr": year, "kw": kw})
d = rows_by_kw[kw]
d["erstberatungen"] = _safe_int(ws.cell(row=row_idx, column=2).value)
d["unterlagen"] = _safe_int(ws.cell(row=row_idx, column=3).value)
d["ablehnungen"] = _safe_int(ws.cell(row=row_idx, column=4).value)
d["keine_rueckmeldung"] = _safe_int(ws.cell(row=row_idx, column=5).value)
d["gutachten_gesamt"] = _safe_int(ws.cell(row=row_idx, column=6).value)
# --- Sheet 2: Auswertung nach Fachgebieten ---
if "Auswertung nach Fachgebieten" in wb.sheetnames:
ws = wb["Auswertung nach Fachgebieten"]
# Fallgruppe start columns: B=onko, E=kardio, H=intensiv, K=galle, N=sd
fg_start_cols = [2, 5, 8, 11, 14]
for row_idx in range(5, 57): # Rows 5-56 = KW 1-52
kw_val = ws.cell(row=row_idx, column=1).value
kw = _parse_kw(kw_val)
if kw is None:
continue
rows_by_kw.setdefault(kw, {"jahr": year, "kw": kw})
d = rows_by_kw[kw]
for fg_key, start_col in zip(FALLGRUPPEN, fg_start_cols):
d[f"{fg_key}_anzahl"] = _safe_int(ws.cell(row=row_idx, column=start_col).value)
d[f"{fg_key}_gutachten"] = _safe_int(ws.cell(row=row_idx, column=start_col + 1).value)
d[f"{fg_key}_keine_rm"] = _safe_int(ws.cell(row=row_idx, column=start_col + 2).value)
# --- Sheet 3: Auswertung Gutachten ---
if "Auswertung Gutachten" in wb.sheetnames:
ws = wb["Auswertung Gutachten"]
# Gesamt: B=Gutachten, C=Alternative, D=Bestaetigung
# Fallgruppen: E/H/K/N/Q (each 3 cols: Gutachten, Alternative, Bestaetigung)
fg_gut_start_cols = [5, 8, 11, 14, 17]
for row_idx in range(5, 57): # Rows 5-56 = KW 1-52
kw_val = ws.cell(row=row_idx, column=1).value
kw = _parse_kw(kw_val)
if kw is None:
continue
rows_by_kw.setdefault(kw, {"jahr": year, "kw": kw})
d = rows_by_kw[kw]
d["gutachten_alternative"] = _safe_int(ws.cell(row=row_idx, column=3).value)
d["gutachten_bestaetigung"] = _safe_int(ws.cell(row=row_idx, column=4).value)
for fg_key, start_col in zip(FALLGRUPPEN, fg_gut_start_cols):
d[f"{fg_key}_alternative"] = _safe_int(ws.cell(row=row_idx, column=start_col + 1).value)
d[f"{fg_key}_bestaetigung"] = _safe_int(ws.cell(row=row_idx, column=start_col + 2).value)
return list(rows_by_kw.values())
# ---------------------------------------------------------------------------
# 2022 format parser
# ---------------------------------------------------------------------------
def _parse_2022_format(wb, year: int) -> list[dict]:
"""Parse the older 2022 Berichtswesen format.
The 2022 format has:
- 'Auswertung KW gesamt': KW | Summe von EG | Summe von Gutachten | Summe von Keine RM
- 'Auswertung KW-O/K/I': Per-Fallgruppe weekly counts
- 'Auswertung Gutachten gesamt': KW | Gutachten | Alternative | Bestaetigung
- 'Auswertung Gutachten O/K/I': Per-Fallgruppe gutachten detail
Note: 2022 only has 3 Fallgruppen (onko, kardio, intensiv).
No Unterlagen/Ablehnungen columns, no Gallenblase/Schilddruese.
"""
rows_by_kw: dict[int, dict] = {}
# --- KW gesamt ---
if "Auswertung KW gesamt" in wb.sheetnames:
ws = wb["Auswertung KW gesamt"]
for row_idx in range(2, ws.max_row + 1):
kw_val = ws.cell(row=row_idx, column=1).value
kw = _parse_kw(kw_val)
if kw is None:
continue
rows_by_kw.setdefault(kw, {"jahr": year, "kw": kw})
d = rows_by_kw[kw]
d["erstberatungen"] = _safe_int(ws.cell(row=row_idx, column=2).value)
d["gutachten_gesamt"] = _safe_int(ws.cell(row=row_idx, column=3).value)
d["keine_rueckmeldung"] = _safe_int(ws.cell(row=row_idx, column=4).value)
# 2022 doesn't separate Unterlagen/Ablehnungen
d["unterlagen"] = 0
d["ablehnungen"] = 0
# --- Per-Fallgruppe KW sheets ---
for suffix, fg_key in FALLGRUPPE_2022_MAP.items():
sheet_name = f"Auswertung KW-{suffix}"
if sheet_name not in wb.sheetnames:
continue
ws = wb[sheet_name]
for row_idx in range(2, ws.max_row + 1):
kw_val = ws.cell(row=row_idx, column=1).value
kw = _parse_kw(kw_val)
if kw is None:
continue
rows_by_kw.setdefault(kw, {"jahr": year, "kw": kw})
d = rows_by_kw[kw]
d[f"{fg_key}_anzahl"] = _safe_int(ws.cell(row=row_idx, column=2).value)
d[f"{fg_key}_gutachten"] = _safe_int(ws.cell(row=row_idx, column=3).value)
d[f"{fg_key}_keine_rm"] = _safe_int(ws.cell(row=row_idx, column=4).value)
# --- Gutachten gesamt ---
if "Auswertung Gutachten gesamt" in wb.sheetnames:
ws = wb["Auswertung Gutachten gesamt"]
for row_idx in range(2, ws.max_row + 1):
kw_val = ws.cell(row=row_idx, column=1).value
kw = _parse_kw(kw_val)
if kw is None:
continue
rows_by_kw.setdefault(kw, {"jahr": year, "kw": kw})
d = rows_by_kw[kw]
d["gutachten_alternative"] = _safe_int(ws.cell(row=row_idx, column=3).value)
d["gutachten_bestaetigung"] = _safe_int(ws.cell(row=row_idx, column=4).value)
# --- Per-Fallgruppe Gutachten sheets ---
for suffix, fg_key in FALLGRUPPE_2022_MAP.items():
sheet_name = f"Auswertung Gutachten {suffix}"
if sheet_name not in wb.sheetnames:
continue
ws = wb[sheet_name]
for row_idx in range(2, ws.max_row + 1):
kw_val = ws.cell(row=row_idx, column=1).value
kw = _parse_kw(kw_val)
if kw is None:
continue
rows_by_kw.setdefault(kw, {"jahr": year, "kw": kw})
d = rows_by_kw[kw]
d[f"{fg_key}_alternative"] = _safe_int(ws.cell(row=row_idx, column=3).value)
d[f"{fg_key}_bestaetigung"] = _safe_int(ws.cell(row=row_idx, column=4).value)
return list(rows_by_kw.values())
# ---------------------------------------------------------------------------
# DB upsert
# ---------------------------------------------------------------------------
# Columns on YearlySummary that we populate
_YEARLY_SUMMARY_FIELDS = [
"erstberatungen", "ablehnungen", "unterlagen", "keine_rueckmeldung",
"gutachten_gesamt", "gutachten_alternative", "gutachten_bestaetigung",
"onko_anzahl", "onko_gutachten", "onko_keine_rm",
"onko_alternative", "onko_bestaetigung",
"kardio_anzahl", "kardio_gutachten", "kardio_keine_rm",
"kardio_alternative", "kardio_bestaetigung",
"intensiv_anzahl", "intensiv_gutachten", "intensiv_keine_rm",
"intensiv_alternative", "intensiv_bestaetigung",
"galle_anzahl", "galle_gutachten", "galle_keine_rm",
"galle_alternative", "galle_bestaetigung",
"sd_anzahl", "sd_gutachten", "sd_keine_rm",
"sd_alternative", "sd_bestaetigung",
]
def _upsert_yearly_summary(db, rows: list[dict], dry_run: bool = False) -> dict:
"""Insert or update yearly_summary rows.
Returns summary dict with counts.
"""
inserted = 0
updated = 0
errors = []
for row_data in rows:
jahr = row_data.get("jahr")
kw = row_data.get("kw")
if jahr is None or kw is None:
errors.append(f"Missing jahr/kw in row: {row_data}")
continue
try:
existing = (
db.query(YearlySummary)
.filter(YearlySummary.jahr == jahr, YearlySummary.kw == kw)
.first()
)
if existing:
# Update existing record
for field in _YEARLY_SUMMARY_FIELDS:
if field in row_data:
setattr(existing, field, row_data[field])
updated += 1
else:
# Create new record
obj = YearlySummary(jahr=jahr, kw=kw)
for field in _YEARLY_SUMMARY_FIELDS:
if field in row_data:
setattr(obj, field, row_data[field])
db.add(obj)
inserted += 1
except Exception as e:
errors.append(f"Error for KW {kw}/{jahr}: {e}")
if not dry_run:
db.flush()
return {"inserted": inserted, "updated": updated, "errors": errors}
# ---------------------------------------------------------------------------
# Public import function
# ---------------------------------------------------------------------------
def import_berichtswesen_file(
db,
filepath: str,
year: int | None = None,
dry_run: bool = False,
) -> dict:
"""Import a single Berichtswesen file into yearly_summary.
Args:
db: SQLAlchemy session.
filepath: Path to the .xlsx file.
year: Report year (auto-detected from filename if None).
dry_run: If True, parse but don't write to DB.
Returns:
Dict with 'year', 'weeks_parsed', 'inserted', 'updated', 'errors'.
"""
if year is None:
year = _detect_year_from_filename(filepath)
if year is None:
raise ValueError(
f"Cannot detect year from filename: {os.path.basename(filepath)}. "
"Please specify the year explicitly."
)
wb = load_workbook(filepath, read_only=True, data_only=True)
try:
if _is_2022_format(wb):
rows = _parse_2022_format(wb, year)
else:
rows = _parse_modern_format(wb, year)
finally:
wb.close()
result = _upsert_yearly_summary(db, rows, dry_run=dry_run)
result["year"] = year
result["weeks_parsed"] = len(rows)
return result
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main() -> None:
parser = argparse.ArgumentParser(
description="Import historical Berichtswesen .xlsx files into yearly_summary table"
)
parser.add_argument(
"filepath",
nargs="?",
default=None,
help="Path to a single Berichtswesen .xlsx file. "
"If omitted, imports all Berichtswesen_*.xlsx from ../data/",
)
parser.add_argument(
"year",
nargs="?",
type=int,
default=None,
help="Report year (auto-detected from filename if omitted)",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Parse and validate without writing to DB",
)
parser.add_argument(
"--verbose", "-v",
action="store_true",
help="Show detailed per-row info",
)
args = parser.parse_args()
# Configure logging
log_level = logging.DEBUG if args.verbose else logging.INFO
logging.basicConfig(
level=log_level,
format="%(asctime)s %(levelname)-8s %(name)s: %(message)s",
datefmt="%H:%M:%S",
)
# Determine files to import
if args.filepath:
files = [(args.filepath, args.year)]
else:
data_dir = os.path.join(
os.path.dirname( # dak_c2s/
os.path.dirname( # dak_c2s/backend/
os.path.dirname(os.path.abspath(__file__)) # dak_c2s/backend/scripts/
)
),
"data",
)
pattern = os.path.join(data_dir, "Berichtswesen_*.xlsx")
found = sorted(glob.glob(pattern))
if not found:
print(f"ERROR: No Berichtswesen files found in {data_dir}")
sys.exit(1)
files = [(f, None) for f in found]
print(f"Importing {len(files)} Berichtswesen file(s)")
if args.dry_run:
print("*** DRY RUN -- no changes will be committed ***")
print()
db = SessionLocal()
total_inserted = 0
total_updated = 0
total_errors = 0
try:
for filepath, year in files:
if not os.path.exists(filepath):
print(f" ERROR: File not found: {filepath}")
continue
basename = os.path.basename(filepath)
try:
result = import_berichtswesen_file(db, filepath, year=year, dry_run=args.dry_run)
status = "OK" if not result["errors"] else f"{len(result['errors'])} errors"
print(
f" {basename:45s} year={result['year']} "
f"weeks={result['weeks_parsed']:2d} "
f"ins={result['inserted']:3d} "
f"upd={result['updated']:3d} [{status}]"
)
if args.verbose and result["errors"]:
for err in result["errors"]:
print(f" - {err}")
total_inserted += result["inserted"]
total_updated += result["updated"]
total_errors += len(result["errors"])
except Exception as e:
print(f" {basename:45s} ERROR: {e}")
total_errors += 1
if args.dry_run:
db.rollback()
print(f"\nDry run complete -- rolled back. "
f"Would have inserted {total_inserted}, updated {total_updated}.")
else:
db.commit()
print(f"\nImport committed. "
f"Inserted {total_inserted}, updated {total_updated}.")
if total_errors:
print(f"Total errors: {total_errors}")
except Exception as e:
db.rollback()
print(f"\nERROR: Import failed: {e}")
logging.exception("Import failed")
sys.exit(1)
finally:
db.close()
if __name__ == "__main__":
main()