dak.c2s/backend/app/services/excel_import.py
CCS Admin f4afea7f85 feat: historical Excel import (Abrechnung_DAK.xlsx)
Add service and standalone script to import all cases from the master
Excel workbook into the database. Handles 5 year-sheets (2020-2022,
2023, 2024, 2025, 2026) with dynamic column mapping, fallgruppe
normalization, boolean/date parsing, phone number formatting, and
duplicate detection. Supports dry-run mode and per-sheet import.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-24 07:58:04 +00:00

636 lines
22 KiB
Python

"""Historical Excel import for Abrechnung_DAK.xlsx.
Imports case data from the master Excel workbook into the database.
Each year-sheet (2026, 2025, 2024, 2023, 2020-2022) is imported independently.
Sheets like 'Gutachten', 'Ubersicht', 'BKK Salzgitter', '_2023', and 'Tabelle1'
are skipped.
Column mapping is dynamic -- headers are read from row 1 and matched by name,
so column order changes between sheets are handled automatically. The '2020-2022'
sheet has an extra 'Jahr' column; for single-year sheets the year is derived
from the sheet name.
Fallgruppe normalization maps the messy Excel values to the 5 valid DB codes:
onko, kardio, intensiv, galle, sd.
"""
from __future__ import annotations
import datetime as dt
import logging
import os
from typing import Any
from openpyxl import load_workbook
from sqlalchemy.orm import Session
from app.models.audit import ImportLog
from app.models.case import Case
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Fallgruppe normalization
# ---------------------------------------------------------------------------
# The Excel contains a messy mix of values that must map to the 5 valid DB
# codes enforced by CHECK constraint: onko, kardio, intensiv, galle, sd
_FALLGRUPPE_MAP: dict[str, str] = {
# Standard codes
"onko": "onko",
"kardio": "kardio",
"intensiv": "intensiv",
"galle": "galle",
"sd": "sd",
"schild": "sd", # Schilddruese -> sd
# Typos observed in data
"intei": "intensiv",
"intsiv": "intensiv",
# Non-standard categories mapped to closest valid code
"medi": "onko", # Medizin-Onko cases
"radio": "onko", # Radiologie-Onko cases
"knie": "intensiv", # Orthopedic, mapped to intensiv
"schmerz": "intensiv", # Pain cases, mapped to intensiv
"wunde": "intensiv", # Wound cases, mapped to intensiv
"orthopaedie": "intensiv",
"orthopadie": "intensiv",
}
# Keyword-based fallback for compound values like "onko brust", "onko Lymph"
_FALLGRUPPE_KEYWORDS: list[tuple[str, str]] = [
("onko", "onko"),
("kardio", "kardio"),
("intensiv", "intensiv"),
("galle", "galle"),
("schild", "sd"),
("sd", "sd"),
]
def _normalize_fallgruppe(raw: str | None) -> str | None:
"""Map a raw Fallgruppe string to a valid DB code.
Returns None if the value cannot be mapped (caller decides how to handle).
"""
if not raw:
return None
cleaned = raw.strip().lower()
# Remove accents for orthopaedie matching
cleaned_ascii = cleaned.replace("\u00e4", "ae").replace("\u00f6", "oe").replace("\u00fc", "ue")
# Direct match
if cleaned in _FALLGRUPPE_MAP:
return _FALLGRUPPE_MAP[cleaned]
if cleaned_ascii in _FALLGRUPPE_MAP:
return _FALLGRUPPE_MAP[cleaned_ascii]
# Keyword match (for compound values like "onko brust")
for keyword, code in _FALLGRUPPE_KEYWORDS:
if keyword in cleaned:
return code
return None
# ---------------------------------------------------------------------------
# Boolean parsing
# ---------------------------------------------------------------------------
def _parse_bool(value: Any) -> bool:
"""Parse a cell value to boolean.
Handles: True, False, None, 'Ja', 'Nein', 'nix', ' ', 1, 0, etc.
"""
if value is None:
return False
if isinstance(value, bool):
return value
if isinstance(value, (int, float)):
return bool(value)
if isinstance(value, str):
v = value.strip().lower()
if v in ("ja", "yes", "1", "true", "x"):
return True
# Everything else (nein, no, nix, empty, space) -> False
return False
return False
# ---------------------------------------------------------------------------
# Date parsing
# ---------------------------------------------------------------------------
def _parse_date(value: Any) -> dt.date | None:
"""Parse a cell value to a date.
openpyxl typically returns datetime objects for date-formatted cells.
Strings in DD.MM.YYYY format are also handled.
"""
if value is None:
return None
if isinstance(value, dt.datetime):
return value.date()
if isinstance(value, dt.date):
return value
if isinstance(value, str):
value = value.strip()
if not value:
return None
# Try DD.MM.YYYY
parts = value.split(".")
if len(parts) == 3:
try:
day, month, year = int(parts[0]), int(parts[1]), int(parts[2])
if year < 100:
year = 2000 + year if year < 50 else 1900 + year
return dt.date(year, month, day)
except (ValueError, TypeError):
pass
logger.warning("Cannot parse date value: %r", value)
return None
if isinstance(value, (int, float)):
# Excel serial date number -- openpyxl normally converts these,
# but just in case, handle it.
try:
# Excel epoch is 1899-12-30 (with the Lotus 1-2-3 bug)
return (dt.datetime(1899, 12, 30) + dt.timedelta(days=int(value))).date()
except (ValueError, OverflowError):
logger.warning("Cannot parse date from number: %r", value)
return None
return None
# ---------------------------------------------------------------------------
# String helper
# ---------------------------------------------------------------------------
def _str_or_none(value: Any, max_len: int | None = None) -> str | None:
"""Convert cell value to stripped string or None."""
if value is None:
return None
s = str(value).strip()
if not s:
return None
if max_len and len(s) > max_len:
s = s[:max_len]
return s
def _phone_str(value: Any) -> str | None:
"""Convert phone number cell to string.
Phone numbers in Excel are often stored as integers (e.g. 4915121659287).
We need to convert them to strings, optionally adding a '+' prefix.
"""
if value is None:
return None
if isinstance(value, (int, float)):
s = str(int(value))
# If it starts with 49 and is long enough, add + prefix
if len(s) >= 10 and s.startswith("49"):
return "+" + s
return s
s = str(value).strip()
return s if s else None
# ---------------------------------------------------------------------------
# Fall-ID generation (matches import_service.generate_fall_id format)
# ---------------------------------------------------------------------------
def _generate_fall_id(jahr: int, kw: int, fallgruppe: str, nachname: str) -> str:
"""Generate fall_id: YYYY-KW02d-fallgruppe-Nachname."""
return f"{jahr}-{kw:02d}-{fallgruppe}-{nachname}"
# ---------------------------------------------------------------------------
# Sheet import
# ---------------------------------------------------------------------------
# Standard header names (case-insensitive matching)
_HEADER_MAP = {
"id": "id",
"jahr": "jahr",
"kw": "kw",
"datum": "datum",
"anrede": "anrede",
"vorname": "vorname",
"nachname": "nachname",
"geburtsdatum": "geburtsdatum",
"kvnr": "kvnr",
"versicherung": "versicherung",
"icd": "icd",
"fallgruppe": "fallgruppe",
"strasse": "strasse",
"strasze": "strasse",
"stra\u00dfe": "strasse", # Strasse with eszett
"plz": "plz",
"ort": "ort",
"e-mail": "email",
"email": "email",
"ansprechpartner": "ansprechpartner",
"telefonnummer": "telefonnummer",
"mobiltelefon": "mobiltelefon",
"unterlagen": "unterlagen",
"unterlagen verschickt": "unterlagen_verschickt",
"erhalten": "erhalten",
"unterlagen erhalten": "unterlagen_erhalten",
"unterlagen an gutachter": "unterlagen_an_gutachter",
"gutachten": "gutachten",
"gutachter": "gutachter",
"gutachten erstellt": "gutachten_erstellt",
"gutachten versendet": "gutachten_versendet",
"schweigepflicht": "schweigepflicht",
"ablehnung": "ablehnung",
"abbruch": "abbruch",
"abbruch_datum": "abbruch_datum",
"kurzbeschreibung": "kurzbeschreibung",
"fragestellung": "fragestellung",
"kommentar": "kommentar",
"e-mail2": "email2",
"email2": "email2",
"telefon2": "telefon2",
"sonstiges": "sonstiges",
"abgerechnet": "abgerechnet",
"abrechnung_datum": "abrechnung_datum",
}
# Sheets to import (in order)
YEAR_SHEETS = ["2020-2022", "2023", "2024", "2025", "2026"]
# Sheets to skip
SKIP_SHEETS = {"Gutachten", "\u00dcbersicht", "Ubersicht", "BKK Salzgitter",
"_2023", "Tabelle1"}
def _build_col_map(header_row: tuple) -> dict[str, int]:
"""Build mapping from canonical field name -> column index (0-based).
Reads header row and matches each cell against _HEADER_MAP.
"""
col_map: dict[str, int] = {}
for idx, cell_value in enumerate(header_row):
if cell_value is None:
continue
key = str(cell_value).strip().lower()
canonical = _HEADER_MAP.get(key)
if canonical and canonical not in col_map:
col_map[canonical] = idx
return col_map
def _get(row: tuple, col_map: dict[str, int], field: str) -> Any:
"""Get a value from a row by canonical field name."""
idx = col_map.get(field)
if idx is None:
return None
if idx >= len(row):
return None
return row[idx]
def import_abrechnung_sheet(
db: Session,
ws, # openpyxl worksheet (read-only)
sheet_name: str,
default_year: int | None = None,
user_id: int | None = None,
) -> dict:
"""Import a single sheet from Abrechnung_DAK.xlsx.
Args:
db: SQLAlchemy session.
ws: openpyxl worksheet object.
sheet_name: Name of the sheet (for logging and import_source).
default_year: Year to use if not available per-row (derived from sheet name).
user_id: User ID for import logging.
Returns:
{"imported": int, "skipped": int, "errors": list[str]}
"""
imported = 0
skipped = 0
errors: list[str] = []
# Read header row
rows_iter = ws.iter_rows(values_only=True)
try:
header_row = next(rows_iter)
except StopIteration:
return {"imported": 0, "skipped": 0, "errors": ["Empty sheet"]}
col_map = _build_col_map(header_row)
# Verify essential columns exist
required = {"nachname", "fallgruppe", "datum"}
missing = required - col_map.keys()
if missing:
return {
"imported": 0,
"skipped": 0,
"errors": [f"Missing required columns: {missing}"],
}
has_jahr_col = "jahr" in col_map
for row_num, row in enumerate(rows_iter, start=2):
try:
nachname_raw = _get(row, col_map, "nachname")
if not nachname_raw or (isinstance(nachname_raw, str) and not nachname_raw.strip()):
skipped += 1
continue
nachname = str(nachname_raw).strip()
# Parse datum
datum = _parse_date(_get(row, col_map, "datum"))
if datum is None:
errors.append(f"Row {row_num}: Missing/invalid Datum for {nachname}")
skipped += 1
continue
# Determine year
if has_jahr_col:
jahr_val = _get(row, col_map, "jahr")
if jahr_val is not None:
jahr = int(jahr_val)
else:
jahr = default_year or datum.year
else:
jahr = default_year or datum.year
# Parse KW
kw_val = _get(row, col_map, "kw")
if kw_val is not None:
try:
kw = int(kw_val)
except (ValueError, TypeError):
kw = datum.isocalendar()[1]
else:
kw = datum.isocalendar()[1]
# Normalize Fallgruppe
fallgruppe_raw = _str_or_none(_get(row, col_map, "fallgruppe"))
fallgruppe = _normalize_fallgruppe(fallgruppe_raw)
if fallgruppe is None:
errors.append(
f"Row {row_num}: Cannot map Fallgruppe '{fallgruppe_raw}' "
f"for {nachname}"
)
skipped += 1
continue
# Generate fall_id
fall_id = _generate_fall_id(jahr, kw, fallgruppe, nachname)
# Check for duplicate by fall_id
existing = db.query(Case.id).filter(Case.fall_id == fall_id).first()
if existing:
skipped += 1
continue
# Parse all other fields
vorname = _str_or_none(_get(row, col_map, "vorname"), max_len=100)
geburtsdatum = _parse_date(_get(row, col_map, "geburtsdatum"))
# Also check for duplicate by personal data
dup_query = db.query(Case.id).filter(
Case.nachname == nachname,
Case.fallgruppe == fallgruppe,
Case.datum == datum,
)
if vorname:
dup_query = dup_query.filter(Case.vorname == vorname)
if geburtsdatum:
dup_query = dup_query.filter(Case.geburtsdatum == geburtsdatum)
if dup_query.first():
skipped += 1
continue
anrede = _str_or_none(_get(row, col_map, "anrede"), max_len=20)
kvnr = _str_or_none(_get(row, col_map, "kvnr"), max_len=20)
versicherung = _str_or_none(_get(row, col_map, "versicherung"), max_len=50) or "DAK"
icd = _str_or_none(_get(row, col_map, "icd"))
strasse = _str_or_none(_get(row, col_map, "strasse"), max_len=255)
plz_raw = _get(row, col_map, "plz")
plz = str(int(plz_raw)).zfill(5) if isinstance(plz_raw, (int, float)) else _str_or_none(plz_raw, max_len=10)
ort = _str_or_none(_get(row, col_map, "ort"), max_len=100)
email = _str_or_none(_get(row, col_map, "email"), max_len=255)
ansprechpartner = _str_or_none(_get(row, col_map, "ansprechpartner"), max_len=200)
telefonnummer = _phone_str(_get(row, col_map, "telefonnummer"))
if telefonnummer and len(telefonnummer) > 50:
telefonnummer = telefonnummer[:50]
mobiltelefon = _phone_str(_get(row, col_map, "mobiltelefon"))
if mobiltelefon and len(mobiltelefon) > 50:
mobiltelefon = mobiltelefon[:50]
email2 = _str_or_none(_get(row, col_map, "email2"), max_len=255)
telefon2 = _phone_str(_get(row, col_map, "telefon2"))
if telefon2 and len(telefon2) > 50:
telefon2 = telefon2[:50]
unterlagen = _parse_bool(_get(row, col_map, "unterlagen"))
unterlagen_verschickt = _parse_date(_get(row, col_map, "unterlagen_verschickt"))
# "erhalten" is Optional[bool] -- None means unknown
erhalten_raw = _get(row, col_map, "erhalten")
erhalten = None if erhalten_raw is None else _parse_bool(erhalten_raw)
unterlagen_erhalten = _parse_date(_get(row, col_map, "unterlagen_erhalten"))
unterlagen_an_gutachter = _parse_date(_get(row, col_map, "unterlagen_an_gutachter"))
gutachten_bool = _parse_bool(_get(row, col_map, "gutachten"))
gutachter = _str_or_none(_get(row, col_map, "gutachter"), max_len=100)
gutachten_erstellt = _parse_date(_get(row, col_map, "gutachten_erstellt"))
gutachten_versendet = _parse_date(_get(row, col_map, "gutachten_versendet"))
schweigepflicht = _parse_bool(_get(row, col_map, "schweigepflicht"))
ablehnung = _parse_bool(_get(row, col_map, "ablehnung"))
abbruch = _parse_bool(_get(row, col_map, "abbruch"))
abbruch_datum = _parse_date(_get(row, col_map, "abbruch_datum"))
kurzbeschreibung = _str_or_none(_get(row, col_map, "kurzbeschreibung"))
fragestellung = _str_or_none(_get(row, col_map, "fragestellung"))
kommentar = _str_or_none(_get(row, col_map, "kommentar"))
sonstiges = _str_or_none(_get(row, col_map, "sonstiges"))
abgerechnet = _parse_bool(_get(row, col_map, "abgerechnet"))
abrechnung_datum = _parse_date(_get(row, col_map, "abrechnung_datum"))
case = Case(
fall_id=fall_id,
jahr=jahr,
kw=kw,
datum=datum,
anrede=anrede,
vorname=vorname,
nachname=nachname,
geburtsdatum=geburtsdatum,
kvnr=kvnr,
versicherung=versicherung,
icd=icd,
fallgruppe=fallgruppe,
strasse=strasse,
plz=plz,
ort=ort,
email=email,
ansprechpartner=ansprechpartner,
telefonnummer=telefonnummer,
mobiltelefon=mobiltelefon,
email2=email2,
telefon2=telefon2,
unterlagen=unterlagen,
unterlagen_verschickt=unterlagen_verschickt,
erhalten=erhalten,
unterlagen_erhalten=unterlagen_erhalten,
unterlagen_an_gutachter=unterlagen_an_gutachter,
gutachten=gutachten_bool,
gutachter=gutachter,
gutachten_erstellt=gutachten_erstellt,
gutachten_versendet=gutachten_versendet,
schweigepflicht=schweigepflicht,
ablehnung=ablehnung,
abbruch=abbruch,
abbruch_datum=abbruch_datum,
kurzbeschreibung=kurzbeschreibung,
fragestellung=fragestellung,
kommentar=kommentar,
sonstiges=sonstiges,
abgerechnet=abgerechnet,
abrechnung_datum=abrechnung_datum,
import_source=f"Abrechnung_DAK.xlsx:{sheet_name}",
)
db.add(case)
imported += 1
# Flush in batches of 100 to catch constraint violations early
if imported % 100 == 0:
db.flush()
except Exception as e:
nachname_display = _str_or_none(_get(row, col_map, "nachname")) or "?"
errors.append(f"Row {row_num} ({nachname_display}): {e}")
logger.warning(
"Import error in sheet '%s' row %d: %s",
sheet_name, row_num, e,
)
# Final flush
if imported > 0:
db.flush()
logger.info(
"Sheet '%s': %d imported, %d skipped, %d errors",
sheet_name, imported, skipped, len(errors),
)
return {"imported": imported, "skipped": skipped, "errors": errors}
def import_full_abrechnung(
db: Session,
filepath: str,
user_id: int | None = None,
) -> dict:
"""Import all relevant sheets from Abrechnung_DAK.xlsx.
Opens the workbook in read-only mode, iterates through year sheets,
commits all changes at the end, and logs the import.
Args:
db: SQLAlchemy session.
filepath: Path to the Excel file.
user_id: User ID for import logging.
Returns:
Dict mapping sheet name -> {"imported": int, "skipped": int, "errors": list}
"""
filename = os.path.basename(filepath)
wb = load_workbook(filepath, read_only=True, data_only=True)
results: dict[str, dict] = {}
total_imported = 0
total_skipped = 0
all_errors: list[str] = []
try:
available_sheets = set(wb.sheetnames)
for sheet_name in YEAR_SHEETS:
if sheet_name not in available_sheets:
logger.info("Sheet '%s' not found, skipping", sheet_name)
continue
# Determine default year from sheet name
if sheet_name == "2020-2022":
default_year = None # Will use per-row "Jahr" column
else:
try:
default_year = int(sheet_name)
except ValueError:
default_year = None
ws = wb[sheet_name]
logger.info("Importing sheet '%s'...", sheet_name)
result = import_abrechnung_sheet(
db=db,
ws=ws,
sheet_name=sheet_name,
default_year=default_year,
user_id=user_id,
)
results[sheet_name] = result
total_imported += result["imported"]
total_skipped += result["skipped"]
if result["errors"]:
all_errors.extend(
[f"[{sheet_name}] {e}" for e in result["errors"]]
)
# Also check for any additional year-like sheets not in our list
for sn in wb.sheetnames:
if sn in SKIP_SHEETS or sn in results:
continue
# Check if it looks like a year sheet (has standard headers)
ws = wb[sn]
try:
header = next(ws.iter_rows(max_row=1, values_only=True))
header_names = {str(h).strip().lower() for h in header if h}
if "nachname" in header_names and "fallgruppe" in header_names:
logger.info(
"Found additional data sheet '%s', skipping "
"(not in YEAR_SHEETS list). Add it manually if needed.",
sn,
)
except StopIteration:
pass
# Commit everything
db.flush()
# Log the import
log = ImportLog(
filename=filename,
import_type="historical_excel",
cases_imported=total_imported,
cases_skipped=total_skipped,
cases_updated=0,
errors="; ".join(all_errors[:50]) if all_errors else None,
details={
"sheets": {
sn: {"imported": r["imported"], "skipped": r["skipped"],
"error_count": len(r["errors"])}
for sn, r in results.items()
}
},
imported_by=user_id,
)
db.add(log)
db.commit()
except Exception:
db.rollback()
raise
finally:
wb.close()
logger.info(
"Full import complete: %d imported, %d skipped, %d errors across %d sheets",
total_imported, total_skipped, len(all_errors), len(results),
)
return results