dak.c2s/backend/app/api/admin.py
CCS Admin 32cee4d30d feat: add KPI links, My Disclosures page, and extend Admin Disclosures
- Dashboard KPI cards are now clickable with role-based links
- New GutachtenStatistikPage placeholder at /gutachten-statistik
- New "Meine Freigaben" page for DAK-Mitarbeiter to view/revoke own disclosures
- Backend: GET /cases/my-disclosure-requests, PUT /cases/disclosure-requests/{id}/revoke
- Admin Disclosures page: full history with status tabs and revoke capability
- Backend: PUT /admin/disclosure-requests/{id}/revoke, default shows all statuses
- Sidebar: "Meine Freigaben" entry for dak_mitarbeiter role

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-26 22:19:46 +00:00

413 lines
13 KiB
Python

"""Admin-only API endpoints: user CRUD, invitation management, audit log."""
import secrets
from datetime import datetime, timedelta, timezone
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
from pydantic import BaseModel, EmailStr
from sqlalchemy.orm import Session
from app.core.dependencies import require_admin
from app.core.security import hash_password
from app.database import get_db
from app.models.audit import AuditLog, DisclosureRequest
from app.models.user import InvitationLink, User
from app.schemas.disclosure import (
DisclosureCountResponse, DisclosureRequestResponse, DisclosureRequestUpdate,
)
from app.schemas.user import UserCreate, UserResponse, UserUpdate
from app.services.audit_service import log_action
from app.services.disclosure_service import (
get_pending_count, review_disclosure_request, revoke_disclosure,
)
router = APIRouter()
# ---------------------------------------------------------------------------
# Invitation schemas (kept co-located with the admin router for simplicity)
# ---------------------------------------------------------------------------
class InvitationCreate(BaseModel):
"""Payload for creating a new invitation link."""
email: Optional[EmailStr] = None
role: str = "dak_mitarbeiter"
expires_in_days: int = 7
class InvitationResponse(BaseModel):
"""Public representation of an invitation link."""
id: int
token: str
email: Optional[str] = None
role: str
created_by: Optional[int] = None
expires_at: datetime
used_at: Optional[datetime] = None
used_by: Optional[int] = None
is_active: bool
created_at: datetime
model_config = {"from_attributes": True}
class AuditLogResponse(BaseModel):
"""Single audit-log entry returned by the admin endpoint."""
id: int
user_id: Optional[int] = None
action: str
entity_type: Optional[str] = None
entity_id: Optional[int] = None
old_values: Optional[dict] = None
new_values: Optional[dict] = None
ip_address: Optional[str] = None
user_agent: Optional[str] = None
created_at: datetime
model_config = {"from_attributes": True}
# ---------------------------------------------------------------------------
# Users CRUD
# ---------------------------------------------------------------------------
@router.get("/users", response_model=list[UserResponse])
def list_users(
skip: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=200),
admin: User = Depends(require_admin),
db: Session = Depends(get_db),
):
"""Return a paginated list of all users."""
users = db.query(User).offset(skip).limit(limit).all()
return users
@router.post("/users", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
def create_user(
payload: UserCreate,
request: Request,
admin: User = Depends(require_admin),
db: Session = Depends(get_db),
):
"""Create a new user directly (admin bypass -- no invitation required)."""
existing = (
db.query(User)
.filter((User.email == payload.email) | (User.username == payload.username))
.first()
)
if existing:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="User with this email or username already exists",
)
user = User(
username=payload.username,
email=payload.email,
password_hash=hash_password(payload.password),
role=payload.role,
)
db.add(user)
db.commit()
db.refresh(user)
log_action(
db,
user_id=admin.id,
action="user_created",
entity_type="user",
entity_id=user.id,
new_values={"username": user.username, "email": user.email, "role": user.role},
ip_address=request.client.host if request.client else None,
user_agent=request.headers.get("user-agent"),
)
return user
@router.put("/users/{user_id}", response_model=UserResponse)
def update_user(
user_id: int,
payload: UserUpdate,
request: Request,
admin: User = Depends(require_admin),
db: Session = Depends(get_db),
):
"""Update an existing user's profile fields."""
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found",
)
old_values: dict = {}
new_values: dict = {}
update_data = payload.model_dump(exclude_unset=True)
for field, value in update_data.items():
current = getattr(user, field)
if current != value:
old_values[field] = current
new_values[field] = value
setattr(user, field, value)
if new_values:
db.commit()
db.refresh(user)
log_action(
db,
user_id=admin.id,
action="user_updated",
entity_type="user",
entity_id=user.id,
old_values=old_values,
new_values=new_values,
ip_address=request.client.host if request.client else None,
user_agent=request.headers.get("user-agent"),
)
return user
@router.delete("/users/{user_id}/mfa")
def admin_reset_mfa(
user_id: int,
request: Request,
admin: User = Depends(require_admin),
db: Session = Depends(get_db),
):
"""Admin: reset MFA on a user's account."""
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
user.mfa_secret = None
user.mfa_enabled = False
db.commit()
log_action(
db,
user_id=admin.id,
action="mfa_reset",
entity_type="user",
entity_id=user.id,
ip_address=request.client.host if request.client else None,
user_agent=request.headers.get("user-agent"),
)
return {"detail": "MFA reset"}
# ---------------------------------------------------------------------------
# Invitations
# ---------------------------------------------------------------------------
@router.post(
"/invitations",
response_model=InvitationResponse,
status_code=status.HTTP_201_CREATED,
)
def create_invitation(
payload: InvitationCreate,
request: Request,
admin: User = Depends(require_admin),
db: Session = Depends(get_db),
):
"""Generate a new invitation link (optionally bound to a specific email)."""
token = secrets.token_urlsafe(32)
expires_at = datetime.now(timezone.utc) + timedelta(days=payload.expires_in_days)
invitation = InvitationLink(
token=token,
email=payload.email,
role=payload.role,
created_by=admin.id,
expires_at=expires_at,
)
db.add(invitation)
db.commit()
db.refresh(invitation)
log_action(
db,
user_id=admin.id,
action="invitation_created",
entity_type="invitation",
entity_id=invitation.id,
new_values={
"email": payload.email,
"role": payload.role,
"expires_at": expires_at.isoformat(),
},
ip_address=request.client.host if request.client else None,
user_agent=request.headers.get("user-agent"),
)
return invitation
@router.get("/invitations", response_model=list[InvitationResponse])
def list_invitations(
skip: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=200),
admin: User = Depends(require_admin),
db: Session = Depends(get_db),
):
"""Return a paginated list of all invitation links."""
invitations = (
db.query(InvitationLink)
.order_by(InvitationLink.created_at.desc())
.offset(skip)
.limit(limit)
.all()
)
return invitations
# ---------------------------------------------------------------------------
# Audit log
# ---------------------------------------------------------------------------
@router.get("/audit-log", response_model=list[AuditLogResponse])
def get_audit_log(
skip: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=200),
user_id: Optional[int] = Query(None),
action: Optional[str] = Query(None),
date_from: Optional[datetime] = Query(None),
date_to: Optional[datetime] = Query(None),
admin: User = Depends(require_admin),
db: Session = Depends(get_db),
):
"""Return a paginated, filterable audit log."""
query = db.query(AuditLog)
if user_id is not None:
query = query.filter(AuditLog.user_id == user_id)
if action is not None:
query = query.filter(AuditLog.action == action)
if date_from is not None:
query = query.filter(AuditLog.created_at >= date_from)
if date_to is not None:
query = query.filter(AuditLog.created_at <= date_to)
entries = (
query.order_by(AuditLog.created_at.desc())
.offset(skip)
.limit(limit)
.all()
)
return entries
# ---------------------------------------------------------------------------
# Disclosure requests
# ---------------------------------------------------------------------------
@router.get("/disclosure-requests", response_model=list[DisclosureRequestResponse])
def list_disclosure_requests(
status_filter: Optional[str] = Query(None, alias="status"),
admin: User = Depends(require_admin),
db: Session = Depends(get_db),
):
"""List disclosure requests. Without status filter, returns all."""
query = db.query(DisclosureRequest)
if status_filter:
query = query.filter(DisclosureRequest.status == status_filter)
requests = query.order_by(DisclosureRequest.created_at.desc()).all()
result = []
for dr in requests:
result.append(DisclosureRequestResponse(
id=dr.id, case_id=dr.case_id, requester_id=dr.requester_id,
requester_username=dr.requester.username if dr.requester else None,
fall_id=dr.case.fall_id if dr.case else None,
reason=dr.reason, status=dr.status,
reviewed_by=dr.reviewed_by, reviewed_at=dr.reviewed_at,
expires_at=dr.expires_at, created_at=dr.created_at,
))
return result
@router.get("/disclosure-requests/count", response_model=DisclosureCountResponse)
def disclosure_request_count(
admin: User = Depends(require_admin),
db: Session = Depends(get_db),
):
"""Return count of pending disclosure requests."""
return DisclosureCountResponse(pending_count=get_pending_count(db))
@router.put("/disclosure-requests/{request_id}", response_model=DisclosureRequestResponse)
def review_disclosure(
request_id: int,
payload: DisclosureRequestUpdate,
request: Request,
admin: User = Depends(require_admin),
db: Session = Depends(get_db),
):
"""Approve or reject a disclosure request."""
if payload.status not in ("approved", "rejected"):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Status must be 'approved' or 'rejected'",
)
try:
dr = review_disclosure_request(db, request_id, admin.id, payload.status)
except ValueError as e:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
log_action(
db, user_id=admin.id, action=f"disclosure_{payload.status}",
entity_type="disclosure_request", entity_id=request_id,
new_values={"status": payload.status, "case_id": dr.case_id},
ip_address=request.client.host if request.client else None,
user_agent=request.headers.get("user-agent"),
)
return DisclosureRequestResponse(
id=dr.id, case_id=dr.case_id, requester_id=dr.requester_id,
requester_username=dr.requester.username if dr.requester else None,
fall_id=dr.case.fall_id if dr.case else None,
reason=dr.reason, status=dr.status,
reviewed_by=dr.reviewed_by, reviewed_at=dr.reviewed_at,
expires_at=dr.expires_at, created_at=dr.created_at,
)
@router.put("/disclosure-requests/{request_id}/revoke", response_model=DisclosureRequestResponse)
def admin_revoke_disclosure(
request_id: int,
request: Request,
admin: User = Depends(require_admin),
db: Session = Depends(get_db),
):
"""Admin: revoke an active disclosure (sets expires_at to now)."""
try:
dr = revoke_disclosure(db, request_id, admin.id, admin=True)
except ValueError as e:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
log_action(
db, user_id=admin.id, action="disclosure_admin_revoked",
entity_type="disclosure_request", entity_id=request_id,
new_values={"expires_at": dr.expires_at.isoformat() if dr.expires_at else None},
ip_address=request.client.host if request.client else None,
user_agent=request.headers.get("user-agent"),
)
return DisclosureRequestResponse(
id=dr.id, case_id=dr.case_id, requester_id=dr.requester_id,
requester_username=dr.requester.username if dr.requester else None,
fall_id=dr.case.fall_id if dr.case else None,
reason=dr.reason, status=dr.status,
reviewed_by=dr.reviewed_by, reviewed_at=dr.reviewed_at,
expires_at=dr.expires_at, created_at=dr.created_at,
)