mirror of
https://github.com/complexcaresolutions/dak.c2s.git
synced 2026-03-17 17:13:42 +00:00
feat: Excel export in Berichtswesen format + historical import
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
d6fb04d5a7
commit
1748379253
2 changed files with 989 additions and 0 deletions
486
backend/app/services/excel_export.py
Normal file
486
backend/app/services/excel_export.py
Normal file
|
|
@ -0,0 +1,486 @@
|
||||||
|
"""Excel export service for Berichtswesen files.
|
||||||
|
|
||||||
|
Generates .xlsx files in the exact format used by the DAK Zweitmeinungs-Portal
|
||||||
|
Berichtswesen, matching the structure of historical files (2023-2026).
|
||||||
|
|
||||||
|
The workbook contains 4 sheets:
|
||||||
|
1. Auswertung KW gesamt - Overall weekly summary + year comparison
|
||||||
|
2. Auswertung nach Fachgebieten - Weekly breakdown by Fallgruppe
|
||||||
|
3. Auswertung Gutachten - Gutachten details (Alternative/Bestaetigung)
|
||||||
|
4. Auswertung ICD onko - ICD code frequency listing
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from io import BytesIO
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from openpyxl import Workbook
|
||||||
|
from openpyxl.styles import Alignment, Font, PatternFill
|
||||||
|
from openpyxl.utils import get_column_letter
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Styles matching the reference Berichtswesen files
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
# Header row fill: theme colour 0 with tint -0.15 (light grey).
|
||||||
|
# openpyxl cannot replicate theme-based tints portably, so we use a concrete
|
||||||
|
# grey that closely matches the rendered colour in the original files.
|
||||||
|
HEADER_FILL = PatternFill(start_color="FFD9D9D9", end_color="FFD9D9D9", fill_type="solid")
|
||||||
|
BOLD_FONT = Font(bold=True)
|
||||||
|
HEADER_FONT = Font(bold=True, name="Calibri", size=11)
|
||||||
|
TITLE_FONT = Font(bold=True, name="Calibri", size=12)
|
||||||
|
PCT_FORMAT = "0%"
|
||||||
|
|
||||||
|
# Fallgruppe keys in canonical order
|
||||||
|
FALLGRUPPEN = ["onko", "kardio", "intensiv", "galle", "sd"]
|
||||||
|
FALLGRUPPEN_LABELS = {
|
||||||
|
"onko": "Fallgruppe onko",
|
||||||
|
"kardio": "Fallgruppe kardio",
|
||||||
|
"intensiv": "Fallgruppe intensiv",
|
||||||
|
"galle": "Fallgruppe Gallenblase",
|
||||||
|
"sd": "Fallgruppe Schilddr\u00fcse",
|
||||||
|
}
|
||||||
|
|
||||||
|
MAX_KW = 52 # always output 52 rows
|
||||||
|
|
||||||
|
|
||||||
|
def generate_berichtswesen_xlsx(
|
||||||
|
report_data: dict[str, Any],
|
||||||
|
jahr: int,
|
||||||
|
vorjahr_data: dict[str, Any] | None = None,
|
||||||
|
) -> bytes:
|
||||||
|
"""Generate a Berichtswesen Excel file.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
report_data: Output from ``report_service.generate_full_report()``.
|
||||||
|
Expected keys: ``sheet1``, ``sheet2``, ``sheet3``, ``icd_codes``.
|
||||||
|
jahr: The report year.
|
||||||
|
vorjahr_data: Previous-year summary data for the year-over-year
|
||||||
|
comparison block on Sheet 1. Structure same as *report_data*
|
||||||
|
``sheet1`` (i.e. containing ``totals`` and ``weeks``).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The ``.xlsx`` file contents as *bytes*.
|
||||||
|
"""
|
||||||
|
wb = Workbook()
|
||||||
|
|
||||||
|
_write_sheet1_kw_gesamt(wb, report_data.get("sheet1", {}), jahr, vorjahr_data)
|
||||||
|
_write_sheet2_fachgebiete(wb, report_data.get("sheet2", {}), jahr)
|
||||||
|
_write_sheet3_gutachten(wb, report_data.get("sheet3", {}), jahr)
|
||||||
|
_write_sheet4_icd_onko(wb, report_data.get("icd_codes", []), jahr)
|
||||||
|
|
||||||
|
# Remove the default empty sheet created by Workbook()
|
||||||
|
if "Sheet" in wb.sheetnames:
|
||||||
|
del wb["Sheet"]
|
||||||
|
|
||||||
|
buf = BytesIO()
|
||||||
|
wb.save(buf)
|
||||||
|
return buf.getvalue()
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Internal helpers
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def _safe(val: Any, default: int = 0) -> int:
|
||||||
|
"""Return *val* as int, falling back to *default*."""
|
||||||
|
if val is None:
|
||||||
|
return default
|
||||||
|
try:
|
||||||
|
return int(val)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
return default
|
||||||
|
|
||||||
|
|
||||||
|
def _pct(numerator: int, denominator: int) -> float | None:
|
||||||
|
"""Return fraction or *None* if denominator is zero."""
|
||||||
|
if denominator == 0:
|
||||||
|
return None
|
||||||
|
return numerator / denominator
|
||||||
|
|
||||||
|
|
||||||
|
def _apply_header_style(ws, row: int, min_col: int, max_col: int) -> None:
|
||||||
|
"""Apply header fill + bold font to a row range."""
|
||||||
|
for col in range(min_col, max_col + 1):
|
||||||
|
cell = ws.cell(row=row, column=col)
|
||||||
|
cell.fill = HEADER_FILL
|
||||||
|
cell.font = HEADER_FONT
|
||||||
|
|
||||||
|
|
||||||
|
def _auto_col_width(ws, min_col: int = 1, max_col: int | None = None, min_width: float = 8.0) -> None:
|
||||||
|
"""Set column widths based on content length, with a sensible minimum."""
|
||||||
|
if max_col is None:
|
||||||
|
max_col = ws.max_column or 1
|
||||||
|
for col in range(min_col, max_col + 1):
|
||||||
|
max_len = min_width
|
||||||
|
col_letter = get_column_letter(col)
|
||||||
|
for row in range(1, (ws.max_row or 1) + 1):
|
||||||
|
val = ws.cell(row=row, column=col).value
|
||||||
|
if val is not None:
|
||||||
|
cell_len = len(str(val)) + 2
|
||||||
|
if cell_len > max_len:
|
||||||
|
max_len = cell_len
|
||||||
|
# Cap at reasonable width
|
||||||
|
ws.column_dimensions[col_letter].width = min(max_len, 35)
|
||||||
|
|
||||||
|
|
||||||
|
def _weeks_lookup(weeks: list[dict]) -> dict[int, dict]:
|
||||||
|
"""Convert a list of week dicts to ``{kw: dict}`` for fast lookup."""
|
||||||
|
return {w["kw"]: w for w in weeks} if weeks else {}
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Sheet 1: Auswertung KW gesamt
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def _write_sheet1_kw_gesamt(
|
||||||
|
wb: Workbook,
|
||||||
|
data: dict[str, Any],
|
||||||
|
jahr: int,
|
||||||
|
vorjahr_data: dict[str, Any] | None,
|
||||||
|
) -> None:
|
||||||
|
"""Write the 'Auswertung KW gesamt' sheet.
|
||||||
|
|
||||||
|
Layout (matching reference files):
|
||||||
|
Row 1: A1 = "Gesamtuebersicht"
|
||||||
|
Row 2: C2 = <jahr>, E2 = <vorjahr>
|
||||||
|
Row 3: A3 = "Gesamtzahl an Erstberatungen", C3 = count, E3 = vorjahr
|
||||||
|
Row 4: A4 = "Anzahl Ablehnungen", C4 = count, D4 = %, E4 = vorjahr, F4 = %
|
||||||
|
Row 5: A5 = "Anzahl versendeter Unterlagen", C5 = count, D5 = %, E5 = vorjahr, F5 = %
|
||||||
|
Row 6: A6 = "Anzahl keine Rueckmeldung", C6 = count, D6 = %, E6 = vorjahr, F6 = %
|
||||||
|
Row 7: A7 = "Anzahl erstellter Gutachten", C7 = count, D7 = %, E7 = vorjahr, F7 = %
|
||||||
|
Row 8-9: (empty)
|
||||||
|
Row 10: Column headers (KW | Erstberatungen | Unterlagen | Ablehnung | Keine Rueckmeldung | Gutachten)
|
||||||
|
Row 11-62: KW 1-52 data
|
||||||
|
Row 63: Summe row
|
||||||
|
"""
|
||||||
|
ws = wb.create_sheet(title="Auswertung KW gesamt")
|
||||||
|
|
||||||
|
totals = data.get("totals", {})
|
||||||
|
weeks = _weeks_lookup(data.get("weeks", []))
|
||||||
|
|
||||||
|
erst = _safe(totals.get("erstberatungen"))
|
||||||
|
abl = _safe(totals.get("ablehnungen"))
|
||||||
|
unt = _safe(totals.get("unterlagen"))
|
||||||
|
keine_rm = _safe(totals.get("keine_rueckmeldung"))
|
||||||
|
gutachten = _safe(totals.get("gutachten_gesamt"))
|
||||||
|
|
||||||
|
# Vorjahr totals
|
||||||
|
vj = vorjahr_data or {}
|
||||||
|
vj_totals = vj.get("totals", {}) if isinstance(vj, dict) else {}
|
||||||
|
vj_erst = _safe(vj_totals.get("erstberatungen"))
|
||||||
|
vj_abl = _safe(vj_totals.get("ablehnungen"))
|
||||||
|
vj_unt = _safe(vj_totals.get("unterlagen"))
|
||||||
|
vj_keine = _safe(vj_totals.get("keine_rueckmeldung"))
|
||||||
|
vj_ga = _safe(vj_totals.get("gutachten_gesamt"))
|
||||||
|
|
||||||
|
# --- Title ---
|
||||||
|
ws["A1"] = "Gesamt\u00fcbersicht"
|
||||||
|
ws["A1"].font = TITLE_FONT
|
||||||
|
|
||||||
|
# --- Year headers ---
|
||||||
|
ws["C2"] = jahr
|
||||||
|
ws["E2"] = jahr - 1
|
||||||
|
|
||||||
|
# --- Summary rows ---
|
||||||
|
summary_rows = [
|
||||||
|
("Gesamtzahl an Erstberatungen", erst, None, vj_erst, None),
|
||||||
|
("Anzahl Ablehnungen", abl, _pct(abl, erst), vj_abl, _pct(vj_abl, vj_erst)),
|
||||||
|
("Anzahl versendeter Unterlagen", unt, _pct(unt, erst), vj_unt, _pct(vj_unt, vj_erst)),
|
||||||
|
("Anzahl keine R\u00fcckmeldung", keine_rm, _pct(keine_rm, erst), vj_keine, _pct(vj_keine, vj_erst)),
|
||||||
|
("Anzahl erstellter Gutachten", gutachten, _pct(gutachten, erst), vj_ga, _pct(vj_ga, vj_erst)),
|
||||||
|
]
|
||||||
|
for i, (label, val, pct, vj_val, vj_pct) in enumerate(summary_rows):
|
||||||
|
row = 3 + i
|
||||||
|
ws.cell(row=row, column=1, value=label)
|
||||||
|
ws.cell(row=row, column=3, value=val)
|
||||||
|
if pct is not None:
|
||||||
|
c = ws.cell(row=row, column=4, value=pct)
|
||||||
|
c.number_format = PCT_FORMAT
|
||||||
|
if vj_val:
|
||||||
|
ws.cell(row=row, column=5, value=vj_val)
|
||||||
|
if vj_pct is not None:
|
||||||
|
c = ws.cell(row=row, column=6, value=vj_pct)
|
||||||
|
c.number_format = PCT_FORMAT
|
||||||
|
|
||||||
|
# --- Column headers (row 10) ---
|
||||||
|
headers = ["KW", "Anzahl an Erstberatungen", "Unterlagen", "Ablehnung", "Keine R\u00fcckmeldung", "Gutachten"]
|
||||||
|
for ci, h in enumerate(headers, start=1):
|
||||||
|
ws.cell(row=10, column=ci, value=h)
|
||||||
|
_apply_header_style(ws, 10, 1, 6)
|
||||||
|
|
||||||
|
# --- Weekly data (rows 11-62) ---
|
||||||
|
for kw in range(1, MAX_KW + 1):
|
||||||
|
row = 10 + kw # row 11 = KW 1
|
||||||
|
w = weeks.get(kw, {})
|
||||||
|
ws.cell(row=row, column=1, value=kw)
|
||||||
|
ws.cell(row=row, column=2, value=_safe(w.get("erstberatungen")))
|
||||||
|
ws.cell(row=row, column=3, value=_safe(w.get("unterlagen")))
|
||||||
|
ws.cell(row=row, column=4, value=_safe(w.get("ablehnungen")))
|
||||||
|
ws.cell(row=row, column=5, value=_safe(w.get("keine_rueckmeldung")))
|
||||||
|
ws.cell(row=row, column=6, value=_safe(w.get("gutachten_gesamt")))
|
||||||
|
|
||||||
|
# --- Summe row (row 63) ---
|
||||||
|
summe_row = 10 + MAX_KW + 1 # 63
|
||||||
|
ws.cell(row=summe_row, column=1, value="Summe")
|
||||||
|
ws.cell(row=summe_row, column=2, value=erst)
|
||||||
|
ws.cell(row=summe_row, column=3, value=unt)
|
||||||
|
ws.cell(row=summe_row, column=4, value=abl)
|
||||||
|
ws.cell(row=summe_row, column=5, value=keine_rm)
|
||||||
|
ws.cell(row=summe_row, column=6, value=gutachten)
|
||||||
|
_apply_header_style(ws, summe_row, 1, 6)
|
||||||
|
|
||||||
|
# --- Column widths ---
|
||||||
|
ws.column_dimensions["A"].width = 10
|
||||||
|
ws.column_dimensions["B"].width = 24
|
||||||
|
ws.column_dimensions["C"].width = 12
|
||||||
|
ws.column_dimensions["D"].width = 12
|
||||||
|
ws.column_dimensions["E"].width = 20
|
||||||
|
ws.column_dimensions["F"].width = 12
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Sheet 2: Auswertung nach Fachgebieten
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def _write_sheet2_fachgebiete(
|
||||||
|
wb: Workbook,
|
||||||
|
data: dict[str, Any],
|
||||||
|
jahr: int,
|
||||||
|
) -> None:
|
||||||
|
"""Write the 'Auswertung nach Fachgebieten' sheet.
|
||||||
|
|
||||||
|
Layout:
|
||||||
|
Row 1: A1 = "Uebersicht nach Fallgruppen"
|
||||||
|
Row 3: Merged group headers (B3:D3, E3:G3, H3:J3, K3:M3, N3:P3)
|
||||||
|
Row 4: Sub-headers: KW | Anzahl | Gutachten | Keine RM/Ablehnung (x5)
|
||||||
|
Row 5-56: KW 1-52 data
|
||||||
|
Row 57: Summe row
|
||||||
|
"""
|
||||||
|
ws = wb.create_sheet(title="Auswertung nach Fachgebieten")
|
||||||
|
|
||||||
|
weeks = _weeks_lookup(data.get("weeks", []))
|
||||||
|
|
||||||
|
# --- Title ---
|
||||||
|
ws["A1"] = "\u00dcbersicht nach Fallgruppen"
|
||||||
|
ws["A1"].font = TITLE_FONT
|
||||||
|
|
||||||
|
# --- Group headers (row 3) with merged cells ---
|
||||||
|
group_start_cols = [2, 5, 8, 11, 14] # B, E, H, K, N
|
||||||
|
for fg_key, start_col in zip(FALLGRUPPEN, group_start_cols):
|
||||||
|
label = FALLGRUPPEN_LABELS[fg_key]
|
||||||
|
cell = ws.cell(row=3, column=start_col, value=label)
|
||||||
|
cell.fill = HEADER_FILL
|
||||||
|
cell.font = HEADER_FONT
|
||||||
|
cell.alignment = Alignment(horizontal="center")
|
||||||
|
# Merge across 3 columns
|
||||||
|
end_col = start_col + 2
|
||||||
|
ws.merge_cells(
|
||||||
|
start_row=3, start_column=start_col,
|
||||||
|
end_row=3, end_column=end_col,
|
||||||
|
)
|
||||||
|
# Apply fill to merged cells too
|
||||||
|
for c in range(start_col, end_col + 1):
|
||||||
|
ws.cell(row=3, column=c).fill = HEADER_FILL
|
||||||
|
|
||||||
|
# --- Sub-headers (row 4) ---
|
||||||
|
ws.cell(row=4, column=1, value="KW")
|
||||||
|
for start_col in group_start_cols:
|
||||||
|
ws.cell(row=4, column=start_col, value="Anzahl")
|
||||||
|
ws.cell(row=4, column=start_col + 1, value="Gutachten")
|
||||||
|
ws.cell(row=4, column=start_col + 2, value="Keine R\u00fcckmeldung/Ablehnung")
|
||||||
|
_apply_header_style(ws, 4, 1, 16)
|
||||||
|
|
||||||
|
# --- Weekly data (rows 5-56) ---
|
||||||
|
sums = {fg: {"anzahl": 0, "gutachten": 0, "keine_rm": 0} for fg in FALLGRUPPEN}
|
||||||
|
|
||||||
|
for kw in range(1, MAX_KW + 1):
|
||||||
|
row = 4 + kw # row 5 = KW 1
|
||||||
|
w = weeks.get(kw, {})
|
||||||
|
ws.cell(row=row, column=1, value=kw)
|
||||||
|
|
||||||
|
for fg_key, start_col in zip(FALLGRUPPEN, group_start_cols):
|
||||||
|
fg_data = w.get(fg_key, {})
|
||||||
|
anz = _safe(fg_data.get("anzahl"))
|
||||||
|
ga = _safe(fg_data.get("gutachten"))
|
||||||
|
keine = _safe(fg_data.get("keine_rm"))
|
||||||
|
|
||||||
|
ws.cell(row=row, column=start_col, value=anz)
|
||||||
|
ws.cell(row=row, column=start_col + 1, value=ga)
|
||||||
|
ws.cell(row=row, column=start_col + 2, value=keine)
|
||||||
|
|
||||||
|
sums[fg_key]["anzahl"] += anz
|
||||||
|
sums[fg_key]["gutachten"] += ga
|
||||||
|
sums[fg_key]["keine_rm"] += keine
|
||||||
|
|
||||||
|
# --- Summe row (row 57) ---
|
||||||
|
summe_row = 4 + MAX_KW + 1 # 57
|
||||||
|
for fg_key, start_col in zip(FALLGRUPPEN, group_start_cols):
|
||||||
|
ws.cell(row=summe_row, column=start_col, value=sums[fg_key]["anzahl"])
|
||||||
|
ws.cell(row=summe_row, column=start_col + 1, value=sums[fg_key]["gutachten"])
|
||||||
|
ws.cell(row=summe_row, column=start_col + 2, value=sums[fg_key]["keine_rm"])
|
||||||
|
_apply_header_style(ws, summe_row, 1, 16)
|
||||||
|
|
||||||
|
# --- Column widths ---
|
||||||
|
ws.column_dimensions["A"].width = 6
|
||||||
|
for start_col in group_start_cols:
|
||||||
|
ws.column_dimensions[get_column_letter(start_col)].width = 8
|
||||||
|
ws.column_dimensions[get_column_letter(start_col + 1)].width = 10
|
||||||
|
ws.column_dimensions[get_column_letter(start_col + 2)].width = 14
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Sheet 3: Auswertung Gutachten
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def _write_sheet3_gutachten(
|
||||||
|
wb: Workbook,
|
||||||
|
data: dict[str, Any],
|
||||||
|
jahr: int,
|
||||||
|
) -> None:
|
||||||
|
"""Write the 'Auswertung Gutachten' sheet.
|
||||||
|
|
||||||
|
Layout:
|
||||||
|
Row 1: A1 = "Uebersicht nach Fallgruppen"
|
||||||
|
Row 3: Group headers: Gesamt (B3) + 5 Fallgruppen
|
||||||
|
Row 4: Sub-headers: KW | Gutachten | Alternative | Bestaetigung (x6)
|
||||||
|
Row 5-56: KW 1-52 data
|
||||||
|
Row 57: Summe row
|
||||||
|
"""
|
||||||
|
ws = wb.create_sheet(title="Auswertung Gutachten")
|
||||||
|
|
||||||
|
weeks = _weeks_lookup(data.get("weeks", []))
|
||||||
|
|
||||||
|
# --- Title ---
|
||||||
|
ws["A1"] = "\u00dcbersicht nach Fallgruppen"
|
||||||
|
ws["A1"].font = TITLE_FONT
|
||||||
|
|
||||||
|
# --- Group headers (row 3) ---
|
||||||
|
# Gesamt: B3 (no merge since it's a single-column-start group header,
|
||||||
|
# but in the reference the Gesamt label sits in B3 without a merge)
|
||||||
|
cell = ws.cell(row=3, column=2, value="Gesamt")
|
||||||
|
cell.fill = HEADER_FILL
|
||||||
|
cell.font = HEADER_FONT
|
||||||
|
cell.alignment = Alignment(horizontal="center")
|
||||||
|
|
||||||
|
# Fallgruppen start at columns E, H, K, N, Q (each 3 cols wide)
|
||||||
|
fg_start_cols = [5, 8, 11, 14, 17]
|
||||||
|
for fg_key, start_col in zip(FALLGRUPPEN, fg_start_cols):
|
||||||
|
label = FALLGRUPPEN_LABELS[fg_key]
|
||||||
|
cell = ws.cell(row=3, column=start_col, value=label)
|
||||||
|
cell.fill = HEADER_FILL
|
||||||
|
cell.font = HEADER_FONT
|
||||||
|
cell.alignment = Alignment(horizontal="center")
|
||||||
|
# Merge: the reference merges first 2 of 3 columns (E3:F3, H3:I3, etc.)
|
||||||
|
end_col = start_col + 1
|
||||||
|
ws.merge_cells(
|
||||||
|
start_row=3, start_column=start_col,
|
||||||
|
end_row=3, end_column=end_col,
|
||||||
|
)
|
||||||
|
for c in range(start_col, end_col + 1):
|
||||||
|
ws.cell(row=3, column=c).fill = HEADER_FILL
|
||||||
|
|
||||||
|
# --- Sub-headers (row 4) ---
|
||||||
|
ws.cell(row=4, column=1, value="KW")
|
||||||
|
|
||||||
|
# Gesamt columns: B=Gutachten, C=Alternative, D=Bestaetigung
|
||||||
|
ws.cell(row=4, column=2, value="Gutachten")
|
||||||
|
ws.cell(row=4, column=3, value="Alternative")
|
||||||
|
ws.cell(row=4, column=4, value="Best\u00e4tigung")
|
||||||
|
|
||||||
|
for start_col in fg_start_cols:
|
||||||
|
ws.cell(row=4, column=start_col, value="Gutachten")
|
||||||
|
ws.cell(row=4, column=start_col + 1, value="Alternative")
|
||||||
|
ws.cell(row=4, column=start_col + 2, value="Best\u00e4tigung")
|
||||||
|
_apply_header_style(ws, 4, 1, 19)
|
||||||
|
|
||||||
|
# --- Weekly data (rows 5-56) ---
|
||||||
|
sums_gesamt = {"gutachten": 0, "alternative": 0, "bestaetigung": 0}
|
||||||
|
sums_fg = {fg: {"gutachten": 0, "alternative": 0, "bestaetigung": 0} for fg in FALLGRUPPEN}
|
||||||
|
|
||||||
|
for kw in range(1, MAX_KW + 1):
|
||||||
|
row = 4 + kw
|
||||||
|
w = weeks.get(kw, {})
|
||||||
|
ws.cell(row=row, column=1, value=kw)
|
||||||
|
|
||||||
|
# Gesamt
|
||||||
|
g_ges = _safe(w.get("gutachten_gesamt"))
|
||||||
|
g_alt = _safe(w.get("gutachten_alternative"))
|
||||||
|
g_best = _safe(w.get("gutachten_bestaetigung"))
|
||||||
|
ws.cell(row=row, column=2, value=g_ges)
|
||||||
|
ws.cell(row=row, column=3, value=g_alt)
|
||||||
|
ws.cell(row=row, column=4, value=g_best)
|
||||||
|
sums_gesamt["gutachten"] += g_ges
|
||||||
|
sums_gesamt["alternative"] += g_alt
|
||||||
|
sums_gesamt["bestaetigung"] += g_best
|
||||||
|
|
||||||
|
# Per Fallgruppe
|
||||||
|
for fg_key, start_col in zip(FALLGRUPPEN, fg_start_cols):
|
||||||
|
fg_data = w.get(fg_key, {})
|
||||||
|
ga = _safe(fg_data.get("gutachten"))
|
||||||
|
alt = _safe(fg_data.get("alternative"))
|
||||||
|
best = _safe(fg_data.get("bestaetigung"))
|
||||||
|
ws.cell(row=row, column=start_col, value=ga)
|
||||||
|
ws.cell(row=row, column=start_col + 1, value=alt)
|
||||||
|
ws.cell(row=row, column=start_col + 2, value=best)
|
||||||
|
sums_fg[fg_key]["gutachten"] += ga
|
||||||
|
sums_fg[fg_key]["alternative"] += alt
|
||||||
|
sums_fg[fg_key]["bestaetigung"] += best
|
||||||
|
|
||||||
|
# --- Summe row (row 57) ---
|
||||||
|
summe_row = 4 + MAX_KW + 1
|
||||||
|
ws.cell(row=summe_row, column=2, value=sums_gesamt["gutachten"])
|
||||||
|
ws.cell(row=summe_row, column=3, value=sums_gesamt["alternative"])
|
||||||
|
ws.cell(row=summe_row, column=4, value=sums_gesamt["bestaetigung"])
|
||||||
|
for fg_key, start_col in zip(FALLGRUPPEN, fg_start_cols):
|
||||||
|
ws.cell(row=summe_row, column=start_col, value=sums_fg[fg_key]["gutachten"])
|
||||||
|
ws.cell(row=summe_row, column=start_col + 1, value=sums_fg[fg_key]["alternative"])
|
||||||
|
ws.cell(row=summe_row, column=start_col + 2, value=sums_fg[fg_key]["bestaetigung"])
|
||||||
|
_apply_header_style(ws, summe_row, 1, 19)
|
||||||
|
|
||||||
|
# --- Column widths ---
|
||||||
|
ws.column_dimensions["A"].width = 6
|
||||||
|
for col in range(2, 20):
|
||||||
|
ws.column_dimensions[get_column_letter(col)].width = 12
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Sheet 4: Auswertung ICD onko
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def _write_sheet4_icd_onko(
|
||||||
|
wb: Workbook,
|
||||||
|
icd_data: list[dict[str, Any]],
|
||||||
|
jahr: int,
|
||||||
|
) -> None:
|
||||||
|
"""Write the 'Auswertung ICD onko' sheet.
|
||||||
|
|
||||||
|
Layout:
|
||||||
|
Row 1: A1 = "ICD", B1 = "Anzahl von ICD"
|
||||||
|
Row 2: A2 = "ICD", B2 = "AnzahlvonICD" (pivot-table artefact in originals)
|
||||||
|
Row 3+: ICD code | count (sorted by count descending)
|
||||||
|
"""
|
||||||
|
ws = wb.create_sheet(title="Auswertung ICD onko")
|
||||||
|
|
||||||
|
# Header row (matches reference format exactly)
|
||||||
|
ws["A1"] = "ICD"
|
||||||
|
ws["B1"] = "Anzahl von ICD"
|
||||||
|
ws["A1"].font = HEADER_FONT
|
||||||
|
ws["B1"].font = HEADER_FONT
|
||||||
|
|
||||||
|
# Second "header" row (artefact from pivot table in originals)
|
||||||
|
ws["A2"] = "ICD"
|
||||||
|
ws["B2"] = "AnzahlvonICD"
|
||||||
|
|
||||||
|
# Sort by count descending
|
||||||
|
sorted_icd = sorted(icd_data, key=lambda x: _safe(x.get("count")), reverse=True)
|
||||||
|
|
||||||
|
for i, entry in enumerate(sorted_icd):
|
||||||
|
row = 3 + i
|
||||||
|
ws.cell(row=row, column=1, value=entry.get("icd", ""))
|
||||||
|
ws.cell(row=row, column=2, value=_safe(entry.get("count")))
|
||||||
|
|
||||||
|
# Column widths
|
||||||
|
ws.column_dimensions["A"].width = 12
|
||||||
|
ws.column_dimensions["B"].width = 16
|
||||||
503
backend/scripts/import_berichtswesen.py
Normal file
503
backend/scripts/import_berichtswesen.py
Normal file
|
|
@ -0,0 +1,503 @@
|
||||||
|
"""Import historical Berichtswesen .xlsx files into yearly_summary table.
|
||||||
|
|
||||||
|
Reads Sheet 1 (KW gesamt), Sheet 2 (Fachgebiete), Sheet 3 (Gutachten) and
|
||||||
|
stores the aggregated per-week data in the yearly_summary table for
|
||||||
|
year-over-year comparison in future reports.
|
||||||
|
|
||||||
|
Supports two file formats:
|
||||||
|
- 2023+ format: 4 sheets (KW gesamt, Fachgebiete, Gutachten, ICD onko)
|
||||||
|
- 2022 format: 9 sheets (separate per-Fallgruppe sheets)
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
cd /home/frontend/dak_c2s/backend
|
||||||
|
source venv/bin/activate
|
||||||
|
|
||||||
|
# Import a single file:
|
||||||
|
python -m scripts.import_berichtswesen /path/to/Berichtswesen_2025.xlsx 2025
|
||||||
|
|
||||||
|
# Import all Berichtswesen files from data/ directory:
|
||||||
|
python -m scripts.import_berichtswesen
|
||||||
|
|
||||||
|
# Dry run (parse only, no DB writes):
|
||||||
|
python -m scripts.import_berichtswesen --dry-run
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import glob
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import re
|
||||||
|
import sys
|
||||||
|
|
||||||
|
# Ensure the backend package is importable
|
||||||
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||||
|
|
||||||
|
from openpyxl import load_workbook
|
||||||
|
|
||||||
|
from app.database import SessionLocal
|
||||||
|
from app.models.report import YearlySummary
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Fallgruppe keys in canonical order
|
||||||
|
FALLGRUPPEN = ["onko", "kardio", "intensiv", "galle", "sd"]
|
||||||
|
|
||||||
|
# Mapping from 2022-style sheet suffixes to Fallgruppe keys
|
||||||
|
FALLGRUPPE_2022_MAP = {
|
||||||
|
"O": "onko",
|
||||||
|
"K": "kardio",
|
||||||
|
"I": "intensiv",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Parsing helpers
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def _parse_kw(value) -> int | None:
|
||||||
|
"""Extract KW number from a cell value.
|
||||||
|
|
||||||
|
Handles both formats:
|
||||||
|
- Integer: 1, 2, ... 52
|
||||||
|
- String: "KW01", "KW02", etc.
|
||||||
|
"""
|
||||||
|
if isinstance(value, (int, float)):
|
||||||
|
kw = int(value)
|
||||||
|
if 1 <= kw <= 53:
|
||||||
|
return kw
|
||||||
|
return None
|
||||||
|
if isinstance(value, str):
|
||||||
|
m = re.match(r"KW\s*(\d+)", value, re.IGNORECASE)
|
||||||
|
if m:
|
||||||
|
return int(m.group(1))
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _safe_int(value, default: int = 0) -> int:
|
||||||
|
"""Convert a cell value to int, defaulting to *default*."""
|
||||||
|
if value is None:
|
||||||
|
return default
|
||||||
|
try:
|
||||||
|
return int(value)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
return default
|
||||||
|
|
||||||
|
|
||||||
|
def _detect_year_from_filename(filename: str) -> int | None:
|
||||||
|
"""Try to extract the year from a Berichtswesen filename.
|
||||||
|
|
||||||
|
Examples:
|
||||||
|
Berichtswesen_2025_31122025_final.xlsx -> 2025
|
||||||
|
Berichtswesen_2022.xlsx -> 2022
|
||||||
|
"""
|
||||||
|
m = re.search(r"Berichtswesen_(\d{4})", os.path.basename(filename))
|
||||||
|
if m:
|
||||||
|
return int(m.group(1))
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _is_2022_format(wb) -> bool:
|
||||||
|
"""Detect whether this is the older 2022 format (separate per-Fallgruppe sheets)."""
|
||||||
|
return "Auswertung KW-O" in wb.sheetnames
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# 2023+ format parser
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def _parse_modern_format(wb, year: int) -> list[dict]:
|
||||||
|
"""Parse Berichtswesen files in the 2023+ format.
|
||||||
|
|
||||||
|
Returns a list of dicts, one per KW, with all fields matching YearlySummary columns.
|
||||||
|
"""
|
||||||
|
rows_by_kw: dict[int, dict] = {}
|
||||||
|
|
||||||
|
# --- Sheet 1: Auswertung KW gesamt ---
|
||||||
|
if "Auswertung KW gesamt" in wb.sheetnames:
|
||||||
|
ws = wb["Auswertung KW gesamt"]
|
||||||
|
for row_idx in range(11, 63): # Rows 11-62 = KW 1-52
|
||||||
|
kw_val = ws.cell(row=row_idx, column=1).value
|
||||||
|
kw = _parse_kw(kw_val)
|
||||||
|
if kw is None:
|
||||||
|
continue
|
||||||
|
|
||||||
|
rows_by_kw.setdefault(kw, {"jahr": year, "kw": kw})
|
||||||
|
d = rows_by_kw[kw]
|
||||||
|
d["erstberatungen"] = _safe_int(ws.cell(row=row_idx, column=2).value)
|
||||||
|
d["unterlagen"] = _safe_int(ws.cell(row=row_idx, column=3).value)
|
||||||
|
d["ablehnungen"] = _safe_int(ws.cell(row=row_idx, column=4).value)
|
||||||
|
d["keine_rueckmeldung"] = _safe_int(ws.cell(row=row_idx, column=5).value)
|
||||||
|
d["gutachten_gesamt"] = _safe_int(ws.cell(row=row_idx, column=6).value)
|
||||||
|
|
||||||
|
# --- Sheet 2: Auswertung nach Fachgebieten ---
|
||||||
|
if "Auswertung nach Fachgebieten" in wb.sheetnames:
|
||||||
|
ws = wb["Auswertung nach Fachgebieten"]
|
||||||
|
# Fallgruppe start columns: B=onko, E=kardio, H=intensiv, K=galle, N=sd
|
||||||
|
fg_start_cols = [2, 5, 8, 11, 14]
|
||||||
|
|
||||||
|
for row_idx in range(5, 57): # Rows 5-56 = KW 1-52
|
||||||
|
kw_val = ws.cell(row=row_idx, column=1).value
|
||||||
|
kw = _parse_kw(kw_val)
|
||||||
|
if kw is None:
|
||||||
|
continue
|
||||||
|
|
||||||
|
rows_by_kw.setdefault(kw, {"jahr": year, "kw": kw})
|
||||||
|
d = rows_by_kw[kw]
|
||||||
|
|
||||||
|
for fg_key, start_col in zip(FALLGRUPPEN, fg_start_cols):
|
||||||
|
d[f"{fg_key}_anzahl"] = _safe_int(ws.cell(row=row_idx, column=start_col).value)
|
||||||
|
d[f"{fg_key}_gutachten"] = _safe_int(ws.cell(row=row_idx, column=start_col + 1).value)
|
||||||
|
d[f"{fg_key}_keine_rm"] = _safe_int(ws.cell(row=row_idx, column=start_col + 2).value)
|
||||||
|
|
||||||
|
# --- Sheet 3: Auswertung Gutachten ---
|
||||||
|
if "Auswertung Gutachten" in wb.sheetnames:
|
||||||
|
ws = wb["Auswertung Gutachten"]
|
||||||
|
# Gesamt: B=Gutachten, C=Alternative, D=Bestaetigung
|
||||||
|
# Fallgruppen: E/H/K/N/Q (each 3 cols: Gutachten, Alternative, Bestaetigung)
|
||||||
|
fg_gut_start_cols = [5, 8, 11, 14, 17]
|
||||||
|
|
||||||
|
for row_idx in range(5, 57): # Rows 5-56 = KW 1-52
|
||||||
|
kw_val = ws.cell(row=row_idx, column=1).value
|
||||||
|
kw = _parse_kw(kw_val)
|
||||||
|
if kw is None:
|
||||||
|
continue
|
||||||
|
|
||||||
|
rows_by_kw.setdefault(kw, {"jahr": year, "kw": kw})
|
||||||
|
d = rows_by_kw[kw]
|
||||||
|
|
||||||
|
d["gutachten_alternative"] = _safe_int(ws.cell(row=row_idx, column=3).value)
|
||||||
|
d["gutachten_bestaetigung"] = _safe_int(ws.cell(row=row_idx, column=4).value)
|
||||||
|
|
||||||
|
for fg_key, start_col in zip(FALLGRUPPEN, fg_gut_start_cols):
|
||||||
|
d[f"{fg_key}_alternative"] = _safe_int(ws.cell(row=row_idx, column=start_col + 1).value)
|
||||||
|
d[f"{fg_key}_bestaetigung"] = _safe_int(ws.cell(row=row_idx, column=start_col + 2).value)
|
||||||
|
|
||||||
|
return list(rows_by_kw.values())
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# 2022 format parser
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def _parse_2022_format(wb, year: int) -> list[dict]:
|
||||||
|
"""Parse the older 2022 Berichtswesen format.
|
||||||
|
|
||||||
|
The 2022 format has:
|
||||||
|
- 'Auswertung KW gesamt': KW | Summe von EG | Summe von Gutachten | Summe von Keine RM
|
||||||
|
- 'Auswertung KW-O/K/I': Per-Fallgruppe weekly counts
|
||||||
|
- 'Auswertung Gutachten gesamt': KW | Gutachten | Alternative | Bestaetigung
|
||||||
|
- 'Auswertung Gutachten O/K/I': Per-Fallgruppe gutachten detail
|
||||||
|
|
||||||
|
Note: 2022 only has 3 Fallgruppen (onko, kardio, intensiv).
|
||||||
|
No Unterlagen/Ablehnungen columns, no Gallenblase/Schilddruese.
|
||||||
|
"""
|
||||||
|
rows_by_kw: dict[int, dict] = {}
|
||||||
|
|
||||||
|
# --- KW gesamt ---
|
||||||
|
if "Auswertung KW gesamt" in wb.sheetnames:
|
||||||
|
ws = wb["Auswertung KW gesamt"]
|
||||||
|
for row_idx in range(2, ws.max_row + 1):
|
||||||
|
kw_val = ws.cell(row=row_idx, column=1).value
|
||||||
|
kw = _parse_kw(kw_val)
|
||||||
|
if kw is None:
|
||||||
|
continue
|
||||||
|
|
||||||
|
rows_by_kw.setdefault(kw, {"jahr": year, "kw": kw})
|
||||||
|
d = rows_by_kw[kw]
|
||||||
|
d["erstberatungen"] = _safe_int(ws.cell(row=row_idx, column=2).value)
|
||||||
|
d["gutachten_gesamt"] = _safe_int(ws.cell(row=row_idx, column=3).value)
|
||||||
|
d["keine_rueckmeldung"] = _safe_int(ws.cell(row=row_idx, column=4).value)
|
||||||
|
# 2022 doesn't separate Unterlagen/Ablehnungen
|
||||||
|
d["unterlagen"] = 0
|
||||||
|
d["ablehnungen"] = 0
|
||||||
|
|
||||||
|
# --- Per-Fallgruppe KW sheets ---
|
||||||
|
for suffix, fg_key in FALLGRUPPE_2022_MAP.items():
|
||||||
|
sheet_name = f"Auswertung KW-{suffix}"
|
||||||
|
if sheet_name not in wb.sheetnames:
|
||||||
|
continue
|
||||||
|
ws = wb[sheet_name]
|
||||||
|
for row_idx in range(2, ws.max_row + 1):
|
||||||
|
kw_val = ws.cell(row=row_idx, column=1).value
|
||||||
|
kw = _parse_kw(kw_val)
|
||||||
|
if kw is None:
|
||||||
|
continue
|
||||||
|
|
||||||
|
rows_by_kw.setdefault(kw, {"jahr": year, "kw": kw})
|
||||||
|
d = rows_by_kw[kw]
|
||||||
|
d[f"{fg_key}_anzahl"] = _safe_int(ws.cell(row=row_idx, column=2).value)
|
||||||
|
d[f"{fg_key}_gutachten"] = _safe_int(ws.cell(row=row_idx, column=3).value)
|
||||||
|
d[f"{fg_key}_keine_rm"] = _safe_int(ws.cell(row=row_idx, column=4).value)
|
||||||
|
|
||||||
|
# --- Gutachten gesamt ---
|
||||||
|
if "Auswertung Gutachten gesamt" in wb.sheetnames:
|
||||||
|
ws = wb["Auswertung Gutachten gesamt"]
|
||||||
|
for row_idx in range(2, ws.max_row + 1):
|
||||||
|
kw_val = ws.cell(row=row_idx, column=1).value
|
||||||
|
kw = _parse_kw(kw_val)
|
||||||
|
if kw is None:
|
||||||
|
continue
|
||||||
|
|
||||||
|
rows_by_kw.setdefault(kw, {"jahr": year, "kw": kw})
|
||||||
|
d = rows_by_kw[kw]
|
||||||
|
d["gutachten_alternative"] = _safe_int(ws.cell(row=row_idx, column=3).value)
|
||||||
|
d["gutachten_bestaetigung"] = _safe_int(ws.cell(row=row_idx, column=4).value)
|
||||||
|
|
||||||
|
# --- Per-Fallgruppe Gutachten sheets ---
|
||||||
|
for suffix, fg_key in FALLGRUPPE_2022_MAP.items():
|
||||||
|
sheet_name = f"Auswertung Gutachten {suffix}"
|
||||||
|
if sheet_name not in wb.sheetnames:
|
||||||
|
continue
|
||||||
|
ws = wb[sheet_name]
|
||||||
|
for row_idx in range(2, ws.max_row + 1):
|
||||||
|
kw_val = ws.cell(row=row_idx, column=1).value
|
||||||
|
kw = _parse_kw(kw_val)
|
||||||
|
if kw is None:
|
||||||
|
continue
|
||||||
|
|
||||||
|
rows_by_kw.setdefault(kw, {"jahr": year, "kw": kw})
|
||||||
|
d = rows_by_kw[kw]
|
||||||
|
d[f"{fg_key}_alternative"] = _safe_int(ws.cell(row=row_idx, column=3).value)
|
||||||
|
d[f"{fg_key}_bestaetigung"] = _safe_int(ws.cell(row=row_idx, column=4).value)
|
||||||
|
|
||||||
|
return list(rows_by_kw.values())
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# DB upsert
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
# Columns on YearlySummary that we populate
|
||||||
|
_YEARLY_SUMMARY_FIELDS = [
|
||||||
|
"erstberatungen", "ablehnungen", "unterlagen", "keine_rueckmeldung",
|
||||||
|
"gutachten_gesamt", "gutachten_alternative", "gutachten_bestaetigung",
|
||||||
|
"onko_anzahl", "onko_gutachten", "onko_keine_rm",
|
||||||
|
"onko_alternative", "onko_bestaetigung",
|
||||||
|
"kardio_anzahl", "kardio_gutachten", "kardio_keine_rm",
|
||||||
|
"kardio_alternative", "kardio_bestaetigung",
|
||||||
|
"intensiv_anzahl", "intensiv_gutachten", "intensiv_keine_rm",
|
||||||
|
"intensiv_alternative", "intensiv_bestaetigung",
|
||||||
|
"galle_anzahl", "galle_gutachten", "galle_keine_rm",
|
||||||
|
"galle_alternative", "galle_bestaetigung",
|
||||||
|
"sd_anzahl", "sd_gutachten", "sd_keine_rm",
|
||||||
|
"sd_alternative", "sd_bestaetigung",
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def _upsert_yearly_summary(db, rows: list[dict], dry_run: bool = False) -> dict:
|
||||||
|
"""Insert or update yearly_summary rows.
|
||||||
|
|
||||||
|
Returns summary dict with counts.
|
||||||
|
"""
|
||||||
|
inserted = 0
|
||||||
|
updated = 0
|
||||||
|
errors = []
|
||||||
|
|
||||||
|
for row_data in rows:
|
||||||
|
jahr = row_data.get("jahr")
|
||||||
|
kw = row_data.get("kw")
|
||||||
|
if jahr is None or kw is None:
|
||||||
|
errors.append(f"Missing jahr/kw in row: {row_data}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
try:
|
||||||
|
existing = (
|
||||||
|
db.query(YearlySummary)
|
||||||
|
.filter(YearlySummary.jahr == jahr, YearlySummary.kw == kw)
|
||||||
|
.first()
|
||||||
|
)
|
||||||
|
|
||||||
|
if existing:
|
||||||
|
# Update existing record
|
||||||
|
for field in _YEARLY_SUMMARY_FIELDS:
|
||||||
|
if field in row_data:
|
||||||
|
setattr(existing, field, row_data[field])
|
||||||
|
updated += 1
|
||||||
|
else:
|
||||||
|
# Create new record
|
||||||
|
obj = YearlySummary(jahr=jahr, kw=kw)
|
||||||
|
for field in _YEARLY_SUMMARY_FIELDS:
|
||||||
|
if field in row_data:
|
||||||
|
setattr(obj, field, row_data[field])
|
||||||
|
db.add(obj)
|
||||||
|
inserted += 1
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
errors.append(f"Error for KW {kw}/{jahr}: {e}")
|
||||||
|
|
||||||
|
if not dry_run:
|
||||||
|
db.flush()
|
||||||
|
|
||||||
|
return {"inserted": inserted, "updated": updated, "errors": errors}
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Public import function
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def import_berichtswesen_file(
|
||||||
|
db,
|
||||||
|
filepath: str,
|
||||||
|
year: int | None = None,
|
||||||
|
dry_run: bool = False,
|
||||||
|
) -> dict:
|
||||||
|
"""Import a single Berichtswesen file into yearly_summary.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
db: SQLAlchemy session.
|
||||||
|
filepath: Path to the .xlsx file.
|
||||||
|
year: Report year (auto-detected from filename if None).
|
||||||
|
dry_run: If True, parse but don't write to DB.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dict with 'year', 'weeks_parsed', 'inserted', 'updated', 'errors'.
|
||||||
|
"""
|
||||||
|
if year is None:
|
||||||
|
year = _detect_year_from_filename(filepath)
|
||||||
|
if year is None:
|
||||||
|
raise ValueError(
|
||||||
|
f"Cannot detect year from filename: {os.path.basename(filepath)}. "
|
||||||
|
"Please specify the year explicitly."
|
||||||
|
)
|
||||||
|
|
||||||
|
wb = load_workbook(filepath, read_only=True, data_only=True)
|
||||||
|
try:
|
||||||
|
if _is_2022_format(wb):
|
||||||
|
rows = _parse_2022_format(wb, year)
|
||||||
|
else:
|
||||||
|
rows = _parse_modern_format(wb, year)
|
||||||
|
finally:
|
||||||
|
wb.close()
|
||||||
|
|
||||||
|
result = _upsert_yearly_summary(db, rows, dry_run=dry_run)
|
||||||
|
result["year"] = year
|
||||||
|
result["weeks_parsed"] = len(rows)
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# CLI
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
parser = argparse.ArgumentParser(
|
||||||
|
description="Import historical Berichtswesen .xlsx files into yearly_summary table"
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"filepath",
|
||||||
|
nargs="?",
|
||||||
|
default=None,
|
||||||
|
help="Path to a single Berichtswesen .xlsx file. "
|
||||||
|
"If omitted, imports all Berichtswesen_*.xlsx from ../data/",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"year",
|
||||||
|
nargs="?",
|
||||||
|
type=int,
|
||||||
|
default=None,
|
||||||
|
help="Report year (auto-detected from filename if omitted)",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--dry-run",
|
||||||
|
action="store_true",
|
||||||
|
help="Parse and validate without writing to DB",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--verbose", "-v",
|
||||||
|
action="store_true",
|
||||||
|
help="Show detailed per-row info",
|
||||||
|
)
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
# Configure logging
|
||||||
|
log_level = logging.DEBUG if args.verbose else logging.INFO
|
||||||
|
logging.basicConfig(
|
||||||
|
level=log_level,
|
||||||
|
format="%(asctime)s %(levelname)-8s %(name)s: %(message)s",
|
||||||
|
datefmt="%H:%M:%S",
|
||||||
|
)
|
||||||
|
|
||||||
|
# Determine files to import
|
||||||
|
if args.filepath:
|
||||||
|
files = [(args.filepath, args.year)]
|
||||||
|
else:
|
||||||
|
data_dir = os.path.join(
|
||||||
|
os.path.dirname( # dak_c2s/
|
||||||
|
os.path.dirname( # dak_c2s/backend/
|
||||||
|
os.path.dirname(os.path.abspath(__file__)) # dak_c2s/backend/scripts/
|
||||||
|
)
|
||||||
|
),
|
||||||
|
"data",
|
||||||
|
)
|
||||||
|
pattern = os.path.join(data_dir, "Berichtswesen_*.xlsx")
|
||||||
|
found = sorted(glob.glob(pattern))
|
||||||
|
if not found:
|
||||||
|
print(f"ERROR: No Berichtswesen files found in {data_dir}")
|
||||||
|
sys.exit(1)
|
||||||
|
files = [(f, None) for f in found]
|
||||||
|
|
||||||
|
print(f"Importing {len(files)} Berichtswesen file(s)")
|
||||||
|
if args.dry_run:
|
||||||
|
print("*** DRY RUN -- no changes will be committed ***")
|
||||||
|
print()
|
||||||
|
|
||||||
|
db = SessionLocal()
|
||||||
|
total_inserted = 0
|
||||||
|
total_updated = 0
|
||||||
|
total_errors = 0
|
||||||
|
|
||||||
|
try:
|
||||||
|
for filepath, year in files:
|
||||||
|
if not os.path.exists(filepath):
|
||||||
|
print(f" ERROR: File not found: {filepath}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
basename = os.path.basename(filepath)
|
||||||
|
try:
|
||||||
|
result = import_berichtswesen_file(db, filepath, year=year, dry_run=args.dry_run)
|
||||||
|
|
||||||
|
status = "OK" if not result["errors"] else f"{len(result['errors'])} errors"
|
||||||
|
print(
|
||||||
|
f" {basename:45s} year={result['year']} "
|
||||||
|
f"weeks={result['weeks_parsed']:2d} "
|
||||||
|
f"ins={result['inserted']:3d} "
|
||||||
|
f"upd={result['updated']:3d} [{status}]"
|
||||||
|
)
|
||||||
|
|
||||||
|
if args.verbose and result["errors"]:
|
||||||
|
for err in result["errors"]:
|
||||||
|
print(f" - {err}")
|
||||||
|
|
||||||
|
total_inserted += result["inserted"]
|
||||||
|
total_updated += result["updated"]
|
||||||
|
total_errors += len(result["errors"])
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f" {basename:45s} ERROR: {e}")
|
||||||
|
total_errors += 1
|
||||||
|
|
||||||
|
if args.dry_run:
|
||||||
|
db.rollback()
|
||||||
|
print(f"\nDry run complete -- rolled back. "
|
||||||
|
f"Would have inserted {total_inserted}, updated {total_updated}.")
|
||||||
|
else:
|
||||||
|
db.commit()
|
||||||
|
print(f"\nImport committed. "
|
||||||
|
f"Inserted {total_inserted}, updated {total_updated}.")
|
||||||
|
|
||||||
|
if total_errors:
|
||||||
|
print(f"Total errors: {total_errors}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
db.rollback()
|
||||||
|
print(f"\nERROR: Import failed: {e}")
|
||||||
|
logging.exception("Import failed")
|
||||||
|
sys.exit(1)
|
||||||
|
finally:
|
||||||
|
db.close()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
Loading…
Reference in a new issue