"""Disclosure request service — create, review, check active grants.""" from datetime import datetime, timedelta, timezone from typing import Optional from sqlalchemy.orm import Session from app.models.audit import DisclosureRequest, Notification from app.models.case import Case from app.models.user import User def has_active_disclosure(db: Session, case_id: int, user_id: int) -> tuple[bool, Optional[datetime]]: """Check if user has an active (approved, non-expired) disclosure for a case. Returns (granted: bool, expires_at: datetime | None). """ dr = ( db.query(DisclosureRequest) .filter( DisclosureRequest.case_id == case_id, DisclosureRequest.requester_id == user_id, DisclosureRequest.status == "approved", DisclosureRequest.expires_at > datetime.now(timezone.utc), ) .first() ) if dr: return True, dr.expires_at return False, None def has_pending_request(db: Session, case_id: int, user_id: int) -> bool: """Check if user already has a pending disclosure request for a case.""" return ( db.query(DisclosureRequest) .filter( DisclosureRequest.case_id == case_id, DisclosureRequest.requester_id == user_id, DisclosureRequest.status == "pending", ) .first() ) is not None def create_disclosure_request( db: Session, case_id: int, user_id: int, reason: str ) -> DisclosureRequest: """Create a new disclosure request and notify admins.""" dr = DisclosureRequest( case_id=case_id, requester_id=user_id, reason=reason, ) db.add(dr) db.flush() # Notify all active admins case = db.query(Case).filter(Case.id == case_id).first() fall_id = case.fall_id if case else str(case_id) requester = db.query(User).filter(User.id == user_id).first() requester_name = requester.username if requester else str(user_id) admins = db.query(User).filter(User.role == "admin", User.is_active == True).all() # noqa: E712 for admin in admins: db.add(Notification( recipient_id=admin.id, notification_type="disclosure_request", title=f"Freigabe-Anfrage für Fall {fall_id}", message=f"{requester_name} bittet um Einsicht in Personendaten. Begründung: {reason}", related_entity_type="disclosure_request", related_entity_id=dr.id, )) db.commit() db.refresh(dr) return dr def review_disclosure_request( db: Session, request_id: int, admin_id: int, new_status: str ) -> DisclosureRequest: """Approve or reject a disclosure request.""" dr = db.query(DisclosureRequest).filter(DisclosureRequest.id == request_id).first() if not dr: raise ValueError("Disclosure request not found") if dr.status != "pending": raise ValueError("Request already reviewed") dr.status = new_status dr.reviewed_by = admin_id dr.reviewed_at = datetime.now(timezone.utc) if new_status == "approved": dr.expires_at = datetime.now(timezone.utc) + timedelta(hours=24) # Notify requester case = db.query(Case).filter(Case.id == dr.case_id).first() fall_id = case.fall_id if case else str(dr.case_id) status_text = "genehmigt" if new_status == "approved" else "abgelehnt" db.add(Notification( recipient_id=dr.requester_id, notification_type="disclosure_resolved", title=f"Freigabe-Anfrage {status_text}", message=f"Ihre Anfrage für Fall {fall_id} wurde {status_text}.", related_entity_type="disclosure_request", related_entity_id=dr.id, )) db.commit() db.refresh(dr) return dr def get_pending_requests(db: Session) -> list[DisclosureRequest]: """Return all pending disclosure requests.""" return ( db.query(DisclosureRequest) .filter(DisclosureRequest.status == "pending") .order_by(DisclosureRequest.created_at.desc()) .all() ) def get_pending_count(db: Session) -> int: """Return count of pending disclosure requests.""" return ( db.query(DisclosureRequest) .filter(DisclosureRequest.status == "pending") .count() )