dak.c2s/backend/app/services/import_service.py
CCS Admin 78c2c682a4 feat: import service with duplicate detection and fall_id generation
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-24 07:49:15 +00:00

182 lines
5 KiB
Python

"""Import service for DAK Zweitmeinungs-Portal.
Handles:
- fall_id generation: YYYY-KW02d-fallgruppe-Nachname
- Duplicate detection: by fall_id or (nachname, fallgruppe, datum, vorname, geburtsdatum)
- Preview/confirm flow: preview_import() checks for duplicates, confirm_import() inserts
- Import logging: writes ImportLog entry on each confirmed import
"""
import logging
from sqlalchemy.orm import Session
from app.models.audit import ImportLog
from app.models.case import Case
from app.schemas.import_schemas import ImportPreview, ImportResult, ImportRow
from app.services.csv_parser import ParsedCase
logger = logging.getLogger(__name__)
def generate_fall_id(parsed: ParsedCase) -> str:
"""Generate unique fall_id: YYYY-KW02d-fallgruppe-Nachname.
Examples:
- 2026-06-onko-Tonn
- 2026-12-kardio-Mueller
- 2026-06-intensiv-Daum
"""
return f"{parsed.jahr}-{parsed.kw:02d}-{parsed.fallgruppe}-{parsed.nachname}"
def check_duplicate(db: Session, parsed: ParsedCase) -> bool:
"""Check if a case already exists in the database.
Match criteria (OR):
1. Exact fall_id match
2. Personal data match: (nachname, fallgruppe, datum) plus optional
(vorname, geburtsdatum) when available
"""
fall_id = generate_fall_id(parsed)
# Check by fall_id
existing = db.query(Case).filter(Case.fall_id == fall_id).first()
if existing:
return True
# Check by personal data combination
query = db.query(Case).filter(
Case.nachname == parsed.nachname,
Case.fallgruppe == parsed.fallgruppe,
Case.datum == parsed.datum,
)
if parsed.vorname:
query = query.filter(Case.vorname == parsed.vorname)
if parsed.geburtsdatum:
query = query.filter(Case.geburtsdatum == parsed.geburtsdatum)
return query.first() is not None
def preview_import(
db: Session,
parsed_cases: list[ParsedCase],
filename: str,
) -> ImportPreview:
"""Check parsed cases against DB for duplicates, return preview.
This is the first step of the two-phase import flow. It does NOT
modify the database -- only reads to detect duplicates.
"""
rows: list[ImportRow] = []
duplicates = 0
for i, pc in enumerate(parsed_cases, start=1):
is_dup = check_duplicate(db, pc)
if is_dup:
duplicates += 1
rows.append(
ImportRow(
row_number=i,
nachname=pc.nachname,
vorname=pc.vorname,
geburtsdatum=pc.geburtsdatum,
kvnr=pc.kvnr,
fallgruppe=pc.fallgruppe,
datum=pc.datum,
is_duplicate=is_dup,
fall_id=generate_fall_id(pc),
)
)
logger.info(
"Import preview for '%s': %d total, %d new, %d duplicates",
filename,
len(parsed_cases),
len(parsed_cases) - duplicates,
duplicates,
)
return ImportPreview(
filename=filename,
total_rows=len(parsed_cases),
new_cases=len(parsed_cases) - duplicates,
duplicates=duplicates,
rows=rows,
)
def confirm_import(
db: Session,
parsed_cases: list[ParsedCase],
filename: str,
user_id: int | None = None,
) -> ImportResult:
"""Insert non-duplicate cases into the database.
This is the second step of the two-phase import flow. It:
1. Re-checks each case for duplicates (in case of concurrent imports)
2. Creates Case rows for new cases
3. Logs the import in ImportLog
4. Commits the transaction
"""
imported = 0
skipped = 0
errors: list[str] = []
for pc in parsed_cases:
try:
if check_duplicate(db, pc):
skipped += 1
continue
case = Case(
fall_id=generate_fall_id(pc),
crm_ticket_id=pc.crm_ticket_id,
jahr=pc.jahr,
kw=pc.kw,
datum=pc.datum,
nachname=pc.nachname,
vorname=pc.vorname,
geburtsdatum=pc.geburtsdatum,
kvnr=pc.kvnr,
fallgruppe=pc.fallgruppe,
import_source=filename,
)
db.add(case)
imported += 1
except Exception as e:
errors.append(f"{pc.nachname}: {e}")
logger.warning("Import error for case %s: %s", pc.nachname, e)
# Flush to detect any DB-level constraint violations before logging
db.flush()
# Log the import
log = ImportLog(
filename=filename,
import_type="csv_crm",
cases_imported=imported,
cases_skipped=skipped,
cases_updated=0,
errors="; ".join(errors) if errors else None,
imported_by=user_id,
)
db.add(log)
db.commit()
logger.info(
"Import confirmed for '%s': %d imported, %d skipped, %d errors",
filename,
imported,
skipped,
len(errors),
)
return ImportResult(
imported=imported,
skipped=skipped,
updated=0,
errors=errors,
)