dak.c2s/backend/app/api/auth.py
CCS Admin 1fa5d20d40 feat: add PUT /api/auth/profile endpoint for self-service profile update
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-26 09:37:17 +00:00

165 lines
5.2 KiB
Python

"""Auth API router: login, register, refresh, logout, MFA, password change."""
from fastapi import APIRouter, Depends, Request
from sqlalchemy.orm import Session
from app.core.dependencies import get_current_user
from app.database import get_db
from app.services.audit_service import log_action
from app.models.user import User
from app.schemas.auth import (
ChangePasswordRequest,
LoginRequest,
MFASetupResponse,
MFAVerifyRequest,
RefreshRequest,
RegisterRequest,
TokenResponse,
)
from app.schemas.user import ProfileUpdate, UserResponse
from app.services.auth_service import (
activate_mfa,
authenticate_user,
change_password,
create_tokens,
refresh_access_token,
register_user,
revoke_refresh_token,
setup_mfa,
update_profile,
)
router = APIRouter()
# ---------------------------------------------------------------------------
# Public endpoints (no auth required)
# ---------------------------------------------------------------------------
@router.post("/login", response_model=TokenResponse)
def login(data: LoginRequest, request: Request, db: Session = Depends(get_db)):
"""Authenticate with email/password (+ optional MFA) and receive tokens."""
user = authenticate_user(db, data.email, data.password, data.mfa_code)
access, refresh = create_tokens(db, user)
log_action(
db,
user_id=user.id,
action="login",
entity_type="user",
entity_id=user.id,
ip_address=request.client.host if request.client else None,
user_agent=request.headers.get("user-agent"),
)
return TokenResponse(
access_token=access,
refresh_token=refresh,
user=UserResponse.model_validate(user),
)
@router.post("/register", response_model=UserResponse)
def register(data: RegisterRequest, db: Session = Depends(get_db)):
"""Self-service registration (invitation token or whitelisted domain)."""
user = register_user(
db,
username=data.username,
email=data.email,
password=data.password,
invitation_token=data.invitation_token,
)
return UserResponse.model_validate(user)
@router.post("/refresh", response_model=TokenResponse)
def refresh(data: RefreshRequest, db: Session = Depends(get_db)):
"""Exchange a valid refresh token for a new access token."""
access, user = refresh_access_token(db, data.refresh_token)
return TokenResponse(
access_token=access,
refresh_token=data.refresh_token,
user=UserResponse.model_validate(user),
)
# ---------------------------------------------------------------------------
# Authenticated endpoints
# ---------------------------------------------------------------------------
@router.post("/logout")
def logout(
data: RefreshRequest,
request: Request,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Revoke the supplied refresh token (logout)."""
revoke_refresh_token(db, data.refresh_token)
log_action(
db,
user_id=current_user.id,
action="logout",
entity_type="user",
entity_id=current_user.id,
ip_address=request.client.host if request.client else None,
user_agent=request.headers.get("user-agent"),
)
return {"detail": "Logged out"}
@router.post("/mfa/setup", response_model=MFASetupResponse)
def mfa_setup(current_user: User = Depends(get_current_user)):
"""Generate a new TOTP secret and QR provisioning URI.
The client must store the returned *secret* and send it back together
with a valid TOTP code to ``POST /mfa/verify`` to finalise activation.
"""
secret, uri = setup_mfa(current_user)
return MFASetupResponse(secret=secret, qr_uri=uri)
@router.post("/mfa/verify")
def mfa_verify(
data: MFAVerifyRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Verify the TOTP code against the secret from /mfa/setup and enable MFA.
MFA setup flow:
1. ``POST /mfa/setup`` --> ``{secret, qr_uri}``
2. User scans QR in authenticator app.
3. ``POST /mfa/verify`` with ``{secret, code}`` --> MFA enabled.
"""
activate_mfa(db, current_user, data.secret, data.code)
return {"detail": "MFA enabled"}
@router.post("/change-password")
def change_password_endpoint(
data: ChangePasswordRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Change the authenticated user's password."""
change_password(db, current_user, data.old_password, data.new_password)
return {"detail": "Password changed"}
@router.get("/me", response_model=UserResponse)
def me(current_user: User = Depends(get_current_user)):
"""Return the currently authenticated user's profile."""
return UserResponse.model_validate(current_user)
@router.put("/profile", response_model=UserResponse)
def update_profile_endpoint(
data: ProfileUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Update the authenticated user's profile."""
update_data = data.model_dump(exclude_unset=True)
user = update_profile(db, current_user, **update_data)
return UserResponse.model_validate(user)