"""Auth API router: login, register, refresh, logout, MFA, password change.""" import os import time from fastapi import APIRouter, Depends, File, HTTPException, Request, UploadFile from sqlalchemy.orm import Session from app.core.dependencies import get_current_user from app.database import get_db from app.services.audit_service import log_action from app.models.user import User from app.schemas.auth import ( ChangePasswordRequest, LoginRequest, MFADisableRequest, MFASetupResponse, MFAVerifyRequest, RefreshRequest, RegisterRequest, TokenResponse, ) from app.schemas.user import ProfileUpdate, UserResponse from app.services.auth_service import ( activate_mfa, authenticate_user, change_password, create_tokens, disable_mfa, refresh_access_token, register_user, revoke_refresh_token, setup_mfa, update_profile, ) router = APIRouter() # --------------------------------------------------------------------------- # Public endpoints (no auth required) # --------------------------------------------------------------------------- @router.post("/login", response_model=TokenResponse) def login(data: LoginRequest, request: Request, db: Session = Depends(get_db)): """Authenticate with email/password (+ optional MFA) and receive tokens.""" user = authenticate_user(db, data.email, data.password, data.mfa_code) access, refresh = create_tokens(db, user) log_action( db, user_id=user.id, action="login", entity_type="user", entity_id=user.id, ip_address=request.client.host if request.client else None, user_agent=request.headers.get("user-agent"), ) return TokenResponse( access_token=access, refresh_token=refresh, user=UserResponse.model_validate(user), ) @router.post("/register", response_model=UserResponse) def register(data: RegisterRequest, db: Session = Depends(get_db)): """Self-service registration (invitation token or whitelisted domain).""" user = register_user( db, username=data.username, email=data.email, password=data.password, invitation_token=data.invitation_token, ) return UserResponse.model_validate(user) @router.post("/refresh", response_model=TokenResponse) def refresh(data: RefreshRequest, db: Session = Depends(get_db)): """Exchange a valid refresh token for a new access token.""" access, user = refresh_access_token(db, data.refresh_token) return TokenResponse( access_token=access, refresh_token=data.refresh_token, user=UserResponse.model_validate(user), ) # --------------------------------------------------------------------------- # Authenticated endpoints # --------------------------------------------------------------------------- @router.post("/logout") def logout( data: RefreshRequest, request: Request, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Revoke the supplied refresh token (logout).""" revoke_refresh_token(db, data.refresh_token) log_action( db, user_id=current_user.id, action="logout", entity_type="user", entity_id=current_user.id, ip_address=request.client.host if request.client else None, user_agent=request.headers.get("user-agent"), ) return {"detail": "Logged out"} @router.post("/mfa/setup", response_model=MFASetupResponse) def mfa_setup(current_user: User = Depends(get_current_user)): """Generate a new TOTP secret and QR provisioning URI. The client must store the returned *secret* and send it back together with a valid TOTP code to ``POST /mfa/verify`` to finalise activation. """ secret, uri = setup_mfa(current_user) return MFASetupResponse(secret=secret, qr_uri=uri) @router.post("/mfa/verify") def mfa_verify( data: MFAVerifyRequest, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Verify the TOTP code against the secret from /mfa/setup and enable MFA. MFA setup flow: 1. ``POST /mfa/setup`` --> ``{secret, qr_uri}`` 2. User scans QR in authenticator app. 3. ``POST /mfa/verify`` with ``{secret, code}`` --> MFA enabled. """ activate_mfa(db, current_user, data.secret, data.code) return {"detail": "MFA enabled"} @router.delete("/mfa") def mfa_disable( data: MFADisableRequest, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Disable MFA on the authenticated user's account (requires password).""" disable_mfa(db, current_user, data.password) return {"detail": "MFA disabled"} @router.post("/change-password") def change_password_endpoint( data: ChangePasswordRequest, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Change the authenticated user's password.""" change_password(db, current_user, data.old_password, data.new_password) return {"detail": "Password changed"} @router.get("/me", response_model=UserResponse) def me(current_user: User = Depends(get_current_user)): """Return the currently authenticated user's profile.""" return UserResponse.model_validate(current_user) @router.put("/profile", response_model=UserResponse) def update_profile_endpoint( data: ProfileUpdate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Update the authenticated user's profile.""" update_data = data.model_dump(exclude_unset=True) user = update_profile(db, current_user, **update_data) return UserResponse.model_validate(user) # --------------------------------------------------------------------------- # Avatar upload / delete # --------------------------------------------------------------------------- ALLOWED_IMAGE_TYPES = {"image/jpeg", "image/png"} MAX_AVATAR_SIZE = 2 * 1024 * 1024 # 2MB @router.post("/avatar", response_model=UserResponse) async def upload_avatar( file: UploadFile = File(...), db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Upload or replace the authenticated user's avatar image.""" if file.content_type not in ALLOWED_IMAGE_TYPES: raise HTTPException(status_code=400, detail="Only JPG and PNG images are allowed") contents = await file.read() if len(contents) > MAX_AVATAR_SIZE: raise HTTPException(status_code=400, detail="Image must be smaller than 2MB") ext = "jpg" if file.content_type == "image/jpeg" else "png" filename = f"{current_user.id}_{int(time.time())}.{ext}" uploads_dir = os.path.join( os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "uploads", "avatars", ) os.makedirs(uploads_dir, exist_ok=True) # Delete old avatar file if exists if current_user.avatar_url: old_file = os.path.join( os.path.dirname(os.path.dirname(os.path.dirname(__file__))), current_user.avatar_url.lstrip("/"), ) if os.path.isfile(old_file): os.remove(old_file) filepath = os.path.join(uploads_dir, filename) with open(filepath, "wb") as f: f.write(contents) current_user.avatar_url = f"/uploads/avatars/{filename}" db.commit() db.refresh(current_user) return UserResponse.model_validate(current_user) @router.delete("/avatar", response_model=UserResponse) def delete_avatar( db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Remove the authenticated user's avatar image.""" if current_user.avatar_url: old_file = os.path.join( os.path.dirname(os.path.dirname(os.path.dirname(__file__))), current_user.avatar_url.lstrip("/"), ) if os.path.isfile(old_file): os.remove(old_file) current_user.avatar_url = None db.commit() db.refresh(current_user) return UserResponse.model_validate(current_user)