"""Disclosure request service — create, review, check active grants.""" from datetime import datetime, timedelta, timezone from typing import Optional from sqlalchemy.orm import Session from app.models.audit import DisclosureRequest from app.models.case import Case from app.models.user import User from app.services.notification_service import create_notification, notify_all_users_with_role def _utcnow_naive() -> datetime: """Return current UTC time as a naive datetime, matching MySQL DATETIME columns.""" return datetime.now(timezone.utc).replace(tzinfo=None) def has_active_disclosure(db: Session, case_id: int, user_id: int) -> tuple[bool, Optional[datetime]]: """Check if user has an active (approved, non-expired) disclosure for a case. Returns (granted: bool, expires_at: datetime | None). """ dr = ( db.query(DisclosureRequest) .filter( DisclosureRequest.case_id == case_id, DisclosureRequest.requester_id == user_id, DisclosureRequest.status == "approved", DisclosureRequest.expires_at > _utcnow_naive(), ) .first() ) if dr: return True, dr.expires_at return False, None def has_pending_request(db: Session, case_id: int, user_id: int) -> bool: """Check if user already has a pending disclosure request for a case.""" return ( db.query(DisclosureRequest) .filter( DisclosureRequest.case_id == case_id, DisclosureRequest.requester_id == user_id, DisclosureRequest.status == "pending", ) .first() ) is not None def create_disclosure_request( db: Session, case_id: int, user_id: int, reason: str ) -> DisclosureRequest: """Create a new disclosure request and notify admins.""" dr = DisclosureRequest( case_id=case_id, requester_id=user_id, reason=reason, ) db.add(dr) db.flush() # Notify all active admins (in-app + email) case = db.query(Case).filter(Case.id == case_id).first() fall_id = case.fall_id if case else str(case_id) requester = db.query(User).filter(User.id == user_id).first() requester_name = requester.username if requester else str(user_id) notify_all_users_with_role( db, "admin", "disclosure_request", title=f"Neue Freigabe-Anfrage für Fall {fall_id}", message=( f"{requester_name} hat eine Freigabe für Personendaten beantragt.\n" f"Begründung: {reason}\n\n" "Bitte prüfen Sie die Anfrage im DAK Portal." ), related_entity_type="disclosure_request", related_entity_id=dr.id, ) db.refresh(dr) return dr def review_disclosure_request( db: Session, request_id: int, admin_id: int, new_status: str ) -> DisclosureRequest: """Approve or reject a disclosure request.""" dr = db.query(DisclosureRequest).filter(DisclosureRequest.id == request_id).first() if not dr: raise ValueError("Disclosure request not found") if dr.status != "pending": raise ValueError("Request already reviewed") dr.status = new_status dr.reviewed_by = admin_id dr.reviewed_at = _utcnow_naive() if new_status == "approved": dr.expires_at = _utcnow_naive() + timedelta(hours=24) # Notify requester (in-app + email) case = db.query(Case).filter(Case.id == dr.case_id).first() fall_id = case.fall_id if case else str(dr.case_id) status_text = "genehmigt" if new_status == "approved" else "abgelehnt" if new_status == "approved": expires_str = dr.expires_at.strftime("%d.%m.%Y %H:%M Uhr") if dr.expires_at else "24h" message = ( f"Ihre Freigabe-Anfrage für Fall {fall_id} wurde genehmigt.\n" f"Die Personendaten sind bis {expires_str} sichtbar." ) else: message = f"Ihre Freigabe-Anfrage für Fall {fall_id} wurde abgelehnt." create_notification( db, recipient_id=dr.requester_id, notification_type="disclosure_resolved", title=f"Freigabe-Anfrage {status_text} — Fall {fall_id}", message=message, related_entity_type="disclosure_request", related_entity_id=dr.id, ) db.refresh(dr) return dr def revoke_disclosure(db: Session, request_id: int, user_id: int, *, admin: bool = False) -> DisclosureRequest: """Revoke an active disclosure by setting expires_at to now. If admin=False, only the original requester can revoke their own disclosure. If admin=True, any admin can revoke any disclosure. """ dr = db.query(DisclosureRequest).filter(DisclosureRequest.id == request_id).first() if not dr: raise ValueError("Disclosure request not found") if dr.status != "approved": raise ValueError("Only approved disclosures can be revoked") if dr.expires_at and dr.expires_at <= _utcnow_naive(): raise ValueError("Disclosure already expired") if not admin and dr.requester_id != user_id: raise ValueError("Not authorized to revoke this disclosure") dr.expires_at = _utcnow_naive() db.commit() db.refresh(dr) return dr def reactivate_disclosure(db: Session, request_id: int) -> DisclosureRequest: """Reactivate an expired, revoked, or rejected disclosure with a new 24h window.""" dr = db.query(DisclosureRequest).filter(DisclosureRequest.id == request_id).first() if not dr: raise ValueError("Disclosure request not found") if dr.status == "pending": raise ValueError("Pending requests must be approved first") if dr.status == "approved" and dr.expires_at and dr.expires_at > _utcnow_naive(): raise ValueError("Disclosure is still active") dr.status = "approved" dr.expires_at = _utcnow_naive() + timedelta(hours=24) db.commit() db.refresh(dr) return dr def admin_delete_disclosure(db: Session, request_id: int) -> None: """Admin: permanently delete any non-active disclosure request.""" dr = db.query(DisclosureRequest).filter(DisclosureRequest.id == request_id).first() if not dr: raise ValueError("Disclosure request not found") if dr.status == "pending": raise ValueError("Pending requests cannot be deleted") if dr.status == "approved" and dr.expires_at and dr.expires_at > _utcnow_naive(): raise ValueError("Active disclosures cannot be deleted") db.delete(dr) db.commit() def delete_disclosure_request(db: Session, request_id: int, user_id: int) -> None: """Permanently delete a non-active disclosure request owned by the user.""" dr = db.query(DisclosureRequest).filter(DisclosureRequest.id == request_id).first() if not dr: raise ValueError("Disclosure request not found") if dr.requester_id != user_id: raise ValueError("Not authorized to delete this disclosure") # Only allow deletion of inactive requests (rejected, expired, revoked) if dr.status == "pending": raise ValueError("Pending requests cannot be deleted") if dr.status == "approved" and dr.expires_at and dr.expires_at > _utcnow_naive(): raise ValueError("Active disclosures cannot be deleted") db.delete(dr) db.commit() def get_my_disclosure_requests(db: Session, user_id: int) -> list[DisclosureRequest]: """Return all disclosure requests made by a specific user.""" return ( db.query(DisclosureRequest) .filter(DisclosureRequest.requester_id == user_id) .order_by(DisclosureRequest.created_at.desc()) .all() ) def get_pending_requests(db: Session) -> list[DisclosureRequest]: """Return all pending disclosure requests.""" return ( db.query(DisclosureRequest) .filter(DisclosureRequest.status == "pending") .order_by(DisclosureRequest.created_at.desc()) .all() ) def get_pending_count(db: Session) -> int: """Return count of pending disclosure requests.""" return ( db.query(DisclosureRequest) .filter(DisclosureRequest.status == "pending") .count() )