From f4afea7f8509d7832840b821cb3d56b2ac9e0b85 Mon Sep 17 00:00:00 2001 From: CCS Admin Date: Tue, 24 Feb 2026 07:58:04 +0000 Subject: [PATCH] feat: historical Excel import (Abrechnung_DAK.xlsx) Add service and standalone script to import all cases from the master Excel workbook into the database. Handles 5 year-sheets (2020-2022, 2023, 2024, 2025, 2026) with dynamic column mapping, fallgruppe normalization, boolean/date parsing, phone number formatting, and duplicate detection. Supports dry-run mode and per-sheet import. Co-Authored-By: Claude Opus 4.6 --- backend/app/services/excel_import.py | 636 +++++++++++++++++++++++++++ backend/scripts/import_historical.py | 190 ++++++++ 2 files changed, 826 insertions(+) create mode 100644 backend/app/services/excel_import.py create mode 100644 backend/scripts/import_historical.py diff --git a/backend/app/services/excel_import.py b/backend/app/services/excel_import.py new file mode 100644 index 0000000..f674758 --- /dev/null +++ b/backend/app/services/excel_import.py @@ -0,0 +1,636 @@ +"""Historical Excel import for Abrechnung_DAK.xlsx. + +Imports case data from the master Excel workbook into the database. +Each year-sheet (2026, 2025, 2024, 2023, 2020-2022) is imported independently. +Sheets like 'Gutachten', 'Ubersicht', 'BKK Salzgitter', '_2023', and 'Tabelle1' +are skipped. + +Column mapping is dynamic -- headers are read from row 1 and matched by name, +so column order changes between sheets are handled automatically. The '2020-2022' +sheet has an extra 'Jahr' column; for single-year sheets the year is derived +from the sheet name. + +Fallgruppe normalization maps the messy Excel values to the 5 valid DB codes: +onko, kardio, intensiv, galle, sd. +""" + +from __future__ import annotations + +import datetime as dt +import logging +import os +from typing import Any + +from openpyxl import load_workbook +from sqlalchemy.orm import Session + +from app.models.audit import ImportLog +from app.models.case import Case + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Fallgruppe normalization +# --------------------------------------------------------------------------- +# The Excel contains a messy mix of values that must map to the 5 valid DB +# codes enforced by CHECK constraint: onko, kardio, intensiv, galle, sd + +_FALLGRUPPE_MAP: dict[str, str] = { + # Standard codes + "onko": "onko", + "kardio": "kardio", + "intensiv": "intensiv", + "galle": "galle", + "sd": "sd", + "schild": "sd", # Schilddruese -> sd + # Typos observed in data + "intei": "intensiv", + "intsiv": "intensiv", + # Non-standard categories mapped to closest valid code + "medi": "onko", # Medizin-Onko cases + "radio": "onko", # Radiologie-Onko cases + "knie": "intensiv", # Orthopedic, mapped to intensiv + "schmerz": "intensiv", # Pain cases, mapped to intensiv + "wunde": "intensiv", # Wound cases, mapped to intensiv + "orthopaedie": "intensiv", + "orthopadie": "intensiv", +} + +# Keyword-based fallback for compound values like "onko brust", "onko Lymph" +_FALLGRUPPE_KEYWORDS: list[tuple[str, str]] = [ + ("onko", "onko"), + ("kardio", "kardio"), + ("intensiv", "intensiv"), + ("galle", "galle"), + ("schild", "sd"), + ("sd", "sd"), +] + + +def _normalize_fallgruppe(raw: str | None) -> str | None: + """Map a raw Fallgruppe string to a valid DB code. + + Returns None if the value cannot be mapped (caller decides how to handle). + """ + if not raw: + return None + + cleaned = raw.strip().lower() + # Remove accents for orthopaedie matching + cleaned_ascii = cleaned.replace("\u00e4", "ae").replace("\u00f6", "oe").replace("\u00fc", "ue") + + # Direct match + if cleaned in _FALLGRUPPE_MAP: + return _FALLGRUPPE_MAP[cleaned] + if cleaned_ascii in _FALLGRUPPE_MAP: + return _FALLGRUPPE_MAP[cleaned_ascii] + + # Keyword match (for compound values like "onko brust") + for keyword, code in _FALLGRUPPE_KEYWORDS: + if keyword in cleaned: + return code + + return None + + +# --------------------------------------------------------------------------- +# Boolean parsing +# --------------------------------------------------------------------------- + +def _parse_bool(value: Any) -> bool: + """Parse a cell value to boolean. + + Handles: True, False, None, 'Ja', 'Nein', 'nix', ' ', 1, 0, etc. + """ + if value is None: + return False + if isinstance(value, bool): + return value + if isinstance(value, (int, float)): + return bool(value) + if isinstance(value, str): + v = value.strip().lower() + if v in ("ja", "yes", "1", "true", "x"): + return True + # Everything else (nein, no, nix, empty, space) -> False + return False + return False + + +# --------------------------------------------------------------------------- +# Date parsing +# --------------------------------------------------------------------------- + +def _parse_date(value: Any) -> dt.date | None: + """Parse a cell value to a date. + + openpyxl typically returns datetime objects for date-formatted cells. + Strings in DD.MM.YYYY format are also handled. + """ + if value is None: + return None + if isinstance(value, dt.datetime): + return value.date() + if isinstance(value, dt.date): + return value + if isinstance(value, str): + value = value.strip() + if not value: + return None + # Try DD.MM.YYYY + parts = value.split(".") + if len(parts) == 3: + try: + day, month, year = int(parts[0]), int(parts[1]), int(parts[2]) + if year < 100: + year = 2000 + year if year < 50 else 1900 + year + return dt.date(year, month, day) + except (ValueError, TypeError): + pass + logger.warning("Cannot parse date value: %r", value) + return None + if isinstance(value, (int, float)): + # Excel serial date number -- openpyxl normally converts these, + # but just in case, handle it. + try: + # Excel epoch is 1899-12-30 (with the Lotus 1-2-3 bug) + return (dt.datetime(1899, 12, 30) + dt.timedelta(days=int(value))).date() + except (ValueError, OverflowError): + logger.warning("Cannot parse date from number: %r", value) + return None + return None + + +# --------------------------------------------------------------------------- +# String helper +# --------------------------------------------------------------------------- + +def _str_or_none(value: Any, max_len: int | None = None) -> str | None: + """Convert cell value to stripped string or None.""" + if value is None: + return None + s = str(value).strip() + if not s: + return None + if max_len and len(s) > max_len: + s = s[:max_len] + return s + + +def _phone_str(value: Any) -> str | None: + """Convert phone number cell to string. + + Phone numbers in Excel are often stored as integers (e.g. 4915121659287). + We need to convert them to strings, optionally adding a '+' prefix. + """ + if value is None: + return None + if isinstance(value, (int, float)): + s = str(int(value)) + # If it starts with 49 and is long enough, add + prefix + if len(s) >= 10 and s.startswith("49"): + return "+" + s + return s + s = str(value).strip() + return s if s else None + + +# --------------------------------------------------------------------------- +# Fall-ID generation (matches import_service.generate_fall_id format) +# --------------------------------------------------------------------------- + +def _generate_fall_id(jahr: int, kw: int, fallgruppe: str, nachname: str) -> str: + """Generate fall_id: YYYY-KW02d-fallgruppe-Nachname.""" + return f"{jahr}-{kw:02d}-{fallgruppe}-{nachname}" + + +# --------------------------------------------------------------------------- +# Sheet import +# --------------------------------------------------------------------------- + +# Standard header names (case-insensitive matching) +_HEADER_MAP = { + "id": "id", + "jahr": "jahr", + "kw": "kw", + "datum": "datum", + "anrede": "anrede", + "vorname": "vorname", + "nachname": "nachname", + "geburtsdatum": "geburtsdatum", + "kvnr": "kvnr", + "versicherung": "versicherung", + "icd": "icd", + "fallgruppe": "fallgruppe", + "strasse": "strasse", + "strasze": "strasse", + "stra\u00dfe": "strasse", # Strasse with eszett + "plz": "plz", + "ort": "ort", + "e-mail": "email", + "email": "email", + "ansprechpartner": "ansprechpartner", + "telefonnummer": "telefonnummer", + "mobiltelefon": "mobiltelefon", + "unterlagen": "unterlagen", + "unterlagen verschickt": "unterlagen_verschickt", + "erhalten": "erhalten", + "unterlagen erhalten": "unterlagen_erhalten", + "unterlagen an gutachter": "unterlagen_an_gutachter", + "gutachten": "gutachten", + "gutachter": "gutachter", + "gutachten erstellt": "gutachten_erstellt", + "gutachten versendet": "gutachten_versendet", + "schweigepflicht": "schweigepflicht", + "ablehnung": "ablehnung", + "abbruch": "abbruch", + "abbruch_datum": "abbruch_datum", + "kurzbeschreibung": "kurzbeschreibung", + "fragestellung": "fragestellung", + "kommentar": "kommentar", + "e-mail2": "email2", + "email2": "email2", + "telefon2": "telefon2", + "sonstiges": "sonstiges", + "abgerechnet": "abgerechnet", + "abrechnung_datum": "abrechnung_datum", +} + +# Sheets to import (in order) +YEAR_SHEETS = ["2020-2022", "2023", "2024", "2025", "2026"] + +# Sheets to skip +SKIP_SHEETS = {"Gutachten", "\u00dcbersicht", "Ubersicht", "BKK Salzgitter", + "_2023", "Tabelle1"} + + +def _build_col_map(header_row: tuple) -> dict[str, int]: + """Build mapping from canonical field name -> column index (0-based). + + Reads header row and matches each cell against _HEADER_MAP. + """ + col_map: dict[str, int] = {} + for idx, cell_value in enumerate(header_row): + if cell_value is None: + continue + key = str(cell_value).strip().lower() + canonical = _HEADER_MAP.get(key) + if canonical and canonical not in col_map: + col_map[canonical] = idx + return col_map + + +def _get(row: tuple, col_map: dict[str, int], field: str) -> Any: + """Get a value from a row by canonical field name.""" + idx = col_map.get(field) + if idx is None: + return None + if idx >= len(row): + return None + return row[idx] + + +def import_abrechnung_sheet( + db: Session, + ws, # openpyxl worksheet (read-only) + sheet_name: str, + default_year: int | None = None, + user_id: int | None = None, +) -> dict: + """Import a single sheet from Abrechnung_DAK.xlsx. + + Args: + db: SQLAlchemy session. + ws: openpyxl worksheet object. + sheet_name: Name of the sheet (for logging and import_source). + default_year: Year to use if not available per-row (derived from sheet name). + user_id: User ID for import logging. + + Returns: + {"imported": int, "skipped": int, "errors": list[str]} + """ + imported = 0 + skipped = 0 + errors: list[str] = [] + + # Read header row + rows_iter = ws.iter_rows(values_only=True) + try: + header_row = next(rows_iter) + except StopIteration: + return {"imported": 0, "skipped": 0, "errors": ["Empty sheet"]} + + col_map = _build_col_map(header_row) + + # Verify essential columns exist + required = {"nachname", "fallgruppe", "datum"} + missing = required - col_map.keys() + if missing: + return { + "imported": 0, + "skipped": 0, + "errors": [f"Missing required columns: {missing}"], + } + + has_jahr_col = "jahr" in col_map + + for row_num, row in enumerate(rows_iter, start=2): + try: + nachname_raw = _get(row, col_map, "nachname") + if not nachname_raw or (isinstance(nachname_raw, str) and not nachname_raw.strip()): + skipped += 1 + continue + + nachname = str(nachname_raw).strip() + + # Parse datum + datum = _parse_date(_get(row, col_map, "datum")) + if datum is None: + errors.append(f"Row {row_num}: Missing/invalid Datum for {nachname}") + skipped += 1 + continue + + # Determine year + if has_jahr_col: + jahr_val = _get(row, col_map, "jahr") + if jahr_val is not None: + jahr = int(jahr_val) + else: + jahr = default_year or datum.year + else: + jahr = default_year or datum.year + + # Parse KW + kw_val = _get(row, col_map, "kw") + if kw_val is not None: + try: + kw = int(kw_val) + except (ValueError, TypeError): + kw = datum.isocalendar()[1] + else: + kw = datum.isocalendar()[1] + + # Normalize Fallgruppe + fallgruppe_raw = _str_or_none(_get(row, col_map, "fallgruppe")) + fallgruppe = _normalize_fallgruppe(fallgruppe_raw) + if fallgruppe is None: + errors.append( + f"Row {row_num}: Cannot map Fallgruppe '{fallgruppe_raw}' " + f"for {nachname}" + ) + skipped += 1 + continue + + # Generate fall_id + fall_id = _generate_fall_id(jahr, kw, fallgruppe, nachname) + + # Check for duplicate by fall_id + existing = db.query(Case.id).filter(Case.fall_id == fall_id).first() + if existing: + skipped += 1 + continue + + # Parse all other fields + vorname = _str_or_none(_get(row, col_map, "vorname"), max_len=100) + geburtsdatum = _parse_date(_get(row, col_map, "geburtsdatum")) + + # Also check for duplicate by personal data + dup_query = db.query(Case.id).filter( + Case.nachname == nachname, + Case.fallgruppe == fallgruppe, + Case.datum == datum, + ) + if vorname: + dup_query = dup_query.filter(Case.vorname == vorname) + if geburtsdatum: + dup_query = dup_query.filter(Case.geburtsdatum == geburtsdatum) + if dup_query.first(): + skipped += 1 + continue + + anrede = _str_or_none(_get(row, col_map, "anrede"), max_len=20) + kvnr = _str_or_none(_get(row, col_map, "kvnr"), max_len=20) + versicherung = _str_or_none(_get(row, col_map, "versicherung"), max_len=50) or "DAK" + icd = _str_or_none(_get(row, col_map, "icd")) + strasse = _str_or_none(_get(row, col_map, "strasse"), max_len=255) + plz_raw = _get(row, col_map, "plz") + plz = str(int(plz_raw)).zfill(5) if isinstance(plz_raw, (int, float)) else _str_or_none(plz_raw, max_len=10) + ort = _str_or_none(_get(row, col_map, "ort"), max_len=100) + email = _str_or_none(_get(row, col_map, "email"), max_len=255) + ansprechpartner = _str_or_none(_get(row, col_map, "ansprechpartner"), max_len=200) + telefonnummer = _phone_str(_get(row, col_map, "telefonnummer")) + if telefonnummer and len(telefonnummer) > 50: + telefonnummer = telefonnummer[:50] + mobiltelefon = _phone_str(_get(row, col_map, "mobiltelefon")) + if mobiltelefon and len(mobiltelefon) > 50: + mobiltelefon = mobiltelefon[:50] + email2 = _str_or_none(_get(row, col_map, "email2"), max_len=255) + telefon2 = _phone_str(_get(row, col_map, "telefon2")) + if telefon2 and len(telefon2) > 50: + telefon2 = telefon2[:50] + + unterlagen = _parse_bool(_get(row, col_map, "unterlagen")) + unterlagen_verschickt = _parse_date(_get(row, col_map, "unterlagen_verschickt")) + # "erhalten" is Optional[bool] -- None means unknown + erhalten_raw = _get(row, col_map, "erhalten") + erhalten = None if erhalten_raw is None else _parse_bool(erhalten_raw) + unterlagen_erhalten = _parse_date(_get(row, col_map, "unterlagen_erhalten")) + unterlagen_an_gutachter = _parse_date(_get(row, col_map, "unterlagen_an_gutachter")) + gutachten_bool = _parse_bool(_get(row, col_map, "gutachten")) + gutachter = _str_or_none(_get(row, col_map, "gutachter"), max_len=100) + gutachten_erstellt = _parse_date(_get(row, col_map, "gutachten_erstellt")) + gutachten_versendet = _parse_date(_get(row, col_map, "gutachten_versendet")) + schweigepflicht = _parse_bool(_get(row, col_map, "schweigepflicht")) + ablehnung = _parse_bool(_get(row, col_map, "ablehnung")) + abbruch = _parse_bool(_get(row, col_map, "abbruch")) + abbruch_datum = _parse_date(_get(row, col_map, "abbruch_datum")) + kurzbeschreibung = _str_or_none(_get(row, col_map, "kurzbeschreibung")) + fragestellung = _str_or_none(_get(row, col_map, "fragestellung")) + kommentar = _str_or_none(_get(row, col_map, "kommentar")) + sonstiges = _str_or_none(_get(row, col_map, "sonstiges")) + abgerechnet = _parse_bool(_get(row, col_map, "abgerechnet")) + abrechnung_datum = _parse_date(_get(row, col_map, "abrechnung_datum")) + + case = Case( + fall_id=fall_id, + jahr=jahr, + kw=kw, + datum=datum, + anrede=anrede, + vorname=vorname, + nachname=nachname, + geburtsdatum=geburtsdatum, + kvnr=kvnr, + versicherung=versicherung, + icd=icd, + fallgruppe=fallgruppe, + strasse=strasse, + plz=plz, + ort=ort, + email=email, + ansprechpartner=ansprechpartner, + telefonnummer=telefonnummer, + mobiltelefon=mobiltelefon, + email2=email2, + telefon2=telefon2, + unterlagen=unterlagen, + unterlagen_verschickt=unterlagen_verschickt, + erhalten=erhalten, + unterlagen_erhalten=unterlagen_erhalten, + unterlagen_an_gutachter=unterlagen_an_gutachter, + gutachten=gutachten_bool, + gutachter=gutachter, + gutachten_erstellt=gutachten_erstellt, + gutachten_versendet=gutachten_versendet, + schweigepflicht=schweigepflicht, + ablehnung=ablehnung, + abbruch=abbruch, + abbruch_datum=abbruch_datum, + kurzbeschreibung=kurzbeschreibung, + fragestellung=fragestellung, + kommentar=kommentar, + sonstiges=sonstiges, + abgerechnet=abgerechnet, + abrechnung_datum=abrechnung_datum, + import_source=f"Abrechnung_DAK.xlsx:{sheet_name}", + ) + db.add(case) + imported += 1 + + # Flush in batches of 100 to catch constraint violations early + if imported % 100 == 0: + db.flush() + + except Exception as e: + nachname_display = _str_or_none(_get(row, col_map, "nachname")) or "?" + errors.append(f"Row {row_num} ({nachname_display}): {e}") + logger.warning( + "Import error in sheet '%s' row %d: %s", + sheet_name, row_num, e, + ) + + # Final flush + if imported > 0: + db.flush() + + logger.info( + "Sheet '%s': %d imported, %d skipped, %d errors", + sheet_name, imported, skipped, len(errors), + ) + + return {"imported": imported, "skipped": skipped, "errors": errors} + + +def import_full_abrechnung( + db: Session, + filepath: str, + user_id: int | None = None, +) -> dict: + """Import all relevant sheets from Abrechnung_DAK.xlsx. + + Opens the workbook in read-only mode, iterates through year sheets, + commits all changes at the end, and logs the import. + + Args: + db: SQLAlchemy session. + filepath: Path to the Excel file. + user_id: User ID for import logging. + + Returns: + Dict mapping sheet name -> {"imported": int, "skipped": int, "errors": list} + """ + filename = os.path.basename(filepath) + wb = load_workbook(filepath, read_only=True, data_only=True) + results: dict[str, dict] = {} + total_imported = 0 + total_skipped = 0 + all_errors: list[str] = [] + + try: + available_sheets = set(wb.sheetnames) + + for sheet_name in YEAR_SHEETS: + if sheet_name not in available_sheets: + logger.info("Sheet '%s' not found, skipping", sheet_name) + continue + + # Determine default year from sheet name + if sheet_name == "2020-2022": + default_year = None # Will use per-row "Jahr" column + else: + try: + default_year = int(sheet_name) + except ValueError: + default_year = None + + ws = wb[sheet_name] + logger.info("Importing sheet '%s'...", sheet_name) + + result = import_abrechnung_sheet( + db=db, + ws=ws, + sheet_name=sheet_name, + default_year=default_year, + user_id=user_id, + ) + results[sheet_name] = result + total_imported += result["imported"] + total_skipped += result["skipped"] + if result["errors"]: + all_errors.extend( + [f"[{sheet_name}] {e}" for e in result["errors"]] + ) + + # Also check for any additional year-like sheets not in our list + for sn in wb.sheetnames: + if sn in SKIP_SHEETS or sn in results: + continue + # Check if it looks like a year sheet (has standard headers) + ws = wb[sn] + try: + header = next(ws.iter_rows(max_row=1, values_only=True)) + header_names = {str(h).strip().lower() for h in header if h} + if "nachname" in header_names and "fallgruppe" in header_names: + logger.info( + "Found additional data sheet '%s', skipping " + "(not in YEAR_SHEETS list). Add it manually if needed.", + sn, + ) + except StopIteration: + pass + + # Commit everything + db.flush() + + # Log the import + log = ImportLog( + filename=filename, + import_type="historical_excel", + cases_imported=total_imported, + cases_skipped=total_skipped, + cases_updated=0, + errors="; ".join(all_errors[:50]) if all_errors else None, + details={ + "sheets": { + sn: {"imported": r["imported"], "skipped": r["skipped"], + "error_count": len(r["errors"])} + for sn, r in results.items() + } + }, + imported_by=user_id, + ) + db.add(log) + db.commit() + + except Exception: + db.rollback() + raise + finally: + wb.close() + + logger.info( + "Full import complete: %d imported, %d skipped, %d errors across %d sheets", + total_imported, total_skipped, len(all_errors), len(results), + ) + + return results diff --git a/backend/scripts/import_historical.py b/backend/scripts/import_historical.py new file mode 100644 index 0000000..8f2fe76 --- /dev/null +++ b/backend/scripts/import_historical.py @@ -0,0 +1,190 @@ +"""One-time script: Import all cases from Abrechnung_DAK.xlsx into DB. + +Usage: + cd /home/frontend/dak_c2s/backend + source venv/bin/activate + python -m scripts.import_historical [path_to_xlsx] + + Default path: ../data/Abrechnung_DAK.xlsx + +Options: + --dry-run Parse and validate without writing to DB + --sheet NAME Import only the named sheet (can repeat) + --verbose Show per-row errors in output +""" + +import argparse +import logging +import os +import sys + +# Ensure the backend package is importable +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from app.database import SessionLocal +from app.services.excel_import import ( + YEAR_SHEETS, + import_abrechnung_sheet, + import_full_abrechnung, +) + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Import historical cases from Abrechnung_DAK.xlsx" + ) + parser.add_argument( + "filepath", + nargs="?", + default=os.path.join( + os.path.dirname( # dak_c2s/ + os.path.dirname( # dak_c2s/backend/ + os.path.dirname(os.path.abspath(__file__)) # dak_c2s/backend/scripts/ + ) + ), + "data", + "Abrechnung_DAK.xlsx", + ), + help="Path to the Excel file (default: ../../data/Abrechnung_DAK.xlsx)", + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Parse and validate without writing to DB", + ) + parser.add_argument( + "--sheet", + action="append", + dest="sheets", + help="Import only specific sheet(s); can be repeated", + ) + parser.add_argument( + "--verbose", "-v", + action="store_true", + help="Show per-row errors in output", + ) + args = parser.parse_args() + + # Configure logging + log_level = logging.DEBUG if args.verbose else logging.INFO + logging.basicConfig( + level=log_level, + format="%(asctime)s %(levelname)-8s %(name)s: %(message)s", + datefmt="%H:%M:%S", + ) + + filepath = args.filepath + if not os.path.exists(filepath): + print(f"ERROR: File not found: {filepath}") + sys.exit(1) + + print(f"Importing from: {filepath}") + if args.dry_run: + print("*** DRY RUN -- no changes will be committed ***") + print() + + db = SessionLocal() + try: + if args.sheets: + # Import specific sheets only + from openpyxl import load_workbook + + wb = load_workbook(filepath, read_only=True, data_only=True) + try: + for sheet_name in args.sheets: + if sheet_name not in wb.sheetnames: + print(f" WARNING: Sheet '{sheet_name}' not found, skipping") + continue + + # Determine default year + if sheet_name == "2020-2022": + default_year = None + else: + try: + default_year = int(sheet_name) + except ValueError: + default_year = None + + ws = wb[sheet_name] + result = import_abrechnung_sheet( + db=db, + ws=ws, + sheet_name=sheet_name, + default_year=default_year, + ) + _print_result(sheet_name, result, args.verbose) + + if args.dry_run: + db.rollback() + print("\nDry run complete -- rolled back all changes.") + else: + db.commit() + print("\nImport committed to database.") + finally: + wb.close() + else: + # Import all year sheets + if args.dry_run: + # For dry run, we do the same import but rollback at the end + from openpyxl import load_workbook + + wb = load_workbook(filepath, read_only=True, data_only=True) + try: + for sheet_name in YEAR_SHEETS: + if sheet_name not in wb.sheetnames: + print(f" Sheet '{sheet_name}' not found, skipping") + continue + + if sheet_name == "2020-2022": + default_year = None + else: + try: + default_year = int(sheet_name) + except ValueError: + default_year = None + + ws = wb[sheet_name] + result = import_abrechnung_sheet( + db=db, + ws=ws, + sheet_name=sheet_name, + default_year=default_year, + ) + _print_result(sheet_name, result, args.verbose) + finally: + wb.close() + + db.rollback() + print("\nDry run complete -- rolled back all changes.") + else: + result = import_full_abrechnung(db, filepath) + print("Import results:") + for sheet_name, stats in result.items(): + _print_result(sheet_name, stats, args.verbose) + print("\nImport committed to database.") + + except Exception as e: + db.rollback() + print(f"\nERROR: Import failed: {e}") + logging.exception("Import failed") + sys.exit(1) + finally: + db.close() + + +def _print_result(sheet_name: str, result: dict, verbose: bool) -> None: + """Print import result for a single sheet.""" + imported = result["imported"] + skipped = result["skipped"] + error_count = len(result["errors"]) + + status = "OK" if error_count == 0 else f"{error_count} errors" + print(f" {sheet_name:12s}: {imported:4d} imported, {skipped:4d} skipped [{status}]") + + if verbose and result["errors"]: + for err in result["errors"]: + print(f" - {err}") + + +if __name__ == "__main__": + main()