"""FastAPI dependency functions for authentication and authorisation.""" from fastapi import Depends, HTTPException, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from jose import JWTError from sqlalchemy.orm import Session from app.core.security import decode_access_token from app.database import get_db from app.models.user import User security = HTTPBearer() def get_current_user( credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db), ) -> User: """Extract and validate the JWT bearer token, then return the active user. Raises 401 if the token is invalid/expired or the user is inactive. """ try: payload = decode_access_token(credentials.credentials) user_id = int(payload["sub"]) except (JWTError, KeyError, ValueError): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token", ) user = ( db.query(User) .filter(User.id == user_id, User.is_active == True) # noqa: E712 .first() ) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found or inactive", ) return user def require_admin(user: User = Depends(get_current_user)) -> User: """Require the authenticated user to have the ``admin`` role. Raises 403 if the user is not an admin. """ if user.role != "admin": raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required", ) return user