dak.c2s/backend/app/api/auth.py
CCS Admin d5db84d93f feat: add self-service password reset via email
Adds "Passwort vergessen?" to login page with email-based password
reset flow. Backend generates secure token (SHA-256 hashed, 1h expiry),
sends reset link via SMTP, and validates on submission. Includes rate
limiting (3 requests/hour/email), audit logging, and account unlock
on successful reset. New ResetPasswordPage with password confirmation.

New DB table: password_reset_tokens (migration 008).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-28 14:56:07 +00:00

381 lines
13 KiB
Python

"""Auth API router: login, register, refresh, logout, MFA, password change, reset."""
import logging
import os
import secrets
import time
from datetime import datetime, timedelta, timezone
from fastapi import APIRouter, Depends, File, HTTPException, Request, UploadFile
from sqlalchemy.orm import Session
from app.config import get_settings
from app.core.dependencies import get_current_user
from app.core.security import hash_password, hash_token
from app.database import get_db
from app.models.user import PasswordResetToken, User
from app.schemas.auth import (
ChangePasswordRequest,
ForgotPasswordRequest,
LoginRequest,
MFADisableRequest,
MFASetupResponse,
MFAVerifyRequest,
RefreshRequest,
RegisterRequest,
ResetPasswordRequest,
TokenResponse,
)
from app.schemas.user import ProfileUpdate, UserResponse
from app.services.audit_service import log_action
from app.services.auth_service import (
activate_mfa,
authenticate_user,
change_password,
create_tokens,
disable_mfa,
refresh_access_token,
register_user,
revoke_refresh_token,
setup_mfa,
update_profile,
)
from app.services.notification_service import send_email
logger = logging.getLogger(__name__)
settings = get_settings()
router = APIRouter()
# ---------------------------------------------------------------------------
# Public endpoints (no auth required)
# ---------------------------------------------------------------------------
@router.post("/login", response_model=TokenResponse)
def login(data: LoginRequest, request: Request, db: Session = Depends(get_db)):
"""Authenticate with email/password (+ optional MFA) and receive tokens."""
user = authenticate_user(db, data.email, data.password, data.mfa_code)
access, refresh = create_tokens(db, user)
log_action(
db,
user_id=user.id,
action="login",
entity_type="user",
entity_id=user.id,
ip_address=request.client.host if request.client else None,
user_agent=request.headers.get("user-agent"),
)
return TokenResponse(
access_token=access,
refresh_token=refresh,
user=UserResponse.model_validate(user),
)
@router.post("/register", response_model=UserResponse)
def register(data: RegisterRequest, db: Session = Depends(get_db)):
"""Self-service registration (invitation token or whitelisted domain)."""
user = register_user(
db,
username=data.username,
email=data.email,
password=data.password,
invitation_token=data.invitation_token,
)
return UserResponse.model_validate(user)
@router.post("/refresh", response_model=TokenResponse)
def refresh(data: RefreshRequest, db: Session = Depends(get_db)):
"""Exchange a valid refresh token for a new access token."""
access, user = refresh_access_token(db, data.refresh_token)
return TokenResponse(
access_token=access,
refresh_token=data.refresh_token,
user=UserResponse.model_validate(user),
)
# ---------------------------------------------------------------------------
# Password reset (public, no auth required)
# ---------------------------------------------------------------------------
RESET_TOKEN_EXPIRE_HOURS = 1
RESET_MAX_REQUESTS_PER_HOUR = 3
@router.post("/forgot-password")
def forgot_password(data: ForgotPasswordRequest, request: Request, db: Session = Depends(get_db)):
"""Request a password reset link via email.
Always returns 200 to prevent user enumeration.
"""
user = (
db.query(User)
.filter(User.email == data.email, User.is_active == True) # noqa: E712
.first()
)
if user:
# Rate limiting: max N requests per email per hour
one_hour_ago = datetime.now(timezone.utc) - timedelta(hours=1)
recent_count = (
db.query(PasswordResetToken)
.filter(
PasswordResetToken.user_id == user.id,
PasswordResetToken.created_at >= one_hour_ago,
)
.count()
)
if recent_count < RESET_MAX_REQUESTS_PER_HOUR:
# Generate token
raw_token = secrets.token_urlsafe(32)
token_h = hash_token(raw_token)
prt = PasswordResetToken(
user_id=user.id,
token_hash=token_h,
expires_at=datetime.now(timezone.utc) + timedelta(hours=RESET_TOKEN_EXPIRE_HOURS),
)
db.add(prt)
db.commit()
# Send email
reset_url = f"{settings.FRONTEND_BASE_URL}/reset-password?token={raw_token}"
send_email(
to=user.email,
subject="[DAK Portal] Passwort zurücksetzen",
body=(
f"Hallo {user.first_name or user.username},\n\n"
f"Sie haben eine Passwort-Zurücksetzung angefordert.\n\n"
f"Klicken Sie auf folgenden Link, um ein neues Passwort zu vergeben:\n"
f"{reset_url}\n\n"
f"Der Link ist {RESET_TOKEN_EXPIRE_HOURS} Stunde gültig.\n\n"
f"Falls Sie diese Anfrage nicht gestellt haben, ignorieren Sie diese E-Mail.\n\n"
f"Mit freundlichen Grüßen\n"
f"DAK Zweitmeinungs-Portal"
),
)
log_action(
db, user_id=user.id, action="password_reset_requested",
entity_type="user", entity_id=user.id,
ip_address=request.client.host if request.client else None,
user_agent=request.headers.get("user-agent"),
)
return {"detail": "Falls ein Konto mit dieser E-Mail existiert, wurde ein Reset-Link gesendet."}
@router.post("/reset-password")
def reset_password(data: ResetPasswordRequest, request: Request, db: Session = Depends(get_db)):
"""Reset password using a valid reset token."""
if len(data.new_password) < 8:
raise HTTPException(status_code=422, detail="Das Passwort muss mindestens 8 Zeichen lang sein.")
token_h = hash_token(data.token)
prt = (
db.query(PasswordResetToken)
.filter(
PasswordResetToken.token_hash == token_h,
PasswordResetToken.used_at == None, # noqa: E711
PasswordResetToken.expires_at >= datetime.now(timezone.utc),
)
.first()
)
if not prt:
raise HTTPException(status_code=400, detail="Ungültiger oder abgelaufener Reset-Link.")
user = db.query(User).filter(User.id == prt.user_id).first()
if not user:
raise HTTPException(status_code=400, detail="Benutzer nicht gefunden.")
# Update password
user.password_hash = hash_password(data.new_password)
user.must_change_password = False
user.failed_login_attempts = 0
user.locked_until = None
# Invalidate all reset tokens for this user
db.query(PasswordResetToken).filter(
PasswordResetToken.user_id == user.id,
PasswordResetToken.used_at == None, # noqa: E711
).update({"used_at": datetime.now(timezone.utc)})
db.commit()
log_action(
db, user_id=user.id, action="password_reset_completed",
entity_type="user", entity_id=user.id,
ip_address=request.client.host if request.client else None,
user_agent=request.headers.get("user-agent"),
)
return {"detail": "Passwort erfolgreich zurückgesetzt. Sie können sich jetzt anmelden."}
# ---------------------------------------------------------------------------
# Authenticated endpoints
# ---------------------------------------------------------------------------
@router.post("/logout")
def logout(
data: RefreshRequest,
request: Request,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Revoke the supplied refresh token (logout)."""
revoke_refresh_token(db, data.refresh_token)
log_action(
db,
user_id=current_user.id,
action="logout",
entity_type="user",
entity_id=current_user.id,
ip_address=request.client.host if request.client else None,
user_agent=request.headers.get("user-agent"),
)
return {"detail": "Logged out"}
@router.post("/mfa/setup", response_model=MFASetupResponse)
def mfa_setup(current_user: User = Depends(get_current_user)):
"""Generate a new TOTP secret and QR provisioning URI.
The client must store the returned *secret* and send it back together
with a valid TOTP code to ``POST /mfa/verify`` to finalise activation.
"""
secret, uri = setup_mfa(current_user)
return MFASetupResponse(secret=secret, qr_uri=uri)
@router.post("/mfa/verify")
def mfa_verify(
data: MFAVerifyRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Verify the TOTP code against the secret from /mfa/setup and enable MFA.
MFA setup flow:
1. ``POST /mfa/setup`` --> ``{secret, qr_uri}``
2. User scans QR in authenticator app.
3. ``POST /mfa/verify`` with ``{secret, code}`` --> MFA enabled.
"""
activate_mfa(db, current_user, data.secret, data.code)
return {"detail": "MFA enabled"}
@router.delete("/mfa")
def mfa_disable(
data: MFADisableRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Disable MFA on the authenticated user's account (requires password)."""
disable_mfa(db, current_user, data.password)
return {"detail": "MFA disabled"}
@router.post("/change-password")
def change_password_endpoint(
data: ChangePasswordRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Change the authenticated user's password."""
change_password(db, current_user, data.old_password, data.new_password)
return {"detail": "Password changed"}
@router.get("/me", response_model=UserResponse)
def me(current_user: User = Depends(get_current_user)):
"""Return the currently authenticated user's profile."""
return UserResponse.model_validate(current_user)
@router.put("/profile", response_model=UserResponse)
def update_profile_endpoint(
data: ProfileUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Update the authenticated user's profile."""
update_data = data.model_dump(exclude_unset=True)
user = update_profile(db, current_user, **update_data)
return UserResponse.model_validate(user)
# ---------------------------------------------------------------------------
# Avatar upload / delete
# ---------------------------------------------------------------------------
ALLOWED_IMAGE_TYPES = {"image/jpeg", "image/png"}
MAX_AVATAR_SIZE = 2 * 1024 * 1024 # 2MB
@router.post("/avatar", response_model=UserResponse)
async def upload_avatar(
file: UploadFile = File(...),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Upload or replace the authenticated user's avatar image."""
if file.content_type not in ALLOWED_IMAGE_TYPES:
raise HTTPException(status_code=400, detail="Only JPG and PNG images are allowed")
contents = await file.read()
if len(contents) > MAX_AVATAR_SIZE:
raise HTTPException(status_code=400, detail="Image must be smaller than 2MB")
ext = "jpg" if file.content_type == "image/jpeg" else "png"
filename = f"{current_user.id}_{int(time.time())}.{ext}"
uploads_dir = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
"uploads", "avatars",
)
os.makedirs(uploads_dir, exist_ok=True)
# Delete old avatar file if exists
if current_user.avatar_url:
old_file = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
current_user.avatar_url.lstrip("/"),
)
if os.path.isfile(old_file):
os.remove(old_file)
filepath = os.path.join(uploads_dir, filename)
with open(filepath, "wb") as f:
f.write(contents)
current_user.avatar_url = f"/uploads/avatars/{filename}"
db.commit()
db.refresh(current_user)
return UserResponse.model_validate(current_user)
@router.delete("/avatar", response_model=UserResponse)
def delete_avatar(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Remove the authenticated user's avatar image."""
if current_user.avatar_url:
old_file = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
current_user.avatar_url.lstrip("/"),
)
if os.path.isfile(old_file):
os.remove(old_file)
current_user.avatar_url = None
db.commit()
db.refresh(current_user)
return UserResponse.model_validate(current_user)