dak.c2s/backend/app/services/excel_sync.py
CCS Admin 32127c30c3 fix: filter all case queries to DAK insurance only
Add VERSICHERUNG_FILTER="DAK" to config and apply it to all case
queries: list, detail, pending-icd, pending-coding, coding queue,
dashboard KPIs, all 5 report sheets, and excel sync export.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-26 10:20:33 +00:00

195 lines
6.3 KiB
Python

"""Bidirectional sync between the database and Abrechnung_DAK.xlsx files.
This is a convenience feature for exporting case data to the DAK Abrechnung
Excel format and re-importing changes made in the spreadsheet back into the DB.
"""
import logging
from datetime import date, datetime
from io import BytesIO
from typing import Any
from openpyxl import Workbook, load_workbook
from sqlalchemy.orm import Session
from app.config import get_settings
from app.models.case import Case
settings = get_settings()
logger = logging.getLogger(__name__)
# Columns in the Abrechnung export, matching the expected DAK format
ABRECHNUNG_COLUMNS = [
("fall_id", "Fall-ID"),
("datum", "Datum"),
("nachname", "Nachname"),
("vorname", "Vorname"),
("geburtsdatum", "Geburtsdatum"),
("kvnr", "KVNR"),
("fallgruppe", "Fallgruppe"),
("icd", "ICD"),
("gutachten", "Gutachten"),
("gutachten_typ", "Gutachten-Typ"),
("therapieaenderung", "Therapieaenderung"),
("abgerechnet", "Abgerechnet"),
("abrechnung_datum", "Abrechnungsdatum"),
]
def sync_db_to_excel(db: Session, filepath: str | None = None) -> bytes:
"""Export DB cases to Abrechnung format xlsx.
Cases are grouped by year into separate sheets. Each sheet contains the
columns defined in ABRECHNUNG_COLUMNS.
Args:
db: Database session.
filepath: Optional path to write the file to disk. If None, only
returns bytes.
Returns:
The Excel file as bytes.
"""
cases = db.query(Case).filter(Case.versicherung == settings.VERSICHERUNG_FILTER).order_by(Case.jahr.desc(), Case.datum.desc()).all()
# Group cases by year
by_year: dict[int, list[Case]] = {}
for case in cases:
by_year.setdefault(case.jahr, []).append(case)
wb = Workbook()
# Remove default sheet
wb.remove(wb.active)
for jahr in sorted(by_year.keys(), reverse=True):
ws = wb.create_sheet(title=str(jahr))
# Header row
for col_idx, (_, header) in enumerate(ABRECHNUNG_COLUMNS, start=1):
ws.cell(row=1, column=col_idx, value=header)
# Data rows
for row_idx, case in enumerate(by_year[jahr], start=2):
for col_idx, (field, _) in enumerate(ABRECHNUNG_COLUMNS, start=1):
value = getattr(case, field, None)
# Convert date objects to strings for Excel
if isinstance(value, (date, datetime)):
value = value.strftime("%d.%m.%Y")
elif isinstance(value, bool):
value = "Ja" if value else "Nein"
ws.cell(row=row_idx, column=col_idx, value=value)
# Ensure at least one sheet exists
if not wb.sheetnames:
wb.create_sheet(title="Leer")
output = BytesIO()
wb.save(output)
xlsx_bytes = output.getvalue()
if filepath:
with open(filepath, "wb") as f:
f.write(xlsx_bytes)
logger.info("Wrote Abrechnung export to %s", filepath)
return xlsx_bytes
def sync_excel_to_db(
db: Session, content: bytes, user_id: int | None = None
) -> dict[str, Any]:
"""Import changes from edited Abrechnung xlsx back to DB.
Compares the spreadsheet rows (matched by fall_id) against existing
cases and updates any changed fields.
Args:
db: Database session.
content: The Excel file content as bytes.
user_id: ID of the user performing the import.
Returns:
Dict with ``updated``, ``skipped``, and ``errors`` counts.
"""
wb = load_workbook(BytesIO(content), read_only=True, data_only=True)
# Build a header-to-column-index map from ABRECHNUNG_COLUMNS
field_by_header: dict[str, str] = {
header: field for field, header in ABRECHNUNG_COLUMNS
}
updated = 0
skipped = 0
errors: list[str] = []
for ws in wb.worksheets:
rows = list(ws.iter_rows(values_only=True))
if not rows:
continue
# Map header row to field names
header_row = rows[0]
col_map: dict[int, str] = {}
for col_idx, header_val in enumerate(header_row):
if header_val and str(header_val).strip() in field_by_header:
col_map[col_idx] = field_by_header[str(header_val).strip()]
if "fall_id" not in col_map.values():
logger.warning("Sheet '%s' has no Fall-ID column, skipping", ws.title)
continue
# Find the fall_id column index
fall_id_col = next(
idx for idx, field in col_map.items() if field == "fall_id"
)
for row_num, row in enumerate(rows[1:], start=2):
try:
fall_id = row[fall_id_col]
if not fall_id:
skipped += 1
continue
case = (
db.query(Case)
.filter(Case.fall_id == str(fall_id).strip())
.first()
)
if not case:
skipped += 1
continue
changed = False
for col_idx, field in col_map.items():
if field == "fall_id":
continue
new_val = row[col_idx] if col_idx < len(row) else None
# Convert "Ja"/"Nein" strings to booleans for bool fields
if field in ("gutachten", "abgerechnet"):
if isinstance(new_val, str):
new_val = new_val.strip().lower() in ("ja", "1", "true")
elif new_val is None:
continue
current_val = getattr(case, field, None)
# Rough comparison (skip type mismatches gracefully)
if str(new_val) != str(current_val) and new_val is not None:
setattr(case, field, new_val)
changed = True
if changed:
if user_id:
case.updated_by = user_id
db.commit()
updated += 1
else:
skipped += 1
except Exception as exc:
errors.append(f"Sheet '{ws.title}' row {row_num}: {exc}")
logger.warning("Error in sync row: %s", exc)
return {"updated": updated, "skipped": skipped, "errors": errors}