From 498cb7048d67f2acc242a2fcd7c09c26f4cf3545 Mon Sep 17 00:00:00 2001 From: CCS Admin Date: Tue, 24 Feb 2026 07:49:05 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20ICD=20service=20=E2=80=94=20normalize,?= =?UTF-8?q?=20split,=20validate,=20coding=20template?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Opus 4.6 --- backend/app/services/icd_service.py | 175 ++++++++++++++++++++++++++++ backend/tests/test_icd_service.py | 96 +++++++++++++++ 2 files changed, 271 insertions(+) create mode 100644 backend/app/services/icd_service.py create mode 100644 backend/tests/test_icd_service.py diff --git a/backend/app/services/icd_service.py b/backend/app/services/icd_service.py new file mode 100644 index 0000000..c665c01 --- /dev/null +++ b/backend/app/services/icd_service.py @@ -0,0 +1,175 @@ +"""ICD service — normalize, split, validate, save, and generate coding templates.""" + +import logging +from datetime import datetime, timezone +from io import BytesIO +from typing import Optional + +from openpyxl import Workbook, load_workbook +from sqlalchemy.orm import Session + +from app.models.case import Case, CaseICDCode +from app.utils.validators import normalize_icd_hauptgruppe, split_icd_codes, validate_icd + +logger = logging.getLogger(__name__) + + +def normalize_and_validate_icd(raw: str) -> list[tuple[str, str]]: + """Split, validate, and normalize ICD codes from a raw string. + + Returns list of (icd_code, hauptgruppe) tuples. + Raises ValueError for any invalid code. + """ + codes = split_icd_codes(raw) + result = [] + for code in codes: + validated = validate_icd(code) + hauptgruppe = normalize_icd_hauptgruppe(validated) + result.append((validated, hauptgruppe)) + return result + + +def save_icd_for_case( + db: Session, + case_id: int, + icd_raw: str, + user_id: int, +) -> Case: + """Set ICD codes for a case. Replaces existing ICD codes.""" + case = db.query(Case).filter(Case.id == case_id).first() + if not case: + from app.core.exceptions import CaseNotFoundError + + raise CaseNotFoundError() + + # Validate all codes first + icd_pairs = normalize_and_validate_icd(icd_raw) + + # Delete existing ICD codes for this case + db.query(CaseICDCode).filter(CaseICDCode.case_id == case_id).delete() + + # Store raw ICD string on case + case.icd = ", ".join(code for code, _ in icd_pairs) + case.icd_entered_by = user_id + case.icd_entered_at = datetime.now(timezone.utc) + + # Create individual ICD code entries + for code, hauptgruppe in icd_pairs: + db.add( + CaseICDCode( + case_id=case_id, + icd_code=code, + icd_hauptgruppe=hauptgruppe, + ) + ) + + db.commit() + db.refresh(case) + return case + + +def get_pending_icd_cases( + db: Session, + jahr: Optional[int] = None, + fallgruppe: Optional[str] = None, + page: int = 1, + per_page: int = 50, +) -> tuple[list[Case], int]: + """Get cases without ICD codes.""" + query = db.query(Case).filter(Case.icd == None) # noqa: E711 + + if jahr: + query = query.filter(Case.jahr == jahr) + if fallgruppe: + query = query.filter(Case.fallgruppe == fallgruppe) + + total = query.count() + cases = ( + query.order_by(Case.datum.desc()) + .offset((page - 1) * per_page) + .limit(per_page) + .all() + ) + return cases, total + + +def generate_coding_template( + db: Session, + jahr: Optional[int] = None, + fallgruppe: Optional[str] = None, +) -> bytes: + """Generate an Excel template for ICD coding. + + Returns .xlsx bytes with columns: + Case_ID, Fall_ID, Nachname, Vorname, Fallgruppe, Datum, ICD (empty) + """ + cases, _ = get_pending_icd_cases( + db, jahr=jahr, fallgruppe=fallgruppe, page=1, per_page=10000 + ) + + wb = Workbook() + ws = wb.active + ws.title = "ICD Coding" + + # Header + headers = ["Case_ID", "Fall_ID", "Nachname", "Vorname", "Fallgruppe", "Datum", "ICD"] + for col, header in enumerate(headers, start=1): + ws.cell(row=1, column=col, value=header) + + # Data + for i, case in enumerate(cases, start=2): + ws.cell(row=i, column=1, value=case.id) + ws.cell(row=i, column=2, value=case.fall_id) + ws.cell(row=i, column=3, value=case.nachname) + ws.cell(row=i, column=4, value=case.vorname) + ws.cell(row=i, column=5, value=case.fallgruppe) + ws.cell(row=i, column=6, value=case.datum.isoformat() if case.datum else "") + # Column 7 (ICD) left empty for DAK to fill in + + # Auto-width + for col in ws.columns: + max_length = max(len(str(cell.value or "")) for cell in col) + ws.column_dimensions[col[0].column_letter].width = min(max_length + 2, 30) + + buffer = BytesIO() + wb.save(buffer) + return buffer.getvalue() + + +def import_icd_from_xlsx(db: Session, content: bytes, user_id: int) -> dict: + """Import ICD codes from a filled-in coding template Excel file. + + Expects columns: Case_ID (col 1), ICD (col 7 or last col) + Returns: {"updated": int, "errors": list[str]} + """ + wb = load_workbook(BytesIO(content), read_only=True) + ws = wb.active + + updated = 0 + errors: list[str] = [] + + for row in ws.iter_rows(min_row=2, values_only=False): + case_id_cell = row[0].value + if not case_id_cell: + continue + + try: + case_id = int(case_id_cell) + except (ValueError, TypeError): + continue + + # Find ICD column (column 7) + icd_value = None + if len(row) >= 7 and row[6].value: + icd_value = str(row[6].value).strip() + + if not icd_value: + continue + + try: + save_icd_for_case(db, case_id, icd_value, user_id) + updated += 1 + except Exception as e: + errors.append(f"Case {case_id}: {e}") + + return {"updated": updated, "errors": errors} diff --git a/backend/tests/test_icd_service.py b/backend/tests/test_icd_service.py new file mode 100644 index 0000000..9d22ee1 --- /dev/null +++ b/backend/tests/test_icd_service.py @@ -0,0 +1,96 @@ +"""Tests for the ICD service — normalize, validate, coding template.""" + +from io import BytesIO +from unittest.mock import MagicMock, PropertyMock + +import pytest +from openpyxl import load_workbook + +from app.services.icd_service import generate_coding_template, normalize_and_validate_icd + + +# ── normalize_and_validate_icd ───────────────────────────────────── + + +class TestNormalizeAndValidateICD: + def test_normalize_and_validate_single(self): + result = normalize_and_validate_icd("C50.1") + assert result == [("C50.1", "C50")] + + def test_normalize_and_validate_multiple(self): + result = normalize_and_validate_icd("C50.1, C79.5") + assert result == [("C50.1", "C50"), ("C79.5", "C79")] + + def test_normalize_and_validate_semicolon(self): + result = normalize_and_validate_icd("c50;D12.3") + assert result == [("C50", "C50"), ("D12.3", "D12")] + + def test_normalize_and_validate_invalid(self): + with pytest.raises(ValueError, match="Invalid ICD code format"): + normalize_and_validate_icd("XYZ") + + def test_normalize_and_validate_empty(self): + result = normalize_and_validate_icd("") + assert result == [] + + +# ── generate_coding_template ────────────────────────────────────── + + +class TestGenerateCodingTemplate: + def test_generate_coding_template_returns_bytes(self): + """Mock the DB session and verify the template is valid xlsx bytes.""" + from datetime import date + + # Create mock cases + mock_case = MagicMock() + mock_case.id = 1 + mock_case.fall_id = "FALL-001" + mock_case.nachname = "Mustermann" + mock_case.vorname = "Max" + mock_case.fallgruppe = "onko" + mock_case.datum = date(2026, 2, 24) + + # Build a mock query chain + mock_db = MagicMock(spec=["query"]) + mock_query = MagicMock() + mock_db.query.return_value = mock_query + mock_query.filter.return_value = mock_query + mock_query.count.return_value = 1 + mock_query.order_by.return_value = mock_query + mock_query.offset.return_value = mock_query + mock_query.limit.return_value = mock_query + mock_query.all.return_value = [mock_case] + + result = generate_coding_template(mock_db, jahr=2026, fallgruppe="onko") + + # Verify it is non-empty bytes + assert isinstance(result, bytes) + assert len(result) > 0 + + # Verify it is valid xlsx + wb = load_workbook(BytesIO(result)) + ws = wb.active + assert ws.title == "ICD Coding" + + # Verify header row + headers = [cell.value for cell in ws[1]] + assert headers == [ + "Case_ID", + "Fall_ID", + "Nachname", + "Vorname", + "Fallgruppe", + "Datum", + "ICD", + ] + + # Verify data row + row2 = [cell.value for cell in ws[2]] + assert row2[0] == 1 + assert row2[1] == "FALL-001" + assert row2[2] == "Mustermann" + assert row2[3] == "Max" + assert row2[4] == "onko" + assert row2[5] == "2026-02-24" + assert row2[6] is None # ICD column should be empty