feat: auth system — login, register, refresh, MFA, domain whitelist

Add complete authentication layer:
- Pydantic v2 schemas for auth requests/responses and user representation
- Auth service with login (account locking, MFA), registration (invitation
  tokens + domain whitelist), token management, MFA setup/activation, and
  password change
- FastAPI router with 8 endpoints: login, register, refresh, logout,
  mfa/setup, mfa/verify, change-password, me
- Router registered in main.py under /api/auth

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
CCS Admin 2026-02-24 07:46:04 +00:00
parent 84d11822e0
commit 518de3da27
5 changed files with 517 additions and 0 deletions

132
backend/app/api/auth.py Normal file
View file

@ -0,0 +1,132 @@
"""Auth API router: login, register, refresh, logout, MFA, password change."""
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from app.core.dependencies import get_current_user
from app.database import get_db
from app.models.user import User
from app.schemas.auth import (
ChangePasswordRequest,
LoginRequest,
MFASetupResponse,
MFAVerifyRequest,
RefreshRequest,
RegisterRequest,
TokenResponse,
)
from app.schemas.user import UserResponse
from app.services.auth_service import (
activate_mfa,
authenticate_user,
change_password,
create_tokens,
refresh_access_token,
register_user,
revoke_refresh_token,
setup_mfa,
)
router = APIRouter()
# ---------------------------------------------------------------------------
# Public endpoints (no auth required)
# ---------------------------------------------------------------------------
@router.post("/login", response_model=TokenResponse)
def login(data: LoginRequest, db: Session = Depends(get_db)):
"""Authenticate with email/password (+ optional MFA) and receive tokens."""
user = authenticate_user(db, data.email, data.password, data.mfa_code)
access, refresh = create_tokens(db, user)
return TokenResponse(
access_token=access,
refresh_token=refresh,
user=UserResponse.model_validate(user),
)
@router.post("/register", response_model=UserResponse)
def register(data: RegisterRequest, db: Session = Depends(get_db)):
"""Self-service registration (invitation token or whitelisted domain)."""
user = register_user(
db,
username=data.username,
email=data.email,
password=data.password,
invitation_token=data.invitation_token,
)
return UserResponse.model_validate(user)
@router.post("/refresh", response_model=TokenResponse)
def refresh(data: RefreshRequest, db: Session = Depends(get_db)):
"""Exchange a valid refresh token for a new access token."""
access, user = refresh_access_token(db, data.refresh_token)
return TokenResponse(
access_token=access,
refresh_token=data.refresh_token,
user=UserResponse.model_validate(user),
)
# ---------------------------------------------------------------------------
# Authenticated endpoints
# ---------------------------------------------------------------------------
@router.post("/logout")
def logout(
data: RefreshRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Revoke the supplied refresh token (logout)."""
revoke_refresh_token(db, data.refresh_token)
return {"detail": "Logged out"}
@router.post("/mfa/setup", response_model=MFASetupResponse)
def mfa_setup(current_user: User = Depends(get_current_user)):
"""Generate a new TOTP secret and QR provisioning URI.
The client must store the returned *secret* and send it back together
with a valid TOTP code to ``POST /mfa/verify`` to finalise activation.
"""
secret, uri = setup_mfa(current_user)
return MFASetupResponse(secret=secret, qr_uri=uri)
@router.post("/mfa/verify")
def mfa_verify(
data: MFAVerifyRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Verify the TOTP code against the secret from /mfa/setup and enable MFA.
MFA setup flow:
1. ``POST /mfa/setup`` --> ``{secret, qr_uri}``
2. User scans QR in authenticator app.
3. ``POST /mfa/verify`` with ``{secret, code}`` --> MFA enabled.
"""
activate_mfa(db, current_user, data.secret, data.code)
return {"detail": "MFA enabled"}
@router.post("/change-password")
def change_password_endpoint(
data: ChangePasswordRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Change the authenticated user's password."""
change_password(db, current_user, data.old_password, data.new_password)
return {"detail": "Password changed"}
@router.get("/me", response_model=UserResponse)
def me(current_user: User = Depends(get_current_user)):
"""Return the currently authenticated user's profile."""
return UserResponse.model_validate(current_user)

View file

@ -1,6 +1,8 @@
# backend/app/main.py # backend/app/main.py
from fastapi import FastAPI from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from app.api.auth import router as auth_router
from app.config import get_settings from app.config import get_settings
settings = get_settings() settings = get_settings()
@ -15,6 +17,9 @@ app.add_middleware(
allow_headers=["*"], allow_headers=["*"],
) )
# --- Routers ---
app.include_router(auth_router, prefix="/api/auth", tags=["auth"])
@app.get("/api/health") @app.get("/api/health")
def health_check(): def health_check():

View file

@ -0,0 +1,64 @@
"""Pydantic v2 schemas for authentication endpoints."""
from typing import Optional
from pydantic import BaseModel, EmailStr
from app.schemas.user import UserResponse
class LoginRequest(BaseModel):
"""Credentials submitted to POST /login."""
email: EmailStr
password: str
mfa_code: Optional[str] = None
class TokenResponse(BaseModel):
"""Returned on successful login or token refresh."""
access_token: str
refresh_token: str
token_type: str = "bearer"
user: UserResponse
class RegisterRequest(BaseModel):
"""Self-service registration (requires invitation token or whitelisted domain)."""
username: str
email: EmailStr
password: str
invitation_token: Optional[str] = None
class RefreshRequest(BaseModel):
"""Request body for POST /refresh."""
refresh_token: str
class MFASetupResponse(BaseModel):
"""Returned when a user initiates MFA setup."""
secret: str
qr_uri: str
class MFAVerifyRequest(BaseModel):
"""6-digit TOTP code + secret submitted to activate MFA.
The *secret* was returned by ``/mfa/setup`` and must be echoed back so
the server can verify the code before persisting.
"""
secret: str
code: str
class ChangePasswordRequest(BaseModel):
"""Request body for changing the authenticated user's password."""
old_password: str
new_password: str

View file

@ -0,0 +1,39 @@
"""Pydantic v2 schemas for User responses and mutations."""
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, EmailStr
class UserResponse(BaseModel):
"""Public representation of a user (returned by API endpoints)."""
id: int
username: str
email: str
role: str
mfa_enabled: bool
is_active: bool
last_login: Optional[datetime] = None
created_at: datetime
model_config = {"from_attributes": True}
class UserCreate(BaseModel):
"""Admin-only: create a user directly (bypasses invitation/domain check)."""
username: str
email: EmailStr
password: str
role: str = "dak_mitarbeiter"
class UserUpdate(BaseModel):
"""Admin-only: partial update of user fields."""
username: Optional[str] = None
email: Optional[EmailStr] = None
role: Optional[str] = None
is_active: Optional[bool] = None

View file

@ -0,0 +1,277 @@
"""Authentication business logic: login, register, tokens, MFA."""
from datetime import datetime, timedelta, timezone
from fastapi import HTTPException, status
from sqlalchemy.orm import Session
from app.config import get_settings
from app.core.exceptions import AccountLockedError, InvalidCredentialsError
from app.core.security import (
create_access_token,
create_refresh_token,
generate_mfa_secret,
get_mfa_uri,
hash_password,
hash_token,
verify_mfa_code,
verify_password,
)
from app.models.user import AllowedDomain, InvitationLink, RefreshToken, User
settings = get_settings()
# ---------------------------------------------------------------------------
# Login
# ---------------------------------------------------------------------------
def authenticate_user(
db: Session,
email: str,
password: str,
mfa_code: str | None = None,
) -> User:
"""Authenticate a user by email/password (+ optional MFA).
Handles:
* Account-lock detection (locked_until).
* Failed-attempt counting with auto-lock after 5 failures.
* TOTP verification when MFA is enabled.
"""
user = db.query(User).filter(User.email == email).first()
if not user or not user.is_active:
raise InvalidCredentialsError()
# Check whether the account is currently locked.
if user.locked_until and user.locked_until > datetime.now(timezone.utc):
raise AccountLockedError(
detail=f"Account locked until {user.locked_until.isoformat()}"
)
# Verify password.
if not verify_password(password, user.password_hash):
user.failed_login_attempts += 1
if user.failed_login_attempts >= 5:
user.locked_until = datetime.now(timezone.utc) + timedelta(minutes=30)
db.commit()
raise InvalidCredentialsError()
# MFA check (if enabled on this account).
if user.mfa_enabled:
if not mfa_code:
raise InvalidCredentialsError(detail="MFA code required")
if not verify_mfa_code(user.mfa_secret, mfa_code):
raise InvalidCredentialsError(detail="Invalid MFA code")
# Success -- reset counters and record last login.
user.failed_login_attempts = 0
user.locked_until = None
user.last_login = datetime.now(timezone.utc)
db.commit()
return user
# ---------------------------------------------------------------------------
# Registration
# ---------------------------------------------------------------------------
def register_user(
db: Session,
username: str,
email: str,
password: str,
invitation_token: str | None = None,
) -> User:
"""Register a new user account.
Either a valid (unexpired, unused) invitation token OR an email whose
domain is in the ``allowed_domains`` whitelist is required.
"""
# Uniqueness check.
existing = (
db.query(User)
.filter((User.email == email) | (User.username == username))
.first()
)
if existing:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="User already exists",
)
role = "dak_mitarbeiter"
if invitation_token:
invite = (
db.query(InvitationLink)
.filter(
InvitationLink.token == invitation_token,
InvitationLink.is_active == True, # noqa: E712
InvitationLink.used_at == None, # noqa: E711
InvitationLink.expires_at > datetime.now(timezone.utc),
)
.first()
)
if not invite:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid or expired invitation",
)
if invite.email and invite.email.lower() != email.lower():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email does not match invitation",
)
role = invite.role
else:
# Domain whitelist.
domain = email.split("@")[1].lower()
allowed = (
db.query(AllowedDomain)
.filter(
AllowedDomain.domain == domain,
AllowedDomain.is_active == True, # noqa: E712
)
.first()
)
if not allowed:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Email domain not allowed for registration",
)
role = allowed.role
user = User(
username=username,
email=email,
password_hash=hash_password(password),
role=role,
)
db.add(user)
db.commit()
db.refresh(user)
# Mark invitation as consumed.
if invitation_token:
invite.used_at = datetime.now(timezone.utc)
invite.used_by = user.id
invite.is_active = False
db.commit()
return user
# ---------------------------------------------------------------------------
# Token management
# ---------------------------------------------------------------------------
def create_tokens(db: Session, user: User) -> tuple[str, str]:
"""Create an access/refresh token pair and persist the refresh hash."""
access = create_access_token(user.id, user.role)
refresh = create_refresh_token()
rt = RefreshToken(
user_id=user.id,
token_hash=hash_token(refresh),
expires_at=datetime.now(timezone.utc)
+ timedelta(days=settings.REFRESH_TOKEN_EXPIRE_DAYS),
)
db.add(rt)
db.commit()
return access, refresh
def refresh_access_token(db: Session, refresh_token: str) -> tuple[str, User]:
"""Validate a refresh token and return a fresh access token + user."""
token_hash = hash_token(refresh_token)
rt = (
db.query(RefreshToken)
.filter(
RefreshToken.token_hash == token_hash,
RefreshToken.revoked == False, # noqa: E712
RefreshToken.expires_at > datetime.now(timezone.utc),
)
.first()
)
if not rt:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired refresh token",
)
user = (
db.query(User)
.filter(User.id == rt.user_id, User.is_active == True) # noqa: E712
.first()
)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found or inactive",
)
access = create_access_token(user.id, user.role)
return access, user
def revoke_refresh_token(db: Session, refresh_token: str) -> None:
"""Revoke a refresh token (used on logout)."""
token_hash = hash_token(refresh_token)
rt = (
db.query(RefreshToken)
.filter(RefreshToken.token_hash == token_hash)
.first()
)
if rt:
rt.revoked = True
db.commit()
# ---------------------------------------------------------------------------
# MFA helpers
# ---------------------------------------------------------------------------
def setup_mfa(user: User) -> tuple[str, str]:
"""Generate a fresh MFA secret and provisioning URI for *user*.
Does NOT persist the secret -- the caller must save it after the user
confirms activation via ``activate_mfa``.
"""
secret = generate_mfa_secret()
uri = get_mfa_uri(secret, user.email)
return secret, uri
def activate_mfa(db: Session, user: User, secret: str, code: str) -> None:
"""Verify the TOTP *code* against *secret* and enable MFA on *user*."""
if not verify_mfa_code(secret, code):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid MFA code",
)
user.mfa_secret = secret
user.mfa_enabled = True
db.commit()
# ---------------------------------------------------------------------------
# Password change
# ---------------------------------------------------------------------------
def change_password(
db: Session,
user: User,
old_password: str,
new_password: str,
) -> None:
"""Change the authenticated user's password after verifying the old one."""
if not verify_password(old_password, user.password_hash):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Current password is incorrect",
)
user.password_hash = hash_password(new_password)
user.must_change_password = False
db.commit()