"""Notification service -- in-app notifications and SMTP email delivery.""" from __future__ import annotations import logging import smtplib from datetime import datetime, timezone from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from typing import Optional from sqlalchemy.orm import Session from app.config import get_settings from app.models.audit import Notification from app.models.user import User logger = logging.getLogger(__name__) settings = get_settings() # --------------------------------------------------------------------------- # Low-level email helper # --------------------------------------------------------------------------- def send_email( to: str, subject: str, body: str, html_body: str | None = None, ) -> bool: """Send an email via SMTP SSL. Returns *True* on success, *False* on failure (the error is logged). """ try: msg = MIMEMultipart("alternative") msg["From"] = settings.SMTP_FROM msg["To"] = to msg["Subject"] = subject msg.attach(MIMEText(body, "plain")) if html_body: msg.attach(MIMEText(html_body, "html")) with smtplib.SMTP_SSL(settings.SMTP_HOST, settings.SMTP_PORT) as server: server.login(settings.SMTP_USER, settings.SMTP_PASSWORD) server.send_message(msg) logger.info("Email sent to %s: %s", to, subject) return True except Exception: logger.exception("Failed to send email to %s", to) return False # --------------------------------------------------------------------------- # Core notification creation # --------------------------------------------------------------------------- def create_notification( db: Session, recipient_id: int, notification_type: str, title: str, message: Optional[str] = None, related_entity_type: Optional[str] = None, related_entity_id: Optional[int] = None, send_email_notification: bool = True, ) -> Notification: """Create an in-app notification and optionally send an email.""" notif = Notification( recipient_id=recipient_id, notification_type=notification_type, title=title, message=message, related_entity_type=related_entity_type, related_entity_id=related_entity_id, ) db.add(notif) if send_email_notification: user = db.query(User).filter(User.id == recipient_id).first() if user and user.email: success = send_email( to=user.email, subject=f"[DAK Portal] {title}", body=message or title, ) if success: notif.email_sent = True notif.email_sent_at = datetime.now(timezone.utc) db.commit() db.refresh(notif) return notif # --------------------------------------------------------------------------- # Bulk / role-based helpers # --------------------------------------------------------------------------- def notify_all_users_with_role( db: Session, role: str, notification_type: str, title: str, message: Optional[str] = None, related_entity_type: Optional[str] = None, related_entity_id: Optional[int] = None, ) -> list[Notification]: """Send a notification to every active user that has *role*.""" users = ( db.query(User) .filter(User.role == role, User.is_active == True) # noqa: E712 .all() ) notifications: list[Notification] = [] for user in users: notif = create_notification( db=db, recipient_id=user.id, notification_type=notification_type, title=title, message=message, related_entity_type=related_entity_type, related_entity_id=related_entity_id, ) notifications.append(notif) return notifications # --------------------------------------------------------------------------- # Convenience functions for specific business events # --------------------------------------------------------------------------- def notify_new_cases_uploaded(db: Session, count: int, filename: str) -> None: """Notify DAK users when the admin uploads new cases.""" notify_all_users_with_role( db, "dak_mitarbeiter", "new_cases_uploaded", title=f"{count} neue Fälle importiert", message=( f"Aus {filename} wurden {count} neue Fälle importiert. " "Bitte ICD-Codes ergänzen." ), ) def notify_icd_entered(db: Session, case_id: int, case_name: str) -> None: """Notify admins when a DAK user enters an ICD code for a case.""" notify_all_users_with_role( db, "admin", "icd_entered", title=f"ICD eingetragen: {case_name}", message=f"Für Fall {case_name} wurde der ICD-Code eingetragen.", related_entity_type="case", related_entity_id=case_id, ) def notify_icd_uploaded(db: Session, count: int) -> None: """Notify admins when DAK uploads an ICD Excel file.""" notify_all_users_with_role( db, "admin", "icd_uploaded", title=f"ICD-Codes hochgeladen ({count} Fälle)", message=f"Es wurden ICD-Codes für {count} Fälle per Excel hochgeladen.", ) def notify_report_ready(db: Session, report_id: int, jahr: int, kw: int) -> None: """Notify all users when a weekly report is generated.""" for role in ("admin", "dak_mitarbeiter"): notify_all_users_with_role( db, role, "report_ready", title=f"Bericht KW {kw}/{jahr} erstellt", message=( f"Der Wochenbericht für KW {kw}/{jahr} steht zum Download bereit." ), related_entity_type="report", related_entity_id=report_id, ) def notify_coding_completed(db: Session, count: int) -> None: """Notify DAK users when coding is completed for a batch of cases.""" notify_all_users_with_role( db, "dak_mitarbeiter", "coding_completed", title=f"Coding abgeschlossen ({count} Fälle)", message=f"Für {count} Fälle wurde das Gutachten-Coding abgeschlossen.", )