dak.c2s/backend/app/core/dependencies.py
CCS Admin 178d40d036 feat: JWT auth, bcrypt, MFA, dependency injection, security tests
Add core security layer:
- security.py: password hashing (bcrypt), JWT access/refresh tokens,
  SHA-256 token hashing, TOTP MFA (generate, verify, provisioning URI),
  plus passlib/bcrypt 5.x compatibility patch
- dependencies.py: FastAPI deps for get_current_user (Bearer JWT) and
  require_admin (role check)
- exceptions.py: domain-specific HTTP exceptions (CaseNotFound,
  DuplicateCase, InvalidImportFile, ICDValidation, AccountLocked,
  InvalidCredentials)
- test_security.py: 9 tests covering all security functions

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-24 07:41:35 +00:00

55 lines
1.6 KiB
Python

"""FastAPI dependency functions for authentication and authorisation."""
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError
from sqlalchemy.orm import Session
from app.core.security import decode_access_token
from app.database import get_db
from app.models.user import User
security = HTTPBearer()
def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db),
) -> User:
"""Extract and validate the JWT bearer token, then return the active user.
Raises 401 if the token is invalid/expired or the user is inactive.
"""
try:
payload = decode_access_token(credentials.credentials)
user_id = int(payload["sub"])
except (JWTError, KeyError, ValueError):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token",
)
user = (
db.query(User)
.filter(User.id == user_id, User.is_active == True) # noqa: E712
.first()
)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found or inactive",
)
return user
def require_admin(user: User = Depends(get_current_user)) -> User:
"""Require the authenticated user to have the ``admin`` role.
Raises 403 if the user is not an admin.
"""
if user.role != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin access required",
)
return user