diff --git a/backend/app/schemas/import_schemas.py b/backend/app/schemas/import_schemas.py new file mode 100644 index 0000000..3afa0d4 --- /dev/null +++ b/backend/app/schemas/import_schemas.py @@ -0,0 +1,40 @@ +"""Pydantic schemas for CSV import preview/confirm flow.""" + +from datetime import date +from typing import Optional + +from pydantic import BaseModel + + +class ImportRow(BaseModel): + """Single row in import preview.""" + + row_number: int + nachname: str + vorname: Optional[str] = None + geburtsdatum: Optional[date] = None + kvnr: Optional[str] = None + fallgruppe: str + datum: date + is_duplicate: bool = False + fall_id: Optional[str] = None + + +class ImportPreview(BaseModel): + """Preview of CSV import before confirmation.""" + + filename: str + total_rows: int + new_cases: int + duplicates: int + errors: list[str] = [] + rows: list[ImportRow] = [] + + +class ImportResult(BaseModel): + """Result after confirming import.""" + + imported: int + skipped: int + updated: int + errors: list[str] = [] diff --git a/backend/app/services/import_service.py b/backend/app/services/import_service.py new file mode 100644 index 0000000..5d4d197 --- /dev/null +++ b/backend/app/services/import_service.py @@ -0,0 +1,182 @@ +"""Import service for DAK Zweitmeinungs-Portal. + +Handles: +- fall_id generation: YYYY-KW02d-fallgruppe-Nachname +- Duplicate detection: by fall_id or (nachname, fallgruppe, datum, vorname, geburtsdatum) +- Preview/confirm flow: preview_import() checks for duplicates, confirm_import() inserts +- Import logging: writes ImportLog entry on each confirmed import +""" + +import logging + +from sqlalchemy.orm import Session + +from app.models.audit import ImportLog +from app.models.case import Case +from app.schemas.import_schemas import ImportPreview, ImportResult, ImportRow +from app.services.csv_parser import ParsedCase + +logger = logging.getLogger(__name__) + + +def generate_fall_id(parsed: ParsedCase) -> str: + """Generate unique fall_id: YYYY-KW02d-fallgruppe-Nachname. + + Examples: + - 2026-06-onko-Tonn + - 2026-12-kardio-Mueller + - 2026-06-intensiv-Daum + """ + return f"{parsed.jahr}-{parsed.kw:02d}-{parsed.fallgruppe}-{parsed.nachname}" + + +def check_duplicate(db: Session, parsed: ParsedCase) -> bool: + """Check if a case already exists in the database. + + Match criteria (OR): + 1. Exact fall_id match + 2. Personal data match: (nachname, fallgruppe, datum) plus optional + (vorname, geburtsdatum) when available + """ + fall_id = generate_fall_id(parsed) + + # Check by fall_id + existing = db.query(Case).filter(Case.fall_id == fall_id).first() + if existing: + return True + + # Check by personal data combination + query = db.query(Case).filter( + Case.nachname == parsed.nachname, + Case.fallgruppe == parsed.fallgruppe, + Case.datum == parsed.datum, + ) + if parsed.vorname: + query = query.filter(Case.vorname == parsed.vorname) + if parsed.geburtsdatum: + query = query.filter(Case.geburtsdatum == parsed.geburtsdatum) + + return query.first() is not None + + +def preview_import( + db: Session, + parsed_cases: list[ParsedCase], + filename: str, +) -> ImportPreview: + """Check parsed cases against DB for duplicates, return preview. + + This is the first step of the two-phase import flow. It does NOT + modify the database -- only reads to detect duplicates. + """ + rows: list[ImportRow] = [] + duplicates = 0 + + for i, pc in enumerate(parsed_cases, start=1): + is_dup = check_duplicate(db, pc) + if is_dup: + duplicates += 1 + rows.append( + ImportRow( + row_number=i, + nachname=pc.nachname, + vorname=pc.vorname, + geburtsdatum=pc.geburtsdatum, + kvnr=pc.kvnr, + fallgruppe=pc.fallgruppe, + datum=pc.datum, + is_duplicate=is_dup, + fall_id=generate_fall_id(pc), + ) + ) + + logger.info( + "Import preview for '%s': %d total, %d new, %d duplicates", + filename, + len(parsed_cases), + len(parsed_cases) - duplicates, + duplicates, + ) + + return ImportPreview( + filename=filename, + total_rows=len(parsed_cases), + new_cases=len(parsed_cases) - duplicates, + duplicates=duplicates, + rows=rows, + ) + + +def confirm_import( + db: Session, + parsed_cases: list[ParsedCase], + filename: str, + user_id: int | None = None, +) -> ImportResult: + """Insert non-duplicate cases into the database. + + This is the second step of the two-phase import flow. It: + 1. Re-checks each case for duplicates (in case of concurrent imports) + 2. Creates Case rows for new cases + 3. Logs the import in ImportLog + 4. Commits the transaction + """ + imported = 0 + skipped = 0 + errors: list[str] = [] + + for pc in parsed_cases: + try: + if check_duplicate(db, pc): + skipped += 1 + continue + + case = Case( + fall_id=generate_fall_id(pc), + crm_ticket_id=pc.crm_ticket_id, + jahr=pc.jahr, + kw=pc.kw, + datum=pc.datum, + nachname=pc.nachname, + vorname=pc.vorname, + geburtsdatum=pc.geburtsdatum, + kvnr=pc.kvnr, + fallgruppe=pc.fallgruppe, + import_source=filename, + ) + db.add(case) + imported += 1 + except Exception as e: + errors.append(f"{pc.nachname}: {e}") + logger.warning("Import error for case %s: %s", pc.nachname, e) + + # Flush to detect any DB-level constraint violations before logging + db.flush() + + # Log the import + log = ImportLog( + filename=filename, + import_type="csv_crm", + cases_imported=imported, + cases_skipped=skipped, + cases_updated=0, + errors="; ".join(errors) if errors else None, + imported_by=user_id, + ) + db.add(log) + db.commit() + + logger.info( + "Import confirmed for '%s': %d imported, %d skipped, %d errors", + filename, + imported, + skipped, + len(errors), + ) + + return ImportResult( + imported=imported, + skipped=skipped, + updated=0, + errors=errors, + ) diff --git a/backend/tests/test_import.py b/backend/tests/test_import.py new file mode 100644 index 0000000..753b739 --- /dev/null +++ b/backend/tests/test_import.py @@ -0,0 +1,315 @@ +"""Tests for import service: fall_id generation and schema validation. + +Unit tests that do NOT require a database connection. +DB-dependent tests (duplicate detection, preview, confirm) are marked +with pytest.mark.skip and documented for future integration testing. +""" + +from datetime import date +from unittest.mock import MagicMock, patch + +import pytest + +from app.schemas.import_schemas import ImportPreview, ImportResult, ImportRow +from app.services.csv_parser import ParsedCase +from app.services.import_service import ( + check_duplicate, + confirm_import, + generate_fall_id, + preview_import, +) + + +# ── Helpers ────────────────────────────────────────────────────────────── + + +def _make_parsed_case( + nachname: str = "Tonn", + vorname: str | None = "Regina", + geburtsdatum: date | None = date(1960, 4, 28), + kvnr: str | None = "D410126355", + fallgruppe: str = "kardio", + datum: date = date(2026, 2, 2), + jahr: int = 2026, + kw: int = 6, + crm_ticket_id: str | None = "103486", + thema: str = "Zweitmeinung", +) -> ParsedCase: + """Create a ParsedCase with sensible defaults for testing.""" + return ParsedCase( + nachname=nachname, + vorname=vorname, + geburtsdatum=geburtsdatum, + kvnr=kvnr, + thema=thema, + fallgruppe=fallgruppe, + datum=datum, + jahr=jahr, + kw=kw, + crm_ticket_id=crm_ticket_id, + ) + + +# ── generate_fall_id tests ────────────────────────────────────────────── + + +class TestGenerateFallId: + def test_format(self): + """fall_id matches YYYY-KW-fallgruppe-Nachname format.""" + pc = _make_parsed_case(nachname="Tonn", fallgruppe="onko", jahr=2026, kw=6) + result = generate_fall_id(pc) + assert result == "2026-06-onko-Tonn" + + def test_kw_padding_single_digit(self): + """KW < 10 is zero-padded to 2 digits.""" + pc = _make_parsed_case(kw=6) + result = generate_fall_id(pc) + assert "-06-" in result + + def test_kw_padding_double_digit(self): + """KW >= 10 stays as-is (no extra padding).""" + pc = _make_parsed_case(kw=12) + result = generate_fall_id(pc) + assert "-12-" in result + + def test_kw_padding_kw1(self): + """KW 1 is zero-padded to 01.""" + pc = _make_parsed_case(kw=1) + result = generate_fall_id(pc) + assert "-01-" in result + + def test_different_cases_produce_different_ids(self): + """Different patients/fallgruppen produce unique fall_ids.""" + pc1 = _make_parsed_case(nachname="Tonn", fallgruppe="onko") + pc2 = _make_parsed_case(nachname="Daum", fallgruppe="intensiv") + pc3 = _make_parsed_case(nachname="Tonn", fallgruppe="kardio") + + ids = {generate_fall_id(pc1), generate_fall_id(pc2), generate_fall_id(pc3)} + assert len(ids) == 3 + + def test_same_patient_same_week_same_fallgruppe(self): + """Same patient in same week and fallgruppe produces same fall_id.""" + pc1 = _make_parsed_case(nachname="Mueller", fallgruppe="onko", kw=8) + pc2 = _make_parsed_case(nachname="Mueller", fallgruppe="onko", kw=8) + assert generate_fall_id(pc1) == generate_fall_id(pc2) + + def test_umlauts_preserved(self): + """German umlauts in Nachname are preserved in fall_id.""" + pc = _make_parsed_case(nachname="Krölls", fallgruppe="onko") + result = generate_fall_id(pc) + assert "Krölls" in result + + def test_hyphenated_name(self): + """Hyphenated names are preserved in fall_id.""" + pc = _make_parsed_case(nachname="Hähle-Jakelski", fallgruppe="sd") + result = generate_fall_id(pc) + assert "Hähle-Jakelski" in result + + def test_all_fallgruppen(self): + """fall_id works for all valid fallgruppen.""" + for fg in ["onko", "kardio", "intensiv", "galle", "sd"]: + pc = _make_parsed_case(nachname="Test", fallgruppe=fg, kw=10) + result = generate_fall_id(pc) + assert f"-{fg}-" in result + + def test_year_boundary(self): + """fall_id uses ISO year (from ParsedCase.jahr), not calendar year.""" + # ISO week 1 of 2027 might start in Dec 2026 + pc = _make_parsed_case(jahr=2027, kw=1) + result = generate_fall_id(pc) + assert result.startswith("2027-") + + +# ── ImportRow schema tests ────────────────────────────────────────────── + + +class TestImportRowSchema: + def test_minimal_row(self): + """ImportRow can be created with only required fields.""" + row = ImportRow( + row_number=1, + nachname="Tonn", + fallgruppe="onko", + datum=date(2026, 2, 2), + ) + assert row.vorname is None + assert row.geburtsdatum is None + assert row.kvnr is None + assert row.is_duplicate is False + assert row.fall_id is None + + def test_full_row(self): + """ImportRow with all fields populated.""" + row = ImportRow( + row_number=1, + nachname="Tonn", + vorname="Regina", + geburtsdatum=date(1960, 4, 28), + kvnr="D410126355", + fallgruppe="kardio", + datum=date(2026, 2, 2), + is_duplicate=True, + fall_id="2026-06-kardio-Tonn", + ) + assert row.is_duplicate is True + assert row.fall_id == "2026-06-kardio-Tonn" + + +# ── ImportPreview schema tests ────────────────────────────────────────── + + +class TestImportPreviewSchema: + def test_empty_preview(self): + """ImportPreview for empty CSV.""" + preview = ImportPreview( + filename="test.csv", + total_rows=0, + new_cases=0, + duplicates=0, + ) + assert preview.rows == [] + assert preview.errors == [] + + def test_preview_counts(self): + """ImportPreview counts are consistent.""" + preview = ImportPreview( + filename="test.csv", + total_rows=10, + new_cases=7, + duplicates=3, + ) + assert preview.total_rows == preview.new_cases + preview.duplicates + + +# ── ImportResult schema tests ────────────────────────────────────────── + + +class TestImportResultSchema: + def test_clean_result(self): + """ImportResult with no errors.""" + result = ImportResult(imported=5, skipped=2, updated=0) + assert result.errors == [] + + def test_result_with_errors(self): + """ImportResult with error messages.""" + result = ImportResult( + imported=3, + skipped=1, + updated=0, + errors=["Tonn: some error", "Daum: other error"], + ) + assert len(result.errors) == 2 + + +# ── check_duplicate with mocked DB ───────────────────────────────────── + + +class TestCheckDuplicateMocked: + def _mock_db_no_results(self): + """Create a mock DB session that returns no results.""" + db = MagicMock() + query = MagicMock() + db.query.return_value = query + query.filter.return_value = query + query.first.return_value = None + return db + + def _mock_db_fall_id_match(self): + """Create a mock DB session that finds a fall_id match.""" + db = MagicMock() + query = MagicMock() + db.query.return_value = query + query.filter.return_value = query + # First call to .first() (fall_id check) returns a result + query.first.return_value = MagicMock() + return db + + def test_no_duplicate(self): + """Returns False when no matching case found.""" + db = self._mock_db_no_results() + pc = _make_parsed_case() + assert check_duplicate(db, pc) is False + + def test_fall_id_duplicate(self): + """Returns True when fall_id already exists.""" + db = self._mock_db_fall_id_match() + pc = _make_parsed_case() + assert check_duplicate(db, pc) is True + + +# ── preview_import with mocked DB ────────────────────────────────────── + + +class TestPreviewImportMocked: + def test_preview_all_new(self): + """Preview with no duplicates in DB.""" + db = MagicMock() + query = MagicMock() + db.query.return_value = query + query.filter.return_value = query + query.first.return_value = None + + cases = [ + _make_parsed_case(nachname="Tonn"), + _make_parsed_case(nachname="Daum", fallgruppe="intensiv"), + ] + result = preview_import(db, cases, "test.csv") + + assert result.total_rows == 2 + assert result.new_cases == 2 + assert result.duplicates == 0 + assert len(result.rows) == 2 + assert result.rows[0].fall_id is not None + assert result.rows[1].fall_id is not None + assert not result.rows[0].is_duplicate + assert not result.rows[1].is_duplicate + + def test_preview_with_duplicates(self): + """Preview where some cases are duplicates.""" + db = MagicMock() + query = MagicMock() + db.query.return_value = query + query.filter.return_value = query + + # First case: fall_id match (duplicate), second case: no match + query.first.side_effect = [MagicMock(), None, None] + + cases = [ + _make_parsed_case(nachname="Tonn"), + _make_parsed_case(nachname="Daum", fallgruppe="intensiv"), + ] + result = preview_import(db, cases, "test.csv") + + assert result.total_rows == 2 + assert result.new_cases == 1 + assert result.duplicates == 1 + assert result.rows[0].is_duplicate is True + assert result.rows[1].is_duplicate is False + + def test_preview_empty(self): + """Preview with empty list of cases.""" + db = MagicMock() + result = preview_import(db, [], "empty.csv") + assert result.total_rows == 0 + assert result.new_cases == 0 + assert result.duplicates == 0 + assert result.rows == [] + + def test_preview_row_numbers_start_at_one(self): + """Row numbers in preview are 1-indexed.""" + db = MagicMock() + query = MagicMock() + db.query.return_value = query + query.filter.return_value = query + query.first.return_value = None + + cases = [ + _make_parsed_case(nachname="A"), + _make_parsed_case(nachname="B"), + _make_parsed_case(nachname="C"), + ] + result = preview_import(db, cases, "test.csv") + + assert result.rows[0].row_number == 1 + assert result.rows[1].row_number == 2 + assert result.rows[2].row_number == 3