mirror of
https://github.com/complexcaresolutions/dak.c2s.git
synced 2026-03-17 18:23:42 +00:00
feat: ICD service — normalize, split, validate, coding template
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
df26b51e14
commit
498cb7048d
2 changed files with 271 additions and 0 deletions
175
backend/app/services/icd_service.py
Normal file
175
backend/app/services/icd_service.py
Normal file
|
|
@ -0,0 +1,175 @@
|
|||
"""ICD service — normalize, split, validate, save, and generate coding templates."""
|
||||
|
||||
import logging
|
||||
from datetime import datetime, timezone
|
||||
from io import BytesIO
|
||||
from typing import Optional
|
||||
|
||||
from openpyxl import Workbook, load_workbook
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from app.models.case import Case, CaseICDCode
|
||||
from app.utils.validators import normalize_icd_hauptgruppe, split_icd_codes, validate_icd
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def normalize_and_validate_icd(raw: str) -> list[tuple[str, str]]:
|
||||
"""Split, validate, and normalize ICD codes from a raw string.
|
||||
|
||||
Returns list of (icd_code, hauptgruppe) tuples.
|
||||
Raises ValueError for any invalid code.
|
||||
"""
|
||||
codes = split_icd_codes(raw)
|
||||
result = []
|
||||
for code in codes:
|
||||
validated = validate_icd(code)
|
||||
hauptgruppe = normalize_icd_hauptgruppe(validated)
|
||||
result.append((validated, hauptgruppe))
|
||||
return result
|
||||
|
||||
|
||||
def save_icd_for_case(
|
||||
db: Session,
|
||||
case_id: int,
|
||||
icd_raw: str,
|
||||
user_id: int,
|
||||
) -> Case:
|
||||
"""Set ICD codes for a case. Replaces existing ICD codes."""
|
||||
case = db.query(Case).filter(Case.id == case_id).first()
|
||||
if not case:
|
||||
from app.core.exceptions import CaseNotFoundError
|
||||
|
||||
raise CaseNotFoundError()
|
||||
|
||||
# Validate all codes first
|
||||
icd_pairs = normalize_and_validate_icd(icd_raw)
|
||||
|
||||
# Delete existing ICD codes for this case
|
||||
db.query(CaseICDCode).filter(CaseICDCode.case_id == case_id).delete()
|
||||
|
||||
# Store raw ICD string on case
|
||||
case.icd = ", ".join(code for code, _ in icd_pairs)
|
||||
case.icd_entered_by = user_id
|
||||
case.icd_entered_at = datetime.now(timezone.utc)
|
||||
|
||||
# Create individual ICD code entries
|
||||
for code, hauptgruppe in icd_pairs:
|
||||
db.add(
|
||||
CaseICDCode(
|
||||
case_id=case_id,
|
||||
icd_code=code,
|
||||
icd_hauptgruppe=hauptgruppe,
|
||||
)
|
||||
)
|
||||
|
||||
db.commit()
|
||||
db.refresh(case)
|
||||
return case
|
||||
|
||||
|
||||
def get_pending_icd_cases(
|
||||
db: Session,
|
||||
jahr: Optional[int] = None,
|
||||
fallgruppe: Optional[str] = None,
|
||||
page: int = 1,
|
||||
per_page: int = 50,
|
||||
) -> tuple[list[Case], int]:
|
||||
"""Get cases without ICD codes."""
|
||||
query = db.query(Case).filter(Case.icd == None) # noqa: E711
|
||||
|
||||
if jahr:
|
||||
query = query.filter(Case.jahr == jahr)
|
||||
if fallgruppe:
|
||||
query = query.filter(Case.fallgruppe == fallgruppe)
|
||||
|
||||
total = query.count()
|
||||
cases = (
|
||||
query.order_by(Case.datum.desc())
|
||||
.offset((page - 1) * per_page)
|
||||
.limit(per_page)
|
||||
.all()
|
||||
)
|
||||
return cases, total
|
||||
|
||||
|
||||
def generate_coding_template(
|
||||
db: Session,
|
||||
jahr: Optional[int] = None,
|
||||
fallgruppe: Optional[str] = None,
|
||||
) -> bytes:
|
||||
"""Generate an Excel template for ICD coding.
|
||||
|
||||
Returns .xlsx bytes with columns:
|
||||
Case_ID, Fall_ID, Nachname, Vorname, Fallgruppe, Datum, ICD (empty)
|
||||
"""
|
||||
cases, _ = get_pending_icd_cases(
|
||||
db, jahr=jahr, fallgruppe=fallgruppe, page=1, per_page=10000
|
||||
)
|
||||
|
||||
wb = Workbook()
|
||||
ws = wb.active
|
||||
ws.title = "ICD Coding"
|
||||
|
||||
# Header
|
||||
headers = ["Case_ID", "Fall_ID", "Nachname", "Vorname", "Fallgruppe", "Datum", "ICD"]
|
||||
for col, header in enumerate(headers, start=1):
|
||||
ws.cell(row=1, column=col, value=header)
|
||||
|
||||
# Data
|
||||
for i, case in enumerate(cases, start=2):
|
||||
ws.cell(row=i, column=1, value=case.id)
|
||||
ws.cell(row=i, column=2, value=case.fall_id)
|
||||
ws.cell(row=i, column=3, value=case.nachname)
|
||||
ws.cell(row=i, column=4, value=case.vorname)
|
||||
ws.cell(row=i, column=5, value=case.fallgruppe)
|
||||
ws.cell(row=i, column=6, value=case.datum.isoformat() if case.datum else "")
|
||||
# Column 7 (ICD) left empty for DAK to fill in
|
||||
|
||||
# Auto-width
|
||||
for col in ws.columns:
|
||||
max_length = max(len(str(cell.value or "")) for cell in col)
|
||||
ws.column_dimensions[col[0].column_letter].width = min(max_length + 2, 30)
|
||||
|
||||
buffer = BytesIO()
|
||||
wb.save(buffer)
|
||||
return buffer.getvalue()
|
||||
|
||||
|
||||
def import_icd_from_xlsx(db: Session, content: bytes, user_id: int) -> dict:
|
||||
"""Import ICD codes from a filled-in coding template Excel file.
|
||||
|
||||
Expects columns: Case_ID (col 1), ICD (col 7 or last col)
|
||||
Returns: {"updated": int, "errors": list[str]}
|
||||
"""
|
||||
wb = load_workbook(BytesIO(content), read_only=True)
|
||||
ws = wb.active
|
||||
|
||||
updated = 0
|
||||
errors: list[str] = []
|
||||
|
||||
for row in ws.iter_rows(min_row=2, values_only=False):
|
||||
case_id_cell = row[0].value
|
||||
if not case_id_cell:
|
||||
continue
|
||||
|
||||
try:
|
||||
case_id = int(case_id_cell)
|
||||
except (ValueError, TypeError):
|
||||
continue
|
||||
|
||||
# Find ICD column (column 7)
|
||||
icd_value = None
|
||||
if len(row) >= 7 and row[6].value:
|
||||
icd_value = str(row[6].value).strip()
|
||||
|
||||
if not icd_value:
|
||||
continue
|
||||
|
||||
try:
|
||||
save_icd_for_case(db, case_id, icd_value, user_id)
|
||||
updated += 1
|
||||
except Exception as e:
|
||||
errors.append(f"Case {case_id}: {e}")
|
||||
|
||||
return {"updated": updated, "errors": errors}
|
||||
96
backend/tests/test_icd_service.py
Normal file
96
backend/tests/test_icd_service.py
Normal file
|
|
@ -0,0 +1,96 @@
|
|||
"""Tests for the ICD service — normalize, validate, coding template."""
|
||||
|
||||
from io import BytesIO
|
||||
from unittest.mock import MagicMock, PropertyMock
|
||||
|
||||
import pytest
|
||||
from openpyxl import load_workbook
|
||||
|
||||
from app.services.icd_service import generate_coding_template, normalize_and_validate_icd
|
||||
|
||||
|
||||
# ── normalize_and_validate_icd ─────────────────────────────────────
|
||||
|
||||
|
||||
class TestNormalizeAndValidateICD:
|
||||
def test_normalize_and_validate_single(self):
|
||||
result = normalize_and_validate_icd("C50.1")
|
||||
assert result == [("C50.1", "C50")]
|
||||
|
||||
def test_normalize_and_validate_multiple(self):
|
||||
result = normalize_and_validate_icd("C50.1, C79.5")
|
||||
assert result == [("C50.1", "C50"), ("C79.5", "C79")]
|
||||
|
||||
def test_normalize_and_validate_semicolon(self):
|
||||
result = normalize_and_validate_icd("c50;D12.3")
|
||||
assert result == [("C50", "C50"), ("D12.3", "D12")]
|
||||
|
||||
def test_normalize_and_validate_invalid(self):
|
||||
with pytest.raises(ValueError, match="Invalid ICD code format"):
|
||||
normalize_and_validate_icd("XYZ")
|
||||
|
||||
def test_normalize_and_validate_empty(self):
|
||||
result = normalize_and_validate_icd("")
|
||||
assert result == []
|
||||
|
||||
|
||||
# ── generate_coding_template ──────────────────────────────────────
|
||||
|
||||
|
||||
class TestGenerateCodingTemplate:
|
||||
def test_generate_coding_template_returns_bytes(self):
|
||||
"""Mock the DB session and verify the template is valid xlsx bytes."""
|
||||
from datetime import date
|
||||
|
||||
# Create mock cases
|
||||
mock_case = MagicMock()
|
||||
mock_case.id = 1
|
||||
mock_case.fall_id = "FALL-001"
|
||||
mock_case.nachname = "Mustermann"
|
||||
mock_case.vorname = "Max"
|
||||
mock_case.fallgruppe = "onko"
|
||||
mock_case.datum = date(2026, 2, 24)
|
||||
|
||||
# Build a mock query chain
|
||||
mock_db = MagicMock(spec=["query"])
|
||||
mock_query = MagicMock()
|
||||
mock_db.query.return_value = mock_query
|
||||
mock_query.filter.return_value = mock_query
|
||||
mock_query.count.return_value = 1
|
||||
mock_query.order_by.return_value = mock_query
|
||||
mock_query.offset.return_value = mock_query
|
||||
mock_query.limit.return_value = mock_query
|
||||
mock_query.all.return_value = [mock_case]
|
||||
|
||||
result = generate_coding_template(mock_db, jahr=2026, fallgruppe="onko")
|
||||
|
||||
# Verify it is non-empty bytes
|
||||
assert isinstance(result, bytes)
|
||||
assert len(result) > 0
|
||||
|
||||
# Verify it is valid xlsx
|
||||
wb = load_workbook(BytesIO(result))
|
||||
ws = wb.active
|
||||
assert ws.title == "ICD Coding"
|
||||
|
||||
# Verify header row
|
||||
headers = [cell.value for cell in ws[1]]
|
||||
assert headers == [
|
||||
"Case_ID",
|
||||
"Fall_ID",
|
||||
"Nachname",
|
||||
"Vorname",
|
||||
"Fallgruppe",
|
||||
"Datum",
|
||||
"ICD",
|
||||
]
|
||||
|
||||
# Verify data row
|
||||
row2 = [cell.value for cell in ws[2]]
|
||||
assert row2[0] == 1
|
||||
assert row2[1] == "FALL-001"
|
||||
assert row2[2] == "Mustermann"
|
||||
assert row2[3] == "Max"
|
||||
assert row2[4] == "onko"
|
||||
assert row2[5] == "2026-02-24"
|
||||
assert row2[6] is None # ICD column should be empty
|
||||
Loading…
Reference in a new issue