mirror of
https://github.com/complexcaresolutions/dak.c2s.git
synced 2026-03-17 23:03:41 +00:00
Add core security layer: - security.py: password hashing (bcrypt), JWT access/refresh tokens, SHA-256 token hashing, TOTP MFA (generate, verify, provisioning URI), plus passlib/bcrypt 5.x compatibility patch - dependencies.py: FastAPI deps for get_current_user (Bearer JWT) and require_admin (role check) - exceptions.py: domain-specific HTTP exceptions (CaseNotFound, DuplicateCase, InvalidImportFile, ICDValidation, AccountLocked, InvalidCredentials) - test_security.py: 9 tests covering all security functions Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
55 lines
1.6 KiB
Python
55 lines
1.6 KiB
Python
"""FastAPI dependency functions for authentication and authorisation."""
|
|
|
|
from fastapi import Depends, HTTPException, status
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
|
from jose import JWTError
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.core.security import decode_access_token
|
|
from app.database import get_db
|
|
from app.models.user import User
|
|
|
|
security = HTTPBearer()
|
|
|
|
|
|
def get_current_user(
|
|
credentials: HTTPAuthorizationCredentials = Depends(security),
|
|
db: Session = Depends(get_db),
|
|
) -> User:
|
|
"""Extract and validate the JWT bearer token, then return the active user.
|
|
|
|
Raises 401 if the token is invalid/expired or the user is inactive.
|
|
"""
|
|
try:
|
|
payload = decode_access_token(credentials.credentials)
|
|
user_id = int(payload["sub"])
|
|
except (JWTError, KeyError, ValueError):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid token",
|
|
)
|
|
|
|
user = (
|
|
db.query(User)
|
|
.filter(User.id == user_id, User.is_active == True) # noqa: E712
|
|
.first()
|
|
)
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="User not found or inactive",
|
|
)
|
|
return user
|
|
|
|
|
|
def require_admin(user: User = Depends(get_current_user)) -> User:
|
|
"""Require the authenticated user to have the ``admin`` role.
|
|
|
|
Raises 403 if the user is not an admin.
|
|
"""
|
|
if user.role != "admin":
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Admin access required",
|
|
)
|
|
return user
|