mirror of
https://github.com/complexcaresolutions/dak.c2s.git
synced 2026-03-17 19:33:41 +00:00
- Dashboard KPI cards are now clickable with role-based links
- New GutachtenStatistikPage placeholder at /gutachten-statistik
- New "Meine Freigaben" page for DAK-Mitarbeiter to view/revoke own disclosures
- Backend: GET /cases/my-disclosure-requests, PUT /cases/disclosure-requests/{id}/revoke
- Admin Disclosures page: full history with status tabs and revoke capability
- Backend: PUT /admin/disclosure-requests/{id}/revoke, default shows all statuses
- Sidebar: "Meine Freigaben" entry for dak_mitarbeiter role
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
164 lines
5.4 KiB
Python
164 lines
5.4 KiB
Python
"""Disclosure request service — create, review, check active grants."""
|
|
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Optional
|
|
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.models.audit import DisclosureRequest, Notification
|
|
from app.models.case import Case
|
|
from app.models.user import User
|
|
|
|
|
|
def has_active_disclosure(db: Session, case_id: int, user_id: int) -> tuple[bool, Optional[datetime]]:
|
|
"""Check if user has an active (approved, non-expired) disclosure for a case.
|
|
|
|
Returns (granted: bool, expires_at: datetime | None).
|
|
"""
|
|
dr = (
|
|
db.query(DisclosureRequest)
|
|
.filter(
|
|
DisclosureRequest.case_id == case_id,
|
|
DisclosureRequest.requester_id == user_id,
|
|
DisclosureRequest.status == "approved",
|
|
DisclosureRequest.expires_at > datetime.now(timezone.utc),
|
|
)
|
|
.first()
|
|
)
|
|
if dr:
|
|
return True, dr.expires_at
|
|
return False, None
|
|
|
|
|
|
def has_pending_request(db: Session, case_id: int, user_id: int) -> bool:
|
|
"""Check if user already has a pending disclosure request for a case."""
|
|
return (
|
|
db.query(DisclosureRequest)
|
|
.filter(
|
|
DisclosureRequest.case_id == case_id,
|
|
DisclosureRequest.requester_id == user_id,
|
|
DisclosureRequest.status == "pending",
|
|
)
|
|
.first()
|
|
) is not None
|
|
|
|
|
|
def create_disclosure_request(
|
|
db: Session, case_id: int, user_id: int, reason: str
|
|
) -> DisclosureRequest:
|
|
"""Create a new disclosure request and notify admins."""
|
|
dr = DisclosureRequest(
|
|
case_id=case_id,
|
|
requester_id=user_id,
|
|
reason=reason,
|
|
)
|
|
db.add(dr)
|
|
db.flush()
|
|
|
|
# Notify all active admins
|
|
case = db.query(Case).filter(Case.id == case_id).first()
|
|
fall_id = case.fall_id if case else str(case_id)
|
|
requester = db.query(User).filter(User.id == user_id).first()
|
|
requester_name = requester.username if requester else str(user_id)
|
|
|
|
admins = db.query(User).filter(User.role == "admin", User.is_active == True).all() # noqa: E712
|
|
for admin in admins:
|
|
db.add(Notification(
|
|
recipient_id=admin.id,
|
|
notification_type="disclosure_request",
|
|
title=f"Freigabe-Anfrage für Fall {fall_id}",
|
|
message=f"{requester_name} bittet um Einsicht in Personendaten. Begründung: {reason}",
|
|
related_entity_type="disclosure_request",
|
|
related_entity_id=dr.id,
|
|
))
|
|
|
|
db.commit()
|
|
db.refresh(dr)
|
|
return dr
|
|
|
|
|
|
def review_disclosure_request(
|
|
db: Session, request_id: int, admin_id: int, new_status: str
|
|
) -> DisclosureRequest:
|
|
"""Approve or reject a disclosure request."""
|
|
dr = db.query(DisclosureRequest).filter(DisclosureRequest.id == request_id).first()
|
|
if not dr:
|
|
raise ValueError("Disclosure request not found")
|
|
if dr.status != "pending":
|
|
raise ValueError("Request already reviewed")
|
|
|
|
dr.status = new_status
|
|
dr.reviewed_by = admin_id
|
|
dr.reviewed_at = datetime.now(timezone.utc)
|
|
|
|
if new_status == "approved":
|
|
dr.expires_at = datetime.now(timezone.utc) + timedelta(hours=24)
|
|
|
|
# Notify requester
|
|
case = db.query(Case).filter(Case.id == dr.case_id).first()
|
|
fall_id = case.fall_id if case else str(dr.case_id)
|
|
|
|
status_text = "genehmigt" if new_status == "approved" else "abgelehnt"
|
|
db.add(Notification(
|
|
recipient_id=dr.requester_id,
|
|
notification_type="disclosure_resolved",
|
|
title=f"Freigabe-Anfrage {status_text}",
|
|
message=f"Ihre Anfrage für Fall {fall_id} wurde {status_text}.",
|
|
related_entity_type="disclosure_request",
|
|
related_entity_id=dr.id,
|
|
))
|
|
|
|
db.commit()
|
|
db.refresh(dr)
|
|
return dr
|
|
|
|
|
|
def revoke_disclosure(db: Session, request_id: int, user_id: int, *, admin: bool = False) -> DisclosureRequest:
|
|
"""Revoke an active disclosure by setting expires_at to now.
|
|
|
|
If admin=False, only the original requester can revoke their own disclosure.
|
|
If admin=True, any admin can revoke any disclosure.
|
|
"""
|
|
dr = db.query(DisclosureRequest).filter(DisclosureRequest.id == request_id).first()
|
|
if not dr:
|
|
raise ValueError("Disclosure request not found")
|
|
if dr.status != "approved":
|
|
raise ValueError("Only approved disclosures can be revoked")
|
|
if dr.expires_at and dr.expires_at <= datetime.now(timezone.utc):
|
|
raise ValueError("Disclosure already expired")
|
|
if not admin and dr.requester_id != user_id:
|
|
raise ValueError("Not authorized to revoke this disclosure")
|
|
|
|
dr.expires_at = datetime.now(timezone.utc)
|
|
db.commit()
|
|
db.refresh(dr)
|
|
return dr
|
|
|
|
|
|
def get_my_disclosure_requests(db: Session, user_id: int) -> list[DisclosureRequest]:
|
|
"""Return all disclosure requests made by a specific user."""
|
|
return (
|
|
db.query(DisclosureRequest)
|
|
.filter(DisclosureRequest.requester_id == user_id)
|
|
.order_by(DisclosureRequest.created_at.desc())
|
|
.all()
|
|
)
|
|
|
|
|
|
def get_pending_requests(db: Session) -> list[DisclosureRequest]:
|
|
"""Return all pending disclosure requests."""
|
|
return (
|
|
db.query(DisclosureRequest)
|
|
.filter(DisclosureRequest.status == "pending")
|
|
.order_by(DisclosureRequest.created_at.desc())
|
|
.all()
|
|
)
|
|
|
|
|
|
def get_pending_count(db: Session) -> int:
|
|
"""Return count of pending disclosure requests."""
|
|
return (
|
|
db.query(DisclosureRequest)
|
|
.filter(DisclosureRequest.status == "pending")
|
|
.count()
|
|
)
|