dak.c2s/backend/app/services/disclosure_service.py
CCS Admin 1bedbcf243 fix: resolve TypeError in disclosure revoke due to naive/aware datetime comparison
PyMySQL returns naive datetimes from MySQL DATETIME columns, but
revoke_disclosure() compared them with timezone-aware datetime.now(timezone.utc),
causing an unhandled TypeError (not caught by except ValueError) and a 500 error.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-27 07:57:59 +00:00

169 lines
5.6 KiB
Python

"""Disclosure request service — create, review, check active grants."""
from datetime import datetime, timedelta, timezone
from typing import Optional
from sqlalchemy.orm import Session
from app.models.audit import DisclosureRequest, Notification
from app.models.case import Case
from app.models.user import User
def has_active_disclosure(db: Session, case_id: int, user_id: int) -> tuple[bool, Optional[datetime]]:
"""Check if user has an active (approved, non-expired) disclosure for a case.
Returns (granted: bool, expires_at: datetime | None).
"""
dr = (
db.query(DisclosureRequest)
.filter(
DisclosureRequest.case_id == case_id,
DisclosureRequest.requester_id == user_id,
DisclosureRequest.status == "approved",
DisclosureRequest.expires_at > datetime.now(timezone.utc),
)
.first()
)
if dr:
return True, dr.expires_at
return False, None
def has_pending_request(db: Session, case_id: int, user_id: int) -> bool:
"""Check if user already has a pending disclosure request for a case."""
return (
db.query(DisclosureRequest)
.filter(
DisclosureRequest.case_id == case_id,
DisclosureRequest.requester_id == user_id,
DisclosureRequest.status == "pending",
)
.first()
) is not None
def create_disclosure_request(
db: Session, case_id: int, user_id: int, reason: str
) -> DisclosureRequest:
"""Create a new disclosure request and notify admins."""
dr = DisclosureRequest(
case_id=case_id,
requester_id=user_id,
reason=reason,
)
db.add(dr)
db.flush()
# Notify all active admins
case = db.query(Case).filter(Case.id == case_id).first()
fall_id = case.fall_id if case else str(case_id)
requester = db.query(User).filter(User.id == user_id).first()
requester_name = requester.username if requester else str(user_id)
admins = db.query(User).filter(User.role == "admin", User.is_active == True).all() # noqa: E712
for admin in admins:
db.add(Notification(
recipient_id=admin.id,
notification_type="disclosure_request",
title=f"Freigabe-Anfrage für Fall {fall_id}",
message=f"{requester_name} bittet um Einsicht in Personendaten. Begründung: {reason}",
related_entity_type="disclosure_request",
related_entity_id=dr.id,
))
db.commit()
db.refresh(dr)
return dr
def review_disclosure_request(
db: Session, request_id: int, admin_id: int, new_status: str
) -> DisclosureRequest:
"""Approve or reject a disclosure request."""
dr = db.query(DisclosureRequest).filter(DisclosureRequest.id == request_id).first()
if not dr:
raise ValueError("Disclosure request not found")
if dr.status != "pending":
raise ValueError("Request already reviewed")
dr.status = new_status
dr.reviewed_by = admin_id
dr.reviewed_at = datetime.now(timezone.utc)
if new_status == "approved":
dr.expires_at = datetime.now(timezone.utc) + timedelta(hours=24)
# Notify requester
case = db.query(Case).filter(Case.id == dr.case_id).first()
fall_id = case.fall_id if case else str(dr.case_id)
status_text = "genehmigt" if new_status == "approved" else "abgelehnt"
db.add(Notification(
recipient_id=dr.requester_id,
notification_type="disclosure_resolved",
title=f"Freigabe-Anfrage {status_text}",
message=f"Ihre Anfrage für Fall {fall_id} wurde {status_text}.",
related_entity_type="disclosure_request",
related_entity_id=dr.id,
))
db.commit()
db.refresh(dr)
return dr
def _utcnow_naive() -> datetime:
"""Return current UTC time as a naive datetime, matching MySQL DATETIME columns."""
return datetime.now(timezone.utc).replace(tzinfo=None)
def revoke_disclosure(db: Session, request_id: int, user_id: int, *, admin: bool = False) -> DisclosureRequest:
"""Revoke an active disclosure by setting expires_at to now.
If admin=False, only the original requester can revoke their own disclosure.
If admin=True, any admin can revoke any disclosure.
"""
dr = db.query(DisclosureRequest).filter(DisclosureRequest.id == request_id).first()
if not dr:
raise ValueError("Disclosure request not found")
if dr.status != "approved":
raise ValueError("Only approved disclosures can be revoked")
if dr.expires_at and dr.expires_at <= _utcnow_naive():
raise ValueError("Disclosure already expired")
if not admin and dr.requester_id != user_id:
raise ValueError("Not authorized to revoke this disclosure")
dr.expires_at = _utcnow_naive()
db.commit()
db.refresh(dr)
return dr
def get_my_disclosure_requests(db: Session, user_id: int) -> list[DisclosureRequest]:
"""Return all disclosure requests made by a specific user."""
return (
db.query(DisclosureRequest)
.filter(DisclosureRequest.requester_id == user_id)
.order_by(DisclosureRequest.created_at.desc())
.all()
)
def get_pending_requests(db: Session) -> list[DisclosureRequest]:
"""Return all pending disclosure requests."""
return (
db.query(DisclosureRequest)
.filter(DisclosureRequest.status == "pending")
.order_by(DisclosureRequest.created_at.desc())
.all()
)
def get_pending_count(db: Session) -> int:
"""Return count of pending disclosure requests."""
return (
db.query(DisclosureRequest)
.filter(DisclosureRequest.status == "pending")
.count()
)