mirror of
https://github.com/complexcaresolutions/dak.c2s.git
synced 2026-03-17 21:53:41 +00:00
When an admin creates an invitation with an email address, an email is now automatically sent containing the registration link and an attached PDF guide (3 pages: registration steps, feature overview, contact info). - Add fpdf2 for PDF generation with Unicode font support - Add PDF guide generator (backend/app/services/pdf_guide.py) - Extend send_email() to support file attachments - Fire-and-forget email in create_invitation endpoint Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
535 lines
18 KiB
Python
535 lines
18 KiB
Python
"""Admin-only API endpoints: user CRUD, invitation management, audit log."""
|
|
|
|
import logging
|
|
import secrets
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Optional
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
|
|
from pydantic import BaseModel, EmailStr
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.core.dependencies import require_admin
|
|
from app.core.security import hash_password
|
|
from app.database import get_db
|
|
from app.models.audit import AuditLog, DisclosureRequest
|
|
from app.models.user import InvitationLink, User
|
|
from app.schemas.disclosure import (
|
|
DisclosureCountResponse, DisclosureRequestResponse, DisclosureRequestUpdate,
|
|
)
|
|
from app.schemas.user import UserCreate, UserResponse, UserUpdate
|
|
from app.config import get_settings
|
|
from app.services.audit_service import log_action
|
|
from app.services.disclosure_service import (
|
|
admin_delete_disclosure, get_pending_count, reactivate_disclosure,
|
|
review_disclosure_request, revoke_disclosure,
|
|
)
|
|
from app.services.notification_service import send_email
|
|
from app.services.pdf_guide import generate_guide_pdf
|
|
|
|
router = APIRouter()
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_settings = get_settings()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Invitation email helper
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _send_invitation_email(
|
|
email: str, token: str, expires_at: datetime
|
|
) -> None:
|
|
"""Send an invitation email with a PDF onboarding guide attached."""
|
|
try:
|
|
invite_url = f"{_settings.FRONTEND_BASE_URL}/register?token={token}"
|
|
pdf_bytes = generate_guide_pdf(invite_url)
|
|
|
|
expires_str = expires_at.strftime("%d.%m.%Y")
|
|
|
|
plain_body = (
|
|
"Guten Tag,\n\n"
|
|
"Sie wurden zum DAK Zweitmeinungs-Portal eingeladen.\n\n"
|
|
"Bitte registrieren Sie sich unter folgendem Link:\n"
|
|
f"{invite_url}\n\n"
|
|
f"Der Link ist gueltig bis zum {expires_str}.\n\n"
|
|
"Im Anhang finden Sie eine Anleitung (PDF) mit allen "
|
|
"wichtigen Informationen zur Registrierung und Nutzung "
|
|
"des Portals.\n\n"
|
|
"Mit freundlichen Gruessen\n"
|
|
"Complex Care Solutions GmbH"
|
|
)
|
|
|
|
html_body = (
|
|
"<div style='font-family: Arial, sans-serif; max-width: 600px;'>"
|
|
"<h2 style='color: #005293;'>DAK Zweitmeinungs-Portal</h2>"
|
|
"<p>Guten Tag,</p>"
|
|
"<p>Sie wurden zum <strong>DAK Zweitmeinungs-Portal</strong> eingeladen.</p>"
|
|
"<p>Bitte registrieren Sie sich unter folgendem Link:</p>"
|
|
f"<p><a href='{invite_url}' style='color: #005293; "
|
|
f"font-weight: bold;'>{invite_url}</a></p>"
|
|
f"<p>Der Link ist gültig bis zum <strong>{expires_str}</strong>.</p>"
|
|
"<p>Im Anhang finden Sie eine <strong>Anleitung (PDF)</strong> mit allen "
|
|
"wichtigen Informationen zur Registrierung und Nutzung des Portals.</p>"
|
|
"<hr style='border: none; border-top: 1px solid #dee2e6; margin: 20px 0;'>"
|
|
"<p style='color: #6c757d; font-size: 12px;'>"
|
|
"Complex Care Solutions GmbH<br>"
|
|
"https://dak.complexcaresolutions.de</p>"
|
|
"</div>"
|
|
)
|
|
|
|
send_email(
|
|
to=email,
|
|
subject="[DAK Portal] Ihre Einladung zum Zweitmeinungs-Portal",
|
|
body=plain_body,
|
|
html_body=html_body,
|
|
attachments=[
|
|
("DAK-Portal-Anleitung.pdf", pdf_bytes, "application/pdf")
|
|
],
|
|
)
|
|
except Exception:
|
|
logger.exception("Failed to send invitation email to %s", email)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Invitation schemas (kept co-located with the admin router for simplicity)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class InvitationCreate(BaseModel):
|
|
"""Payload for creating a new invitation link."""
|
|
|
|
email: Optional[EmailStr] = None
|
|
role: str = "dak_mitarbeiter"
|
|
expires_in_days: int = 7
|
|
|
|
|
|
class InvitationResponse(BaseModel):
|
|
"""Public representation of an invitation link."""
|
|
|
|
id: int
|
|
token: str
|
|
email: Optional[str] = None
|
|
role: str
|
|
created_by: Optional[int] = None
|
|
expires_at: datetime
|
|
used_at: Optional[datetime] = None
|
|
used_by: Optional[int] = None
|
|
is_active: bool
|
|
created_at: datetime
|
|
|
|
model_config = {"from_attributes": True}
|
|
|
|
|
|
class AuditLogResponse(BaseModel):
|
|
"""Single audit-log entry returned by the admin endpoint."""
|
|
|
|
id: int
|
|
user_id: Optional[int] = None
|
|
action: str
|
|
entity_type: Optional[str] = None
|
|
entity_id: Optional[int] = None
|
|
old_values: Optional[dict] = None
|
|
new_values: Optional[dict] = None
|
|
ip_address: Optional[str] = None
|
|
user_agent: Optional[str] = None
|
|
created_at: datetime
|
|
|
|
model_config = {"from_attributes": True}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Users CRUD
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@router.get("/users", response_model=list[UserResponse])
|
|
def list_users(
|
|
skip: int = Query(0, ge=0),
|
|
limit: int = Query(50, ge=1, le=200),
|
|
admin: User = Depends(require_admin),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""Return a paginated list of all users."""
|
|
users = db.query(User).offset(skip).limit(limit).all()
|
|
return users
|
|
|
|
|
|
@router.post("/users", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
|
|
def create_user(
|
|
payload: UserCreate,
|
|
request: Request,
|
|
admin: User = Depends(require_admin),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""Create a new user directly (admin bypass -- no invitation required)."""
|
|
existing = (
|
|
db.query(User)
|
|
.filter((User.email == payload.email) | (User.username == payload.username))
|
|
.first()
|
|
)
|
|
if existing:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_409_CONFLICT,
|
|
detail="User with this email or username already exists",
|
|
)
|
|
|
|
user = User(
|
|
username=payload.username,
|
|
email=payload.email,
|
|
password_hash=hash_password(payload.password),
|
|
role=payload.role,
|
|
)
|
|
db.add(user)
|
|
db.commit()
|
|
db.refresh(user)
|
|
|
|
log_action(
|
|
db,
|
|
user_id=admin.id,
|
|
action="user_created",
|
|
entity_type="user",
|
|
entity_id=user.id,
|
|
new_values={"username": user.username, "email": user.email, "role": user.role},
|
|
ip_address=request.client.host if request.client else None,
|
|
user_agent=request.headers.get("user-agent"),
|
|
)
|
|
|
|
return user
|
|
|
|
|
|
@router.put("/users/{user_id}", response_model=UserResponse)
|
|
def update_user(
|
|
user_id: int,
|
|
payload: UserUpdate,
|
|
request: Request,
|
|
admin: User = Depends(require_admin),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""Update an existing user's profile fields."""
|
|
user = db.query(User).filter(User.id == user_id).first()
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="User not found",
|
|
)
|
|
|
|
old_values: dict = {}
|
|
new_values: dict = {}
|
|
|
|
update_data = payload.model_dump(exclude_unset=True)
|
|
for field, value in update_data.items():
|
|
current = getattr(user, field)
|
|
if current != value:
|
|
old_values[field] = current
|
|
new_values[field] = value
|
|
setattr(user, field, value)
|
|
|
|
if new_values:
|
|
db.commit()
|
|
db.refresh(user)
|
|
|
|
log_action(
|
|
db,
|
|
user_id=admin.id,
|
|
action="user_updated",
|
|
entity_type="user",
|
|
entity_id=user.id,
|
|
old_values=old_values,
|
|
new_values=new_values,
|
|
ip_address=request.client.host if request.client else None,
|
|
user_agent=request.headers.get("user-agent"),
|
|
)
|
|
|
|
return user
|
|
|
|
|
|
@router.delete("/users/{user_id}/mfa")
|
|
def admin_reset_mfa(
|
|
user_id: int,
|
|
request: Request,
|
|
admin: User = Depends(require_admin),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""Admin: reset MFA on a user's account."""
|
|
user = db.query(User).filter(User.id == user_id).first()
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
|
|
user.mfa_secret = None
|
|
user.mfa_enabled = False
|
|
db.commit()
|
|
|
|
log_action(
|
|
db,
|
|
user_id=admin.id,
|
|
action="mfa_reset",
|
|
entity_type="user",
|
|
entity_id=user.id,
|
|
ip_address=request.client.host if request.client else None,
|
|
user_agent=request.headers.get("user-agent"),
|
|
)
|
|
return {"detail": "MFA reset"}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Invitations
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@router.post(
|
|
"/invitations",
|
|
response_model=InvitationResponse,
|
|
status_code=status.HTTP_201_CREATED,
|
|
)
|
|
def create_invitation(
|
|
payload: InvitationCreate,
|
|
request: Request,
|
|
admin: User = Depends(require_admin),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""Generate a new invitation link (optionally bound to a specific email)."""
|
|
token = secrets.token_urlsafe(32)
|
|
expires_at = datetime.now(timezone.utc) + timedelta(days=payload.expires_in_days)
|
|
|
|
invitation = InvitationLink(
|
|
token=token,
|
|
email=payload.email,
|
|
role=payload.role,
|
|
created_by=admin.id,
|
|
expires_at=expires_at,
|
|
)
|
|
db.add(invitation)
|
|
db.commit()
|
|
db.refresh(invitation)
|
|
|
|
log_action(
|
|
db,
|
|
user_id=admin.id,
|
|
action="invitation_created",
|
|
entity_type="invitation",
|
|
entity_id=invitation.id,
|
|
new_values={
|
|
"email": payload.email,
|
|
"role": payload.role,
|
|
"expires_at": expires_at.isoformat(),
|
|
},
|
|
ip_address=request.client.host if request.client else None,
|
|
user_agent=request.headers.get("user-agent"),
|
|
)
|
|
|
|
# Send invitation email with PDF guide (fire-and-forget)
|
|
if payload.email:
|
|
_send_invitation_email(payload.email, token, expires_at)
|
|
|
|
return invitation
|
|
|
|
|
|
@router.get("/invitations", response_model=list[InvitationResponse])
|
|
def list_invitations(
|
|
skip: int = Query(0, ge=0),
|
|
limit: int = Query(50, ge=1, le=200),
|
|
admin: User = Depends(require_admin),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""Return a paginated list of all invitation links."""
|
|
invitations = (
|
|
db.query(InvitationLink)
|
|
.order_by(InvitationLink.created_at.desc())
|
|
.offset(skip)
|
|
.limit(limit)
|
|
.all()
|
|
)
|
|
return invitations
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Audit log
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@router.get("/audit-log", response_model=list[AuditLogResponse])
|
|
def get_audit_log(
|
|
skip: int = Query(0, ge=0),
|
|
limit: int = Query(50, ge=1, le=200),
|
|
user_id: Optional[int] = Query(None),
|
|
action: Optional[str] = Query(None),
|
|
date_from: Optional[datetime] = Query(None),
|
|
date_to: Optional[datetime] = Query(None),
|
|
admin: User = Depends(require_admin),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""Return a paginated, filterable audit log."""
|
|
query = db.query(AuditLog)
|
|
|
|
if user_id is not None:
|
|
query = query.filter(AuditLog.user_id == user_id)
|
|
if action is not None:
|
|
query = query.filter(AuditLog.action == action)
|
|
if date_from is not None:
|
|
query = query.filter(AuditLog.created_at >= date_from)
|
|
if date_to is not None:
|
|
query = query.filter(AuditLog.created_at <= date_to)
|
|
|
|
entries = (
|
|
query.order_by(AuditLog.created_at.desc())
|
|
.offset(skip)
|
|
.limit(limit)
|
|
.all()
|
|
)
|
|
return entries
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Disclosure requests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@router.get("/disclosure-requests", response_model=list[DisclosureRequestResponse])
|
|
def list_disclosure_requests(
|
|
status_filter: Optional[str] = Query(None, alias="status"),
|
|
admin: User = Depends(require_admin),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""List disclosure requests. Without status filter, returns all."""
|
|
query = db.query(DisclosureRequest)
|
|
if status_filter:
|
|
query = query.filter(DisclosureRequest.status == status_filter)
|
|
|
|
requests = query.order_by(DisclosureRequest.created_at.desc()).all()
|
|
|
|
result = []
|
|
for dr in requests:
|
|
result.append(DisclosureRequestResponse(
|
|
id=dr.id, case_id=dr.case_id, requester_id=dr.requester_id,
|
|
requester_username=dr.requester.username if dr.requester else None,
|
|
fall_id=dr.case.fall_id if dr.case else None,
|
|
reason=dr.reason, status=dr.status,
|
|
reviewed_by=dr.reviewed_by, reviewed_at=dr.reviewed_at,
|
|
expires_at=dr.expires_at, created_at=dr.created_at,
|
|
))
|
|
return result
|
|
|
|
|
|
@router.get("/disclosure-requests/count", response_model=DisclosureCountResponse)
|
|
def disclosure_request_count(
|
|
admin: User = Depends(require_admin),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""Return count of pending disclosure requests."""
|
|
return DisclosureCountResponse(pending_count=get_pending_count(db))
|
|
|
|
|
|
@router.put("/disclosure-requests/{request_id}", response_model=DisclosureRequestResponse)
|
|
def review_disclosure(
|
|
request_id: int,
|
|
payload: DisclosureRequestUpdate,
|
|
request: Request,
|
|
admin: User = Depends(require_admin),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""Approve or reject a disclosure request."""
|
|
if payload.status not in ("approved", "rejected"):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
|
|
detail="Status must be 'approved' or 'rejected'",
|
|
)
|
|
try:
|
|
dr = review_disclosure_request(db, request_id, admin.id, payload.status)
|
|
except ValueError as e:
|
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
|
|
|
|
log_action(
|
|
db, user_id=admin.id, action=f"disclosure_{payload.status}",
|
|
entity_type="disclosure_request", entity_id=request_id,
|
|
new_values={"status": payload.status, "case_id": dr.case_id},
|
|
ip_address=request.client.host if request.client else None,
|
|
user_agent=request.headers.get("user-agent"),
|
|
)
|
|
|
|
return DisclosureRequestResponse(
|
|
id=dr.id, case_id=dr.case_id, requester_id=dr.requester_id,
|
|
requester_username=dr.requester.username if dr.requester else None,
|
|
fall_id=dr.case.fall_id if dr.case else None,
|
|
reason=dr.reason, status=dr.status,
|
|
reviewed_by=dr.reviewed_by, reviewed_at=dr.reviewed_at,
|
|
expires_at=dr.expires_at, created_at=dr.created_at,
|
|
)
|
|
|
|
|
|
@router.put("/disclosure-requests/{request_id}/revoke", response_model=DisclosureRequestResponse)
|
|
def admin_revoke_disclosure(
|
|
request_id: int,
|
|
request: Request,
|
|
admin: User = Depends(require_admin),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""Admin: revoke an active disclosure (sets expires_at to now)."""
|
|
try:
|
|
dr = revoke_disclosure(db, request_id, admin.id, admin=True)
|
|
except ValueError as e:
|
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
|
|
|
|
log_action(
|
|
db, user_id=admin.id, action="disclosure_admin_revoked",
|
|
entity_type="disclosure_request", entity_id=request_id,
|
|
new_values={"expires_at": dr.expires_at.isoformat() if dr.expires_at else None},
|
|
ip_address=request.client.host if request.client else None,
|
|
user_agent=request.headers.get("user-agent"),
|
|
)
|
|
|
|
return DisclosureRequestResponse(
|
|
id=dr.id, case_id=dr.case_id, requester_id=dr.requester_id,
|
|
requester_username=dr.requester.username if dr.requester else None,
|
|
fall_id=dr.case.fall_id if dr.case else None,
|
|
reason=dr.reason, status=dr.status,
|
|
reviewed_by=dr.reviewed_by, reviewed_at=dr.reviewed_at,
|
|
expires_at=dr.expires_at, created_at=dr.created_at,
|
|
)
|
|
|
|
|
|
@router.put("/disclosure-requests/{request_id}/reactivate", response_model=DisclosureRequestResponse)
|
|
def admin_reactivate_disclosure(
|
|
request_id: int,
|
|
request: Request,
|
|
admin: User = Depends(require_admin),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""Admin: reactivate an expired/revoked disclosure (new 24h window)."""
|
|
try:
|
|
dr = reactivate_disclosure(db, request_id)
|
|
except ValueError as e:
|
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
|
|
|
|
log_action(
|
|
db, user_id=admin.id, action="disclosure_reactivated",
|
|
entity_type="disclosure_request", entity_id=request_id,
|
|
new_values={"expires_at": dr.expires_at.isoformat() if dr.expires_at else None},
|
|
ip_address=request.client.host if request.client else None,
|
|
user_agent=request.headers.get("user-agent"),
|
|
)
|
|
|
|
return DisclosureRequestResponse(
|
|
id=dr.id, case_id=dr.case_id, requester_id=dr.requester_id,
|
|
requester_username=dr.requester.username if dr.requester else None,
|
|
fall_id=dr.case.fall_id if dr.case else None,
|
|
reason=dr.reason, status=dr.status,
|
|
reviewed_by=dr.reviewed_by, reviewed_at=dr.reviewed_at,
|
|
expires_at=dr.expires_at, created_at=dr.created_at,
|
|
)
|
|
|
|
|
|
@router.delete("/disclosure-requests/{request_id}", status_code=status.HTTP_204_NO_CONTENT)
|
|
def admin_delete_disclosure_request(
|
|
request_id: int,
|
|
request: Request,
|
|
admin: User = Depends(require_admin),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""Admin: permanently delete a non-active disclosure request."""
|
|
try:
|
|
admin_delete_disclosure(db, request_id)
|
|
except ValueError as e:
|
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
|
|
|
|
log_action(
|
|
db, user_id=admin.id, action="disclosure_admin_deleted",
|
|
entity_type="disclosure_request", entity_id=request_id,
|
|
ip_address=request.client.host if request.client else None,
|
|
user_agent=request.headers.get("user-agent"),
|
|
)
|