mirror of
https://github.com/complexcaresolutions/dak.c2s.git
synced 2026-03-17 20:43:41 +00:00
Add core security layer: - security.py: password hashing (bcrypt), JWT access/refresh tokens, SHA-256 token hashing, TOTP MFA (generate, verify, provisioning URI), plus passlib/bcrypt 5.x compatibility patch - dependencies.py: FastAPI deps for get_current_user (Bearer JWT) and require_admin (role check) - exceptions.py: domain-specific HTTP exceptions (CaseNotFound, DuplicateCase, InvalidImportFile, ICDValidation, AccountLocked, InvalidCredentials) - test_security.py: 9 tests covering all security functions Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
114 lines
3.8 KiB
Python
114 lines
3.8 KiB
Python
"""Core security utilities: JWT, password hashing, MFA (TOTP)."""
|
||
|
||
import hashlib
|
||
import secrets
|
||
from datetime import datetime, timedelta, timezone
|
||
|
||
import pyotp
|
||
from jose import jwt, JWTError # noqa: F401 – re-exported for convenience
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Monkey-patch passlib for bcrypt >= 4.1 / 5.x compatibility.
|
||
# Newer bcrypt removed ``__about__`` and rejects >72-byte passwords in its
|
||
# internal wrap-bug detection. The patch is applied before CryptContext is
|
||
# instantiated so that passlib's backend initialisation succeeds.
|
||
# ---------------------------------------------------------------------------
|
||
import passlib.handlers.bcrypt as _bcrypt_mod # noqa: E402
|
||
|
||
_orig_finalize = _bcrypt_mod._BcryptBackend._finalize_backend_mixin.__func__ # type: ignore[attr-defined]
|
||
|
||
|
||
@classmethod # type: ignore[misc]
|
||
def _patched_finalize(cls, name: str, dryrun: bool = False): # type: ignore[no-untyped-def]
|
||
try:
|
||
return _orig_finalize(cls, name, dryrun)
|
||
except ValueError:
|
||
# bcrypt 4.1+ raises ValueError on >72-byte secrets during
|
||
# passlib's internal wrap-bug detection — safe to ignore.
|
||
return True
|
||
|
||
|
||
_bcrypt_mod._BcryptBackend._finalize_backend_mixin = _patched_finalize # type: ignore[assignment]
|
||
# ---------------------------------------------------------------------------
|
||
|
||
from passlib.context import CryptContext # noqa: E402
|
||
|
||
from app.config import get_settings
|
||
|
||
settings = get_settings()
|
||
|
||
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Password hashing
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def hash_password(password: str) -> str:
|
||
"""Hash a plain-text password using bcrypt."""
|
||
return pwd_context.hash(password)
|
||
|
||
|
||
def verify_password(plain: str, hashed: str) -> bool:
|
||
"""Verify a plain-text password against a bcrypt hash."""
|
||
return pwd_context.verify(plain, hashed)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# JWT tokens
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def create_access_token(user_id: int, role: str) -> str:
|
||
"""Create a short-lived JWT access token."""
|
||
expire = datetime.now(timezone.utc) + timedelta(
|
||
minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
|
||
)
|
||
payload = {
|
||
"sub": str(user_id),
|
||
"role": role,
|
||
"exp": expire,
|
||
}
|
||
return jwt.encode(
|
||
payload, settings.JWT_SECRET_KEY, algorithm=settings.JWT_ALGORITHM
|
||
)
|
||
|
||
|
||
def create_refresh_token() -> str:
|
||
"""Create a cryptographically secure refresh token (URL-safe)."""
|
||
return secrets.token_urlsafe(64)
|
||
|
||
|
||
def hash_token(token: str) -> str:
|
||
"""Return a SHA-256 hex digest of *token* (used for DB storage)."""
|
||
return hashlib.sha256(token.encode()).hexdigest()
|
||
|
||
|
||
def decode_access_token(token: str) -> dict:
|
||
"""Decode and validate a JWT access token.
|
||
|
||
Raises ``jose.JWTError`` on invalid or expired tokens.
|
||
"""
|
||
return jwt.decode(
|
||
token, settings.JWT_SECRET_KEY, algorithms=[settings.JWT_ALGORITHM]
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# MFA / TOTP
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def generate_mfa_secret() -> str:
|
||
"""Generate a new TOTP base-32 secret."""
|
||
return pyotp.random_base32()
|
||
|
||
|
||
def verify_mfa_code(secret: str, code: str) -> bool:
|
||
"""Verify a 6-digit TOTP code against *secret*."""
|
||
totp = pyotp.TOTP(secret)
|
||
return totp.verify(code)
|
||
|
||
|
||
def get_mfa_uri(secret: str, email: str) -> str:
|
||
"""Return an ``otpauth://`` provisioning URI for QR code generation."""
|
||
totp = pyotp.TOTP(secret)
|
||
return totp.provisioning_uri(name=email, issuer_name=settings.APP_NAME)
|