719 lines
29 KiB
Python
719 lines
29 KiB
Python
import json
|
|
from datetime import date, datetime, timedelta, timezone
|
|
from typing import Any
|
|
|
|
from sqlalchemy import delete, select
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.models import (
|
|
AutoEntrySkip,
|
|
ImportPreview,
|
|
OvertimeAdjustment,
|
|
SpecialDayStatus,
|
|
TimeEntry,
|
|
User,
|
|
VacationPeriod,
|
|
WeeklyTargetRule,
|
|
)
|
|
from app.services.auto_entries import (
|
|
ENTRY_MODE_AUTO_UNTIL_TODAY,
|
|
ENTRY_MODE_MANUAL,
|
|
delete_future_auto_entries,
|
|
)
|
|
from app.services.calculations import compute_net_minutes
|
|
from app.services.public_holidays import normalize_german_state_code
|
|
from app.services.security import utc_now
|
|
from app.services.targets import ensure_user_has_default_target_rule
|
|
from app.services.workdays import serialize_working_days
|
|
|
|
CURRENT_BACKUP_VERSION = 2
|
|
SUPPORTED_BACKUP_VERSIONS = {1, 2}
|
|
IMPORT_MODE_MERGE = "merge"
|
|
IMPORT_MODE_REPLACE = "replace_user_data"
|
|
IMPORT_PREVIEW_TTL_HOURS = 24
|
|
MAX_BACKUP_BYTES = 5 * 1024 * 1024
|
|
SPECIAL_STATUS_VALUES = {"holiday", "sick"}
|
|
PREFERRED_HOME_VIEWS = {"week", "month"}
|
|
PREFERRED_MONTH_VIEWS = {"flat", "weeks"}
|
|
BREAK_RULE_MODES = {"manual", "auto"}
|
|
|
|
|
|
class BackupImportError(ValueError):
|
|
pass
|
|
|
|
|
|
def supported_import_modes() -> set[str]:
|
|
return {IMPORT_MODE_MERGE, IMPORT_MODE_REPLACE}
|
|
|
|
|
|
def _require_mapping(value: Any, *, label: str) -> dict[str, Any]:
|
|
if not isinstance(value, dict):
|
|
raise BackupImportError(f"{label} ist nicht korrekt aufgebaut.")
|
|
return value
|
|
|
|
|
|
def _require_list(value: Any, *, label: str) -> list[Any]:
|
|
if value is None:
|
|
return []
|
|
if not isinstance(value, list):
|
|
raise BackupImportError(f"{label} ist nicht korrekt aufgebaut.")
|
|
return value
|
|
|
|
|
|
def _parse_date(value: Any, *, label: str) -> date:
|
|
if not isinstance(value, str) or not value.strip():
|
|
raise BackupImportError(f"{label} fehlt oder ist ungültig.")
|
|
try:
|
|
return date.fromisoformat(value)
|
|
except ValueError as exc:
|
|
raise BackupImportError(f"{label} hat kein gültiges Datum.") from exc
|
|
|
|
|
|
def _parse_datetime(value: Any, *, label: str) -> str | None:
|
|
if value in (None, ""):
|
|
return None
|
|
if not isinstance(value, str):
|
|
raise BackupImportError(f"{label} hat kein gültiges Datum.")
|
|
try:
|
|
datetime.fromisoformat(value.replace("Z", "+00:00"))
|
|
except ValueError as exc:
|
|
raise BackupImportError(f"{label} hat kein gültiges Datum.") from exc
|
|
return value
|
|
|
|
|
|
def _parse_int(value: Any, *, label: str, minimum: int | None = None) -> int:
|
|
if not isinstance(value, int):
|
|
raise BackupImportError(f"{label} ist keine ganze Zahl.")
|
|
if minimum is not None and value < minimum:
|
|
raise BackupImportError(f"{label} ist zu klein.")
|
|
return value
|
|
|
|
|
|
def _parse_optional_int(value: Any, *, label: str, minimum: int | None = None) -> int | None:
|
|
if value is None:
|
|
return None
|
|
return _parse_int(value, label=label, minimum=minimum)
|
|
|
|
|
|
def _parse_bool(value: Any, *, label: str) -> bool:
|
|
if not isinstance(value, bool):
|
|
raise BackupImportError(f"{label} muss true oder false sein.")
|
|
return value
|
|
|
|
|
|
def _parse_optional_text(value: Any, *, label: str) -> str | None:
|
|
if value in (None, ""):
|
|
return None
|
|
if not isinstance(value, str):
|
|
raise BackupImportError(f"{label} ist ungültig.")
|
|
return value.strip() or None
|
|
|
|
|
|
def _normalize_settings(payload: dict[str, Any]) -> dict[str, Any]:
|
|
settings_value = payload.get("settings")
|
|
if settings_value is None:
|
|
user_section = payload.get("user")
|
|
if isinstance(user_section, dict):
|
|
settings_value = user_section.get("settings")
|
|
settings_data = _require_mapping(settings_value, label="Backup-Einstellungen")
|
|
|
|
working_days_raw = settings_data.get("working_days")
|
|
if not isinstance(working_days_raw, list) or not working_days_raw:
|
|
raise BackupImportError("Die relevanten Arbeitstage im Backup sind ungültig.")
|
|
working_days: list[int] = []
|
|
for item in working_days_raw:
|
|
if not isinstance(item, int) or item < 0 or item > 6:
|
|
raise BackupImportError("Die relevanten Arbeitstage im Backup sind ungültig.")
|
|
if item not in working_days:
|
|
working_days.append(item)
|
|
if not working_days:
|
|
raise BackupImportError("Im Backup ist kein relevanter Arbeitstag hinterlegt.")
|
|
|
|
preferred_home_view = settings_data.get("preferred_home_view", "week")
|
|
if preferred_home_view not in PREFERRED_HOME_VIEWS:
|
|
preferred_home_view = "week"
|
|
|
|
theme_preference = settings_data.get("theme_preference", "auto")
|
|
if theme_preference not in {"auto", "dark", "light"}:
|
|
theme_preference = "auto"
|
|
|
|
preferred_month_view_mode = settings_data.get("preferred_month_view_mode", "flat")
|
|
if preferred_month_view_mode not in PREFERRED_MONTH_VIEWS:
|
|
preferred_month_view_mode = "flat"
|
|
|
|
entry_mode = settings_data.get("entry_mode", ENTRY_MODE_MANUAL)
|
|
if entry_mode == "auto":
|
|
entry_mode = ENTRY_MODE_AUTO_UNTIL_TODAY
|
|
if entry_mode not in {ENTRY_MODE_MANUAL, ENTRY_MODE_AUTO_UNTIL_TODAY}:
|
|
raise BackupImportError("Der Erfassungsmodus im Backup ist ungültig.")
|
|
|
|
federal_state = None
|
|
if settings_data.get("federal_state"):
|
|
federal_state = normalize_german_state_code(str(settings_data.get("federal_state")))
|
|
if federal_state is None:
|
|
raise BackupImportError("Das Bundesland im Backup ist ungültig.")
|
|
|
|
overtime_start_date = None
|
|
if settings_data.get("overtime_start_date"):
|
|
overtime_start_date = _parse_date(settings_data.get("overtime_start_date"), label="Überstunden-Startdatum")
|
|
|
|
workhours_counter_start_date = None
|
|
if settings_data.get("workhours_counter_start_date"):
|
|
workhours_counter_start_date = _parse_date(
|
|
settings_data.get("workhours_counter_start_date"),
|
|
label="Arbeitsstunden-Counter Startdatum",
|
|
)
|
|
|
|
workhours_counter_end_date = None
|
|
if settings_data.get("workhours_counter_end_date"):
|
|
workhours_counter_end_date = _parse_date(
|
|
settings_data.get("workhours_counter_end_date"),
|
|
label="Arbeitsstunden-Counter Enddatum",
|
|
)
|
|
|
|
return {
|
|
"weekly_target_minutes": _parse_int(settings_data.get("weekly_target_minutes", 1500), label="Wochenstunden", minimum=1),
|
|
"preferred_home_view": preferred_home_view,
|
|
"theme_preference": theme_preference,
|
|
"preferred_month_view_mode": preferred_month_view_mode,
|
|
"entry_mode": entry_mode,
|
|
"working_days": sorted(working_days),
|
|
"count_vacation_as_worktime": _parse_bool(
|
|
settings_data.get("count_vacation_as_worktime", False),
|
|
label="Urlaubstage-wie-Arbeitstage",
|
|
),
|
|
"count_holiday_as_worktime": _parse_bool(
|
|
settings_data.get("count_holiday_as_worktime", False),
|
|
label="Feiertage-wie-Arbeitstage",
|
|
),
|
|
"count_sick_as_worktime": _parse_bool(
|
|
settings_data.get("count_sick_as_worktime", False),
|
|
label="Kranktage-wie-Arbeitstage",
|
|
),
|
|
"automatic_break_rules_enabled": _parse_bool(
|
|
settings_data.get("automatic_break_rules_enabled", False),
|
|
label="Automatische Pausenregel",
|
|
),
|
|
"default_break_minutes": _parse_int(
|
|
settings_data.get("default_break_minutes", 0),
|
|
label="Tägliche Pause",
|
|
minimum=0,
|
|
),
|
|
"overtime_start_date": overtime_start_date.isoformat() if overtime_start_date else None,
|
|
"overtime_expiry_days": _parse_optional_int(
|
|
settings_data.get("overtime_expiry_days"),
|
|
label="Überstunden-Verfall",
|
|
minimum=1,
|
|
),
|
|
"expire_negative_overtime": _parse_bool(
|
|
settings_data.get("expire_negative_overtime", False),
|
|
label="Negative Stunden verfallen",
|
|
),
|
|
"vacation_days_total": _parse_int(
|
|
settings_data.get("vacation_days_total", 0),
|
|
label="Urlaubstage gesamt",
|
|
minimum=0,
|
|
),
|
|
"vacation_show_in_header": _parse_bool(
|
|
settings_data.get("vacation_show_in_header", True),
|
|
label="Urlaub im Header anzeigen",
|
|
),
|
|
"workhours_counter_enabled": _parse_bool(
|
|
settings_data.get("workhours_counter_enabled", False),
|
|
label="Arbeitsstunden-Counter aktiviert",
|
|
),
|
|
"workhours_counter_show_in_header": _parse_bool(
|
|
settings_data.get("workhours_counter_show_in_header", False),
|
|
label="Arbeitsstunden-Counter im Header anzeigen",
|
|
),
|
|
"workhours_counter_start_date": (
|
|
workhours_counter_start_date.isoformat() if workhours_counter_start_date else None
|
|
),
|
|
"workhours_counter_end_date": (
|
|
workhours_counter_end_date.isoformat() if workhours_counter_end_date else None
|
|
),
|
|
"workhours_counter_manual_offset_minutes": _parse_int(
|
|
settings_data.get("workhours_counter_manual_offset_minutes", 0),
|
|
label="Zusatzstunden",
|
|
minimum=0,
|
|
),
|
|
"workhours_counter_target_minutes": _parse_optional_int(
|
|
settings_data.get("workhours_counter_target_minutes"),
|
|
label="Arbeitsstunden-Ziel",
|
|
minimum=1,
|
|
),
|
|
"workhours_counter_target_email_enabled": _parse_bool(
|
|
settings_data.get("workhours_counter_target_email_enabled", False),
|
|
label="Counter-Zielwarnung per E-Mail",
|
|
),
|
|
"federal_state": federal_state,
|
|
}
|
|
|
|
|
|
def _normalize_weekly_target_rules(items: list[Any]) -> list[dict[str, Any]]:
|
|
normalized: list[dict[str, Any]] = []
|
|
seen: set[str] = set()
|
|
for item in _require_list(items, label="Wochenziel-Regeln"):
|
|
row = _require_mapping(item, label="Wochenziel-Regel")
|
|
effective_from = _parse_date(row.get("effective_from"), label="Wochenziel Startdatum").isoformat()
|
|
if effective_from in seen:
|
|
continue
|
|
seen.add(effective_from)
|
|
normalized.append(
|
|
{
|
|
"effective_from": effective_from,
|
|
"weekly_target_minutes": _parse_int(
|
|
row.get("weekly_target_minutes"),
|
|
label="Wochenziel in Minuten",
|
|
minimum=1,
|
|
),
|
|
}
|
|
)
|
|
normalized.sort(key=lambda item: item["effective_from"])
|
|
return normalized
|
|
|
|
|
|
def _normalize_time_entries(items: list[Any]) -> list[dict[str, Any]]:
|
|
normalized: list[dict[str, Any]] = []
|
|
seen: set[str] = set()
|
|
for item in _require_list(items, label="Arbeitszeiteinträge"):
|
|
row = _require_mapping(item, label="Arbeitszeiteintrag")
|
|
entry_date = _parse_date(row.get("date"), label="Arbeitszeiteintrag Datum").isoformat()
|
|
if entry_date in seen:
|
|
continue
|
|
seen.add(entry_date)
|
|
start_minutes = _parse_int(row.get("start_minutes"), label="Arbeitsbeginn", minimum=0)
|
|
end_minutes = _parse_int(row.get("end_minutes"), label="Arbeitsende", minimum=0)
|
|
break_minutes = _parse_int(row.get("break_minutes", 0), label="Pause", minimum=0)
|
|
break_rule_mode = row.get("break_rule_mode", "manual")
|
|
if break_rule_mode not in BREAK_RULE_MODES:
|
|
break_rule_mode = "manual"
|
|
compute_net_minutes(start_minutes, end_minutes, break_minutes)
|
|
normalized.append(
|
|
{
|
|
"date": entry_date,
|
|
"start_minutes": start_minutes,
|
|
"end_minutes": end_minutes,
|
|
"break_minutes": break_minutes,
|
|
"break_rule_mode": break_rule_mode,
|
|
"notes": _parse_optional_text(row.get("notes"), label="Notiz"),
|
|
}
|
|
)
|
|
normalized.sort(key=lambda item: item["date"])
|
|
return normalized
|
|
|
|
|
|
def _normalize_vacation_periods(items: list[Any]) -> list[dict[str, Any]]:
|
|
normalized: list[dict[str, Any]] = []
|
|
seen: set[tuple[str, str, bool, str | None]] = set()
|
|
for item in _require_list(items, label="Urlaubszeiträume"):
|
|
row = _require_mapping(item, label="Urlaubszeitraum")
|
|
start_date = _parse_date(row.get("start_date"), label="Urlaubsbeginn")
|
|
end_date = _parse_date(row.get("end_date"), label="Urlaubsende")
|
|
if end_date < start_date:
|
|
raise BackupImportError("Ein Urlaubszeitraum endet vor seinem Startdatum.")
|
|
include_weekends = _parse_bool(row.get("include_weekends", False), label="Wochenenden einschließen")
|
|
notes = _parse_optional_text(row.get("notes"), label="Urlaubsnotiz")
|
|
key = (start_date.isoformat(), end_date.isoformat(), include_weekends, notes)
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
normalized.append(
|
|
{
|
|
"start_date": start_date.isoformat(),
|
|
"end_date": end_date.isoformat(),
|
|
"include_weekends": include_weekends,
|
|
"notes": notes,
|
|
}
|
|
)
|
|
normalized.sort(key=lambda item: (item["start_date"], item["end_date"]))
|
|
return normalized
|
|
|
|
|
|
def _normalize_special_day_statuses(items: list[Any]) -> list[dict[str, Any]]:
|
|
normalized: list[dict[str, Any]] = []
|
|
seen: set[str] = set()
|
|
for item in _require_list(items, label="Sondertage"):
|
|
row = _require_mapping(item, label="Sondertag")
|
|
status_date = _parse_date(row.get("date"), label="Sondertag Datum").isoformat()
|
|
if status_date in seen:
|
|
continue
|
|
seen.add(status_date)
|
|
status_value = row.get("status")
|
|
if status_value not in SPECIAL_STATUS_VALUES:
|
|
raise BackupImportError("Ein Sondertag im Backup hat einen ungültigen Status.")
|
|
normalized.append(
|
|
{
|
|
"date": status_date,
|
|
"status": status_value,
|
|
"notes": _parse_optional_text(row.get("notes"), label="Sondertag-Notiz"),
|
|
}
|
|
)
|
|
normalized.sort(key=lambda item: item["date"])
|
|
return normalized
|
|
|
|
|
|
def _normalize_overtime_adjustments(items: list[Any]) -> list[dict[str, Any]]:
|
|
normalized: list[dict[str, Any]] = []
|
|
seen: set[str] = set()
|
|
for item in _require_list(items, label="Stundenausgleich"):
|
|
row = _require_mapping(item, label="Stundenausgleich-Eintrag")
|
|
adjustment_date = _parse_date(row.get("date"), label="Stundenausgleich Datum").isoformat()
|
|
if adjustment_date in seen:
|
|
continue
|
|
seen.add(adjustment_date)
|
|
normalized.append(
|
|
{
|
|
"date": adjustment_date,
|
|
"minutes": _parse_int(row.get("minutes"), label="Stundenausgleich Minuten"),
|
|
"notes": _parse_optional_text(row.get("notes"), label="Stundenausgleich-Notiz"),
|
|
}
|
|
)
|
|
normalized.sort(key=lambda item: item["date"])
|
|
return normalized
|
|
|
|
|
|
def load_backup_payload_from_bytes(payload_bytes: bytes) -> dict[str, Any]:
|
|
if not payload_bytes:
|
|
raise BackupImportError("Die Backup-Datei ist leer.")
|
|
if len(payload_bytes) > MAX_BACKUP_BYTES:
|
|
raise BackupImportError("Die Backup-Datei ist zu groß.")
|
|
try:
|
|
raw = json.loads(payload_bytes.decode("utf-8"))
|
|
except (UnicodeDecodeError, json.JSONDecodeError) as exc:
|
|
raise BackupImportError("Die Backup-Datei ist kein gültiges JSON.") from exc
|
|
|
|
payload = _require_mapping(raw, label="Backup-Datei")
|
|
version = payload.get("backup_version")
|
|
if version not in SUPPORTED_BACKUP_VERSIONS:
|
|
raise BackupImportError("Diese Backup-Version wird noch nicht unterstützt.")
|
|
|
|
normalized = {
|
|
"backup_version": version,
|
|
"source_app_name": str(payload.get("app_name") or "Stundenfuchs"),
|
|
"source_app_version": str(payload.get("app_version") or "unbekannt"),
|
|
"exported_at": _parse_datetime(payload.get("exported_at"), label="Exportdatum"),
|
|
"settings": _normalize_settings(payload),
|
|
"weekly_target_rules": _normalize_weekly_target_rules(payload.get("weekly_target_rules")),
|
|
"time_entries": _normalize_time_entries(payload.get("time_entries")),
|
|
"vacation_periods": _normalize_vacation_periods(payload.get("vacation_periods")),
|
|
"special_day_statuses": _normalize_special_day_statuses(payload.get("special_day_statuses")),
|
|
"overtime_adjustments": _normalize_overtime_adjustments(payload.get("overtime_adjustments")),
|
|
}
|
|
return normalized
|
|
|
|
|
|
def summarize_backup_payload(payload: dict[str, Any]) -> dict[str, Any]:
|
|
settings_data = payload["settings"]
|
|
return {
|
|
"backup_version": payload["backup_version"],
|
|
"source_app_name": payload["source_app_name"],
|
|
"source_app_version": payload["source_app_version"],
|
|
"exported_at": payload["exported_at"],
|
|
"settings_summary": {
|
|
"entry_mode": settings_data["entry_mode"],
|
|
"weekly_target_minutes": settings_data["weekly_target_minutes"],
|
|
"working_days": settings_data["working_days"],
|
|
"federal_state": settings_data["federal_state"],
|
|
"vacation_days_total": settings_data["vacation_days_total"],
|
|
"workhours_counter_enabled": settings_data["workhours_counter_enabled"],
|
|
},
|
|
"counts": {
|
|
"weekly_target_rules": len(payload["weekly_target_rules"]),
|
|
"time_entries": len(payload["time_entries"]),
|
|
"vacation_periods": len(payload["vacation_periods"]),
|
|
"special_day_statuses": len(payload["special_day_statuses"]),
|
|
"overtime_adjustments": len(payload["overtime_adjustments"]),
|
|
},
|
|
}
|
|
|
|
|
|
def build_import_preview(*, db: Session, user: User, payload: dict[str, Any], mode: str) -> dict[str, Any]:
|
|
if mode not in supported_import_modes():
|
|
raise BackupImportError("Ungültiger Importmodus.")
|
|
|
|
existing_time_entry_dates = set(
|
|
db.execute(select(TimeEntry.date).where(TimeEntry.user_id == user.id)).scalars().all()
|
|
)
|
|
existing_special_dates = set(
|
|
db.execute(select(SpecialDayStatus.date).where(SpecialDayStatus.user_id == user.id)).scalars().all()
|
|
)
|
|
existing_adjustment_dates = set(
|
|
db.execute(select(OvertimeAdjustment.date).where(OvertimeAdjustment.user_id == user.id)).scalars().all()
|
|
)
|
|
existing_rule_dates = set(
|
|
db.execute(select(WeeklyTargetRule.effective_from).where(WeeklyTargetRule.user_id == user.id)).scalars().all()
|
|
)
|
|
existing_vacation_keys = set(
|
|
db.execute(
|
|
select(
|
|
VacationPeriod.start_date,
|
|
VacationPeriod.end_date,
|
|
VacationPeriod.include_weekends,
|
|
VacationPeriod.notes,
|
|
).where(VacationPeriod.user_id == user.id)
|
|
).all()
|
|
)
|
|
|
|
conflicts = {
|
|
"time_entries": sum(1 for row in payload["time_entries"] if date.fromisoformat(row["date"]) in existing_time_entry_dates),
|
|
"special_day_statuses": sum(
|
|
1 for row in payload["special_day_statuses"] if date.fromisoformat(row["date"]) in existing_special_dates
|
|
),
|
|
"overtime_adjustments": sum(
|
|
1 for row in payload["overtime_adjustments"] if date.fromisoformat(row["date"]) in existing_adjustment_dates
|
|
),
|
|
"weekly_target_rules": sum(
|
|
1 for row in payload["weekly_target_rules"] if date.fromisoformat(row["effective_from"]) in existing_rule_dates
|
|
),
|
|
"vacation_periods": sum(
|
|
1
|
|
for row in payload["vacation_periods"]
|
|
if (
|
|
date.fromisoformat(row["start_date"]),
|
|
date.fromisoformat(row["end_date"]),
|
|
row["include_weekends"],
|
|
row["notes"],
|
|
)
|
|
in existing_vacation_keys
|
|
),
|
|
}
|
|
|
|
return {
|
|
**summarize_backup_payload(payload),
|
|
"mode": mode,
|
|
"mode_label": "Zusammenführen" if mode == IMPORT_MODE_MERGE else "Alle bisherigen Daten ersetzen",
|
|
"conflicts": conflicts,
|
|
}
|
|
|
|
|
|
def cleanup_import_previews(*, db: Session, user_id: str | None = None) -> None:
|
|
cutoff = utc_now() - timedelta(hours=IMPORT_PREVIEW_TTL_HOURS)
|
|
stmt = delete(ImportPreview).where(ImportPreview.created_at < cutoff)
|
|
if user_id:
|
|
stmt = stmt.where(ImportPreview.user_id == user_id)
|
|
db.execute(stmt)
|
|
|
|
|
|
def _preview_created_at(value: datetime) -> datetime:
|
|
if value.tzinfo is None:
|
|
return value.replace(tzinfo=timezone.utc)
|
|
return value
|
|
|
|
|
|
def create_import_preview_record(*, db: Session, user: User, payload: dict[str, Any], mode: str) -> ImportPreview:
|
|
cleanup_import_previews(db=db, user_id=user.id)
|
|
db.execute(delete(ImportPreview).where(ImportPreview.user_id == user.id))
|
|
preview = ImportPreview(user_id=user.id, mode=mode, payload_json=json.dumps(payload, ensure_ascii=False))
|
|
db.add(preview)
|
|
db.flush()
|
|
return preview
|
|
|
|
|
|
def get_import_preview_record(*, db: Session, user: User, preview_id: str) -> ImportPreview | None:
|
|
stmt = select(ImportPreview).where(ImportPreview.id == preview_id, ImportPreview.user_id == user.id)
|
|
preview = db.execute(stmt).scalar_one_or_none()
|
|
if preview is None:
|
|
return None
|
|
if _preview_created_at(preview.created_at) < utc_now() - timedelta(hours=IMPORT_PREVIEW_TTL_HOURS):
|
|
db.delete(preview)
|
|
db.flush()
|
|
return None
|
|
return preview
|
|
|
|
|
|
def parse_preview_payload(preview: ImportPreview) -> dict[str, Any]:
|
|
return load_backup_payload_from_bytes(preview.payload_json.encode("utf-8"))
|
|
|
|
|
|
def _apply_settings_from_backup(*, user: User, settings_data: dict[str, Any]) -> None:
|
|
user.weekly_target_minutes = settings_data["weekly_target_minutes"]
|
|
user.preferred_home_view = settings_data["preferred_home_view"]
|
|
user.theme_preference = settings_data["theme_preference"]
|
|
user.preferred_month_view_mode = settings_data["preferred_month_view_mode"]
|
|
user.entry_mode = settings_data["entry_mode"]
|
|
user.working_days_csv = serialize_working_days(settings_data["working_days"])
|
|
user.count_vacation_as_worktime = settings_data["count_vacation_as_worktime"]
|
|
user.count_holiday_as_worktime = settings_data["count_holiday_as_worktime"]
|
|
user.count_sick_as_worktime = settings_data["count_sick_as_worktime"]
|
|
user.automatic_break_rules_enabled = settings_data["automatic_break_rules_enabled"]
|
|
user.default_break_minutes = settings_data["default_break_minutes"]
|
|
user.overtime_start_date = date.fromisoformat(settings_data["overtime_start_date"]) if settings_data["overtime_start_date"] else None
|
|
user.overtime_expiry_days = settings_data["overtime_expiry_days"]
|
|
user.expire_negative_overtime = settings_data["expire_negative_overtime"]
|
|
user.vacation_days_total = settings_data["vacation_days_total"]
|
|
user.vacation_show_in_header = settings_data["vacation_show_in_header"]
|
|
user.workhours_counter_enabled = settings_data["workhours_counter_enabled"]
|
|
user.workhours_counter_show_in_header = settings_data["workhours_counter_show_in_header"]
|
|
user.workhours_counter_start_date = (
|
|
date.fromisoformat(settings_data["workhours_counter_start_date"])
|
|
if settings_data["workhours_counter_start_date"]
|
|
else None
|
|
)
|
|
user.workhours_counter_end_date = (
|
|
date.fromisoformat(settings_data["workhours_counter_end_date"])
|
|
if settings_data["workhours_counter_end_date"]
|
|
else None
|
|
)
|
|
user.workhours_counter_manual_offset_minutes = settings_data["workhours_counter_manual_offset_minutes"]
|
|
user.workhours_counter_target_minutes = settings_data["workhours_counter_target_minutes"]
|
|
user.workhours_counter_target_email_enabled = settings_data["workhours_counter_target_email_enabled"]
|
|
user.federal_state = settings_data["federal_state"]
|
|
|
|
|
|
def clear_importable_user_data(*, db: Session, user_id: str) -> None:
|
|
db.execute(delete(TimeEntry).where(TimeEntry.user_id == user_id))
|
|
db.execute(delete(WeeklyTargetRule).where(WeeklyTargetRule.user_id == user_id))
|
|
db.execute(delete(VacationPeriod).where(VacationPeriod.user_id == user_id))
|
|
db.execute(delete(SpecialDayStatus).where(SpecialDayStatus.user_id == user_id))
|
|
db.execute(delete(OvertimeAdjustment).where(OvertimeAdjustment.user_id == user_id))
|
|
db.execute(delete(AutoEntrySkip).where(AutoEntrySkip.user_id == user_id))
|
|
|
|
|
|
def execute_backup_import(*, db: Session, user: User, payload: dict[str, Any], mode: str) -> dict[str, Any]:
|
|
if mode not in supported_import_modes():
|
|
raise BackupImportError("Ungültiger Importmodus.")
|
|
|
|
created = {
|
|
"weekly_target_rules": 0,
|
|
"time_entries": 0,
|
|
"vacation_periods": 0,
|
|
"special_day_statuses": 0,
|
|
"overtime_adjustments": 0,
|
|
}
|
|
skipped = {
|
|
"weekly_target_rules": 0,
|
|
"time_entries": 0,
|
|
"vacation_periods": 0,
|
|
"special_day_statuses": 0,
|
|
"overtime_adjustments": 0,
|
|
}
|
|
|
|
if mode == IMPORT_MODE_REPLACE:
|
|
clear_importable_user_data(db=db, user_id=user.id)
|
|
|
|
_apply_settings_from_backup(user=user, settings_data=payload["settings"])
|
|
|
|
existing_rule_dates = set(
|
|
db.execute(select(WeeklyTargetRule.effective_from).where(WeeklyTargetRule.user_id == user.id)).scalars().all()
|
|
)
|
|
existing_entry_dates = set(db.execute(select(TimeEntry.date).where(TimeEntry.user_id == user.id)).scalars().all())
|
|
existing_vacation_keys = set(
|
|
db.execute(
|
|
select(
|
|
VacationPeriod.start_date,
|
|
VacationPeriod.end_date,
|
|
VacationPeriod.include_weekends,
|
|
VacationPeriod.notes,
|
|
).where(VacationPeriod.user_id == user.id)
|
|
).all()
|
|
)
|
|
existing_special_dates = set(
|
|
db.execute(select(SpecialDayStatus.date).where(SpecialDayStatus.user_id == user.id)).scalars().all()
|
|
)
|
|
existing_adjustment_dates = set(
|
|
db.execute(select(OvertimeAdjustment.date).where(OvertimeAdjustment.user_id == user.id)).scalars().all()
|
|
)
|
|
|
|
for row in payload["weekly_target_rules"]:
|
|
effective_from = date.fromisoformat(row["effective_from"])
|
|
if mode == IMPORT_MODE_MERGE and effective_from in existing_rule_dates:
|
|
skipped["weekly_target_rules"] += 1
|
|
continue
|
|
db.add(
|
|
WeeklyTargetRule(
|
|
user_id=user.id,
|
|
effective_from=effective_from,
|
|
weekly_target_minutes=row["weekly_target_minutes"],
|
|
)
|
|
)
|
|
existing_rule_dates.add(effective_from)
|
|
created["weekly_target_rules"] += 1
|
|
|
|
for row in payload["time_entries"]:
|
|
entry_date = date.fromisoformat(row["date"])
|
|
if mode == IMPORT_MODE_MERGE and entry_date in existing_entry_dates:
|
|
skipped["time_entries"] += 1
|
|
continue
|
|
db.add(
|
|
TimeEntry(
|
|
user_id=user.id,
|
|
date=entry_date,
|
|
start_minutes=row["start_minutes"],
|
|
end_minutes=row["end_minutes"],
|
|
break_minutes=row["break_minutes"],
|
|
break_rule_mode=row["break_rule_mode"],
|
|
notes=row["notes"],
|
|
)
|
|
)
|
|
existing_entry_dates.add(entry_date)
|
|
created["time_entries"] += 1
|
|
|
|
for row in payload["vacation_periods"]:
|
|
key = (
|
|
date.fromisoformat(row["start_date"]),
|
|
date.fromisoformat(row["end_date"]),
|
|
row["include_weekends"],
|
|
row["notes"],
|
|
)
|
|
if mode == IMPORT_MODE_MERGE and key in existing_vacation_keys:
|
|
skipped["vacation_periods"] += 1
|
|
continue
|
|
db.add(
|
|
VacationPeriod(
|
|
user_id=user.id,
|
|
start_date=key[0],
|
|
end_date=key[1],
|
|
include_weekends=key[2],
|
|
notes=key[3],
|
|
)
|
|
)
|
|
existing_vacation_keys.add(key)
|
|
created["vacation_periods"] += 1
|
|
|
|
for row in payload["special_day_statuses"]:
|
|
status_date = date.fromisoformat(row["date"])
|
|
if mode == IMPORT_MODE_MERGE and status_date in existing_special_dates:
|
|
skipped["special_day_statuses"] += 1
|
|
continue
|
|
db.add(
|
|
SpecialDayStatus(
|
|
user_id=user.id,
|
|
date=status_date,
|
|
status=row["status"],
|
|
notes=row["notes"],
|
|
)
|
|
)
|
|
existing_special_dates.add(status_date)
|
|
created["special_day_statuses"] += 1
|
|
|
|
for row in payload["overtime_adjustments"]:
|
|
adjustment_date = date.fromisoformat(row["date"])
|
|
if mode == IMPORT_MODE_MERGE and adjustment_date in existing_adjustment_dates:
|
|
skipped["overtime_adjustments"] += 1
|
|
continue
|
|
db.add(
|
|
OvertimeAdjustment(
|
|
user_id=user.id,
|
|
date=adjustment_date,
|
|
minutes=row["minutes"],
|
|
notes=row["notes"],
|
|
)
|
|
)
|
|
existing_adjustment_dates.add(adjustment_date)
|
|
created["overtime_adjustments"] += 1
|
|
|
|
db.flush()
|
|
ensure_user_has_default_target_rule(db, user)
|
|
if user.entry_mode == ENTRY_MODE_AUTO_UNTIL_TODAY:
|
|
removed_future_auto_entries = delete_future_auto_entries(db=db, user_id=user.id, after_date=date.today())
|
|
else:
|
|
removed_future_auto_entries = 0
|
|
|
|
return {
|
|
"mode": mode,
|
|
"created": created,
|
|
"skipped": skipped,
|
|
"removed_future_auto_entries": removed_future_auto_entries,
|
|
}
|