"""Admin-only API endpoints: user CRUD, invitation management, audit log.""" import secrets from datetime import datetime, timedelta, timezone from typing import Optional from fastapi import APIRouter, Depends, HTTPException, Query, Request, status from pydantic import BaseModel, EmailStr from sqlalchemy.orm import Session from app.core.dependencies import require_admin from app.core.security import hash_password from app.database import get_db from app.models.audit import AuditLog from app.models.user import InvitationLink, User from app.schemas.user import UserCreate, UserResponse, UserUpdate from app.services.audit_service import log_action router = APIRouter() # --------------------------------------------------------------------------- # Invitation schemas (kept co-located with the admin router for simplicity) # --------------------------------------------------------------------------- class InvitationCreate(BaseModel): """Payload for creating a new invitation link.""" email: Optional[EmailStr] = None role: str = "dak_mitarbeiter" expires_in_days: int = 7 class InvitationResponse(BaseModel): """Public representation of an invitation link.""" id: int token: str email: Optional[str] = None role: str created_by: Optional[int] = None expires_at: datetime used_at: Optional[datetime] = None used_by: Optional[int] = None is_active: bool created_at: datetime model_config = {"from_attributes": True} class AuditLogResponse(BaseModel): """Single audit-log entry returned by the admin endpoint.""" id: int user_id: Optional[int] = None action: str entity_type: Optional[str] = None entity_id: Optional[int] = None old_values: Optional[dict] = None new_values: Optional[dict] = None ip_address: Optional[str] = None user_agent: Optional[str] = None created_at: datetime model_config = {"from_attributes": True} # --------------------------------------------------------------------------- # Users CRUD # --------------------------------------------------------------------------- @router.get("/users", response_model=list[UserResponse]) def list_users( skip: int = Query(0, ge=0), limit: int = Query(50, ge=1, le=200), admin: User = Depends(require_admin), db: Session = Depends(get_db), ): """Return a paginated list of all users.""" users = db.query(User).offset(skip).limit(limit).all() return users @router.post("/users", response_model=UserResponse, status_code=status.HTTP_201_CREATED) def create_user( payload: UserCreate, request: Request, admin: User = Depends(require_admin), db: Session = Depends(get_db), ): """Create a new user directly (admin bypass -- no invitation required).""" existing = ( db.query(User) .filter((User.email == payload.email) | (User.username == payload.username)) .first() ) if existing: raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail="User with this email or username already exists", ) user = User( username=payload.username, email=payload.email, password_hash=hash_password(payload.password), role=payload.role, ) db.add(user) db.commit() db.refresh(user) log_action( db, user_id=admin.id, action="user_created", entity_type="user", entity_id=user.id, new_values={"username": user.username, "email": user.email, "role": user.role}, ip_address=request.client.host if request.client else None, user_agent=request.headers.get("user-agent"), ) return user @router.put("/users/{user_id}", response_model=UserResponse) def update_user( user_id: int, payload: UserUpdate, request: Request, admin: User = Depends(require_admin), db: Session = Depends(get_db), ): """Update an existing user's profile fields.""" user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found", ) old_values: dict = {} new_values: dict = {} update_data = payload.model_dump(exclude_unset=True) for field, value in update_data.items(): current = getattr(user, field) if current != value: old_values[field] = current new_values[field] = value setattr(user, field, value) if new_values: db.commit() db.refresh(user) log_action( db, user_id=admin.id, action="user_updated", entity_type="user", entity_id=user.id, old_values=old_values, new_values=new_values, ip_address=request.client.host if request.client else None, user_agent=request.headers.get("user-agent"), ) return user # --------------------------------------------------------------------------- # Invitations # --------------------------------------------------------------------------- @router.post( "/invitations", response_model=InvitationResponse, status_code=status.HTTP_201_CREATED, ) def create_invitation( payload: InvitationCreate, request: Request, admin: User = Depends(require_admin), db: Session = Depends(get_db), ): """Generate a new invitation link (optionally bound to a specific email).""" token = secrets.token_urlsafe(32) expires_at = datetime.now(timezone.utc) + timedelta(days=payload.expires_in_days) invitation = InvitationLink( token=token, email=payload.email, role=payload.role, created_by=admin.id, expires_at=expires_at, ) db.add(invitation) db.commit() db.refresh(invitation) log_action( db, user_id=admin.id, action="invitation_created", entity_type="invitation", entity_id=invitation.id, new_values={ "email": payload.email, "role": payload.role, "expires_at": expires_at.isoformat(), }, ip_address=request.client.host if request.client else None, user_agent=request.headers.get("user-agent"), ) return invitation @router.get("/invitations", response_model=list[InvitationResponse]) def list_invitations( skip: int = Query(0, ge=0), limit: int = Query(50, ge=1, le=200), admin: User = Depends(require_admin), db: Session = Depends(get_db), ): """Return a paginated list of all invitation links.""" invitations = ( db.query(InvitationLink) .order_by(InvitationLink.created_at.desc()) .offset(skip) .limit(limit) .all() ) return invitations # --------------------------------------------------------------------------- # Audit log # --------------------------------------------------------------------------- @router.get("/audit-log", response_model=list[AuditLogResponse]) def get_audit_log( skip: int = Query(0, ge=0), limit: int = Query(50, ge=1, le=200), user_id: Optional[int] = Query(None), action: Optional[str] = Query(None), date_from: Optional[datetime] = Query(None), date_to: Optional[datetime] = Query(None), admin: User = Depends(require_admin), db: Session = Depends(get_db), ): """Return a paginated, filterable audit log.""" query = db.query(AuditLog) if user_id is not None: query = query.filter(AuditLog.user_id == user_id) if action is not None: query = query.filter(AuditLog.action == action) if date_from is not None: query = query.filter(AuditLog.created_at >= date_from) if date_to is not None: query = query.filter(AuditLog.created_at <= date_to) entries = ( query.order_by(AuditLog.created_at.desc()) .offset(skip) .limit(limit) .all() ) return entries