feat: import service with duplicate detection and fall_id generation

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
CCS Admin 2026-02-24 07:49:15 +00:00
parent 498cb7048d
commit 78c2c682a4
3 changed files with 537 additions and 0 deletions

View file

@ -0,0 +1,40 @@
"""Pydantic schemas for CSV import preview/confirm flow."""
from datetime import date
from typing import Optional
from pydantic import BaseModel
class ImportRow(BaseModel):
"""Single row in import preview."""
row_number: int
nachname: str
vorname: Optional[str] = None
geburtsdatum: Optional[date] = None
kvnr: Optional[str] = None
fallgruppe: str
datum: date
is_duplicate: bool = False
fall_id: Optional[str] = None
class ImportPreview(BaseModel):
"""Preview of CSV import before confirmation."""
filename: str
total_rows: int
new_cases: int
duplicates: int
errors: list[str] = []
rows: list[ImportRow] = []
class ImportResult(BaseModel):
"""Result after confirming import."""
imported: int
skipped: int
updated: int
errors: list[str] = []

View file

@ -0,0 +1,182 @@
"""Import service for DAK Zweitmeinungs-Portal.
Handles:
- fall_id generation: YYYY-KW02d-fallgruppe-Nachname
- Duplicate detection: by fall_id or (nachname, fallgruppe, datum, vorname, geburtsdatum)
- Preview/confirm flow: preview_import() checks for duplicates, confirm_import() inserts
- Import logging: writes ImportLog entry on each confirmed import
"""
import logging
from sqlalchemy.orm import Session
from app.models.audit import ImportLog
from app.models.case import Case
from app.schemas.import_schemas import ImportPreview, ImportResult, ImportRow
from app.services.csv_parser import ParsedCase
logger = logging.getLogger(__name__)
def generate_fall_id(parsed: ParsedCase) -> str:
"""Generate unique fall_id: YYYY-KW02d-fallgruppe-Nachname.
Examples:
- 2026-06-onko-Tonn
- 2026-12-kardio-Mueller
- 2026-06-intensiv-Daum
"""
return f"{parsed.jahr}-{parsed.kw:02d}-{parsed.fallgruppe}-{parsed.nachname}"
def check_duplicate(db: Session, parsed: ParsedCase) -> bool:
"""Check if a case already exists in the database.
Match criteria (OR):
1. Exact fall_id match
2. Personal data match: (nachname, fallgruppe, datum) plus optional
(vorname, geburtsdatum) when available
"""
fall_id = generate_fall_id(parsed)
# Check by fall_id
existing = db.query(Case).filter(Case.fall_id == fall_id).first()
if existing:
return True
# Check by personal data combination
query = db.query(Case).filter(
Case.nachname == parsed.nachname,
Case.fallgruppe == parsed.fallgruppe,
Case.datum == parsed.datum,
)
if parsed.vorname:
query = query.filter(Case.vorname == parsed.vorname)
if parsed.geburtsdatum:
query = query.filter(Case.geburtsdatum == parsed.geburtsdatum)
return query.first() is not None
def preview_import(
db: Session,
parsed_cases: list[ParsedCase],
filename: str,
) -> ImportPreview:
"""Check parsed cases against DB for duplicates, return preview.
This is the first step of the two-phase import flow. It does NOT
modify the database -- only reads to detect duplicates.
"""
rows: list[ImportRow] = []
duplicates = 0
for i, pc in enumerate(parsed_cases, start=1):
is_dup = check_duplicate(db, pc)
if is_dup:
duplicates += 1
rows.append(
ImportRow(
row_number=i,
nachname=pc.nachname,
vorname=pc.vorname,
geburtsdatum=pc.geburtsdatum,
kvnr=pc.kvnr,
fallgruppe=pc.fallgruppe,
datum=pc.datum,
is_duplicate=is_dup,
fall_id=generate_fall_id(pc),
)
)
logger.info(
"Import preview for '%s': %d total, %d new, %d duplicates",
filename,
len(parsed_cases),
len(parsed_cases) - duplicates,
duplicates,
)
return ImportPreview(
filename=filename,
total_rows=len(parsed_cases),
new_cases=len(parsed_cases) - duplicates,
duplicates=duplicates,
rows=rows,
)
def confirm_import(
db: Session,
parsed_cases: list[ParsedCase],
filename: str,
user_id: int | None = None,
) -> ImportResult:
"""Insert non-duplicate cases into the database.
This is the second step of the two-phase import flow. It:
1. Re-checks each case for duplicates (in case of concurrent imports)
2. Creates Case rows for new cases
3. Logs the import in ImportLog
4. Commits the transaction
"""
imported = 0
skipped = 0
errors: list[str] = []
for pc in parsed_cases:
try:
if check_duplicate(db, pc):
skipped += 1
continue
case = Case(
fall_id=generate_fall_id(pc),
crm_ticket_id=pc.crm_ticket_id,
jahr=pc.jahr,
kw=pc.kw,
datum=pc.datum,
nachname=pc.nachname,
vorname=pc.vorname,
geburtsdatum=pc.geburtsdatum,
kvnr=pc.kvnr,
fallgruppe=pc.fallgruppe,
import_source=filename,
)
db.add(case)
imported += 1
except Exception as e:
errors.append(f"{pc.nachname}: {e}")
logger.warning("Import error for case %s: %s", pc.nachname, e)
# Flush to detect any DB-level constraint violations before logging
db.flush()
# Log the import
log = ImportLog(
filename=filename,
import_type="csv_crm",
cases_imported=imported,
cases_skipped=skipped,
cases_updated=0,
errors="; ".join(errors) if errors else None,
imported_by=user_id,
)
db.add(log)
db.commit()
logger.info(
"Import confirmed for '%s': %d imported, %d skipped, %d errors",
filename,
imported,
skipped,
len(errors),
)
return ImportResult(
imported=imported,
skipped=skipped,
updated=0,
errors=errors,
)

View file

@ -0,0 +1,315 @@
"""Tests for import service: fall_id generation and schema validation.
Unit tests that do NOT require a database connection.
DB-dependent tests (duplicate detection, preview, confirm) are marked
with pytest.mark.skip and documented for future integration testing.
"""
from datetime import date
from unittest.mock import MagicMock, patch
import pytest
from app.schemas.import_schemas import ImportPreview, ImportResult, ImportRow
from app.services.csv_parser import ParsedCase
from app.services.import_service import (
check_duplicate,
confirm_import,
generate_fall_id,
preview_import,
)
# ── Helpers ──────────────────────────────────────────────────────────────
def _make_parsed_case(
nachname: str = "Tonn",
vorname: str | None = "Regina",
geburtsdatum: date | None = date(1960, 4, 28),
kvnr: str | None = "D410126355",
fallgruppe: str = "kardio",
datum: date = date(2026, 2, 2),
jahr: int = 2026,
kw: int = 6,
crm_ticket_id: str | None = "103486",
thema: str = "Zweitmeinung",
) -> ParsedCase:
"""Create a ParsedCase with sensible defaults for testing."""
return ParsedCase(
nachname=nachname,
vorname=vorname,
geburtsdatum=geburtsdatum,
kvnr=kvnr,
thema=thema,
fallgruppe=fallgruppe,
datum=datum,
jahr=jahr,
kw=kw,
crm_ticket_id=crm_ticket_id,
)
# ── generate_fall_id tests ──────────────────────────────────────────────
class TestGenerateFallId:
def test_format(self):
"""fall_id matches YYYY-KW-fallgruppe-Nachname format."""
pc = _make_parsed_case(nachname="Tonn", fallgruppe="onko", jahr=2026, kw=6)
result = generate_fall_id(pc)
assert result == "2026-06-onko-Tonn"
def test_kw_padding_single_digit(self):
"""KW < 10 is zero-padded to 2 digits."""
pc = _make_parsed_case(kw=6)
result = generate_fall_id(pc)
assert "-06-" in result
def test_kw_padding_double_digit(self):
"""KW >= 10 stays as-is (no extra padding)."""
pc = _make_parsed_case(kw=12)
result = generate_fall_id(pc)
assert "-12-" in result
def test_kw_padding_kw1(self):
"""KW 1 is zero-padded to 01."""
pc = _make_parsed_case(kw=1)
result = generate_fall_id(pc)
assert "-01-" in result
def test_different_cases_produce_different_ids(self):
"""Different patients/fallgruppen produce unique fall_ids."""
pc1 = _make_parsed_case(nachname="Tonn", fallgruppe="onko")
pc2 = _make_parsed_case(nachname="Daum", fallgruppe="intensiv")
pc3 = _make_parsed_case(nachname="Tonn", fallgruppe="kardio")
ids = {generate_fall_id(pc1), generate_fall_id(pc2), generate_fall_id(pc3)}
assert len(ids) == 3
def test_same_patient_same_week_same_fallgruppe(self):
"""Same patient in same week and fallgruppe produces same fall_id."""
pc1 = _make_parsed_case(nachname="Mueller", fallgruppe="onko", kw=8)
pc2 = _make_parsed_case(nachname="Mueller", fallgruppe="onko", kw=8)
assert generate_fall_id(pc1) == generate_fall_id(pc2)
def test_umlauts_preserved(self):
"""German umlauts in Nachname are preserved in fall_id."""
pc = _make_parsed_case(nachname="Krölls", fallgruppe="onko")
result = generate_fall_id(pc)
assert "Krölls" in result
def test_hyphenated_name(self):
"""Hyphenated names are preserved in fall_id."""
pc = _make_parsed_case(nachname="Hähle-Jakelski", fallgruppe="sd")
result = generate_fall_id(pc)
assert "Hähle-Jakelski" in result
def test_all_fallgruppen(self):
"""fall_id works for all valid fallgruppen."""
for fg in ["onko", "kardio", "intensiv", "galle", "sd"]:
pc = _make_parsed_case(nachname="Test", fallgruppe=fg, kw=10)
result = generate_fall_id(pc)
assert f"-{fg}-" in result
def test_year_boundary(self):
"""fall_id uses ISO year (from ParsedCase.jahr), not calendar year."""
# ISO week 1 of 2027 might start in Dec 2026
pc = _make_parsed_case(jahr=2027, kw=1)
result = generate_fall_id(pc)
assert result.startswith("2027-")
# ── ImportRow schema tests ──────────────────────────────────────────────
class TestImportRowSchema:
def test_minimal_row(self):
"""ImportRow can be created with only required fields."""
row = ImportRow(
row_number=1,
nachname="Tonn",
fallgruppe="onko",
datum=date(2026, 2, 2),
)
assert row.vorname is None
assert row.geburtsdatum is None
assert row.kvnr is None
assert row.is_duplicate is False
assert row.fall_id is None
def test_full_row(self):
"""ImportRow with all fields populated."""
row = ImportRow(
row_number=1,
nachname="Tonn",
vorname="Regina",
geburtsdatum=date(1960, 4, 28),
kvnr="D410126355",
fallgruppe="kardio",
datum=date(2026, 2, 2),
is_duplicate=True,
fall_id="2026-06-kardio-Tonn",
)
assert row.is_duplicate is True
assert row.fall_id == "2026-06-kardio-Tonn"
# ── ImportPreview schema tests ──────────────────────────────────────────
class TestImportPreviewSchema:
def test_empty_preview(self):
"""ImportPreview for empty CSV."""
preview = ImportPreview(
filename="test.csv",
total_rows=0,
new_cases=0,
duplicates=0,
)
assert preview.rows == []
assert preview.errors == []
def test_preview_counts(self):
"""ImportPreview counts are consistent."""
preview = ImportPreview(
filename="test.csv",
total_rows=10,
new_cases=7,
duplicates=3,
)
assert preview.total_rows == preview.new_cases + preview.duplicates
# ── ImportResult schema tests ──────────────────────────────────────────
class TestImportResultSchema:
def test_clean_result(self):
"""ImportResult with no errors."""
result = ImportResult(imported=5, skipped=2, updated=0)
assert result.errors == []
def test_result_with_errors(self):
"""ImportResult with error messages."""
result = ImportResult(
imported=3,
skipped=1,
updated=0,
errors=["Tonn: some error", "Daum: other error"],
)
assert len(result.errors) == 2
# ── check_duplicate with mocked DB ─────────────────────────────────────
class TestCheckDuplicateMocked:
def _mock_db_no_results(self):
"""Create a mock DB session that returns no results."""
db = MagicMock()
query = MagicMock()
db.query.return_value = query
query.filter.return_value = query
query.first.return_value = None
return db
def _mock_db_fall_id_match(self):
"""Create a mock DB session that finds a fall_id match."""
db = MagicMock()
query = MagicMock()
db.query.return_value = query
query.filter.return_value = query
# First call to .first() (fall_id check) returns a result
query.first.return_value = MagicMock()
return db
def test_no_duplicate(self):
"""Returns False when no matching case found."""
db = self._mock_db_no_results()
pc = _make_parsed_case()
assert check_duplicate(db, pc) is False
def test_fall_id_duplicate(self):
"""Returns True when fall_id already exists."""
db = self._mock_db_fall_id_match()
pc = _make_parsed_case()
assert check_duplicate(db, pc) is True
# ── preview_import with mocked DB ──────────────────────────────────────
class TestPreviewImportMocked:
def test_preview_all_new(self):
"""Preview with no duplicates in DB."""
db = MagicMock()
query = MagicMock()
db.query.return_value = query
query.filter.return_value = query
query.first.return_value = None
cases = [
_make_parsed_case(nachname="Tonn"),
_make_parsed_case(nachname="Daum", fallgruppe="intensiv"),
]
result = preview_import(db, cases, "test.csv")
assert result.total_rows == 2
assert result.new_cases == 2
assert result.duplicates == 0
assert len(result.rows) == 2
assert result.rows[0].fall_id is not None
assert result.rows[1].fall_id is not None
assert not result.rows[0].is_duplicate
assert not result.rows[1].is_duplicate
def test_preview_with_duplicates(self):
"""Preview where some cases are duplicates."""
db = MagicMock()
query = MagicMock()
db.query.return_value = query
query.filter.return_value = query
# First case: fall_id match (duplicate), second case: no match
query.first.side_effect = [MagicMock(), None, None]
cases = [
_make_parsed_case(nachname="Tonn"),
_make_parsed_case(nachname="Daum", fallgruppe="intensiv"),
]
result = preview_import(db, cases, "test.csv")
assert result.total_rows == 2
assert result.new_cases == 1
assert result.duplicates == 1
assert result.rows[0].is_duplicate is True
assert result.rows[1].is_duplicate is False
def test_preview_empty(self):
"""Preview with empty list of cases."""
db = MagicMock()
result = preview_import(db, [], "empty.csv")
assert result.total_rows == 0
assert result.new_cases == 0
assert result.duplicates == 0
assert result.rows == []
def test_preview_row_numbers_start_at_one(self):
"""Row numbers in preview are 1-indexed."""
db = MagicMock()
query = MagicMock()
db.query.return_value = query
query.filter.return_value = query
query.first.return_value = None
cases = [
_make_parsed_case(nachname="A"),
_make_parsed_case(nachname="B"),
_make_parsed_case(nachname="C"),
]
result = preview_import(db, cases, "test.csv")
assert result.rows[0].row_number == 1
assert result.rows[1].row_number == 2
assert result.rows[2].row_number == 3