dak.c2s/backend/app/api/cases.py
CCS Admin 9825489781 feat: add case masking for dak_mitarbeiter and disclosure endpoints
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-26 16:07:30 +00:00

534 lines
17 KiB
Python

"""Cases API routes — list, detail, update, ICD entry, coding, template download."""
import logging
from datetime import datetime, timezone
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
from fastapi.responses import StreamingResponse
from sqlalchemy import or_
from sqlalchemy.orm import Session
from app.core.dependencies import get_current_user, require_admin
from app.database import get_db
from app.models.case import Case
from app.models.user import User
from app.schemas.case import (
CaseListResponse,
CaseResponse,
CaseUpdate,
CodingUpdate,
ICDUpdate,
mask_case_for_mitarbeiter,
)
from app.schemas.disclosure import DisclosureRequestCreate, DisclosureRequestResponse
from app.config import get_settings
from app.services.audit_service import log_action
from app.services.disclosure_service import (
create_disclosure_request, has_active_disclosure, has_pending_request,
)
from app.services.icd_service import generate_coding_template, get_pending_icd_cases, save_icd_for_case
settings = get_settings()
logger = logging.getLogger(__name__)
router = APIRouter()
def _mask_case_response(case: Case, user: User, db: Session) -> dict:
"""Serialize a single case, masking sensitive fields for dak_mitarbeiter."""
case_dict = CaseResponse.model_validate(case).model_dump()
if user.role != "admin":
granted, expires_at = has_active_disclosure(db, case.id, user.id)
case_dict = mask_case_for_mitarbeiter(case_dict, disclosure_granted=granted)
if expires_at:
case_dict["disclosure_expires_at"] = expires_at.isoformat()
return case_dict
# ---------------------------------------------------------------------------
# Static routes MUST be defined BEFORE the parameterised /{case_id} routes
# to avoid FastAPI interpreting "pending-icd" etc. as a case_id.
# ---------------------------------------------------------------------------
@router.get("/pending-icd", response_model=CaseListResponse)
def list_pending_icd(
page: int = Query(1, ge=1),
per_page: int = Query(50, ge=1, le=200),
jahr: Optional[int] = Query(None),
fallgruppe: Optional[str] = Query(None),
db: Session = Depends(get_db),
user: User = Depends(get_current_user),
):
"""Return cases that still need ICD codes (icd IS NULL).
Accessible to both admin and dak_mitarbeiter users.
"""
cases, total = get_pending_icd_cases(
db, jahr=jahr, fallgruppe=fallgruppe, page=page, per_page=per_page
)
if user.role == "admin":
return CaseListResponse(items=cases, total=total, page=page, per_page=per_page)
masked_items = [_mask_case_response(c, user, db) for c in cases]
return {"items": masked_items, "total": total, "page": page, "per_page": per_page}
@router.get("/pending-coding", response_model=CaseListResponse)
def list_pending_coding(
page: int = Query(1, ge=1),
per_page: int = Query(50, ge=1, le=200),
jahr: Optional[int] = Query(None),
fallgruppe: Optional[str] = Query(None),
db: Session = Depends(get_db),
user: User = Depends(require_admin),
):
"""Return cases with gutachten=True but no gutachten_typ yet (admin only).
These cases have received a Gutachten but still need classification
(Bestätigung/Alternative) and therapy-change coding.
"""
query = db.query(Case).filter(
Case.versicherung == settings.VERSICHERUNG_FILTER,
Case.gutachten == True, # noqa: E712
Case.gutachten_typ == None, # noqa: E711
)
if jahr:
query = query.filter(Case.jahr == jahr)
if fallgruppe:
query = query.filter(Case.fallgruppe == fallgruppe)
total = query.count()
cases = (
query.order_by(Case.datum.desc())
.offset((page - 1) * per_page)
.limit(per_page)
.all()
)
return CaseListResponse(
items=cases,
total=total,
page=page,
per_page=per_page,
)
@router.get("/coding-template")
def download_coding_template(
request: Request,
token: Optional[str] = Query(None),
jahr: Optional[int] = Query(None),
fallgruppe: Optional[str] = Query(None),
db: Session = Depends(get_db),
):
"""Download an Excel template for ICD coding.
Supports both ``Authorization: Bearer`` header and ``?token=`` query
parameter for direct browser downloads.
"""
from app.core.security import decode_access_token
from jose import JWTError
raw_token = token
if not raw_token:
auth = request.headers.get("authorization", "")
if auth.lower().startswith("bearer "):
raw_token = auth[7:]
if not raw_token:
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Not authenticated")
try:
payload = decode_access_token(raw_token)
user_id = int(payload["sub"])
except (JWTError, KeyError, ValueError):
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid token")
user = db.query(User).filter(User.id == user_id, User.is_active == True).first() # noqa: E712
if not user:
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "User not found")
xlsx_bytes = generate_coding_template(db, jahr=jahr, fallgruppe=fallgruppe)
from io import BytesIO
return StreamingResponse(
BytesIO(xlsx_bytes),
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
headers={
"Content-Disposition": 'attachment; filename="ICD_Coding_Template.xlsx"'
},
)
# ---------------------------------------------------------------------------
# Disclosure request
# ---------------------------------------------------------------------------
@router.post("/{case_id}/disclosure-request", response_model=DisclosureRequestResponse)
def request_disclosure(
case_id: int,
payload: DisclosureRequestCreate,
request: Request,
db: Session = Depends(get_db),
user: User = Depends(get_current_user),
):
"""Request disclosure of personal data for a case."""
case = db.query(Case).filter(Case.id == case_id).first()
if not case:
raise HTTPException(status.HTTP_404_NOT_FOUND, "Case not found")
if has_pending_request(db, case_id, user.id):
raise HTTPException(status.HTTP_409_CONFLICT, "Anfrage bereits ausstehend")
granted, _ = has_active_disclosure(db, case_id, user.id)
if granted:
raise HTTPException(status.HTTP_409_CONFLICT, "Freigabe bereits aktiv")
dr = create_disclosure_request(db, case_id, user.id, payload.reason)
log_action(
db, user_id=user.id, action="disclosure_requested",
entity_type="case", entity_id=case_id,
new_values={"reason": payload.reason},
ip_address=request.client.host if request.client else None,
user_agent=request.headers.get("user-agent"),
)
return DisclosureRequestResponse(
id=dr.id, case_id=dr.case_id, requester_id=dr.requester_id,
reason=dr.reason, status=dr.status, created_at=dr.created_at,
)
# ---------------------------------------------------------------------------
# Paginated case list
# ---------------------------------------------------------------------------
@router.get("/", response_model=CaseListResponse)
def list_cases(
page: int = Query(1, ge=1),
per_page: int = Query(50, ge=1, le=200),
jahr: Optional[int] = Query(None),
kw: Optional[int] = Query(None),
fallgruppe: Optional[str] = Query(None),
has_icd: Optional[bool] = Query(None),
has_coding: Optional[bool] = Query(None),
search: Optional[str] = Query(None, min_length=1, max_length=200),
db: Session = Depends(get_db),
user: User = Depends(get_current_user),
):
"""Return a paginated, filterable list of cases.
Accessible to both admin and dak_mitarbeiter users.
Filters:
- ``jahr``: calendar year
- ``kw``: calendar week
- ``fallgruppe``: one of onko, kardio, intensiv, galle, sd
- ``has_icd``: True = only cases WITH icd; False = only cases WITHOUT
- ``has_coding``: True = only with gutachten_typ; False = only without
- ``search``: free-text search across nachname, vorname, fall_id, kvnr
"""
query = db.query(Case).filter(Case.versicherung == settings.VERSICHERUNG_FILTER)
if jahr is not None:
query = query.filter(Case.jahr == jahr)
if kw is not None:
query = query.filter(Case.kw == kw)
if fallgruppe is not None:
query = query.filter(Case.fallgruppe == fallgruppe)
if has_icd is True:
query = query.filter(Case.icd != None) # noqa: E711
elif has_icd is False:
query = query.filter(Case.icd == None) # noqa: E711
if has_coding is True:
query = query.filter(Case.gutachten_typ != None) # noqa: E711
elif has_coding is False:
query = query.filter(Case.gutachten_typ == None) # noqa: E711
if search:
like_pattern = f"%{search}%"
if user.role == "admin":
query = query.filter(
or_(
Case.nachname.ilike(like_pattern),
Case.vorname.ilike(like_pattern),
Case.fall_id.ilike(like_pattern),
Case.kvnr.ilike(like_pattern),
)
)
else:
query = query.filter(
or_(
Case.fall_id.ilike(like_pattern),
Case.kvnr.ilike(like_pattern),
)
)
total = query.count()
cases = (
query.order_by(Case.datum.desc(), Case.id.desc())
.offset((page - 1) * per_page)
.limit(per_page)
.all()
)
if user.role == "admin":
return CaseListResponse(items=cases, total=total, page=page, per_page=per_page)
masked_items = [_mask_case_response(c, user, db) for c in cases]
return {"items": masked_items, "total": total, "page": page, "per_page": per_page}
# ---------------------------------------------------------------------------
# Single case detail
# ---------------------------------------------------------------------------
@router.get("/{case_id}", response_model=CaseResponse)
def get_case(
case_id: int,
db: Session = Depends(get_db),
user: User = Depends(get_current_user),
):
"""Return a single case by its database ID."""
case = db.query(Case).filter(Case.id == case_id, Case.versicherung == settings.VERSICHERUNG_FILTER).first()
if not case:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Case not found",
)
if user.role == "admin":
return case
return _mask_case_response(case, user, db)
# ---------------------------------------------------------------------------
# Update case (admin only)
# ---------------------------------------------------------------------------
@router.put("/{case_id}", response_model=CaseResponse)
def update_case(
case_id: int,
payload: CaseUpdate,
request: Request,
db: Session = Depends(get_db),
user: User = Depends(require_admin),
):
"""Update arbitrary case fields (admin only).
Only fields included in the request body are applied. Changes are
recorded in the audit log.
"""
case = db.query(Case).filter(Case.id == case_id).first()
if not case:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Case not found",
)
update_data = payload.model_dump(exclude_unset=True)
if not update_data:
return case # Nothing to update
old_values: dict = {}
new_values: dict = {}
for field, value in update_data.items():
current = getattr(case, field)
# Convert date objects to strings for JSON-safe audit log
current_cmp = current
value_cmp = value
if current_cmp != value_cmp:
old_values[field] = str(current) if current is not None else None
new_values[field] = str(value) if value is not None else None
setattr(case, field, value)
if new_values:
case.updated_by = user.id
db.commit()
db.refresh(case)
log_action(
db,
user_id=user.id,
action="case_updated",
entity_type="case",
entity_id=case.id,
old_values=old_values,
new_values=new_values,
ip_address=request.client.host if request.client else None,
user_agent=request.headers.get("user-agent"),
)
return case
# ---------------------------------------------------------------------------
# ICD entry
# ---------------------------------------------------------------------------
@router.put("/{case_id}/icd", response_model=CaseResponse)
def set_case_icd(
case_id: int,
payload: ICDUpdate,
request: Request,
db: Session = Depends(get_db),
user: User = Depends(get_current_user),
):
"""Set ICD codes for a case. Accessible to both admin and dak_mitarbeiter.
Replaces any existing ICD codes. The raw ICD string is validated,
normalised, and split into individual ``CaseICDCode`` entries.
"""
try:
case = save_icd_for_case(db, case_id, payload.icd, user.id)
except HTTPException:
raise
except ValueError as exc:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=str(exc),
)
log_action(
db,
user_id=user.id,
action="icd_entered",
entity_type="case",
entity_id=case.id,
new_values={"icd": payload.icd},
ip_address=request.client.host if request.client else None,
user_agent=request.headers.get("user-agent"),
)
return case
# ---------------------------------------------------------------------------
# KVNR update
# ---------------------------------------------------------------------------
@router.put("/{case_id}/kvnr", response_model=CaseResponse)
def set_case_kvnr(
case_id: int,
payload: dict,
request: Request,
db: Session = Depends(get_db),
user: User = Depends(get_current_user),
):
"""Update the KVNR for a case. Accessible to both admin and dak_mitarbeiter."""
case = db.query(Case).filter(Case.id == case_id).first()
if not case:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Case not found",
)
old_kvnr = case.kvnr
new_kvnr = payload.get("kvnr")
case.kvnr = new_kvnr
case.updated_by = user.id
db.commit()
db.refresh(case)
log_action(
db,
user_id=user.id,
action="kvnr_updated",
entity_type="case",
entity_id=case.id,
old_values={"kvnr": old_kvnr},
new_values={"kvnr": new_kvnr},
ip_address=request.client.host if request.client else None,
user_agent=request.headers.get("user-agent"),
)
return case
# ---------------------------------------------------------------------------
# Coding (Gutachten classification)
# ---------------------------------------------------------------------------
@router.put("/{case_id}/coding", response_model=CaseResponse)
def set_case_coding(
case_id: int,
payload: CodingUpdate,
request: Request,
db: Session = Depends(get_db),
user: User = Depends(require_admin),
):
"""Set coding fields on a case (admin only).
Records the Gutachten classification (Bestätigung/Alternative) and
therapy-change flags.
"""
case = db.query(Case).filter(Case.id == case_id).first()
if not case:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Case not found",
)
# Validate gutachten_typ
if payload.gutachten_typ not in ("Bestätigung", "Alternative"):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="gutachten_typ must be 'Bestätigung' or 'Alternative'",
)
# Validate therapieaenderung
if payload.therapieaenderung not in ("Ja", "Nein"):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="therapieaenderung must be 'Ja' or 'Nein'",
)
old_values = {
"gutachten_typ": case.gutachten_typ,
"therapieaenderung": case.therapieaenderung,
"ta_diagnosekorrektur": case.ta_diagnosekorrektur,
"ta_unterversorgung": case.ta_unterversorgung,
"ta_uebertherapie": case.ta_uebertherapie,
}
case.gutachten_typ = payload.gutachten_typ
case.therapieaenderung = payload.therapieaenderung
case.ta_diagnosekorrektur = payload.ta_diagnosekorrektur
case.ta_unterversorgung = payload.ta_unterversorgung
case.ta_uebertherapie = payload.ta_uebertherapie
case.coding_completed_by = user.id
case.coding_completed_at = datetime.now(timezone.utc)
case.updated_by = user.id
db.commit()
db.refresh(case)
new_values = {
"gutachten_typ": case.gutachten_typ,
"therapieaenderung": case.therapieaenderung,
"ta_diagnosekorrektur": case.ta_diagnosekorrektur,
"ta_unterversorgung": case.ta_unterversorgung,
"ta_uebertherapie": case.ta_uebertherapie,
}
log_action(
db,
user_id=user.id,
action="coding_completed",
entity_type="case",
entity_id=case.id,
old_values=old_values,
new_values=new_values,
ip_address=request.client.host if request.client else None,
user_agent=request.headers.get("user-agent"),
)
return case