"""Bidirectional sync between the database and Abrechnung_DAK.xlsx files. This is a convenience feature for exporting case data to the DAK Abrechnung Excel format and re-importing changes made in the spreadsheet back into the DB. """ import logging from datetime import date, datetime from io import BytesIO from typing import Any from openpyxl import Workbook, load_workbook from sqlalchemy.orm import Session from app.config import get_settings from app.models.case import Case settings = get_settings() logger = logging.getLogger(__name__) # Columns in the Abrechnung export, matching the expected DAK format ABRECHNUNG_COLUMNS = [ ("fall_id", "Fall-ID"), ("datum", "Datum"), ("nachname", "Nachname"), ("vorname", "Vorname"), ("geburtsdatum", "Geburtsdatum"), ("kvnr", "KVNR"), ("fallgruppe", "Fallgruppe"), ("icd", "ICD"), ("gutachten", "Gutachten"), ("gutachten_typ", "Gutachten-Typ"), ("therapieaenderung", "Therapieaenderung"), ("abgerechnet", "Abgerechnet"), ("abrechnung_datum", "Abrechnungsdatum"), ] def sync_db_to_excel(db: Session, filepath: str | None = None) -> bytes: """Export DB cases to Abrechnung format xlsx. Cases are grouped by year into separate sheets. Each sheet contains the columns defined in ABRECHNUNG_COLUMNS. Args: db: Database session. filepath: Optional path to write the file to disk. If None, only returns bytes. Returns: The Excel file as bytes. """ cases = db.query(Case).filter(Case.versicherung == settings.VERSICHERUNG_FILTER).order_by(Case.jahr.desc(), Case.datum.desc()).all() # Group cases by year by_year: dict[int, list[Case]] = {} for case in cases: by_year.setdefault(case.jahr, []).append(case) wb = Workbook() # Remove default sheet wb.remove(wb.active) for jahr in sorted(by_year.keys(), reverse=True): ws = wb.create_sheet(title=str(jahr)) # Header row for col_idx, (_, header) in enumerate(ABRECHNUNG_COLUMNS, start=1): ws.cell(row=1, column=col_idx, value=header) # Data rows for row_idx, case in enumerate(by_year[jahr], start=2): for col_idx, (field, _) in enumerate(ABRECHNUNG_COLUMNS, start=1): value = getattr(case, field, None) # Convert date objects to strings for Excel if isinstance(value, (date, datetime)): value = value.strftime("%d.%m.%Y") elif isinstance(value, bool): value = "Ja" if value else "Nein" ws.cell(row=row_idx, column=col_idx, value=value) # Ensure at least one sheet exists if not wb.sheetnames: wb.create_sheet(title="Leer") output = BytesIO() wb.save(output) xlsx_bytes = output.getvalue() if filepath: with open(filepath, "wb") as f: f.write(xlsx_bytes) logger.info("Wrote Abrechnung export to %s", filepath) return xlsx_bytes def sync_excel_to_db( db: Session, content: bytes, user_id: int | None = None ) -> dict[str, Any]: """Import changes from edited Abrechnung xlsx back to DB. Compares the spreadsheet rows (matched by fall_id) against existing cases and updates any changed fields. Args: db: Database session. content: The Excel file content as bytes. user_id: ID of the user performing the import. Returns: Dict with ``updated``, ``skipped``, and ``errors`` counts. """ wb = load_workbook(BytesIO(content), read_only=True, data_only=True) # Build a header-to-column-index map from ABRECHNUNG_COLUMNS field_by_header: dict[str, str] = { header: field for field, header in ABRECHNUNG_COLUMNS } updated = 0 skipped = 0 errors: list[str] = [] for ws in wb.worksheets: rows = list(ws.iter_rows(values_only=True)) if not rows: continue # Map header row to field names header_row = rows[0] col_map: dict[int, str] = {} for col_idx, header_val in enumerate(header_row): if header_val and str(header_val).strip() in field_by_header: col_map[col_idx] = field_by_header[str(header_val).strip()] if "fall_id" not in col_map.values(): logger.warning("Sheet '%s' has no Fall-ID column, skipping", ws.title) continue # Find the fall_id column index fall_id_col = next( idx for idx, field in col_map.items() if field == "fall_id" ) for row_num, row in enumerate(rows[1:], start=2): try: fall_id = row[fall_id_col] if not fall_id: skipped += 1 continue case = ( db.query(Case) .filter(Case.fall_id == str(fall_id).strip()) .first() ) if not case: skipped += 1 continue changed = False for col_idx, field in col_map.items(): if field == "fall_id": continue new_val = row[col_idx] if col_idx < len(row) else None # Convert "Ja"/"Nein" strings to booleans for bool fields if field in ("gutachten", "abgerechnet"): if isinstance(new_val, str): new_val = new_val.strip().lower() in ("ja", "1", "true") elif new_val is None: continue current_val = getattr(case, field, None) # Rough comparison (skip type mismatches gracefully) if str(new_val) != str(current_val) and new_val is not None: setattr(case, field, new_val) changed = True if changed: if user_id: case.updated_by = user_id db.commit() updated += 1 else: skipped += 1 except Exception as exc: errors.append(f"Sheet '{ws.title}' row {row_num}: {exc}") logger.warning("Error in sync row: %s", exc) return {"updated": updated, "skipped": skipped, "errors": errors}