dak.c2s/backend/app/api/admin.py
CCS Admin fc609401c3 feat: add MFA disable (self-service + admin reset) endpoints
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-26 09:40:57 +00:00

300 lines
8.6 KiB
Python

"""Admin-only API endpoints: user CRUD, invitation management, audit log."""
import secrets
from datetime import datetime, timedelta, timezone
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
from pydantic import BaseModel, EmailStr
from sqlalchemy.orm import Session
from app.core.dependencies import require_admin
from app.core.security import hash_password
from app.database import get_db
from app.models.audit import AuditLog
from app.models.user import InvitationLink, User
from app.schemas.user import UserCreate, UserResponse, UserUpdate
from app.services.audit_service import log_action
router = APIRouter()
# ---------------------------------------------------------------------------
# Invitation schemas (kept co-located with the admin router for simplicity)
# ---------------------------------------------------------------------------
class InvitationCreate(BaseModel):
"""Payload for creating a new invitation link."""
email: Optional[EmailStr] = None
role: str = "dak_mitarbeiter"
expires_in_days: int = 7
class InvitationResponse(BaseModel):
"""Public representation of an invitation link."""
id: int
token: str
email: Optional[str] = None
role: str
created_by: Optional[int] = None
expires_at: datetime
used_at: Optional[datetime] = None
used_by: Optional[int] = None
is_active: bool
created_at: datetime
model_config = {"from_attributes": True}
class AuditLogResponse(BaseModel):
"""Single audit-log entry returned by the admin endpoint."""
id: int
user_id: Optional[int] = None
action: str
entity_type: Optional[str] = None
entity_id: Optional[int] = None
old_values: Optional[dict] = None
new_values: Optional[dict] = None
ip_address: Optional[str] = None
user_agent: Optional[str] = None
created_at: datetime
model_config = {"from_attributes": True}
# ---------------------------------------------------------------------------
# Users CRUD
# ---------------------------------------------------------------------------
@router.get("/users", response_model=list[UserResponse])
def list_users(
skip: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=200),
admin: User = Depends(require_admin),
db: Session = Depends(get_db),
):
"""Return a paginated list of all users."""
users = db.query(User).offset(skip).limit(limit).all()
return users
@router.post("/users", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
def create_user(
payload: UserCreate,
request: Request,
admin: User = Depends(require_admin),
db: Session = Depends(get_db),
):
"""Create a new user directly (admin bypass -- no invitation required)."""
existing = (
db.query(User)
.filter((User.email == payload.email) | (User.username == payload.username))
.first()
)
if existing:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="User with this email or username already exists",
)
user = User(
username=payload.username,
email=payload.email,
password_hash=hash_password(payload.password),
role=payload.role,
)
db.add(user)
db.commit()
db.refresh(user)
log_action(
db,
user_id=admin.id,
action="user_created",
entity_type="user",
entity_id=user.id,
new_values={"username": user.username, "email": user.email, "role": user.role},
ip_address=request.client.host if request.client else None,
user_agent=request.headers.get("user-agent"),
)
return user
@router.put("/users/{user_id}", response_model=UserResponse)
def update_user(
user_id: int,
payload: UserUpdate,
request: Request,
admin: User = Depends(require_admin),
db: Session = Depends(get_db),
):
"""Update an existing user's profile fields."""
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found",
)
old_values: dict = {}
new_values: dict = {}
update_data = payload.model_dump(exclude_unset=True)
for field, value in update_data.items():
current = getattr(user, field)
if current != value:
old_values[field] = current
new_values[field] = value
setattr(user, field, value)
if new_values:
db.commit()
db.refresh(user)
log_action(
db,
user_id=admin.id,
action="user_updated",
entity_type="user",
entity_id=user.id,
old_values=old_values,
new_values=new_values,
ip_address=request.client.host if request.client else None,
user_agent=request.headers.get("user-agent"),
)
return user
@router.delete("/users/{user_id}/mfa")
def admin_reset_mfa(
user_id: int,
request: Request,
admin: User = Depends(require_admin),
db: Session = Depends(get_db),
):
"""Admin: reset MFA on a user's account."""
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
user.mfa_secret = None
user.mfa_enabled = False
db.commit()
log_action(
db,
user_id=admin.id,
action="mfa_reset",
entity_type="user",
entity_id=user.id,
ip_address=request.client.host if request.client else None,
user_agent=request.headers.get("user-agent"),
)
return {"detail": "MFA reset"}
# ---------------------------------------------------------------------------
# Invitations
# ---------------------------------------------------------------------------
@router.post(
"/invitations",
response_model=InvitationResponse,
status_code=status.HTTP_201_CREATED,
)
def create_invitation(
payload: InvitationCreate,
request: Request,
admin: User = Depends(require_admin),
db: Session = Depends(get_db),
):
"""Generate a new invitation link (optionally bound to a specific email)."""
token = secrets.token_urlsafe(32)
expires_at = datetime.now(timezone.utc) + timedelta(days=payload.expires_in_days)
invitation = InvitationLink(
token=token,
email=payload.email,
role=payload.role,
created_by=admin.id,
expires_at=expires_at,
)
db.add(invitation)
db.commit()
db.refresh(invitation)
log_action(
db,
user_id=admin.id,
action="invitation_created",
entity_type="invitation",
entity_id=invitation.id,
new_values={
"email": payload.email,
"role": payload.role,
"expires_at": expires_at.isoformat(),
},
ip_address=request.client.host if request.client else None,
user_agent=request.headers.get("user-agent"),
)
return invitation
@router.get("/invitations", response_model=list[InvitationResponse])
def list_invitations(
skip: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=200),
admin: User = Depends(require_admin),
db: Session = Depends(get_db),
):
"""Return a paginated list of all invitation links."""
invitations = (
db.query(InvitationLink)
.order_by(InvitationLink.created_at.desc())
.offset(skip)
.limit(limit)
.all()
)
return invitations
# ---------------------------------------------------------------------------
# Audit log
# ---------------------------------------------------------------------------
@router.get("/audit-log", response_model=list[AuditLogResponse])
def get_audit_log(
skip: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=200),
user_id: Optional[int] = Query(None),
action: Optional[str] = Query(None),
date_from: Optional[datetime] = Query(None),
date_to: Optional[datetime] = Query(None),
admin: User = Depends(require_admin),
db: Session = Depends(get_db),
):
"""Return a paginated, filterable audit log."""
query = db.query(AuditLog)
if user_id is not None:
query = query.filter(AuditLog.user_id == user_id)
if action is not None:
query = query.filter(AuditLog.action == action)
if date_from is not None:
query = query.filter(AuditLog.created_at >= date_from)
if date_to is not None:
query = query.filter(AuditLog.created_at <= date_to)
entries = (
query.order_by(AuditLog.created_at.desc())
.offset(skip)
.limit(limit)
.all()
)
return entries