mirror of
https://github.com/complexcaresolutions/dak.c2s.git
synced 2026-03-17 21:53:41 +00:00
- New Excel export service for weekly DAK summary sheets (c2s / c2s_g_s variants) - New API endpoint GET /reports/wochenuebersicht (admin-only) - ICD import auto-detects format (coding template vs. Wochenübersicht KVNR-based) - New admin frontend page with download form - Route + sidebar navigation entry Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
278 lines
8.1 KiB
Python
278 lines
8.1 KiB
Python
"""ICD service — normalize, split, validate, save, and generate coding templates."""
|
|
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
from io import BytesIO
|
|
from typing import Optional
|
|
|
|
from openpyxl import Workbook, load_workbook
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.config import get_settings
|
|
from app.models.case import Case, CaseICDCode
|
|
from app.utils.validators import normalize_icd_hauptgruppe, split_icd_codes, validate_icd
|
|
|
|
settings = get_settings()
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def normalize_and_validate_icd(raw: str) -> list[tuple[str, str]]:
|
|
"""Split, validate, and normalize ICD codes from a raw string.
|
|
|
|
Returns list of (icd_code, hauptgruppe) tuples.
|
|
Raises ValueError for any invalid code.
|
|
"""
|
|
codes = split_icd_codes(raw)
|
|
result = []
|
|
for code in codes:
|
|
validated = validate_icd(code)
|
|
hauptgruppe = normalize_icd_hauptgruppe(validated)
|
|
result.append((validated, hauptgruppe))
|
|
return result
|
|
|
|
|
|
def save_icd_for_case(
|
|
db: Session,
|
|
case_id: int,
|
|
icd_raw: str,
|
|
user_id: int,
|
|
) -> Case:
|
|
"""Set ICD codes for a case. Replaces existing ICD codes."""
|
|
case = db.query(Case).filter(Case.id == case_id).first()
|
|
if not case:
|
|
from app.core.exceptions import CaseNotFoundError
|
|
|
|
raise CaseNotFoundError()
|
|
|
|
# Validate all codes first
|
|
icd_pairs = normalize_and_validate_icd(icd_raw)
|
|
|
|
# Delete existing ICD codes for this case
|
|
db.query(CaseICDCode).filter(CaseICDCode.case_id == case_id).delete()
|
|
|
|
# Store raw ICD string on case
|
|
case.icd = ", ".join(code for code, _ in icd_pairs)
|
|
case.icd_entered_by = user_id
|
|
case.icd_entered_at = datetime.now(timezone.utc)
|
|
|
|
# Create individual ICD code entries
|
|
for code, hauptgruppe in icd_pairs:
|
|
db.add(
|
|
CaseICDCode(
|
|
case_id=case_id,
|
|
icd_code=code,
|
|
icd_hauptgruppe=hauptgruppe,
|
|
)
|
|
)
|
|
|
|
db.commit()
|
|
db.refresh(case)
|
|
return case
|
|
|
|
|
|
def get_pending_icd_cases(
|
|
db: Session,
|
|
jahr: Optional[int] = None,
|
|
fallgruppe: Optional[str] = None,
|
|
page: int = 1,
|
|
per_page: int = 50,
|
|
) -> tuple[list[Case], int]:
|
|
"""Get cases without ICD codes."""
|
|
query = db.query(Case).filter(
|
|
Case.versicherung == settings.VERSICHERUNG_FILTER,
|
|
Case.icd == None, # noqa: E711
|
|
)
|
|
|
|
if jahr:
|
|
query = query.filter(Case.jahr == jahr)
|
|
if fallgruppe:
|
|
query = query.filter(Case.fallgruppe == fallgruppe)
|
|
|
|
total = query.count()
|
|
cases = (
|
|
query.order_by(Case.datum.desc())
|
|
.offset((page - 1) * per_page)
|
|
.limit(per_page)
|
|
.all()
|
|
)
|
|
return cases, total
|
|
|
|
|
|
def generate_coding_template(
|
|
db: Session,
|
|
jahr: Optional[int] = None,
|
|
fallgruppe: Optional[str] = None,
|
|
) -> bytes:
|
|
"""Generate an Excel template for ICD coding.
|
|
|
|
Returns .xlsx bytes with columns:
|
|
Case_ID, Fall_ID, Fallgruppe, Datum, ICD (empty)
|
|
|
|
Patient names are excluded for data privacy (DSGVO).
|
|
"""
|
|
cases, _ = get_pending_icd_cases(
|
|
db, jahr=jahr, fallgruppe=fallgruppe, page=1, per_page=10000
|
|
)
|
|
|
|
wb = Workbook()
|
|
ws = wb.active
|
|
ws.title = "ICD Coding"
|
|
|
|
# Header
|
|
headers = ["Case_ID", "Fall_ID", "Fallgruppe", "Datum", "ICD"]
|
|
for col, header in enumerate(headers, start=1):
|
|
ws.cell(row=1, column=col, value=header)
|
|
|
|
# Data
|
|
for i, case in enumerate(cases, start=2):
|
|
ws.cell(row=i, column=1, value=case.id)
|
|
ws.cell(row=i, column=2, value=case.fall_id)
|
|
ws.cell(row=i, column=3, value=case.fallgruppe)
|
|
ws.cell(row=i, column=4, value=case.datum.isoformat() if case.datum else "")
|
|
# Column 5 (ICD) left empty for admin to fill in
|
|
|
|
# Auto-width
|
|
for col in ws.columns:
|
|
max_length = max(len(str(cell.value or "")) for cell in col)
|
|
ws.column_dimensions[col[0].column_letter].width = min(max_length + 2, 30)
|
|
|
|
buffer = BytesIO()
|
|
wb.save(buffer)
|
|
return buffer.getvalue()
|
|
|
|
|
|
def import_icd_from_xlsx(db: Session, content: bytes, user_id: int) -> dict:
|
|
"""Import ICD codes from an Excel file.
|
|
|
|
Auto-detects the format:
|
|
- Coding template: Column A contains numeric Case_IDs
|
|
- Wochenübersicht: Column A contains KVNR strings (letter prefix)
|
|
|
|
Returns: {"updated": int, "errors": list[str]}
|
|
"""
|
|
wb = load_workbook(BytesIO(content), read_only=True)
|
|
ws = wb.active
|
|
|
|
# Detect format by scanning for first data row
|
|
detected_format = _detect_import_format(ws)
|
|
|
|
if detected_format == "wochenuebersicht":
|
|
return _import_icd_wochenuebersicht(db, ws, user_id)
|
|
|
|
# Default: Coding template (Case_ID-based)
|
|
return _import_icd_coding_template(db, ws, user_id)
|
|
|
|
|
|
def _detect_import_format(ws) -> str:
|
|
"""Detect whether the worksheet is a coding template or Wochenübersicht."""
|
|
for row in ws.iter_rows(min_row=1, max_row=20, values_only=True):
|
|
if not row or not row[0]:
|
|
continue
|
|
val = str(row[0]).strip()
|
|
# Skip header rows
|
|
if val.lower() in ("case_id", "kvnr", "kalenderwoche:"):
|
|
continue
|
|
# If it's a pure integer, it's a coding template (Case_ID)
|
|
try:
|
|
int(val)
|
|
return "coding_template"
|
|
except (ValueError, TypeError):
|
|
pass
|
|
# If it starts with a letter, it's likely a KVNR → Wochenübersicht
|
|
if val and val[0].isalpha():
|
|
return "wochenuebersicht"
|
|
return "coding_template"
|
|
|
|
|
|
def _import_icd_coding_template(db: Session, ws, user_id: int) -> dict:
|
|
"""Import ICD codes from a coding template (Case_ID in column A)."""
|
|
updated = 0
|
|
errors: list[str] = []
|
|
|
|
for row in ws.iter_rows(min_row=2, values_only=False):
|
|
case_id_cell = row[0].value
|
|
if not case_id_cell:
|
|
continue
|
|
|
|
try:
|
|
case_id = int(case_id_cell)
|
|
except (ValueError, TypeError):
|
|
continue
|
|
|
|
# Find ICD column: last column (col 5 in new template, col 7 in legacy)
|
|
icd_value = None
|
|
last_idx = len(row) - 1
|
|
if last_idx >= 0 and row[last_idx].value:
|
|
icd_value = str(row[last_idx].value).strip()
|
|
|
|
if not icd_value:
|
|
continue
|
|
|
|
try:
|
|
save_icd_for_case(db, case_id, icd_value, user_id)
|
|
updated += 1
|
|
except Exception as e:
|
|
errors.append(f"Case {case_id}: {e}")
|
|
|
|
return {"updated": updated, "errors": errors}
|
|
|
|
|
|
def _import_icd_wochenuebersicht(db: Session, ws, user_id: int) -> dict:
|
|
"""Import ICD codes from a Wochenübersicht file (KVNR in col A, ICD in col I)."""
|
|
updated = 0
|
|
errors: list[str] = []
|
|
|
|
for row in ws.iter_rows(values_only=False):
|
|
# Skip rows without enough columns
|
|
if len(row) < 9:
|
|
continue
|
|
|
|
col_a = row[0].value
|
|
if not col_a:
|
|
continue
|
|
|
|
col_a_str = str(col_a).strip()
|
|
|
|
# Skip header/label rows
|
|
if col_a_str.lower() in ("kvnr", ""):
|
|
continue
|
|
# Skip non-data rows (e.g. "Kalenderwoche:", instruction text)
|
|
if not col_a_str[0].isalpha() and not col_a_str[0].isdigit():
|
|
continue
|
|
# Skip rows where col A looks like a header label
|
|
if col_a_str.lower().startswith(("bitte", "kalender")):
|
|
continue
|
|
|
|
kvnr = col_a_str
|
|
|
|
# ICD is in column I (index 8)
|
|
icd_cell = row[8].value
|
|
if not icd_cell:
|
|
continue
|
|
icd_value = str(icd_cell).strip()
|
|
if not icd_value or icd_value.lower() == "icd-10":
|
|
continue
|
|
|
|
# Find case by KVNR — take the most recent one
|
|
case = (
|
|
db.query(Case)
|
|
.filter(
|
|
Case.kvnr == kvnr,
|
|
Case.versicherung == settings.VERSICHERUNG_FILTER,
|
|
)
|
|
.order_by(Case.datum.desc())
|
|
.first()
|
|
)
|
|
|
|
if not case:
|
|
errors.append(f"KVNR {kvnr}: Kein Fall gefunden")
|
|
continue
|
|
|
|
try:
|
|
save_icd_for_case(db, case.id, icd_value, user_id)
|
|
updated += 1
|
|
except Exception as e:
|
|
errors.append(f"KVNR {kvnr} (Case {case.id}): {e}")
|
|
|
|
return {"updated": updated, "errors": errors}
|