dak.c2s/backend/app/api/notifications.py
CCS Admin df26b51e14 feat: admin API, audit logging, notifications, create_admin script
Add audit_service for compliance logging, admin endpoints (user CRUD,
invitation management, audit log), notification endpoints (list, mark
read), and interactive create_admin script.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-24 07:48:41 +00:00

83 lines
2.3 KiB
Python

"""Notification API endpoints for authenticated users."""
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app.core.dependencies import get_current_user
from app.database import get_db
from app.models.audit import Notification
from app.models.user import User
from app.schemas.notification import NotificationList, NotificationResponse
router = APIRouter()
@router.get("", response_model=NotificationList)
def list_notifications(
user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""Return the last 50 notifications for the current user plus the unread count."""
notifications = (
db.query(Notification)
.filter(Notification.recipient_id == user.id)
.order_by(Notification.created_at.desc())
.limit(50)
.all()
)
unread_count = (
db.query(Notification)
.filter(
Notification.recipient_id == user.id,
Notification.is_read == False, # noqa: E712
)
.count()
)
return NotificationList(items=notifications, unread_count=unread_count)
@router.put("/{notification_id}/read", response_model=NotificationResponse)
def mark_notification_read(
notification_id: int,
user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""Mark a single notification as read (ownership is verified)."""
notification = (
db.query(Notification)
.filter(
Notification.id == notification_id,
Notification.recipient_id == user.id,
)
.first()
)
if not notification:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Notification not found",
)
notification.is_read = True
db.commit()
db.refresh(notification)
return notification
@router.put("/read-all", status_code=status.HTTP_200_OK)
def mark_all_notifications_read(
user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""Mark all unread notifications as read for the current user."""
updated = (
db.query(Notification)
.filter(
Notification.recipient_id == user.id,
Notification.is_read == False, # noqa: E712
)
.update({"is_read": True})
)
db.commit()
return {"marked_read": updated}