dak.c2s/backend/tests/test_import.py
CCS Admin a436580b03 feat: use KVNR instead of Nachname in fall_id generation
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-26 17:04:48 +00:00

345 lines
12 KiB
Python

"""Tests for import service: fall_id generation and schema validation.
Unit tests that do NOT require a database connection.
DB-dependent tests (duplicate detection, preview, confirm) are marked
with pytest.mark.skip and documented for future integration testing.
"""
from datetime import date
from unittest.mock import MagicMock, patch
import pytest
from app.schemas.import_schemas import ImportPreview, ImportResult, ImportRow
from app.services.csv_parser import ParsedCase
from app.services.import_service import (
check_duplicate,
confirm_import,
generate_fall_id,
generate_random_suffix,
preview_import,
)
# ── Helpers ──────────────────────────────────────────────────────────────
def _make_parsed_case(
nachname: str = "Tonn",
vorname: str | None = "Regina",
geburtsdatum: date | None = date(1960, 4, 28),
kvnr: str | None = "D410126355",
fallgruppe: str = "kardio",
datum: date = date(2026, 2, 2),
jahr: int = 2026,
kw: int = 6,
crm_ticket_id: str | None = "103486",
thema: str = "Zweitmeinung",
) -> ParsedCase:
"""Create a ParsedCase with sensible defaults for testing."""
return ParsedCase(
nachname=nachname,
vorname=vorname,
geburtsdatum=geburtsdatum,
kvnr=kvnr,
thema=thema,
fallgruppe=fallgruppe,
datum=datum,
jahr=jahr,
kw=kw,
crm_ticket_id=crm_ticket_id,
)
# ── generate_fall_id tests ──────────────────────────────────────────────
class TestGenerateFallId:
def test_format(self):
"""fall_id matches YYYY-KW-fallgruppe-KVNR format."""
pc = _make_parsed_case(nachname="Tonn", kvnr="D410126355", fallgruppe="onko", jahr=2026, kw=6)
result = generate_fall_id(pc)
assert result == "2026-06-onko-D410126355"
def test_kw_padding_single_digit(self):
"""KW < 10 is zero-padded to 2 digits."""
pc = _make_parsed_case(kw=6)
result = generate_fall_id(pc)
assert "-06-" in result
def test_kw_padding_double_digit(self):
"""KW >= 10 stays as-is (no extra padding)."""
pc = _make_parsed_case(kw=12)
result = generate_fall_id(pc)
assert "-12-" in result
def test_kw_padding_kw1(self):
"""KW 1 is zero-padded to 01."""
pc = _make_parsed_case(kw=1)
result = generate_fall_id(pc)
assert "-01-" in result
def test_different_cases_produce_different_ids(self):
"""Different KVNRs/fallgruppen produce unique fall_ids."""
pc1 = _make_parsed_case(kvnr="A111111111", fallgruppe="onko")
pc2 = _make_parsed_case(kvnr="B222222222", fallgruppe="intensiv")
pc3 = _make_parsed_case(kvnr="A111111111", fallgruppe="kardio")
ids = {generate_fall_id(pc1), generate_fall_id(pc2), generate_fall_id(pc3)}
assert len(ids) == 3
def test_same_patient_same_week_same_fallgruppe(self):
"""Same KVNR in same week and fallgruppe produces same fall_id."""
pc1 = _make_parsed_case(kvnr="A111111111", fallgruppe="onko", kw=8)
pc2 = _make_parsed_case(kvnr="A111111111", fallgruppe="onko", kw=8)
assert generate_fall_id(pc1) == generate_fall_id(pc2)
def test_random_suffix_when_no_kvnr(self):
"""fall_id uses 6-char random suffix when KVNR is missing."""
pc = _make_parsed_case(kvnr=None, fallgruppe="onko", jahr=2026, kw=6)
result = generate_fall_id(pc)
parts = result.split("-")
assert parts[0] == "2026"
assert parts[1] == "06"
assert parts[2] == "onko"
suffix = parts[3]
assert len(suffix) == 6
assert suffix.isalnum()
assert suffix == suffix.upper()
def test_random_suffix_when_empty_kvnr(self):
"""fall_id uses random suffix when KVNR is empty string."""
pc = _make_parsed_case(kvnr="", fallgruppe="onko", jahr=2026, kw=6)
result = generate_fall_id(pc)
parts = result.split("-")
suffix = parts[3]
assert len(suffix) == 6
assert suffix.isalnum()
def test_all_fallgruppen(self):
"""fall_id works for all valid fallgruppen."""
for fg in ["onko", "kardio", "intensiv", "galle", "sd"]:
pc = _make_parsed_case(nachname="Test", fallgruppe=fg, kw=10)
result = generate_fall_id(pc)
assert f"-{fg}-" in result
def test_year_boundary(self):
"""fall_id uses ISO year (from ParsedCase.jahr), not calendar year."""
# ISO week 1 of 2027 might start in Dec 2026
pc = _make_parsed_case(jahr=2027, kw=1)
result = generate_fall_id(pc)
assert result.startswith("2027-")
# ── generate_random_suffix tests ───────────────────────────────────────
class TestGenerateRandomSuffix:
def test_length(self):
assert len(generate_random_suffix()) == 6
def test_charset(self):
"""Only uppercase letters and digits."""
import re
for _ in range(50):
s = generate_random_suffix()
assert re.match(r'^[A-Z0-9]{6}$', s)
def test_uniqueness(self):
"""Different calls produce different suffixes."""
suffixes = {generate_random_suffix() for _ in range(20)}
assert len(suffixes) >= 15
# ── ImportRow schema tests ──────────────────────────────────────────────
class TestImportRowSchema:
def test_minimal_row(self):
"""ImportRow can be created with only required fields."""
row = ImportRow(
row_number=1,
nachname="Tonn",
fallgruppe="onko",
datum=date(2026, 2, 2),
)
assert row.vorname is None
assert row.geburtsdatum is None
assert row.kvnr is None
assert row.is_duplicate is False
assert row.fall_id is None
def test_full_row(self):
"""ImportRow with all fields populated."""
row = ImportRow(
row_number=1,
nachname="Tonn",
vorname="Regina",
geburtsdatum=date(1960, 4, 28),
kvnr="D410126355",
fallgruppe="kardio",
datum=date(2026, 2, 2),
is_duplicate=True,
fall_id="2026-06-kardio-D410126355",
)
assert row.is_duplicate is True
assert row.fall_id == "2026-06-kardio-D410126355"
# ── ImportPreview schema tests ──────────────────────────────────────────
class TestImportPreviewSchema:
def test_empty_preview(self):
"""ImportPreview for empty CSV."""
preview = ImportPreview(
filename="test.csv",
total_rows=0,
new_cases=0,
duplicates=0,
)
assert preview.rows == []
assert preview.errors == []
def test_preview_counts(self):
"""ImportPreview counts are consistent."""
preview = ImportPreview(
filename="test.csv",
total_rows=10,
new_cases=7,
duplicates=3,
)
assert preview.total_rows == preview.new_cases + preview.duplicates
# ── ImportResult schema tests ──────────────────────────────────────────
class TestImportResultSchema:
def test_clean_result(self):
"""ImportResult with no errors."""
result = ImportResult(imported=5, skipped=2, updated=0)
assert result.errors == []
def test_result_with_errors(self):
"""ImportResult with error messages."""
result = ImportResult(
imported=3,
skipped=1,
updated=0,
errors=["Tonn: some error", "Daum: other error"],
)
assert len(result.errors) == 2
# ── check_duplicate with mocked DB ─────────────────────────────────────
class TestCheckDuplicateMocked:
def _mock_db_no_results(self):
"""Create a mock DB session that returns no results."""
db = MagicMock()
query = MagicMock()
db.query.return_value = query
query.filter.return_value = query
query.first.return_value = None
return db
def _mock_db_fall_id_match(self):
"""Create a mock DB session that finds a fall_id match."""
db = MagicMock()
query = MagicMock()
db.query.return_value = query
query.filter.return_value = query
# First call to .first() (fall_id check) returns a result
query.first.return_value = MagicMock()
return db
def test_no_duplicate(self):
"""Returns False when no matching case found."""
db = self._mock_db_no_results()
pc = _make_parsed_case()
assert check_duplicate(db, pc) is False
def test_fall_id_duplicate(self):
"""Returns True when fall_id already exists."""
db = self._mock_db_fall_id_match()
pc = _make_parsed_case()
assert check_duplicate(db, pc) is True
# ── preview_import with mocked DB ──────────────────────────────────────
class TestPreviewImportMocked:
def test_preview_all_new(self):
"""Preview with no duplicates in DB."""
db = MagicMock()
query = MagicMock()
db.query.return_value = query
query.filter.return_value = query
query.first.return_value = None
cases = [
_make_parsed_case(nachname="Tonn"),
_make_parsed_case(nachname="Daum", fallgruppe="intensiv"),
]
result = preview_import(db, cases, "test.csv")
assert result.total_rows == 2
assert result.new_cases == 2
assert result.duplicates == 0
assert len(result.rows) == 2
assert result.rows[0].fall_id is not None
assert result.rows[1].fall_id is not None
assert not result.rows[0].is_duplicate
assert not result.rows[1].is_duplicate
def test_preview_with_duplicates(self):
"""Preview where some cases are duplicates."""
db = MagicMock()
query = MagicMock()
db.query.return_value = query
query.filter.return_value = query
# First case: fall_id match (duplicate), second case: no match
query.first.side_effect = [MagicMock(), None, None]
cases = [
_make_parsed_case(nachname="Tonn"),
_make_parsed_case(nachname="Daum", fallgruppe="intensiv"),
]
result = preview_import(db, cases, "test.csv")
assert result.total_rows == 2
assert result.new_cases == 1
assert result.duplicates == 1
assert result.rows[0].is_duplicate is True
assert result.rows[1].is_duplicate is False
def test_preview_empty(self):
"""Preview with empty list of cases."""
db = MagicMock()
result = preview_import(db, [], "empty.csv")
assert result.total_rows == 0
assert result.new_cases == 0
assert result.duplicates == 0
assert result.rows == []
def test_preview_row_numbers_start_at_one(self):
"""Row numbers in preview are 1-indexed."""
db = MagicMock()
query = MagicMock()
db.query.return_value = query
query.filter.return_value = query
query.first.return_value = None
cases = [
_make_parsed_case(nachname="A"),
_make_parsed_case(nachname="B"),
_make_parsed_case(nachname="C"),
]
result = preview_import(db, cases, "test.csv")
assert result.rows[0].row_number == 1
assert result.rows[1].row_number == 2
assert result.rows[2].row_number == 3