mirror of
https://github.com/complexcaresolutions/dak.c2s.git
synced 2026-03-17 18:23:42 +00:00
356 lines
12 KiB
Python
356 lines
12 KiB
Python
"""Tests for import service: fall_id generation and schema validation.
|
|
|
|
Unit tests that do NOT require a database connection.
|
|
DB-dependent tests (duplicate detection, preview, confirm) are marked
|
|
with pytest.mark.skip and documented for future integration testing.
|
|
"""
|
|
|
|
from datetime import date
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from app.schemas.import_schemas import ImportPreview, ImportResult, ImportRow
|
|
from app.services.csv_parser import ParsedCase
|
|
from app.services.import_service import (
|
|
check_duplicate,
|
|
confirm_import,
|
|
generate_fall_id,
|
|
generate_random_suffix,
|
|
preview_import,
|
|
)
|
|
|
|
|
|
# ── Helpers ──────────────────────────────────────────────────────────────
|
|
|
|
|
|
def _make_parsed_case(
|
|
nachname: str = "Tonn",
|
|
vorname: str | None = "Regina",
|
|
geburtsdatum: date | None = date(1960, 4, 28),
|
|
kvnr: str | None = "D410126355",
|
|
fallgruppe: str = "kardio",
|
|
datum: date = date(2026, 2, 2),
|
|
jahr: int = 2026,
|
|
kw: int = 6,
|
|
crm_ticket_id: str | None = "103486",
|
|
thema: str = "Zweitmeinung",
|
|
) -> ParsedCase:
|
|
"""Create a ParsedCase with sensible defaults for testing."""
|
|
return ParsedCase(
|
|
nachname=nachname,
|
|
vorname=vorname,
|
|
geburtsdatum=geburtsdatum,
|
|
kvnr=kvnr,
|
|
thema=thema,
|
|
fallgruppe=fallgruppe,
|
|
datum=datum,
|
|
jahr=jahr,
|
|
kw=kw,
|
|
crm_ticket_id=crm_ticket_id,
|
|
)
|
|
|
|
|
|
# ── generate_fall_id tests ──────────────────────────────────────────────
|
|
|
|
|
|
class TestGenerateFallId:
|
|
def test_format(self):
|
|
"""fall_id matches YYYY-KW-fallgruppe-KVNR format."""
|
|
pc = _make_parsed_case(nachname="Tonn", kvnr="D410126355", fallgruppe="onko", jahr=2026, kw=6)
|
|
result = generate_fall_id(pc)
|
|
assert result == "2026-06-onko-D410126355"
|
|
|
|
def test_kw_padding_single_digit(self):
|
|
"""KW < 10 is zero-padded to 2 digits."""
|
|
pc = _make_parsed_case(kw=6)
|
|
result = generate_fall_id(pc)
|
|
assert "-06-" in result
|
|
|
|
def test_kw_padding_double_digit(self):
|
|
"""KW >= 10 stays as-is (no extra padding)."""
|
|
pc = _make_parsed_case(kw=12)
|
|
result = generate_fall_id(pc)
|
|
assert "-12-" in result
|
|
|
|
def test_kw_padding_kw1(self):
|
|
"""KW 1 is zero-padded to 01."""
|
|
pc = _make_parsed_case(kw=1)
|
|
result = generate_fall_id(pc)
|
|
assert "-01-" in result
|
|
|
|
def test_different_cases_produce_different_ids(self):
|
|
"""Different KVNRs/fallgruppen produce unique fall_ids."""
|
|
pc1 = _make_parsed_case(kvnr="A111111111", fallgruppe="onko")
|
|
pc2 = _make_parsed_case(kvnr="B222222222", fallgruppe="intensiv")
|
|
pc3 = _make_parsed_case(kvnr="A111111111", fallgruppe="kardio")
|
|
ids = {generate_fall_id(pc1), generate_fall_id(pc2), generate_fall_id(pc3)}
|
|
assert len(ids) == 3
|
|
|
|
def test_same_patient_same_week_same_fallgruppe(self):
|
|
"""Same KVNR in same week and fallgruppe produces same fall_id."""
|
|
pc1 = _make_parsed_case(kvnr="A111111111", fallgruppe="onko", kw=8)
|
|
pc2 = _make_parsed_case(kvnr="A111111111", fallgruppe="onko", kw=8)
|
|
assert generate_fall_id(pc1) == generate_fall_id(pc2)
|
|
|
|
def test_random_suffix_when_no_kvnr(self):
|
|
"""fall_id uses 6-char random suffix when KVNR is missing."""
|
|
pc = _make_parsed_case(kvnr=None, fallgruppe="onko", jahr=2026, kw=6)
|
|
result = generate_fall_id(pc)
|
|
parts = result.split("-")
|
|
assert parts[0] == "2026"
|
|
assert parts[1] == "06"
|
|
assert parts[2] == "onko"
|
|
suffix = parts[3]
|
|
assert len(suffix) == 6
|
|
assert suffix.isalnum()
|
|
assert suffix == suffix.upper()
|
|
|
|
def test_random_suffix_when_empty_kvnr(self):
|
|
"""fall_id uses random suffix when KVNR is empty string."""
|
|
pc = _make_parsed_case(kvnr="", fallgruppe="onko", jahr=2026, kw=6)
|
|
result = generate_fall_id(pc)
|
|
parts = result.split("-")
|
|
suffix = parts[3]
|
|
assert len(suffix) == 6
|
|
assert suffix.isalnum()
|
|
|
|
def test_all_fallgruppen(self):
|
|
"""fall_id works for all valid fallgruppen."""
|
|
for fg in ["onko", "kardio", "intensiv", "galle", "sd"]:
|
|
pc = _make_parsed_case(nachname="Test", fallgruppe=fg, kw=10)
|
|
result = generate_fall_id(pc)
|
|
assert f"-{fg}-" in result
|
|
|
|
def test_year_boundary(self):
|
|
"""fall_id uses ISO year (from ParsedCase.jahr), not calendar year."""
|
|
# ISO week 1 of 2027 might start in Dec 2026
|
|
pc = _make_parsed_case(jahr=2027, kw=1)
|
|
result = generate_fall_id(pc)
|
|
assert result.startswith("2027-")
|
|
|
|
|
|
# ── generate_random_suffix tests ───────────────────────────────────────
|
|
|
|
|
|
class TestGenerateRandomSuffix:
|
|
def test_length(self):
|
|
assert len(generate_random_suffix()) == 6
|
|
|
|
def test_charset(self):
|
|
"""Only uppercase letters and digits."""
|
|
import re
|
|
for _ in range(50):
|
|
s = generate_random_suffix()
|
|
assert re.match(r'^[A-Z0-9]{6}$', s)
|
|
|
|
def test_uniqueness(self):
|
|
"""Different calls produce different suffixes."""
|
|
suffixes = {generate_random_suffix() for _ in range(20)}
|
|
assert len(suffixes) >= 15
|
|
|
|
|
|
# ── ImportRow schema tests ──────────────────────────────────────────────
|
|
|
|
|
|
class TestImportRowSchema:
|
|
def test_minimal_row(self):
|
|
"""ImportRow can be created with only required fields."""
|
|
row = ImportRow(
|
|
row_number=1,
|
|
nachname="Tonn",
|
|
fallgruppe="onko",
|
|
datum=date(2026, 2, 2),
|
|
)
|
|
assert row.vorname is None
|
|
assert row.geburtsdatum is None
|
|
assert row.kvnr is None
|
|
assert row.is_duplicate is False
|
|
assert row.fall_id is None
|
|
|
|
def test_full_row(self):
|
|
"""ImportRow with all fields populated."""
|
|
row = ImportRow(
|
|
row_number=1,
|
|
nachname="Tonn",
|
|
vorname="Regina",
|
|
geburtsdatum=date(1960, 4, 28),
|
|
kvnr="D410126355",
|
|
fallgruppe="kardio",
|
|
datum=date(2026, 2, 2),
|
|
is_duplicate=True,
|
|
fall_id="2026-06-kardio-D410126355",
|
|
)
|
|
assert row.is_duplicate is True
|
|
assert row.fall_id == "2026-06-kardio-D410126355"
|
|
|
|
|
|
# ── ImportPreview schema tests ──────────────────────────────────────────
|
|
|
|
|
|
class TestImportPreviewSchema:
|
|
def test_empty_preview(self):
|
|
"""ImportPreview for empty CSV."""
|
|
preview = ImportPreview(
|
|
filename="test.csv",
|
|
total_rows=0,
|
|
new_cases=0,
|
|
duplicates=0,
|
|
)
|
|
assert preview.rows == []
|
|
assert preview.errors == []
|
|
|
|
def test_preview_counts(self):
|
|
"""ImportPreview counts are consistent."""
|
|
preview = ImportPreview(
|
|
filename="test.csv",
|
|
total_rows=10,
|
|
new_cases=7,
|
|
duplicates=3,
|
|
)
|
|
assert preview.total_rows == preview.new_cases + preview.duplicates
|
|
|
|
|
|
# ── ImportResult schema tests ──────────────────────────────────────────
|
|
|
|
|
|
class TestImportResultSchema:
|
|
def test_clean_result(self):
|
|
"""ImportResult with no errors."""
|
|
result = ImportResult(imported=5, skipped=2, updated=0)
|
|
assert result.errors == []
|
|
|
|
def test_result_with_errors(self):
|
|
"""ImportResult with error messages."""
|
|
result = ImportResult(
|
|
imported=3,
|
|
skipped=1,
|
|
updated=0,
|
|
errors=["Tonn: some error", "Daum: other error"],
|
|
)
|
|
assert len(result.errors) == 2
|
|
|
|
|
|
# ── check_duplicate with mocked DB ─────────────────────────────────────
|
|
|
|
|
|
class TestCheckDuplicateMocked:
|
|
def _mock_db_no_results(self):
|
|
"""Create a mock DB session that returns no results."""
|
|
db = MagicMock()
|
|
query = MagicMock()
|
|
db.query.return_value = query
|
|
query.filter.return_value = query
|
|
query.first.return_value = None
|
|
return db
|
|
|
|
def _mock_db_fall_id_match(self):
|
|
"""Create a mock DB session that finds a fall_id match."""
|
|
db = MagicMock()
|
|
query = MagicMock()
|
|
db.query.return_value = query
|
|
query.filter.return_value = query
|
|
# First call to .first() (fall_id check) returns a result
|
|
query.first.return_value = MagicMock()
|
|
return db
|
|
|
|
def test_no_duplicate(self):
|
|
"""Returns False when no matching case found."""
|
|
db = self._mock_db_no_results()
|
|
pc = _make_parsed_case()
|
|
assert check_duplicate(db, pc) is False
|
|
|
|
def test_fall_id_duplicate(self):
|
|
"""Returns True when fall_id already exists."""
|
|
db = self._mock_db_fall_id_match()
|
|
pc = _make_parsed_case()
|
|
assert check_duplicate(db, pc) is True
|
|
|
|
def test_duplicate_detected_by_personal_data_when_kvnr_missing(self):
|
|
"""Duplicate detected by personal data even when fall_id uses random suffix."""
|
|
db = MagicMock()
|
|
query = MagicMock()
|
|
db.query.return_value = query
|
|
query.filter.return_value = query
|
|
# First .first() (fall_id check) → no match, second .first() (personal data) → match
|
|
query.first.side_effect = [None, MagicMock()]
|
|
pc = _make_parsed_case(kvnr=None)
|
|
assert check_duplicate(db, pc) is True
|
|
|
|
|
|
# ── preview_import with mocked DB ──────────────────────────────────────
|
|
|
|
|
|
class TestPreviewImportMocked:
|
|
def test_preview_all_new(self):
|
|
"""Preview with no duplicates in DB."""
|
|
db = MagicMock()
|
|
query = MagicMock()
|
|
db.query.return_value = query
|
|
query.filter.return_value = query
|
|
query.first.return_value = None
|
|
|
|
cases = [
|
|
_make_parsed_case(nachname="Tonn"),
|
|
_make_parsed_case(nachname="Daum", fallgruppe="intensiv"),
|
|
]
|
|
result = preview_import(db, cases, "test.csv")
|
|
|
|
assert result.total_rows == 2
|
|
assert result.new_cases == 2
|
|
assert result.duplicates == 0
|
|
assert len(result.rows) == 2
|
|
assert result.rows[0].fall_id is not None
|
|
assert result.rows[1].fall_id is not None
|
|
assert not result.rows[0].is_duplicate
|
|
assert not result.rows[1].is_duplicate
|
|
|
|
def test_preview_with_duplicates(self):
|
|
"""Preview where some cases are duplicates."""
|
|
db = MagicMock()
|
|
query = MagicMock()
|
|
db.query.return_value = query
|
|
query.filter.return_value = query
|
|
|
|
# First case: fall_id match (duplicate), second case: no match
|
|
query.first.side_effect = [MagicMock(), None, None]
|
|
|
|
cases = [
|
|
_make_parsed_case(nachname="Tonn"),
|
|
_make_parsed_case(nachname="Daum", fallgruppe="intensiv"),
|
|
]
|
|
result = preview_import(db, cases, "test.csv")
|
|
|
|
assert result.total_rows == 2
|
|
assert result.new_cases == 1
|
|
assert result.duplicates == 1
|
|
assert result.rows[0].is_duplicate is True
|
|
assert result.rows[1].is_duplicate is False
|
|
|
|
def test_preview_empty(self):
|
|
"""Preview with empty list of cases."""
|
|
db = MagicMock()
|
|
result = preview_import(db, [], "empty.csv")
|
|
assert result.total_rows == 0
|
|
assert result.new_cases == 0
|
|
assert result.duplicates == 0
|
|
assert result.rows == []
|
|
|
|
def test_preview_row_numbers_start_at_one(self):
|
|
"""Row numbers in preview are 1-indexed."""
|
|
db = MagicMock()
|
|
query = MagicMock()
|
|
db.query.return_value = query
|
|
query.filter.return_value = query
|
|
query.first.return_value = None
|
|
|
|
cases = [
|
|
_make_parsed_case(nachname="A"),
|
|
_make_parsed_case(nachname="B"),
|
|
_make_parsed_case(nachname="C"),
|
|
]
|
|
result = preview_import(db, cases, "test.csv")
|
|
|
|
assert result.rows[0].row_number == 1
|
|
assert result.rows[1].row_number == 2
|
|
assert result.rows[2].row_number == 3
|