feat: report service — all 5 sheet calculations + year-over-year

Implements report_service.py with:
- Sheet 1: Auswertung KW gesamt (weekly totals + year summary)
- Sheet 2: Auswertung nach Fachgebieten (per-KW per-Fallgruppe)
- Sheet 3: Auswertung Gutachten (alternative/bestaetigung per group)
- Sheet 4: Auswertung Therapieaenderungen (TA metrics per KW)
- Sheet 5: Auswertung ICD onko (ICD code frequency for onko)
- Dashboard KPIs (total_cases, pending_icd, pending_coding, etc.)
- generate_full_report() for all 5 sheets combined

Implements vorjahr_service.py with:
- Cached year-over-year comparison via yearly_summary table
- get_vorjahr_summary() for Sheet 1 comparison columns
- get_vorjahr_detail() for full previous-year breakdown
- refresh_vorjahr_cache() for cache invalidation

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
CCS Admin 2026-02-24 08:02:53 +00:00
parent f4afea7f85
commit 72b6f784fc
2 changed files with 774 additions and 0 deletions

View file

@ -0,0 +1,570 @@
"""Report service — all 5 sheet calculations for the DAK Berichtswesen.
Sheet 1: Auswertung KW gesamt weekly totals + year summary
Sheet 2: Auswertung nach Fachgebieten per-KW per-Fallgruppe breakdown
Sheet 3: Auswertung Gutachten per-KW gutachten / alternative / bestaetigung
Sheet 4: Auswertung Therapieaenderungen per-KW therapy-change metrics
Sheet 5: Auswertung ICD onko ICD code frequency for onko cases
All queries use SQLAlchemy (not pandas) against the cases / case_icd_codes tables.
"""
from __future__ import annotations
import logging
from typing import Any
from sqlalchemy import and_, func
from sqlalchemy.orm import Session
from app.models.case import Case, CaseICDCode
logger = logging.getLogger(__name__)
# Canonical Fallgruppen in display order
FALLGRUPPEN = ("onko", "kardio", "intensiv", "galle", "sd")
# Number of calendar weeks to include (ISO weeks 1..52; 53 is rare)
MAX_KW = 52
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _int(val: Any) -> int:
"""Safely coerce a query result to int (None -> 0)."""
if val is None:
return 0
return int(val)
def _pct(part: int, total: int) -> float | None:
"""Return part/total as a float, or None if total==0."""
if total == 0:
return None
return round(part / total, 4)
def _empty_weekly_row(kw: int) -> dict:
"""Return a zeroed-out weekly row template for Sheet 1."""
return {
"kw": kw,
"erstberatungen": 0,
"unterlagen": 0,
"ablehnungen": 0,
"keine_rm": 0,
"gutachten": 0,
}
def _empty_fg_weekly_row(kw: int) -> dict:
"""Return a zeroed-out weekly row template for Sheet 2."""
row: dict[str, Any] = {"kw": kw}
for fg in FALLGRUPPEN:
row[fg] = {"anzahl": 0, "gutachten": 0, "keine_rm": 0}
return row
def _empty_gutachten_weekly_row(kw: int) -> dict:
"""Return a zeroed-out weekly row template for Sheet 3."""
row: dict[str, Any] = {"kw": kw}
for group in ("gesamt",) + FALLGRUPPEN:
row[group] = {"gutachten": 0, "alternative": 0, "bestaetigung": 0}
return row
def _empty_ta_weekly_row(kw: int) -> dict:
"""Return a zeroed-out weekly row template for Sheet 4."""
return {
"kw": kw,
"gutachten": 0,
"ta_ja": 0,
"ta_nein": 0,
"diagnosekorrektur": 0,
"unterversorgung": 0,
"uebertherapie": 0,
}
# ---------------------------------------------------------------------------
# Sheet 1: Auswertung KW gesamt
# ---------------------------------------------------------------------------
def calculate_sheet1_data(db: Session, jahr: int) -> dict:
"""Calculate *Auswertung KW gesamt*.
Returns::
{
"summary": {
"erstberatungen": int,
"ablehnungen": int,
"unterlagen": int,
"keine_rueckmeldung": int,
"gutachten": int,
},
"weekly": [
{"kw": 1, "erstberatungen": X, "unterlagen": X,
"ablehnungen": X, "keine_rm": X, "gutachten": X},
... # kw 1..52
]
}
Business rules (matching the Excel formulas):
* Erstberatungen = total cases for the KW
* Unterlagen = cases where unterlagen == True
* Ablehnungen = cases where ablehnung == True
* Gutachten = cases where gutachten == True
* Keine RM = Unterlagen - Gutachten (derived, per KW row)
"""
# One query: group by kw, count the four flags
rows = (
db.query(
Case.kw,
func.count(Case.id).label("erstberatungen"),
func.sum(Case.unterlagen.cast(int)).label("unterlagen"),
func.sum(Case.ablehnung.cast(int)).label("ablehnungen"),
func.sum(Case.gutachten.cast(int)).label("gutachten"),
)
.filter(Case.jahr == jahr)
.group_by(Case.kw)
.all()
)
# Build a lookup kw -> values
kw_map: dict[int, dict] = {}
for row in rows:
kw = _int(row.kw)
unterlagen = _int(row.unterlagen)
gutachten = _int(row.gutachten)
kw_map[kw] = {
"kw": kw,
"erstberatungen": _int(row.erstberatungen),
"unterlagen": unterlagen,
"ablehnungen": _int(row.ablehnungen),
"keine_rm": unterlagen - gutachten,
"gutachten": gutachten,
}
# Build full 1..52 list (filling gaps with zeros)
weekly = []
for kw in range(1, MAX_KW + 1):
weekly.append(kw_map.get(kw, _empty_weekly_row(kw)))
# Summary (sums across all weeks)
total_erst = sum(w["erstberatungen"] for w in weekly)
total_abl = sum(w["ablehnungen"] for w in weekly)
total_unt = sum(w["unterlagen"] for w in weekly)
total_keine = sum(w["keine_rm"] for w in weekly)
total_gut = sum(w["gutachten"] for w in weekly)
summary = {
"erstberatungen": total_erst,
"ablehnungen": total_abl,
"unterlagen": total_unt,
"keine_rueckmeldung": total_keine,
"gutachten": total_gut,
}
return {"summary": summary, "weekly": weekly}
# ---------------------------------------------------------------------------
# Sheet 2: Auswertung nach Fachgebieten
# ---------------------------------------------------------------------------
def calculate_sheet2_data(db: Session, jahr: int) -> dict:
"""Calculate *Auswertung nach Fachgebieten*.
Per KW, per Fallgruppe: Anzahl, Gutachten, Keine RM/Ablehnung.
Returns::
{
"weekly": [
{
"kw": 1,
"onko": {"anzahl": X, "gutachten": X, "keine_rm": X},
"kardio": {...},
"intensiv": {...},
"galle": {...},
"sd": {...},
},
...
]
}
Keine RM/Ablehnung = Anzahl - Gutachten (per the Excel formula).
"""
rows = (
db.query(
Case.kw,
Case.fallgruppe,
func.count(Case.id).label("anzahl"),
func.sum(Case.gutachten.cast(int)).label("gutachten"),
)
.filter(Case.jahr == jahr)
.group_by(Case.kw, Case.fallgruppe)
.all()
)
# Build kw -> fg -> values
kw_map: dict[int, dict] = {}
for row in rows:
kw = _int(row.kw)
fg = row.fallgruppe
if fg not in FALLGRUPPEN:
logger.warning("Unknown fallgruppe '%s' in case data, skipping", fg)
continue
if kw not in kw_map:
kw_map[kw] = _empty_fg_weekly_row(kw)
anzahl = _int(row.anzahl)
gutachten = _int(row.gutachten)
kw_map[kw][fg] = {
"anzahl": anzahl,
"gutachten": gutachten,
"keine_rm": anzahl - gutachten,
}
weekly = []
for kw in range(1, MAX_KW + 1):
weekly.append(kw_map.get(kw, _empty_fg_weekly_row(kw)))
return {"weekly": weekly}
# ---------------------------------------------------------------------------
# Sheet 3: Auswertung Gutachten
# ---------------------------------------------------------------------------
def calculate_sheet3_data(db: Session, jahr: int) -> dict:
"""Calculate *Auswertung Gutachten*.
Per KW, per group (gesamt + 5 Fallgruppen):
Gutachten count, Alternative, Bestaetigung.
Returns::
{
"weekly": [
{
"kw": 1,
"gesamt": {"gutachten": X, "alternative": X, "bestaetigung": X},
"onko": {...},
"kardio": {...},
"intensiv": {...},
"galle": {...},
"sd": {...},
},
...
]
}
In the Excel:
- Per Fallgruppe: Gutachten = count, Alternative = count where typ='Alternative',
Bestaetigung = Gutachten - Alternative
- Gesamt = sum across all Fallgruppen
"""
rows = (
db.query(
Case.kw,
Case.fallgruppe,
func.count(Case.id).label("gutachten"),
func.sum(
(Case.gutachten_typ == "Alternative").cast(int)
).label("alternative"),
)
.filter(Case.jahr == jahr, Case.gutachten == True) # noqa: E712
.group_by(Case.kw, Case.fallgruppe)
.all()
)
kw_map: dict[int, dict] = {}
for row in rows:
kw = _int(row.kw)
fg = row.fallgruppe
if fg not in FALLGRUPPEN:
continue
if kw not in kw_map:
kw_map[kw] = _empty_gutachten_weekly_row(kw)
gutachten = _int(row.gutachten)
alternative = _int(row.alternative)
kw_map[kw][fg] = {
"gutachten": gutachten,
"alternative": alternative,
"bestaetigung": gutachten - alternative,
}
# Compute gesamt (sum of all Fallgruppen per KW)
for kw_data in kw_map.values():
total_g = sum(kw_data[fg]["gutachten"] for fg in FALLGRUPPEN)
total_a = sum(kw_data[fg]["alternative"] for fg in FALLGRUPPEN)
kw_data["gesamt"] = {
"gutachten": total_g,
"alternative": total_a,
"bestaetigung": total_g - total_a,
}
weekly = []
for kw in range(1, MAX_KW + 1):
weekly.append(kw_map.get(kw, _empty_gutachten_weekly_row(kw)))
return {"weekly": weekly}
# ---------------------------------------------------------------------------
# Sheet 4: Auswertung Therapieaenderungen
# ---------------------------------------------------------------------------
def calculate_sheet4_data(db: Session, jahr: int) -> dict:
"""Calculate *Auswertung Therapieaenderungen*.
Per KW: Gutachten count, TA Ja, TA Nein, Diagnosekorrektur,
Unterversorgung, Uebertherapie.
Returns::
{
"weekly": [
{
"kw": 1,
"gutachten": X,
"ta_ja": X,
"ta_nein": X,
"diagnosekorrektur": X,
"unterversorgung": X,
"uebertherapie": X,
},
...
]
}
"""
rows = (
db.query(
Case.kw,
func.count(Case.id).label("gutachten"),
func.sum(
(Case.therapieaenderung == "Ja").cast(int)
).label("ta_ja"),
func.sum(
(Case.therapieaenderung == "Nein").cast(int)
).label("ta_nein"),
func.sum(Case.ta_diagnosekorrektur.cast(int)).label("diagnosekorrektur"),
func.sum(Case.ta_unterversorgung.cast(int)).label("unterversorgung"),
func.sum(Case.ta_uebertherapie.cast(int)).label("uebertherapie"),
)
.filter(Case.jahr == jahr, Case.gutachten == True) # noqa: E712
.group_by(Case.kw)
.all()
)
kw_map: dict[int, dict] = {}
for row in rows:
kw = _int(row.kw)
kw_map[kw] = {
"kw": kw,
"gutachten": _int(row.gutachten),
"ta_ja": _int(row.ta_ja),
"ta_nein": _int(row.ta_nein),
"diagnosekorrektur": _int(row.diagnosekorrektur),
"unterversorgung": _int(row.unterversorgung),
"uebertherapie": _int(row.uebertherapie),
}
weekly = []
for kw in range(1, MAX_KW + 1):
weekly.append(kw_map.get(kw, _empty_ta_weekly_row(kw)))
return {"weekly": weekly}
# ---------------------------------------------------------------------------
# Sheet 5: Auswertung ICD onko
# ---------------------------------------------------------------------------
def calculate_sheet5_data(db: Session, jahr: int) -> dict:
"""Calculate *Auswertung ICD onko*.
Returns sorted list of ICD codes from onko cases with counts.
Query: case_icd_codes JOIN cases
WHERE cases.fallgruppe = 'onko' AND cases.jahr = jahr
GROUP BY UPPER(icd_code)
ORDER BY count DESC, icd_code ASC
Returns::
{
"icd_codes": [
{"icd": "C18", "count": 17},
{"icd": "C50", "count": 12},
...
]
}
"""
rows = (
db.query(
func.upper(CaseICDCode.icd_code).label("icd"),
func.count(CaseICDCode.id).label("cnt"),
)
.join(Case, CaseICDCode.case_id == Case.id)
.filter(
and_(
Case.fallgruppe == "onko",
Case.jahr == jahr,
)
)
.group_by(func.upper(CaseICDCode.icd_code))
.order_by(func.count(CaseICDCode.id).desc(), func.upper(CaseICDCode.icd_code))
.all()
)
icd_codes = [{"icd": row.icd, "count": _int(row.cnt)} for row in rows]
return {"icd_codes": icd_codes}
# ---------------------------------------------------------------------------
# Dashboard KPIs
# ---------------------------------------------------------------------------
def calculate_dashboard_kpis(db: Session, jahr: int) -> dict:
"""Calculate live KPIs for the dashboard.
Returns::
{
"total_cases": int,
"pending_icd": int,
"pending_coding": int,
"total_gutachten": int,
"total_ablehnungen": int,
"total_unterlagen": int,
"fallgruppen": {"onko": X, "kardio": X, "intensiv": X, "galle": X, "sd": X},
"gutachten_typen": {"alternative": X, "bestaetigung": X, "uncodiert": X},
}
"""
# Total cases for the year
total_cases = (
db.query(func.count(Case.id)).filter(Case.jahr == jahr).scalar() or 0
)
# Cases without ICD codes entered
pending_icd = (
db.query(func.count(Case.id))
.filter(Case.jahr == jahr, Case.icd == None) # noqa: E711
.scalar()
or 0
)
# Gutachten without gutachten_typ (need coding)
pending_coding = (
db.query(func.count(Case.id))
.filter(
Case.jahr == jahr,
Case.gutachten == True, # noqa: E712
Case.gutachten_typ == None, # noqa: E711
)
.scalar()
or 0
)
# Gutachten totals
total_gutachten = (
db.query(func.count(Case.id))
.filter(Case.jahr == jahr, Case.gutachten == True) # noqa: E712
.scalar()
or 0
)
# Ablehnungen
total_ablehnungen = (
db.query(func.count(Case.id))
.filter(Case.jahr == jahr, Case.ablehnung == True) # noqa: E712
.scalar()
or 0
)
# Unterlagen
total_unterlagen = (
db.query(func.count(Case.id))
.filter(Case.jahr == jahr, Case.unterlagen == True) # noqa: E712
.scalar()
or 0
)
# Per-Fallgruppe counts
fg_rows = (
db.query(Case.fallgruppe, func.count(Case.id).label("cnt"))
.filter(Case.jahr == jahr)
.group_by(Case.fallgruppe)
.all()
)
fallgruppen = {fg: 0 for fg in FALLGRUPPEN}
for row in fg_rows:
if row.fallgruppe in fallgruppen:
fallgruppen[row.fallgruppe] = _int(row.cnt)
# Gutachten type breakdown
typ_rows = (
db.query(Case.gutachten_typ, func.count(Case.id).label("cnt"))
.filter(Case.jahr == jahr, Case.gutachten == True) # noqa: E712
.group_by(Case.gutachten_typ)
.all()
)
gutachten_typen = {"alternative": 0, "bestaetigung": 0, "uncodiert": 0}
for row in typ_rows:
if row.gutachten_typ == "Alternative":
gutachten_typen["alternative"] = _int(row.cnt)
elif row.gutachten_typ == "Bestätigung":
gutachten_typen["bestaetigung"] = _int(row.cnt)
else:
gutachten_typen["uncodiert"] = _int(row.cnt)
return {
"total_cases": total_cases,
"pending_icd": pending_icd,
"pending_coding": pending_coding,
"total_gutachten": total_gutachten,
"total_ablehnungen": total_ablehnungen,
"total_unterlagen": total_unterlagen,
"fallgruppen": fallgruppen,
"gutachten_typen": gutachten_typen,
}
# ---------------------------------------------------------------------------
# Full report generation (all 5 sheets)
# ---------------------------------------------------------------------------
def generate_full_report(db: Session, jahr: int, kw: int | None = None) -> dict:
"""Generate complete report data for all 5 sheets.
The *kw* parameter is recorded for metadata but does not filter the data
-- all sheets always contain the full year up to the latest available KW.
Returns::
{
"jahr": int,
"kw": int | None,
"sheet1": {...},
"sheet2": {...},
"sheet3": {...},
"sheet4": {...},
"sheet5": {...},
}
"""
logger.info("Generating full report for jahr=%d, kw=%s", jahr, kw)
return {
"jahr": jahr,
"kw": kw,
"sheet1": calculate_sheet1_data(db, jahr),
"sheet2": calculate_sheet2_data(db, jahr),
"sheet3": calculate_sheet3_data(db, jahr),
"sheet4": calculate_sheet4_data(db, jahr),
"sheet5": calculate_sheet5_data(db, jahr),
}

View file

@ -0,0 +1,204 @@
"""Vorjahr (previous-year) comparison service.
Provides aggregated previous-year data for Sheet 1's year-over-year columns.
Uses the yearly_summary table as a cache if summary rows exist for the
previous year they are aggregated directly; otherwise the live cases table
is queried and the results are stored for future lookups.
"""
from __future__ import annotations
import logging
from sqlalchemy.orm import Session
from app.models.report import YearlySummary
from app.services.report_service import (
FALLGRUPPEN,
calculate_sheet1_data,
calculate_sheet2_data,
calculate_sheet3_data,
calculate_sheet4_data,
)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def get_vorjahr_summary(db: Session, jahr: int) -> dict:
"""Get previous year's aggregated data for comparison.
First checks the ``yearly_summary`` table (cached per-KW rows).
If no cached data exists, calculates live from the cases table and
persists the results for future use.
Returns::
{
"jahr": int,
"erstberatungen": int,
"ablehnungen": int,
"unterlagen": int,
"keine_rueckmeldung": int,
"gutachten": int,
"gutachten_alternative": int,
"gutachten_bestaetigung": int,
}
"""
vorjahr = jahr - 1
summaries = (
db.query(YearlySummary)
.filter(YearlySummary.jahr == vorjahr)
.all()
)
if summaries:
logger.debug("Vorjahr %d: using %d cached summary rows", vorjahr, len(summaries))
return _aggregate_summaries(vorjahr, summaries)
# No cache — calculate live and store
logger.info("Vorjahr %d: no cache, calculating from cases table", vorjahr)
return _calculate_and_cache(db, vorjahr)
def get_vorjahr_detail(db: Session, jahr: int) -> dict:
"""Get previous year's full breakdown (all sheets) for side-by-side comparison.
This always calculates live from the cases table (no caching of detail data).
Returns the same structure as ``generate_full_report`` but for *jahr - 1*.
"""
vorjahr = jahr - 1
from app.services.report_service import generate_full_report
return generate_full_report(db, vorjahr)
def refresh_vorjahr_cache(db: Session, jahr: int) -> int:
"""Recalculate and store yearly summary rows for the given year.
This is useful after historical data has been imported or corrected.
Returns the number of KW rows written.
"""
logger.info("Refreshing yearly summary cache for jahr=%d", jahr)
# Delete existing cache rows for this year
db.query(YearlySummary).filter(YearlySummary.jahr == jahr).delete()
db.flush()
return _store_summaries(db, jahr)
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _aggregate_summaries(jahr: int, summaries: list[YearlySummary]) -> dict:
"""Sum up weekly YearlySummary rows into yearly totals."""
result = {
"jahr": jahr,
"erstberatungen": sum(_safe(s.erstberatungen) for s in summaries),
"ablehnungen": sum(_safe(s.ablehnungen) for s in summaries),
"unterlagen": sum(_safe(s.unterlagen) for s in summaries),
"keine_rueckmeldung": sum(_safe(s.keine_rueckmeldung) for s in summaries),
"gutachten": sum(_safe(s.gutachten_gesamt) for s in summaries),
"gutachten_alternative": sum(_safe(s.gutachten_alternative) for s in summaries),
"gutachten_bestaetigung": sum(_safe(s.gutachten_bestaetigung) for s in summaries),
}
return result
def _calculate_and_cache(db: Session, jahr: int) -> dict:
"""Calculate from cases, store in yearly_summary, return aggregated."""
written = _store_summaries(db, jahr)
logger.info("Cached %d KW rows for jahr=%d", written, jahr)
# Re-read from cache to return consistent data
summaries = (
db.query(YearlySummary)
.filter(YearlySummary.jahr == jahr)
.all()
)
return _aggregate_summaries(jahr, summaries)
def _store_summaries(db: Session, jahr: int) -> int:
"""Calculate all sheet data and store one YearlySummary row per KW.
Returns number of KW rows written.
"""
sheet1 = calculate_sheet1_data(db, jahr)
sheet2 = calculate_sheet2_data(db, jahr)
sheet3 = calculate_sheet3_data(db, jahr)
sheet4 = calculate_sheet4_data(db, jahr)
written = 0
for i, s1_week in enumerate(sheet1["weekly"]):
kw = s1_week["kw"]
s2_week = sheet2["weekly"][i]
s3_week = sheet3["weekly"][i]
s4_week = sheet4["weekly"][i]
# Skip KWs with zero data everywhere (no need to store empty rows)
if s1_week["erstberatungen"] == 0 and s1_week["gutachten"] == 0:
continue
summary = YearlySummary(
jahr=jahr,
kw=kw,
# Sheet 1 totals
erstberatungen=s1_week["erstberatungen"],
ablehnungen=s1_week["ablehnungen"],
unterlagen=s1_week["unterlagen"],
keine_rueckmeldung=s1_week["keine_rm"],
gutachten_gesamt=s1_week["gutachten"],
# Sheet 3 gesamt
gutachten_alternative=s3_week["gesamt"]["alternative"],
gutachten_bestaetigung=s3_week["gesamt"]["bestaetigung"],
# Sheet 2: per-Fallgruppe
onko_anzahl=s2_week["onko"]["anzahl"],
onko_gutachten=s2_week["onko"]["gutachten"],
onko_keine_rm=s2_week["onko"]["keine_rm"],
kardio_anzahl=s2_week["kardio"]["anzahl"],
kardio_gutachten=s2_week["kardio"]["gutachten"],
kardio_keine_rm=s2_week["kardio"]["keine_rm"],
intensiv_anzahl=s2_week["intensiv"]["anzahl"],
intensiv_gutachten=s2_week["intensiv"]["gutachten"],
intensiv_keine_rm=s2_week["intensiv"]["keine_rm"],
galle_anzahl=s2_week["galle"]["anzahl"],
galle_gutachten=s2_week["galle"]["gutachten"],
galle_keine_rm=s2_week["galle"]["keine_rm"],
sd_anzahl=s2_week["sd"]["anzahl"],
sd_gutachten=s2_week["sd"]["gutachten"],
sd_keine_rm=s2_week["sd"]["keine_rm"],
# Sheet 3: per-Fallgruppe gutachten types
onko_alternative=s3_week["onko"]["alternative"],
onko_bestaetigung=s3_week["onko"]["bestaetigung"],
kardio_alternative=s3_week["kardio"]["alternative"],
kardio_bestaetigung=s3_week["kardio"]["bestaetigung"],
intensiv_alternative=s3_week["intensiv"]["alternative"],
intensiv_bestaetigung=s3_week["intensiv"]["bestaetigung"],
galle_alternative=s3_week["galle"]["alternative"],
galle_bestaetigung=s3_week["galle"]["bestaetigung"],
sd_alternative=s3_week["sd"]["alternative"],
sd_bestaetigung=s3_week["sd"]["bestaetigung"],
# Sheet 4: therapy changes
ta_ja=s4_week["ta_ja"],
ta_nein=s4_week["ta_nein"],
ta_diagnosekorrektur=s4_week["diagnosekorrektur"],
ta_unterversorgung=s4_week["unterversorgung"],
ta_uebertherapie=s4_week["uebertherapie"],
)
db.add(summary)
written += 1
db.commit()
return written
def _safe(val) -> int:
"""Coerce None to 0."""
return val if val is not None else 0