chore: initialize public repository
CI / checks (push) Has been cancelled

This commit is contained in:
maddin
2026-03-22 12:57:09 +00:00
commit 6fbd1bb3c2
142 changed files with 19826 additions and 0 deletions
View File
+332
View File
@@ -0,0 +1,332 @@
from datetime import date, timedelta
from sqlalchemy import select
from sqlalchemy.orm import Session
from app.models import AutoEntrySkip, OvertimeAdjustment, SpecialDayStatus, TimeEntry, User
from app.services.calculations import automatic_break_minutes_for_net_minutes, compute_net_minutes
from app.services.targets import list_rules_for_user, monday_of, target_for_week
from app.services.vacations import expand_vacation_dates, list_vacations_for_user
from app.services.workdays import parse_working_days_csv
ENTRY_MODE_MANUAL = "manual"
ENTRY_MODE_AUTO_UNTIL_TODAY = "auto_until_today"
AUTO_ENTRY_NOTE = "Automatisch vorausgefuellt"
SPECIAL_DAY_STATUS_HOLIDAY = "holiday"
SPECIAL_DAY_STATUS_SICK = "sick"
def get_user_working_days(user: User) -> set[int]:
return parse_working_days_csv(user.working_days_csv)
def list_special_statuses_for_user(
db: Session,
user_id: str,
from_date: date,
to_date: date,
) -> list[SpecialDayStatus]:
stmt = (
select(SpecialDayStatus)
.where(
SpecialDayStatus.user_id == user_id,
SpecialDayStatus.date >= from_date,
SpecialDayStatus.date <= to_date,
)
.order_by(SpecialDayStatus.date.asc())
)
return db.execute(stmt).scalars().all()
def special_status_map(periods: list[SpecialDayStatus]) -> dict[date, str]:
return {period.date: period.status for period in periods}
def special_status_dates(periods: list[SpecialDayStatus]) -> set[date]:
return {period.date for period in periods}
def count_as_worktime_dates_for_user(
*,
user: User,
vacation_dates: set[date],
special_statuses: list[SpecialDayStatus],
) -> set[date]:
dates: set[date] = set()
if user.count_vacation_as_worktime:
dates.update(vacation_dates)
if user.count_holiday_as_worktime:
dates.update(period.date for period in special_statuses if period.status == SPECIAL_DAY_STATUS_HOLIDAY)
if user.count_sick_as_worktime:
dates.update(period.date for period in special_statuses if period.status == SPECIAL_DAY_STATUS_SICK)
return dates
def effective_non_working_dates_for_user(
*,
user: User,
special_statuses: list[SpecialDayStatus],
) -> set[date]:
blocked: set[date] = set()
for period in special_statuses:
if period.status == SPECIAL_DAY_STATUS_HOLIDAY and user.count_holiday_as_worktime:
continue
if period.status == SPECIAL_DAY_STATUS_SICK and user.count_sick_as_worktime:
continue
blocked.add(period.date)
return blocked
def clear_special_status_for_date(*, db: Session, user_id: str, day: date) -> None:
stmt = select(SpecialDayStatus).where(SpecialDayStatus.user_id == user_id, SpecialDayStatus.date == day)
existing = db.execute(stmt).scalar_one_or_none()
if existing:
db.delete(existing)
def list_overtime_adjustments_for_user(
db: Session,
user_id: str,
from_date: date,
to_date: date,
) -> list[OvertimeAdjustment]:
stmt = (
select(OvertimeAdjustment)
.where(
OvertimeAdjustment.user_id == user_id,
OvertimeAdjustment.date >= from_date,
OvertimeAdjustment.date <= to_date,
)
.order_by(OvertimeAdjustment.date.asc())
)
return db.execute(stmt).scalars().all()
def overtime_adjustment_map(adjustments: list[OvertimeAdjustment]) -> dict[date, OvertimeAdjustment]:
return {adjustment.date: adjustment for adjustment in adjustments}
def overtime_adjustment_minutes_map(adjustments: list[OvertimeAdjustment]) -> dict[date, int]:
return {adjustment.date: adjustment.minutes for adjustment in adjustments}
def clear_overtime_adjustment_for_date(*, db: Session, user_id: str, day: date) -> None:
stmt = select(OvertimeAdjustment).where(OvertimeAdjustment.user_id == user_id, OvertimeAdjustment.date == day)
existing = db.execute(stmt).scalar_one_or_none()
if existing:
db.delete(existing)
def auto_entry_skip_dates_for_user(
db: Session,
user_id: str,
from_date: date,
to_date: date,
) -> set[date]:
stmt = (
select(AutoEntrySkip.date)
.where(
AutoEntrySkip.user_id == user_id,
AutoEntrySkip.date >= from_date,
AutoEntrySkip.date <= to_date,
)
.order_by(AutoEntrySkip.date.asc())
)
return set(db.execute(stmt).scalars().all())
def mark_auto_entry_skip_for_date(*, db: Session, user_id: str, day: date) -> None:
stmt = select(AutoEntrySkip).where(AutoEntrySkip.user_id == user_id, AutoEntrySkip.date == day)
existing = db.execute(stmt).scalar_one_or_none()
if not existing:
db.add(AutoEntrySkip(user_id=user_id, date=day))
def clear_auto_entry_skip_for_date(*, db: Session, user_id: str, day: date) -> None:
stmt = select(AutoEntrySkip).where(AutoEntrySkip.user_id == user_id, AutoEntrySkip.date == day)
existing = db.execute(stmt).scalar_one_or_none()
if existing:
db.delete(existing)
def build_auto_day_entry(
*,
weekly_target_minutes: int,
workdays_per_week: int,
automatic_break_rules_enabled: bool,
default_break_minutes: int,
) -> tuple[int, int, int] | None:
if workdays_per_week <= 0:
return None
day_net_minutes = int(round(weekly_target_minutes / workdays_per_week))
if day_net_minutes <= 0:
return None
start_minutes = 8 * 60 + 30
break_minutes = (
automatic_break_minutes_for_net_minutes(day_net_minutes)
if automatic_break_rules_enabled
else max(0, default_break_minutes)
)
end_minutes = start_minutes + day_net_minutes + break_minutes
if end_minutes > (24 * 60 - 1):
end_minutes = 24 * 60 - 1
available_span = end_minutes - start_minutes
if available_span <= 0:
return None
break_minutes = min(break_minutes, max(0, available_span - 1))
return start_minutes, end_minutes, break_minutes
def auto_entry_sync_start_date(user: User) -> date:
if user.overtime_start_date:
return user.overtime_start_date
return user.created_at.date()
def delete_future_auto_entries(
*,
db: Session,
user_id: str,
after_date: date,
) -> int:
stmt = (
select(TimeEntry)
.where(
TimeEntry.user_id == user_id,
TimeEntry.date > after_date,
TimeEntry.notes == AUTO_ENTRY_NOTE,
)
.order_by(TimeEntry.date.asc())
)
entries = db.execute(stmt).scalars().all()
for entry in entries:
db.delete(entry)
return len(entries)
def autofill_entries_for_range(
*,
db: Session,
user: User,
range_start: date,
range_end: date,
) -> int:
if user.entry_mode != ENTRY_MODE_AUTO_UNTIL_TODAY:
return 0
if range_end < range_start:
return 0
effective_end = min(range_end, date.today())
effective_start = max(range_start, auto_entry_sync_start_date(user))
if effective_start > effective_end:
return 0
working_days = get_user_working_days(user)
if not working_days:
return 0
workdays_per_week = len(working_days)
rules = list_rules_for_user(db, user.id)
vacations = list_vacations_for_user(db, user.id, effective_start, effective_end)
vacation_dates = expand_vacation_dates(vacations, effective_start, effective_end, relevant_weekdays=working_days)
special_statuses = list_special_statuses_for_user(db, user.id, effective_start, effective_end)
special_dates = special_status_dates(special_statuses)
overtime_adjustments = list_overtime_adjustments_for_user(db, user.id, effective_start, effective_end)
adjustment_dates = set(overtime_adjustment_minutes_map(overtime_adjustments).keys())
skipped_auto_dates = auto_entry_skip_dates_for_user(db, user.id, effective_start, effective_end)
existing_dates_stmt = (
select(TimeEntry.date)
.where(
TimeEntry.user_id == user.id,
TimeEntry.date >= effective_start,
TimeEntry.date <= effective_end,
)
.order_by(TimeEntry.date.asc())
)
existing_dates = set(db.execute(existing_dates_stmt).scalars().all())
created = 0
cursor = effective_start
while cursor <= effective_end:
if cursor in existing_dates:
cursor += timedelta(days=1)
continue
if cursor in vacation_dates:
cursor += timedelta(days=1)
continue
if cursor in special_dates:
cursor += timedelta(days=1)
continue
if cursor in adjustment_dates:
cursor += timedelta(days=1)
continue
if cursor in skipped_auto_dates:
cursor += timedelta(days=1)
continue
if cursor.weekday() not in working_days:
cursor += timedelta(days=1)
continue
weekly_target_minutes = target_for_week(rules, monday_of(cursor), user.weekly_target_minutes)
entry_values = build_auto_day_entry(
weekly_target_minutes=weekly_target_minutes,
workdays_per_week=workdays_per_week,
automatic_break_rules_enabled=bool(user.automatic_break_rules_enabled),
default_break_minutes=user.default_break_minutes,
)
if entry_values is None:
cursor += timedelta(days=1)
continue
start_minutes, end_minutes, break_minutes = entry_values
db.add(
TimeEntry(
user_id=user.id,
date=cursor,
start_minutes=start_minutes,
end_minutes=end_minutes,
break_minutes=break_minutes,
break_rule_mode="auto",
notes=AUTO_ENTRY_NOTE,
)
)
existing_dates.add(cursor)
created += 1
cursor += timedelta(days=1)
return created
def sync_auto_entries_for_all_users(
*,
db: Session,
up_to_date: date | None = None,
) -> dict[str, int]:
effective_date = up_to_date or date.today()
stmt = (
select(User)
.where(
User.is_active.is_(True),
User.entry_mode == ENTRY_MODE_AUTO_UNTIL_TODAY,
)
.order_by(User.created_at.asc())
)
users = db.execute(stmt).scalars().all()
created = 0
deleted = 0
for user in users:
deleted += delete_future_auto_entries(db=db, user_id=user.id, after_date=effective_date)
created += autofill_entries_for_range(
db=db,
user=user,
range_start=auto_entry_sync_start_date(user),
range_end=effective_date,
)
return {"users": len(users), "created": created, "deleted_future": deleted}
+126
View File
@@ -0,0 +1,126 @@
from collections import defaultdict
from datetime import date, datetime, timedelta
import re
def parse_time_to_minutes(value: str) -> int:
if not re.fullmatch(r"([01]\d|2[0-3]):[0-5]\d", value):
raise ValueError("Uhrzeit muss im Format HH:MM sein")
try:
parsed = datetime.strptime(value, "%H:%M")
except ValueError as exc:
raise ValueError("Uhrzeit muss im Format HH:MM sein") from exc
return parsed.hour * 60 + parsed.minute
def minutes_to_hhmm(minutes: int) -> str:
sign = "-" if minutes < 0 else ""
minutes_abs = abs(minutes)
hours = minutes_abs // 60
mins = minutes_abs % 60
return f"{sign}{hours:02d}:{mins:02d}"
def validate_entry(start_minutes: int, end_minutes: int, break_minutes: int) -> None:
if end_minutes <= start_minutes:
raise ValueError("Arbeitsende muss nach Arbeitsbeginn liegen")
if break_minutes < 0:
raise ValueError("Pause darf nicht negativ sein")
gross_minutes = end_minutes - start_minutes
if break_minutes > gross_minutes:
raise ValueError("Pause darf nicht laenger als die Arbeitszeit sein")
def required_break_minutes_for_span(work_span_minutes: int) -> int:
if work_span_minutes > 9 * 60:
return 45
if work_span_minutes > 6 * 60:
return 30
return 0
def automatic_break_minutes(start_minutes: int, end_minutes: int) -> int:
if end_minutes <= start_minutes:
raise ValueError("Arbeitsende muss nach Arbeitsbeginn liegen")
return required_break_minutes_for_span(end_minutes - start_minutes)
def automatic_break_minutes_for_net_minutes(net_minutes: int) -> int:
if net_minutes < 0:
raise ValueError("Nettoarbeitszeit darf nicht negativ sein")
if net_minutes > (9 * 60 - 45):
return 45
if net_minutes > (6 * 60 - 30):
return 30
return 0
def compute_net_minutes(start_minutes: int, end_minutes: int, break_minutes: int) -> int:
validate_entry(start_minutes, end_minutes, break_minutes)
return (end_minutes - start_minutes) - break_minutes
def iso_week_bounds(day: date) -> tuple[date, date]:
week_start = day - timedelta(days=day.weekday())
week_end = week_start + timedelta(days=6)
return week_start, week_end
def daterange(start: date, end: date):
current = start
while current <= end:
yield current
current += timedelta(days=1)
def aggregate_week(entries: list, week_start: date, weekly_target_minutes: int) -> dict:
week_end = week_start + timedelta(days=6)
entries_by_date = {entry.date: entry for entry in entries}
days = []
weekly_ist = 0
for day in daterange(week_start, week_end):
entry = entries_by_date.get(day)
if entry is None:
days.append({"date": day, "entry": None, "net_minutes": 0})
continue
net_minutes = compute_net_minutes(entry.start_minutes, entry.end_minutes, entry.break_minutes)
weekly_ist += net_minutes
days.append({"date": day, "entry": entry, "net_minutes": net_minutes})
weekly_delta = weekly_ist - weekly_target_minutes
return {
"week_start": week_start,
"week_end": week_end,
"days": days,
"weekly_ist": weekly_ist,
"weekly_soll": weekly_target_minutes,
"weekly_delta": weekly_delta,
}
def cumulative_delta(entries: list, selected_week_start: date, weekly_target_minutes: int) -> int:
if not entries:
return 0
earliest_entry_date = min(entry.date for entry in entries)
current_week_start = earliest_entry_date - timedelta(days=earliest_entry_date.weekday())
net_by_week_start = defaultdict(int)
for entry in entries:
week_start, _ = iso_week_bounds(entry.date)
net_by_week_start[week_start] += compute_net_minutes(
entry.start_minutes, entry.end_minutes, entry.break_minutes
)
running = 0
while current_week_start <= selected_week_start:
weekly_ist = net_by_week_start[current_week_start]
running += weekly_ist - weekly_target_minutes
current_week_start += timedelta(days=7)
return running
+70
View File
@@ -0,0 +1,70 @@
from __future__ import annotations
from dataclasses import dataclass
from email.message import EmailMessage
import smtplib
import ssl
@dataclass
class MailServerSettings:
smtp_host: str
smtp_port: int
smtp_username: str | None
smtp_password: str | None
from_email: str
from_name: str
use_starttls: bool
use_ssl: bool
verify_tls: bool
timeout_seconds: int = 15
def _build_context(verify_tls: bool) -> ssl.SSLContext:
context = ssl.create_default_context()
if verify_tls:
return context
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
return context
def send_email(
*,
settings: MailServerSettings,
to_email: str,
subject: str,
text_body: str,
) -> None:
if not settings.smtp_host.strip():
raise ValueError("SMTP host is empty")
if not settings.from_email.strip():
raise ValueError("From email is empty")
msg = EmailMessage()
msg["Subject"] = subject
msg["From"] = f"{settings.from_name} <{settings.from_email}>"
msg["To"] = to_email
msg.set_content(text_body)
ssl_context = _build_context(settings.verify_tls)
if settings.use_ssl:
with smtplib.SMTP_SSL(
settings.smtp_host,
settings.smtp_port,
timeout=settings.timeout_seconds,
context=ssl_context,
) as smtp:
if settings.smtp_username:
smtp.login(settings.smtp_username, settings.smtp_password or "")
smtp.send_message(msg)
return
with smtplib.SMTP(settings.smtp_host, settings.smtp_port, timeout=settings.timeout_seconds) as smtp:
smtp.ehlo()
if settings.use_starttls:
smtp.starttls(context=ssl_context)
smtp.ehlo()
if settings.smtp_username:
smtp.login(settings.smtp_username, settings.smtp_password or "")
smtp.send_message(msg)
+237
View File
@@ -0,0 +1,237 @@
import json
from datetime import date
from io import BytesIO
from openpyxl import Workbook
from reportlab.lib.pagesizes import A4, landscape
from reportlab.pdfgen import canvas
from app.services.calculations import minutes_to_hhmm
from app.services.targets import monday_of
def create_excel_export(rows: list[dict], week_summaries: list[dict], totals: dict, title: str) -> bytes:
workbook = Workbook()
sheet = workbook.active
sheet.title = "Tage"
headers = [
"Datum",
"Wochentag",
"KW",
"Start",
"Ende",
"Pause (min)",
"Brutto",
"Netto",
"Stundenausgleich",
"Sonderstatus",
"Wochen-Soll",
"Wochen-Delta",
"Notiz",
]
sheet.append(headers)
for row in rows:
sheet.append(
[
row["date"].isoformat(),
row["weekday_name"],
row["iso_week"],
row["start_time"] or "",
row["end_time"] or "",
row["break_minutes"],
minutes_to_hhmm(row["gross_minutes"]),
minutes_to_hhmm(row["net_minutes"]),
minutes_to_hhmm(row["overtime_adjustment_minutes"]),
row["special_status_label"] or "",
minutes_to_hhmm(row["weekly_target_minutes"]),
minutes_to_hhmm(row["weekly_delta_minutes"]),
row["notes"] or "",
]
)
for col in ["A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M"]:
sheet.column_dimensions[col].width = 16
summary = workbook.create_sheet("Wochen")
summary_headers = ["KW-Start", "KW-Ende", "Ist", "Soll", "Delta"]
summary.append(summary_headers)
for item in week_summaries:
summary.append(
[
item["week_start"].isoformat(),
item["week_end"].isoformat(),
minutes_to_hhmm(item["ist_minutes"]),
minutes_to_hhmm(item["soll_minutes"]),
minutes_to_hhmm(item["delta_minutes"]),
]
)
summary.append([])
summary.append(["Gesamt", "", minutes_to_hhmm(totals["ist_minutes"]), "", minutes_to_hhmm(totals["delta_minutes"])])
meta = workbook.create_sheet("Meta")
meta.append([title])
meta.append([f"Zeitraum: {totals['from_date'].isoformat()} bis {totals['to_date'].isoformat()}"])
output = BytesIO()
workbook.save(output)
return output.getvalue()
def create_pdf_export(rows: list[dict], week_summaries: list[dict], totals: dict, title: str) -> bytes:
output = BytesIO()
pdf = canvas.Canvas(output, pagesize=landscape(A4))
width, height = landscape(A4)
y = height - 35
pdf.setFont("Helvetica-Bold", 13)
pdf.drawString(24, y, title)
y -= 18
pdf.setFont("Helvetica", 10)
pdf.drawString(24, y, f"Zeitraum: {totals['from_date'].isoformat()} bis {totals['to_date'].isoformat()}")
y -= 24
pdf.setFont("Helvetica-Bold", 8)
pdf.drawString(24, y, "Datum")
pdf.drawString(88, y, "Tag")
pdf.drawString(124, y, "KW")
pdf.drawString(154, y, "Start")
pdf.drawString(198, y, "Ende")
pdf.drawString(242, y, "Pause")
pdf.drawString(286, y, "Brutto")
pdf.drawString(338, y, "Netto")
pdf.drawString(390, y, "Ausgl.")
pdf.drawString(436, y, "Status")
pdf.drawString(490, y, "Soll")
pdf.drawString(542, y, "W-Delta")
pdf.drawString(610, y, "Notiz")
y -= 12
pdf.setFont("Helvetica", 8)
for row in rows:
if y < 40:
pdf.showPage()
y = height - 30
pdf.setFont("Helvetica", 8)
note = (row["notes"] or "").strip()
if len(note) > 18:
note = f"{note[:15]}..."
pdf.drawString(24, y, row["date"].isoformat())
pdf.drawString(88, y, row["weekday_short"])
pdf.drawString(124, y, str(row["iso_week"]))
pdf.drawString(154, y, row["start_time"] or "-")
pdf.drawString(198, y, row["end_time"] or "-")
pdf.drawString(242, y, str(row["break_minutes"]))
pdf.drawString(286, y, minutes_to_hhmm(row["gross_minutes"]))
pdf.drawString(338, y, minutes_to_hhmm(row["net_minutes"]))
pdf.drawString(390, y, minutes_to_hhmm(row["overtime_adjustment_minutes"]))
pdf.drawString(436, y, row["special_status_label"] or "-")
pdf.drawString(490, y, minutes_to_hhmm(row["weekly_target_minutes"]))
pdf.drawString(542, y, minutes_to_hhmm(row["weekly_delta_minutes"]))
pdf.drawString(610, y, note)
y -= 11
y -= 12
pdf.setFont("Helvetica-Bold", 10)
pdf.drawString(24, y, "Wochenzusammenfassung")
y -= 14
pdf.setFont("Helvetica", 9)
for item in week_summaries:
if y < 40:
pdf.showPage()
y = height - 30
pdf.setFont("Helvetica", 9)
line = (
f"{item['week_start'].isoformat()} - {item['week_end'].isoformat()} | "
f"Ist {minutes_to_hhmm(item['ist_minutes'])} | "
f"Soll {minutes_to_hhmm(item['soll_minutes'])} | "
f"Delta {minutes_to_hhmm(item['delta_minutes'])}"
)
pdf.drawString(24, y, line)
y -= 12
y -= 10
pdf.setFont("Helvetica-Bold", 10)
pdf.drawString(
24,
y,
f"Gesamt Ist: {minutes_to_hhmm(totals['ist_minutes'])} | Gesamt Delta: {minutes_to_hhmm(totals['delta_minutes'])}",
)
pdf.save()
return output.getvalue()
def create_backup_export(payload: dict) -> bytes:
return json.dumps(payload, ensure_ascii=False, indent=2).encode("utf-8")
def build_export_rows(
days: list[date],
entries_by_date: dict,
week_target_map: dict[date, int],
week_ist_map: dict[date, int],
week_delta_map: dict[date, int],
special_status_map: dict[date, str] | None = None,
overtime_adjustment_map: dict[date, int] | None = None,
) -> list[dict]:
weekday_names = ["Montag", "Dienstag", "Mittwoch", "Donnerstag", "Freitag", "Samstag", "Sonntag"]
weekday_short = ["Mo", "Di", "Mi", "Do", "Fr", "Sa", "So"]
special_status_map = special_status_map or {}
overtime_adjustment_map = overtime_adjustment_map or {}
special_status_labels = {
"holiday": "Feiertag",
"sick": "Krankheit",
}
rows: list[dict] = []
for day in days:
entry = entries_by_date.get(day)
week_start = monday_of(day)
weekly_target = week_target_map[week_start]
weekly_delta = week_delta_map[week_start]
if entry:
gross = entry.end_minutes - entry.start_minutes
net = gross - entry.break_minutes
start_time = f"{entry.start_minutes // 60:02d}:{entry.start_minutes % 60:02d}"
end_time = f"{entry.end_minutes // 60:02d}:{entry.end_minutes % 60:02d}"
break_minutes = entry.break_minutes
notes = entry.notes
else:
gross = 0
net = 0
start_time = None
end_time = None
break_minutes = 0
notes = None
rows.append(
{
"date": day,
"weekday_name": weekday_names[day.weekday()],
"weekday_short": weekday_short[day.weekday()],
"iso_week": day.isocalendar()[1],
"start_time": start_time,
"end_time": end_time,
"break_minutes": break_minutes,
"gross_minutes": gross,
"net_minutes": net,
"overtime_adjustment_minutes": overtime_adjustment_map.get(day, 0),
"special_status_label": special_status_labels.get(special_status_map.get(day, "")),
"weekly_target_minutes": weekly_target,
"weekly_delta_minutes": weekly_delta,
"notes": notes,
}
)
return rows
+712
View File
@@ -0,0 +1,712 @@
import json
from datetime import date, datetime, timedelta, timezone
from typing import Any
from sqlalchemy import delete, select
from sqlalchemy.orm import Session
from app.models import (
AutoEntrySkip,
ImportPreview,
OvertimeAdjustment,
SpecialDayStatus,
TimeEntry,
User,
VacationPeriod,
WeeklyTargetRule,
)
from app.services.auto_entries import (
ENTRY_MODE_AUTO_UNTIL_TODAY,
ENTRY_MODE_MANUAL,
delete_future_auto_entries,
)
from app.services.calculations import compute_net_minutes
from app.services.public_holidays import normalize_german_state_code
from app.services.security import utc_now
from app.services.targets import ensure_user_has_default_target_rule
from app.services.workdays import serialize_working_days
CURRENT_BACKUP_VERSION = 2
SUPPORTED_BACKUP_VERSIONS = {1, 2}
IMPORT_MODE_MERGE = "merge"
IMPORT_MODE_REPLACE = "replace_user_data"
IMPORT_PREVIEW_TTL_HOURS = 24
MAX_BACKUP_BYTES = 5 * 1024 * 1024
SPECIAL_STATUS_VALUES = {"holiday", "sick"}
PREFERRED_HOME_VIEWS = {"week", "month"}
PREFERRED_MONTH_VIEWS = {"flat", "weeks"}
BREAK_RULE_MODES = {"manual", "auto"}
class BackupImportError(ValueError):
pass
def supported_import_modes() -> set[str]:
return {IMPORT_MODE_MERGE, IMPORT_MODE_REPLACE}
def _require_mapping(value: Any, *, label: str) -> dict[str, Any]:
if not isinstance(value, dict):
raise BackupImportError(f"{label} ist nicht korrekt aufgebaut.")
return value
def _require_list(value: Any, *, label: str) -> list[Any]:
if value is None:
return []
if not isinstance(value, list):
raise BackupImportError(f"{label} ist nicht korrekt aufgebaut.")
return value
def _parse_date(value: Any, *, label: str) -> date:
if not isinstance(value, str) or not value.strip():
raise BackupImportError(f"{label} fehlt oder ist ungültig.")
try:
return date.fromisoformat(value)
except ValueError as exc:
raise BackupImportError(f"{label} hat kein gültiges Datum.") from exc
def _parse_datetime(value: Any, *, label: str) -> str | None:
if value in (None, ""):
return None
if not isinstance(value, str):
raise BackupImportError(f"{label} hat kein gültiges Datum.")
try:
datetime.fromisoformat(value.replace("Z", "+00:00"))
except ValueError as exc:
raise BackupImportError(f"{label} hat kein gültiges Datum.") from exc
return value
def _parse_int(value: Any, *, label: str, minimum: int | None = None) -> int:
if not isinstance(value, int):
raise BackupImportError(f"{label} ist keine ganze Zahl.")
if minimum is not None and value < minimum:
raise BackupImportError(f"{label} ist zu klein.")
return value
def _parse_optional_int(value: Any, *, label: str, minimum: int | None = None) -> int | None:
if value is None:
return None
return _parse_int(value, label=label, minimum=minimum)
def _parse_bool(value: Any, *, label: str) -> bool:
if not isinstance(value, bool):
raise BackupImportError(f"{label} muss true oder false sein.")
return value
def _parse_optional_text(value: Any, *, label: str) -> str | None:
if value in (None, ""):
return None
if not isinstance(value, str):
raise BackupImportError(f"{label} ist ungültig.")
return value.strip() or None
def _normalize_settings(payload: dict[str, Any]) -> dict[str, Any]:
settings_value = payload.get("settings")
if settings_value is None:
user_section = payload.get("user")
if isinstance(user_section, dict):
settings_value = user_section.get("settings")
settings_data = _require_mapping(settings_value, label="Backup-Einstellungen")
working_days_raw = settings_data.get("working_days")
if not isinstance(working_days_raw, list) or not working_days_raw:
raise BackupImportError("Die relevanten Arbeitstage im Backup sind ungültig.")
working_days: list[int] = []
for item in working_days_raw:
if not isinstance(item, int) or item < 0 or item > 6:
raise BackupImportError("Die relevanten Arbeitstage im Backup sind ungültig.")
if item not in working_days:
working_days.append(item)
if not working_days:
raise BackupImportError("Im Backup ist kein relevanter Arbeitstag hinterlegt.")
preferred_home_view = settings_data.get("preferred_home_view", "week")
if preferred_home_view not in PREFERRED_HOME_VIEWS:
preferred_home_view = "week"
preferred_month_view_mode = settings_data.get("preferred_month_view_mode", "flat")
if preferred_month_view_mode not in PREFERRED_MONTH_VIEWS:
preferred_month_view_mode = "flat"
entry_mode = settings_data.get("entry_mode", ENTRY_MODE_MANUAL)
if entry_mode == "auto":
entry_mode = ENTRY_MODE_AUTO_UNTIL_TODAY
if entry_mode not in {ENTRY_MODE_MANUAL, ENTRY_MODE_AUTO_UNTIL_TODAY}:
raise BackupImportError("Der Erfassungsmodus im Backup ist ungültig.")
federal_state = None
if settings_data.get("federal_state"):
federal_state = normalize_german_state_code(str(settings_data.get("federal_state")))
if federal_state is None:
raise BackupImportError("Das Bundesland im Backup ist ungültig.")
overtime_start_date = None
if settings_data.get("overtime_start_date"):
overtime_start_date = _parse_date(settings_data.get("overtime_start_date"), label="Überstunden-Startdatum")
workhours_counter_start_date = None
if settings_data.get("workhours_counter_start_date"):
workhours_counter_start_date = _parse_date(
settings_data.get("workhours_counter_start_date"),
label="Arbeitsstunden-Counter Startdatum",
)
workhours_counter_end_date = None
if settings_data.get("workhours_counter_end_date"):
workhours_counter_end_date = _parse_date(
settings_data.get("workhours_counter_end_date"),
label="Arbeitsstunden-Counter Enddatum",
)
return {
"weekly_target_minutes": _parse_int(settings_data.get("weekly_target_minutes", 1500), label="Wochenstunden", minimum=1),
"preferred_home_view": preferred_home_view,
"preferred_month_view_mode": preferred_month_view_mode,
"entry_mode": entry_mode,
"working_days": sorted(working_days),
"count_vacation_as_worktime": _parse_bool(
settings_data.get("count_vacation_as_worktime", False),
label="Urlaubstage-wie-Arbeitstage",
),
"count_holiday_as_worktime": _parse_bool(
settings_data.get("count_holiday_as_worktime", False),
label="Feiertage-wie-Arbeitstage",
),
"count_sick_as_worktime": _parse_bool(
settings_data.get("count_sick_as_worktime", False),
label="Kranktage-wie-Arbeitstage",
),
"automatic_break_rules_enabled": _parse_bool(
settings_data.get("automatic_break_rules_enabled", False),
label="Automatische Pausenregel",
),
"default_break_minutes": _parse_int(
settings_data.get("default_break_minutes", 0),
label="Tägliche Pause",
minimum=0,
),
"overtime_start_date": overtime_start_date.isoformat() if overtime_start_date else None,
"overtime_expiry_days": _parse_optional_int(
settings_data.get("overtime_expiry_days"),
label="Überstunden-Verfall",
minimum=1,
),
"expire_negative_overtime": _parse_bool(
settings_data.get("expire_negative_overtime", False),
label="Negative Stunden verfallen",
),
"vacation_days_total": _parse_int(
settings_data.get("vacation_days_total", 0),
label="Urlaubstage gesamt",
minimum=0,
),
"vacation_show_in_header": _parse_bool(
settings_data.get("vacation_show_in_header", True),
label="Urlaub im Header anzeigen",
),
"workhours_counter_enabled": _parse_bool(
settings_data.get("workhours_counter_enabled", False),
label="Arbeitsstunden-Counter aktiviert",
),
"workhours_counter_show_in_header": _parse_bool(
settings_data.get("workhours_counter_show_in_header", False),
label="Arbeitsstunden-Counter im Header anzeigen",
),
"workhours_counter_start_date": (
workhours_counter_start_date.isoformat() if workhours_counter_start_date else None
),
"workhours_counter_end_date": (
workhours_counter_end_date.isoformat() if workhours_counter_end_date else None
),
"workhours_counter_manual_offset_minutes": _parse_int(
settings_data.get("workhours_counter_manual_offset_minutes", 0),
label="Zusatzstunden",
minimum=0,
),
"workhours_counter_target_minutes": _parse_optional_int(
settings_data.get("workhours_counter_target_minutes"),
label="Arbeitsstunden-Ziel",
minimum=1,
),
"workhours_counter_target_email_enabled": _parse_bool(
settings_data.get("workhours_counter_target_email_enabled", False),
label="Counter-Zielwarnung per E-Mail",
),
"federal_state": federal_state,
}
def _normalize_weekly_target_rules(items: list[Any]) -> list[dict[str, Any]]:
normalized: list[dict[str, Any]] = []
seen: set[str] = set()
for item in _require_list(items, label="Wochenziel-Regeln"):
row = _require_mapping(item, label="Wochenziel-Regel")
effective_from = _parse_date(row.get("effective_from"), label="Wochenziel Startdatum").isoformat()
if effective_from in seen:
continue
seen.add(effective_from)
normalized.append(
{
"effective_from": effective_from,
"weekly_target_minutes": _parse_int(
row.get("weekly_target_minutes"),
label="Wochenziel in Minuten",
minimum=1,
),
}
)
normalized.sort(key=lambda item: item["effective_from"])
return normalized
def _normalize_time_entries(items: list[Any]) -> list[dict[str, Any]]:
normalized: list[dict[str, Any]] = []
seen: set[str] = set()
for item in _require_list(items, label="Arbeitszeiteinträge"):
row = _require_mapping(item, label="Arbeitszeiteintrag")
entry_date = _parse_date(row.get("date"), label="Arbeitszeiteintrag Datum").isoformat()
if entry_date in seen:
continue
seen.add(entry_date)
start_minutes = _parse_int(row.get("start_minutes"), label="Arbeitsbeginn", minimum=0)
end_minutes = _parse_int(row.get("end_minutes"), label="Arbeitsende", minimum=0)
break_minutes = _parse_int(row.get("break_minutes", 0), label="Pause", minimum=0)
break_rule_mode = row.get("break_rule_mode", "manual")
if break_rule_mode not in BREAK_RULE_MODES:
break_rule_mode = "manual"
compute_net_minutes(start_minutes, end_minutes, break_minutes)
normalized.append(
{
"date": entry_date,
"start_minutes": start_minutes,
"end_minutes": end_minutes,
"break_minutes": break_minutes,
"break_rule_mode": break_rule_mode,
"notes": _parse_optional_text(row.get("notes"), label="Notiz"),
}
)
normalized.sort(key=lambda item: item["date"])
return normalized
def _normalize_vacation_periods(items: list[Any]) -> list[dict[str, Any]]:
normalized: list[dict[str, Any]] = []
seen: set[tuple[str, str, bool, str | None]] = set()
for item in _require_list(items, label="Urlaubszeiträume"):
row = _require_mapping(item, label="Urlaubszeitraum")
start_date = _parse_date(row.get("start_date"), label="Urlaubsbeginn")
end_date = _parse_date(row.get("end_date"), label="Urlaubsende")
if end_date < start_date:
raise BackupImportError("Ein Urlaubszeitraum endet vor seinem Startdatum.")
include_weekends = _parse_bool(row.get("include_weekends", False), label="Wochenenden einschließen")
notes = _parse_optional_text(row.get("notes"), label="Urlaubsnotiz")
key = (start_date.isoformat(), end_date.isoformat(), include_weekends, notes)
if key in seen:
continue
seen.add(key)
normalized.append(
{
"start_date": start_date.isoformat(),
"end_date": end_date.isoformat(),
"include_weekends": include_weekends,
"notes": notes,
}
)
normalized.sort(key=lambda item: (item["start_date"], item["end_date"]))
return normalized
def _normalize_special_day_statuses(items: list[Any]) -> list[dict[str, Any]]:
normalized: list[dict[str, Any]] = []
seen: set[str] = set()
for item in _require_list(items, label="Sondertage"):
row = _require_mapping(item, label="Sondertag")
status_date = _parse_date(row.get("date"), label="Sondertag Datum").isoformat()
if status_date in seen:
continue
seen.add(status_date)
status_value = row.get("status")
if status_value not in SPECIAL_STATUS_VALUES:
raise BackupImportError("Ein Sondertag im Backup hat einen ungültigen Status.")
normalized.append(
{
"date": status_date,
"status": status_value,
"notes": _parse_optional_text(row.get("notes"), label="Sondertag-Notiz"),
}
)
normalized.sort(key=lambda item: item["date"])
return normalized
def _normalize_overtime_adjustments(items: list[Any]) -> list[dict[str, Any]]:
normalized: list[dict[str, Any]] = []
seen: set[str] = set()
for item in _require_list(items, label="Stundenausgleich"):
row = _require_mapping(item, label="Stundenausgleich-Eintrag")
adjustment_date = _parse_date(row.get("date"), label="Stundenausgleich Datum").isoformat()
if adjustment_date in seen:
continue
seen.add(adjustment_date)
normalized.append(
{
"date": adjustment_date,
"minutes": _parse_int(row.get("minutes"), label="Stundenausgleich Minuten"),
"notes": _parse_optional_text(row.get("notes"), label="Stundenausgleich-Notiz"),
}
)
normalized.sort(key=lambda item: item["date"])
return normalized
def load_backup_payload_from_bytes(payload_bytes: bytes) -> dict[str, Any]:
if not payload_bytes:
raise BackupImportError("Die Backup-Datei ist leer.")
if len(payload_bytes) > MAX_BACKUP_BYTES:
raise BackupImportError("Die Backup-Datei ist zu groß.")
try:
raw = json.loads(payload_bytes.decode("utf-8"))
except (UnicodeDecodeError, json.JSONDecodeError) as exc:
raise BackupImportError("Die Backup-Datei ist kein gültiges JSON.") from exc
payload = _require_mapping(raw, label="Backup-Datei")
version = payload.get("backup_version")
if version not in SUPPORTED_BACKUP_VERSIONS:
raise BackupImportError("Diese Backup-Version wird noch nicht unterstützt.")
normalized = {
"backup_version": version,
"source_app_name": str(payload.get("app_name") or "Stundenfuchs"),
"source_app_version": str(payload.get("app_version") or "unbekannt"),
"exported_at": _parse_datetime(payload.get("exported_at"), label="Exportdatum"),
"settings": _normalize_settings(payload),
"weekly_target_rules": _normalize_weekly_target_rules(payload.get("weekly_target_rules")),
"time_entries": _normalize_time_entries(payload.get("time_entries")),
"vacation_periods": _normalize_vacation_periods(payload.get("vacation_periods")),
"special_day_statuses": _normalize_special_day_statuses(payload.get("special_day_statuses")),
"overtime_adjustments": _normalize_overtime_adjustments(payload.get("overtime_adjustments")),
}
return normalized
def summarize_backup_payload(payload: dict[str, Any]) -> dict[str, Any]:
settings_data = payload["settings"]
return {
"backup_version": payload["backup_version"],
"source_app_name": payload["source_app_name"],
"source_app_version": payload["source_app_version"],
"exported_at": payload["exported_at"],
"settings_summary": {
"entry_mode": settings_data["entry_mode"],
"weekly_target_minutes": settings_data["weekly_target_minutes"],
"working_days": settings_data["working_days"],
"federal_state": settings_data["federal_state"],
"vacation_days_total": settings_data["vacation_days_total"],
"workhours_counter_enabled": settings_data["workhours_counter_enabled"],
},
"counts": {
"weekly_target_rules": len(payload["weekly_target_rules"]),
"time_entries": len(payload["time_entries"]),
"vacation_periods": len(payload["vacation_periods"]),
"special_day_statuses": len(payload["special_day_statuses"]),
"overtime_adjustments": len(payload["overtime_adjustments"]),
},
}
def build_import_preview(*, db: Session, user: User, payload: dict[str, Any], mode: str) -> dict[str, Any]:
if mode not in supported_import_modes():
raise BackupImportError("Ungültiger Importmodus.")
existing_time_entry_dates = set(
db.execute(select(TimeEntry.date).where(TimeEntry.user_id == user.id)).scalars().all()
)
existing_special_dates = set(
db.execute(select(SpecialDayStatus.date).where(SpecialDayStatus.user_id == user.id)).scalars().all()
)
existing_adjustment_dates = set(
db.execute(select(OvertimeAdjustment.date).where(OvertimeAdjustment.user_id == user.id)).scalars().all()
)
existing_rule_dates = set(
db.execute(select(WeeklyTargetRule.effective_from).where(WeeklyTargetRule.user_id == user.id)).scalars().all()
)
existing_vacation_keys = set(
db.execute(
select(
VacationPeriod.start_date,
VacationPeriod.end_date,
VacationPeriod.include_weekends,
VacationPeriod.notes,
).where(VacationPeriod.user_id == user.id)
).all()
)
conflicts = {
"time_entries": sum(1 for row in payload["time_entries"] if date.fromisoformat(row["date"]) in existing_time_entry_dates),
"special_day_statuses": sum(
1 for row in payload["special_day_statuses"] if date.fromisoformat(row["date"]) in existing_special_dates
),
"overtime_adjustments": sum(
1 for row in payload["overtime_adjustments"] if date.fromisoformat(row["date"]) in existing_adjustment_dates
),
"weekly_target_rules": sum(
1 for row in payload["weekly_target_rules"] if date.fromisoformat(row["effective_from"]) in existing_rule_dates
),
"vacation_periods": sum(
1
for row in payload["vacation_periods"]
if (
date.fromisoformat(row["start_date"]),
date.fromisoformat(row["end_date"]),
row["include_weekends"],
row["notes"],
)
in existing_vacation_keys
),
}
return {
**summarize_backup_payload(payload),
"mode": mode,
"mode_label": "Zusammenführen" if mode == IMPORT_MODE_MERGE else "Alle bisherigen Daten ersetzen",
"conflicts": conflicts,
}
def cleanup_import_previews(*, db: Session, user_id: str | None = None) -> None:
cutoff = utc_now() - timedelta(hours=IMPORT_PREVIEW_TTL_HOURS)
stmt = delete(ImportPreview).where(ImportPreview.created_at < cutoff)
if user_id:
stmt = stmt.where(ImportPreview.user_id == user_id)
db.execute(stmt)
def _preview_created_at(value: datetime) -> datetime:
if value.tzinfo is None:
return value.replace(tzinfo=timezone.utc)
return value
def create_import_preview_record(*, db: Session, user: User, payload: dict[str, Any], mode: str) -> ImportPreview:
cleanup_import_previews(db=db, user_id=user.id)
db.execute(delete(ImportPreview).where(ImportPreview.user_id == user.id))
preview = ImportPreview(user_id=user.id, mode=mode, payload_json=json.dumps(payload, ensure_ascii=False))
db.add(preview)
db.flush()
return preview
def get_import_preview_record(*, db: Session, user: User, preview_id: str) -> ImportPreview | None:
stmt = select(ImportPreview).where(ImportPreview.id == preview_id, ImportPreview.user_id == user.id)
preview = db.execute(stmt).scalar_one_or_none()
if preview is None:
return None
if _preview_created_at(preview.created_at) < utc_now() - timedelta(hours=IMPORT_PREVIEW_TTL_HOURS):
db.delete(preview)
db.flush()
return None
return preview
def parse_preview_payload(preview: ImportPreview) -> dict[str, Any]:
return load_backup_payload_from_bytes(preview.payload_json.encode("utf-8"))
def _apply_settings_from_backup(*, user: User, settings_data: dict[str, Any]) -> None:
user.weekly_target_minutes = settings_data["weekly_target_minutes"]
user.preferred_home_view = settings_data["preferred_home_view"]
user.preferred_month_view_mode = settings_data["preferred_month_view_mode"]
user.entry_mode = settings_data["entry_mode"]
user.working_days_csv = serialize_working_days(settings_data["working_days"])
user.count_vacation_as_worktime = settings_data["count_vacation_as_worktime"]
user.count_holiday_as_worktime = settings_data["count_holiday_as_worktime"]
user.count_sick_as_worktime = settings_data["count_sick_as_worktime"]
user.automatic_break_rules_enabled = settings_data["automatic_break_rules_enabled"]
user.default_break_minutes = settings_data["default_break_minutes"]
user.overtime_start_date = date.fromisoformat(settings_data["overtime_start_date"]) if settings_data["overtime_start_date"] else None
user.overtime_expiry_days = settings_data["overtime_expiry_days"]
user.expire_negative_overtime = settings_data["expire_negative_overtime"]
user.vacation_days_total = settings_data["vacation_days_total"]
user.vacation_show_in_header = settings_data["vacation_show_in_header"]
user.workhours_counter_enabled = settings_data["workhours_counter_enabled"]
user.workhours_counter_show_in_header = settings_data["workhours_counter_show_in_header"]
user.workhours_counter_start_date = (
date.fromisoformat(settings_data["workhours_counter_start_date"])
if settings_data["workhours_counter_start_date"]
else None
)
user.workhours_counter_end_date = (
date.fromisoformat(settings_data["workhours_counter_end_date"])
if settings_data["workhours_counter_end_date"]
else None
)
user.workhours_counter_manual_offset_minutes = settings_data["workhours_counter_manual_offset_minutes"]
user.workhours_counter_target_minutes = settings_data["workhours_counter_target_minutes"]
user.workhours_counter_target_email_enabled = settings_data["workhours_counter_target_email_enabled"]
user.federal_state = settings_data["federal_state"]
def clear_importable_user_data(*, db: Session, user_id: str) -> None:
db.execute(delete(TimeEntry).where(TimeEntry.user_id == user_id))
db.execute(delete(WeeklyTargetRule).where(WeeklyTargetRule.user_id == user_id))
db.execute(delete(VacationPeriod).where(VacationPeriod.user_id == user_id))
db.execute(delete(SpecialDayStatus).where(SpecialDayStatus.user_id == user_id))
db.execute(delete(OvertimeAdjustment).where(OvertimeAdjustment.user_id == user_id))
db.execute(delete(AutoEntrySkip).where(AutoEntrySkip.user_id == user_id))
def execute_backup_import(*, db: Session, user: User, payload: dict[str, Any], mode: str) -> dict[str, Any]:
if mode not in supported_import_modes():
raise BackupImportError("Ungültiger Importmodus.")
created = {
"weekly_target_rules": 0,
"time_entries": 0,
"vacation_periods": 0,
"special_day_statuses": 0,
"overtime_adjustments": 0,
}
skipped = {
"weekly_target_rules": 0,
"time_entries": 0,
"vacation_periods": 0,
"special_day_statuses": 0,
"overtime_adjustments": 0,
}
if mode == IMPORT_MODE_REPLACE:
clear_importable_user_data(db=db, user_id=user.id)
_apply_settings_from_backup(user=user, settings_data=payload["settings"])
existing_rule_dates = set(
db.execute(select(WeeklyTargetRule.effective_from).where(WeeklyTargetRule.user_id == user.id)).scalars().all()
)
existing_entry_dates = set(db.execute(select(TimeEntry.date).where(TimeEntry.user_id == user.id)).scalars().all())
existing_vacation_keys = set(
db.execute(
select(
VacationPeriod.start_date,
VacationPeriod.end_date,
VacationPeriod.include_weekends,
VacationPeriod.notes,
).where(VacationPeriod.user_id == user.id)
).all()
)
existing_special_dates = set(
db.execute(select(SpecialDayStatus.date).where(SpecialDayStatus.user_id == user.id)).scalars().all()
)
existing_adjustment_dates = set(
db.execute(select(OvertimeAdjustment.date).where(OvertimeAdjustment.user_id == user.id)).scalars().all()
)
for row in payload["weekly_target_rules"]:
effective_from = date.fromisoformat(row["effective_from"])
if mode == IMPORT_MODE_MERGE and effective_from in existing_rule_dates:
skipped["weekly_target_rules"] += 1
continue
db.add(
WeeklyTargetRule(
user_id=user.id,
effective_from=effective_from,
weekly_target_minutes=row["weekly_target_minutes"],
)
)
existing_rule_dates.add(effective_from)
created["weekly_target_rules"] += 1
for row in payload["time_entries"]:
entry_date = date.fromisoformat(row["date"])
if mode == IMPORT_MODE_MERGE and entry_date in existing_entry_dates:
skipped["time_entries"] += 1
continue
db.add(
TimeEntry(
user_id=user.id,
date=entry_date,
start_minutes=row["start_minutes"],
end_minutes=row["end_minutes"],
break_minutes=row["break_minutes"],
break_rule_mode=row["break_rule_mode"],
notes=row["notes"],
)
)
existing_entry_dates.add(entry_date)
created["time_entries"] += 1
for row in payload["vacation_periods"]:
key = (
date.fromisoformat(row["start_date"]),
date.fromisoformat(row["end_date"]),
row["include_weekends"],
row["notes"],
)
if mode == IMPORT_MODE_MERGE and key in existing_vacation_keys:
skipped["vacation_periods"] += 1
continue
db.add(
VacationPeriod(
user_id=user.id,
start_date=key[0],
end_date=key[1],
include_weekends=key[2],
notes=key[3],
)
)
existing_vacation_keys.add(key)
created["vacation_periods"] += 1
for row in payload["special_day_statuses"]:
status_date = date.fromisoformat(row["date"])
if mode == IMPORT_MODE_MERGE and status_date in existing_special_dates:
skipped["special_day_statuses"] += 1
continue
db.add(
SpecialDayStatus(
user_id=user.id,
date=status_date,
status=row["status"],
notes=row["notes"],
)
)
existing_special_dates.add(status_date)
created["special_day_statuses"] += 1
for row in payload["overtime_adjustments"]:
adjustment_date = date.fromisoformat(row["date"])
if mode == IMPORT_MODE_MERGE and adjustment_date in existing_adjustment_dates:
skipped["overtime_adjustments"] += 1
continue
db.add(
OvertimeAdjustment(
user_id=user.id,
date=adjustment_date,
minutes=row["minutes"],
notes=row["notes"],
)
)
existing_adjustment_dates.add(adjustment_date)
created["overtime_adjustments"] += 1
db.flush()
ensure_user_has_default_target_rule(db, user)
if user.entry_mode == ENTRY_MODE_AUTO_UNTIL_TODAY:
removed_future_auto_entries = delete_future_auto_entries(db=db, user_id=user.id, after_date=date.today())
else:
removed_future_auto_entries = 0
return {
"mode": mode,
"created": created,
"skipped": skipped,
"removed_future_auto_entries": removed_future_auto_entries,
}
+242
View File
@@ -0,0 +1,242 @@
from __future__ import annotations
import markdown as markdown_lib
import bleach
SITE_CONTENT_IMPRESSUM = 'impressum'
SITE_CONTENT_PRIVACY = 'datenschutz'
DEFAULT_SITE_CONTENT_MARKDOWN = {
SITE_CONTENT_IMPRESSUM: """# Impressum
Bitte vor dem produktiven Einsatz im Admin-Bereich vollständig ausfüllen.
## Diensteanbieter
Firmenname / Name
Straße und Hausnummer
PLZ Ort
Land
## Kontakt
E-Mail: [kontakt@example.com](mailto:kontakt@example.com)
## Verantwortlich für den Inhalt nach § 18 Abs. 2 MStV
Name der verantwortlichen Person
Straße und Hausnummer
PLZ Ort
Land
""",
SITE_CONTENT_PRIVACY: """# Datenschutzerklärung
## 1. Verantwortlicher
Bitte vor dem produktiven Einsatz im Admin-Bereich prüfen und anpassen.
Verantwortlich für die Verarbeitung personenbezogener Daten im Zusammenhang mit dieser Website und Anwendung ist:
Firmenname / Name
Straße und Hausnummer
PLZ Ort
Land
E-Mail: [kontakt@example.com](mailto:kontakt@example.com)
## 2. Allgemeines zur Datenverarbeitung
Ich verarbeite personenbezogene Daten nur, soweit dies zur Bereitstellung einer funktionsfähigen Website und Anwendung, zur Bearbeitung von Anfragen, zur Sicherheit des Dienstes sowie zur Erbringung der angebotenen Funktionen erforderlich ist.
## 3. Aufruf der Website
Beim Aufruf der Website werden technisch erforderliche Daten verarbeitet, um die Seite auszuliefern und die Stabilität und Sicherheit des Dienstes zu gewährleisten.
Dabei können insbesondere folgende Daten verarbeitet werden:
- IP-Adresse
- Datum und Uhrzeit des Abrufs
- aufgerufene Seite bzw. Ressource
- Informationen über Browser und Betriebssystem
- Referrer-Informationen
- Protokolldaten zu Sicherheits- und Fehlervorgängen
Die Verarbeitung erfolgt zur technischen Bereitstellung, Systemsicherheit und Missbrauchserkennung.
## 4. Registrierung und Benutzerkonto
Wenn du ein Benutzerkonto anlegst, verarbeite ich die von dir angegebenen Registrierungsdaten, insbesondere:
- E-Mail-Adresse
- Passwort in gehashter Form
- von dir hinterlegte Einstellungen innerhalb der Anwendung
Die Verarbeitung erfolgt zum Zweck der Einrichtung und Verwaltung deines Benutzerkontos sowie zur Nutzung der Funktionen von Stundenfuchs.
## 5. Nutzung der Anwendung
Im Rahmen der Nutzung von Stundenfuchs verarbeite ich die von dir eingegebenen oder erzeugten Inhalte, insbesondere:
- Arbeitszeiteinträge
- Pausenangaben
- Urlaubs-, Krankheits- und Feiertagseinträge
- Stundenausgleich
- Einstellungen zu Wochenstunden, relevanten Arbeitstagen und Auswertungen
- Backup-, Export- und Importdaten
- Angaben im Arbeitsstunden-Counter
Diese Daten werden verarbeitet, um dir die Funktionen der Anwendung bereitzustellen.
## 6. Anmeldung, Sitzungen und Sicherheit
Zur Anmeldung und sicheren Nutzung der Anwendung werden technisch notwendige Sitzungsdaten verarbeitet. Außerdem können sicherheitsrelevante Daten verarbeitet werden, insbesondere zur:
- Login-Verwaltung
- Erkennung missbräuchlicher Zugriffe
- Durchsetzung von Sicherheitsmaßnahmen
- Begrenzung fehlerhafter Login- oder Formularversuche
## 7. E-Mail-Funktionen
Im Zusammenhang mit der Nutzung von Stundenfuchs können E-Mails versendet werden, insbesondere für:
- E-Mail-Bestätigung
- Passwort-Reset
- sicherheitsrelevante Hinweise
- Benachrichtigungen innerhalb der Anwendung
- Kontaktanfragen bzw. Tickets
Dafür werden insbesondere E-Mail-Adresse und die jeweils zur Nachricht erforderlichen Metadaten verarbeitet.
## 8. Zwei-Faktor-Authentifizierung
Wenn du die Zwei-Faktor-Authentifizierung aktivierst, werden die dafür erforderlichen Sicherheitsdaten verarbeitet, um die zusätzliche Anmeldung per Authenticator-App zu ermöglichen.
## 9. Kontaktformular und Ticketsystem
Wenn du das Kontaktformular nutzt oder ein Ticket erstellst, verarbeite ich die von dir übermittelten Angaben, insbesondere:
- Name
- E-Mail-Adresse
- Kategorie der Anfrage
- Betreff
- Nachricht
- technische Missbrauchsschutzdaten
Die Verarbeitung erfolgt zur Bearbeitung deiner Anfrage, zur Kommunikation mit dir sowie zur Abwehr von Missbrauch und Spam.
## 10. Export und Backup
Wenn du Export- oder Backup-Funktionen nutzt, werden die von dir innerhalb der Anwendung gespeicherten Daten zusammengestellt und zum Download bereitgestellt. Diese Verarbeitung erfolgt ausschließlich zur Durchführung der von dir ausgelösten Funktion.
## 11. Rechtsgrundlagen
Soweit die Verarbeitung zur Bereitstellung und Durchführung der Funktionen von Stundenfuchs erforderlich ist, erfolgt sie auf Grundlage von Art. 6 Abs. 1 lit. b DSGVO.
Soweit die Verarbeitung zur Gewährleistung der Sicherheit, Stabilität und Missbrauchsvermeidung erfolgt, beruht sie auf Art. 6 Abs. 1 lit. f DSGVO. Das berechtigte Interesse liegt in der sicheren, funktionsfähigen und wirtschaftlichen Bereitstellung des Dienstes.
Soweit du mich kontaktierst, erfolgt die Verarbeitung je nach Inhalt deiner Anfrage auf Art. 6 Abs. 1 lit. b DSGVO oder Art. 6 Abs. 1 lit. f DSGVO.
## 12. Empfänger von Daten
Personenbezogene Daten werden nur insoweit weitergegeben, wie dies für den Betrieb der Anwendung technisch erforderlich ist oder eine gesetzliche Verpflichtung besteht.
Hosting-, E-Mail- und sonstige Empfängerangaben müssen für den konkreten Produktivbetrieb ergänzt werden.
## 13. Speicherdauer
Personenbezogene Daten werden nur so lange gespeichert, wie dies für die jeweiligen Zwecke erforderlich ist oder gesetzliche Aufbewahrungspflichten bestehen.
Kontodaten und in der Anwendung gespeicherte Inhalte werden grundsätzlich so lange gespeichert, wie dein Benutzerkonto besteht, sofern keine gesetzlichen Pflichten entgegenstehen.
Kontaktanfragen und Tickets werden gespeichert, soweit dies zur Bearbeitung, Dokumentation und Missbrauchsabwehr erforderlich ist.
## 14. Deine Rechte
Du hast nach Maßgabe der gesetzlichen Vorschriften das Recht auf:
- Auskunft über die verarbeiteten personenbezogenen Daten
- Berichtigung unrichtiger Daten
- Löschung
- Einschränkung der Verarbeitung
- Datenübertragbarkeit
- Widerspruch gegen Verarbeitungen auf Grundlage berechtigter Interessen
Wenn eine Verarbeitung auf einer Einwilligung beruht, kannst du diese jederzeit mit Wirkung für die Zukunft widerrufen.
## 15. Beschwerderecht
Du hast das Recht, dich bei einer Datenschutzaufsichtsbehörde zu beschweren.
## 16. Pflicht zur Bereitstellung von Daten
Soweit personenbezogene Daten für die Registrierung, Anmeldung oder Nutzung der Anwendung erforderlich sind, ist die Bereitstellung dieser Daten notwendig. Ohne diese Daten kann Stundenfuchs ganz oder teilweise nicht genutzt werden.
## 17. Keine automatisierte Entscheidungsfindung
Eine automatisierte Entscheidungsfindung einschließlich Profiling im Sinne von Art. 22 DSGVO findet nicht statt.
## 18. Keine Analyse- oder Drittinhalte
Es werden keine Analyse- oder Trackingdienste eingesetzt.
Es werden keine externen Schriftarten, kein externes Fehlertracking und keine eingebetteten Drittinhalte verwendet.
## 19. Stand
Stand: März 2026
""",
}
_ALLOWED_TAGS = [
'a', 'blockquote', 'br', 'code', 'em', 'h1', 'h2', 'h3', 'h4', 'li', 'ol', 'p', 'pre', 'strong', 'ul'
]
_ALLOWED_ATTRIBUTES = {
'a': ['href', 'title', 'rel', 'target'],
}
_ALLOWED_PROTOCOLS = ['http', 'https', 'mailto']
def default_site_content_markdown(key: str) -> str:
return DEFAULT_SITE_CONTENT_MARKDOWN.get(key, '')
def render_safe_markdown(markdown_text: str) -> str:
raw_html = markdown_lib.markdown(
markdown_text or '',
extensions=['extra', 'sane_lists'],
output_format='html5',
)
cleaned = bleach.clean(
raw_html,
tags=_ALLOWED_TAGS,
attributes=_ALLOWED_ATTRIBUTES,
protocols=_ALLOWED_PROTOCOLS,
strip=True,
)
return bleach.linkify(cleaned)
def normalize_markdown_input(value: str) -> str:
return (value or '').strip()
def ticket_status_label(status: str) -> str:
return {
'open': 'Offen',
'closed': 'Geschlossen',
}.get(status, status)
def ticket_category_options() -> list[dict[str, str]]:
return [
{'value': 'problem', 'label': 'Problem'},
{'value': 'feature', 'label': 'Featurerequest'},
{'value': 'other', 'label': 'Sonstiges'},
]
def ticket_category_label(value: str) -> str:
for item in ticket_category_options():
if item['value'] == value:
return item['label']
return value
+181
View File
@@ -0,0 +1,181 @@
from sqlalchemy import text
from sqlalchemy.engine import Engine
def _table_columns(engine: Engine, table_name: str) -> set[str]:
with engine.connect() as conn:
rows = conn.execute(text(f"PRAGMA table_info({table_name})")).mappings().all()
return {row["name"] for row in rows}
def run_startup_migrations(engine: Engine) -> None:
if engine.dialect.name != "sqlite":
return
user_columns = _table_columns(engine, "users")
statements: list[str] = []
if "preferred_home_view" not in user_columns:
statements.append("ALTER TABLE users ADD COLUMN preferred_home_view VARCHAR(16) NOT NULL DEFAULT 'week'")
if "preferred_month_view_mode" not in user_columns:
statements.append("ALTER TABLE users ADD COLUMN preferred_month_view_mode VARCHAR(16) NOT NULL DEFAULT 'flat'")
if "entry_mode" not in user_columns:
statements.append("ALTER TABLE users ADD COLUMN entry_mode VARCHAR(16) NOT NULL DEFAULT 'manual'")
if "working_days_csv" not in user_columns:
statements.append("ALTER TABLE users ADD COLUMN working_days_csv VARCHAR(32) NOT NULL DEFAULT '0,1,2,3,4'")
if "count_vacation_as_worktime" not in user_columns:
statements.append("ALTER TABLE users ADD COLUMN count_vacation_as_worktime BOOLEAN NOT NULL DEFAULT 0")
if "count_holiday_as_worktime" not in user_columns:
statements.append("ALTER TABLE users ADD COLUMN count_holiday_as_worktime BOOLEAN NOT NULL DEFAULT 0")
if "count_sick_as_worktime" not in user_columns:
statements.append("ALTER TABLE users ADD COLUMN count_sick_as_worktime BOOLEAN NOT NULL DEFAULT 0")
if "automatic_break_rules_enabled" not in user_columns:
statements.append("ALTER TABLE users ADD COLUMN automatic_break_rules_enabled BOOLEAN NOT NULL DEFAULT 0")
if "default_break_minutes" not in user_columns:
statements.append("ALTER TABLE users ADD COLUMN default_break_minutes INTEGER NOT NULL DEFAULT 0")
if "overtime_start_date" not in user_columns:
statements.append("ALTER TABLE users ADD COLUMN overtime_start_date DATE")
if "overtime_expiry_days" not in user_columns:
statements.append("ALTER TABLE users ADD COLUMN overtime_expiry_days INTEGER")
if "expire_negative_overtime" not in user_columns:
statements.append("ALTER TABLE users ADD COLUMN expire_negative_overtime BOOLEAN NOT NULL DEFAULT 0")
if "vacation_days_total" not in user_columns:
statements.append("ALTER TABLE users ADD COLUMN vacation_days_total INTEGER NOT NULL DEFAULT 0")
if "vacation_show_in_header" not in user_columns:
statements.append("ALTER TABLE users ADD COLUMN vacation_show_in_header BOOLEAN NOT NULL DEFAULT 1")
if "workhours_counter_enabled" not in user_columns:
statements.append("ALTER TABLE users ADD COLUMN workhours_counter_enabled BOOLEAN NOT NULL DEFAULT 0")
if "workhours_counter_show_in_header" not in user_columns:
statements.append("ALTER TABLE users ADD COLUMN workhours_counter_show_in_header BOOLEAN NOT NULL DEFAULT 0")
if "workhours_counter_start_date" not in user_columns:
statements.append("ALTER TABLE users ADD COLUMN workhours_counter_start_date DATE")
if "workhours_counter_end_date" not in user_columns:
statements.append("ALTER TABLE users ADD COLUMN workhours_counter_end_date DATE")
if "workhours_counter_manual_offset_minutes" not in user_columns:
statements.append("ALTER TABLE users ADD COLUMN workhours_counter_manual_offset_minutes INTEGER NOT NULL DEFAULT 0")
if "workhours_counter_target_minutes" not in user_columns:
statements.append("ALTER TABLE users ADD COLUMN workhours_counter_target_minutes INTEGER")
if "workhours_counter_target_email_enabled" not in user_columns:
statements.append("ALTER TABLE users ADD COLUMN workhours_counter_target_email_enabled BOOLEAN NOT NULL DEFAULT 0")
if "workhours_counter_warning_last_sent_on" not in user_columns:
statements.append("ALTER TABLE users ADD COLUMN workhours_counter_warning_last_sent_on DATE")
if "workhours_counter_warning_last_sent_key" not in user_columns:
statements.append("ALTER TABLE users ADD COLUMN workhours_counter_warning_last_sent_key VARCHAR(120)")
if "federal_state" not in user_columns:
statements.append("ALTER TABLE users ADD COLUMN federal_state VARCHAR(8)")
if "email_verified" not in user_columns:
statements.append("ALTER TABLE users ADD COLUMN email_verified BOOLEAN NOT NULL DEFAULT 1")
if "email_verification_token_hash" not in user_columns:
statements.append("ALTER TABLE users ADD COLUMN email_verification_token_hash VARCHAR(128)")
if "email_verification_expires_at" not in user_columns:
statements.append("ALTER TABLE users ADD COLUMN email_verification_expires_at DATETIME")
if "email_verification_sent_at" not in user_columns:
statements.append("ALTER TABLE users ADD COLUMN email_verification_sent_at DATETIME")
if "mfa_method" not in user_columns:
statements.append("ALTER TABLE users ADD COLUMN mfa_method VARCHAR(16) NOT NULL DEFAULT 'none'")
if "mfa_totp_secret_encrypted" not in user_columns:
statements.append("ALTER TABLE users ADD COLUMN mfa_totp_secret_encrypted TEXT")
if "mfa_email_code_hash" not in user_columns:
statements.append("ALTER TABLE users ADD COLUMN mfa_email_code_hash VARCHAR(255)")
if "mfa_email_code_expires_at" not in user_columns:
statements.append("ALTER TABLE users ADD COLUMN mfa_email_code_expires_at DATETIME")
if "mfa_email_code_sent_at" not in user_columns:
statements.append("ALTER TABLE users ADD COLUMN mfa_email_code_sent_at DATETIME")
email_config_columns = _table_columns(engine, "email_server_config")
if "registration_admin_notify_enabled" not in email_config_columns:
statements.append("ALTER TABLE email_server_config ADD COLUMN registration_admin_notify_enabled BOOLEAN NOT NULL DEFAULT 1")
if "registration_admin_notify_admin_ids_csv" not in email_config_columns:
statements.append("ALTER TABLE email_server_config ADD COLUMN registration_admin_notify_admin_ids_csv VARCHAR(1024)")
time_entry_columns = _table_columns(engine, "time_entries")
if "break_rule_mode" not in time_entry_columns:
statements.append("ALTER TABLE time_entries ADD COLUMN break_rule_mode VARCHAR(16) NOT NULL DEFAULT 'manual'")
if not statements:
return
with engine.begin() as conn:
for statement in statements:
conn.execute(text(statement))
conn.execute(text("UPDATE users SET entry_mode = 'auto_until_today' WHERE entry_mode = 'auto'"))
conn.execute(
text("CREATE INDEX IF NOT EXISTS ix_users_email_verification_token_hash ON users (email_verification_token_hash)")
)
conn.execute(
text(
"""
CREATE TABLE IF NOT EXISTS overtime_adjustments (
id VARCHAR(36) PRIMARY KEY NOT NULL,
user_id VARCHAR(36) NOT NULL,
date DATE NOT NULL,
minutes INTEGER NOT NULL,
notes TEXT,
created_at DATETIME NOT NULL,
FOREIGN KEY(user_id) REFERENCES users (id) ON DELETE CASCADE,
CONSTRAINT uq_user_overtime_adjustment_date UNIQUE (user_id, date)
)
"""
)
)
conn.execute(text("CREATE INDEX IF NOT EXISTS ix_overtime_adjustments_user_id ON overtime_adjustments (user_id)"))
conn.execute(text("CREATE INDEX IF NOT EXISTS ix_overtime_adjustments_date ON overtime_adjustments (date)"))
conn.execute(
text(
"""
CREATE TABLE IF NOT EXISTS auto_entry_skips (
id VARCHAR(36) PRIMARY KEY NOT NULL,
user_id VARCHAR(36) NOT NULL,
date DATE NOT NULL,
created_at DATETIME NOT NULL,
FOREIGN KEY(user_id) REFERENCES users (id) ON DELETE CASCADE,
CONSTRAINT uq_user_auto_entry_skip_date UNIQUE (user_id, date)
)
"""
)
)
conn.execute(text("CREATE INDEX IF NOT EXISTS ix_auto_entry_skips_user_id ON auto_entry_skips (user_id)"))
conn.execute(text("CREATE INDEX IF NOT EXISTS ix_auto_entry_skips_date ON auto_entry_skips (date)"))
conn.execute(
text(
"""
CREATE TABLE IF NOT EXISTS site_content (
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
key VARCHAR(64) NOT NULL UNIQUE,
markdown_text TEXT NOT NULL DEFAULT '',
updated_by_user_id VARCHAR(36),
updated_at DATETIME,
FOREIGN KEY(updated_by_user_id) REFERENCES users (id) ON DELETE SET NULL
)
"""
)
)
conn.execute(text("CREATE INDEX IF NOT EXISTS ix_site_content_key ON site_content (key)"))
conn.execute(
text(
"""
CREATE TABLE IF NOT EXISTS support_tickets (
id VARCHAR(36) PRIMARY KEY NOT NULL,
user_id VARCHAR(36),
category VARCHAR(24) NOT NULL DEFAULT 'problem',
status VARCHAR(24) NOT NULL DEFAULT 'open',
name VARCHAR(255) NOT NULL DEFAULT '',
email VARCHAR(255) NOT NULL,
subject VARCHAR(255) NOT NULL,
message TEXT NOT NULL,
admin_notes TEXT,
source_ip_hash VARCHAR(128),
source_user_agent VARCHAR(512),
created_at DATETIME NOT NULL,
updated_at DATETIME,
closed_at DATETIME,
FOREIGN KEY(user_id) REFERENCES users (id) ON DELETE SET NULL
)
"""
)
)
conn.execute(text("CREATE INDEX IF NOT EXISTS ix_support_tickets_user_id ON support_tickets (user_id)"))
conn.execute(text("CREATE INDEX IF NOT EXISTS ix_support_tickets_email ON support_tickets (email)"))
conn.execute(text("CREATE INDEX IF NOT EXISTS ix_support_tickets_status ON support_tickets (status)"))
conn.execute(text("CREATE INDEX IF NOT EXISTS ix_support_tickets_source_ip_hash ON support_tickets (source_ip_hash)"))
conn.execute(text("CREATE INDEX IF NOT EXISTS ix_support_tickets_created_at ON support_tickets (created_at)"))
+246
View File
@@ -0,0 +1,246 @@
from datetime import date, timedelta
from app.services.calculations import compute_net_minutes
from app.services.targets import monday_of, target_map_for_weeks, week_starts_between
from app.services.vacations import expand_vacation_dates
from app.services.workdays import DEFAULT_WORKING_DAYS, is_workday
def compute_effective_span_totals(
*,
entries: list,
range_start: date,
range_end: date,
weekly_target_minutes: int,
vacation_dates: set[date] | None,
non_working_dates: set[date] | None,
count_as_worktime_dates: set[date] | None,
overtime_adjustment_minutes_by_date: dict[date, int] | None,
overtime_start_date: date | None,
relevant_weekdays: set[int] | None = None,
) -> dict[str, int]:
if range_end < range_start:
return {
"ist_minutes": 0,
"soll_minutes": 0,
"delta_minutes": 0,
"eligible_workdays": 0,
"vacation_workdays": 0,
}
blocked_before = overtime_start_date
vacation_dates = vacation_dates or set()
non_working_dates = non_working_dates or set()
count_as_worktime_dates = count_as_worktime_dates or set()
overtime_adjustment_minutes_by_date = overtime_adjustment_minutes_by_date or {}
relevant_weekdays = relevant_weekdays or set(DEFAULT_WORKING_DAYS)
workdays_per_week = max(1, len(relevant_weekdays))
net_by_date: dict[date, int] = {}
for entry in entries:
if entry.date < range_start or entry.date > range_end:
continue
net_by_date[entry.date] = compute_net_minutes(
entry.start_minutes,
entry.end_minutes,
entry.break_minutes,
)
eligible_workdays = 0
vacation_workdays = 0
ist_minutes = 0
overtime_adjustment_minutes = 0
current = range_start
while current <= range_end:
overtime_adjustment_minutes += int(overtime_adjustment_minutes_by_date.get(current, 0))
if blocked_before is None or current >= blocked_before:
day_counts_as_worktime = current in count_as_worktime_dates and is_workday(current, relevant_weekdays)
day_target_minutes = int(round(weekly_target_minutes / workdays_per_week)) if is_workday(current, relevant_weekdays) else 0
if day_counts_as_worktime:
ist_minutes += day_target_minutes
elif current not in non_working_dates:
ist_minutes += net_by_date.get(current, 0)
if is_workday(current, relevant_weekdays):
if current in vacation_dates and not day_counts_as_worktime:
vacation_workdays += 1
elif current in non_working_dates and not day_counts_as_worktime:
pass
else:
eligible_workdays += 1
current += timedelta(days=1)
soll_minutes = int(round((weekly_target_minutes / workdays_per_week) * eligible_workdays))
delta_minutes = ist_minutes - soll_minutes + overtime_adjustment_minutes
return {
"ist_minutes": ist_minutes,
"soll_minutes": soll_minutes,
"delta_minutes": delta_minutes,
"eligible_workdays": eligible_workdays,
"vacation_workdays": vacation_workdays,
"overtime_adjustment_minutes": overtime_adjustment_minutes,
}
def compute_effective_week_totals(
*,
entries: list,
week_start: date,
weekly_target_minutes: int,
vacation_dates: set[date] | None,
non_working_dates: set[date] | None,
count_as_worktime_dates: set[date] | None,
overtime_adjustment_minutes_by_date: dict[date, int] | None,
overtime_start_date: date | None,
relevant_weekdays: set[int] | None = None,
) -> dict[str, int]:
week_end = week_start + timedelta(days=6)
totals = compute_effective_span_totals(
entries=entries,
range_start=week_start,
range_end=week_end,
weekly_target_minutes=weekly_target_minutes,
vacation_dates=vacation_dates,
non_working_dates=non_working_dates,
count_as_worktime_dates=count_as_worktime_dates,
overtime_adjustment_minutes_by_date=overtime_adjustment_minutes_by_date,
overtime_start_date=overtime_start_date,
relevant_weekdays=relevant_weekdays,
)
return {
"weekly_ist": totals["ist_minutes"],
"weekly_soll": totals["soll_minutes"],
"weekly_delta": totals["delta_minutes"],
}
def compute_cumulative_overtime_minutes(
*,
entries: list,
rules: list,
weekly_target_fallback: int,
vacation_periods: list,
non_working_dates: set[date] | None,
count_as_worktime_dates: set[date] | None,
overtime_adjustment_minutes_by_date: dict[date, int] | None,
selected_week_start: date,
overtime_start_date: date | None,
overtime_expiry_days: int | None,
expire_negative_overtime: bool,
relevant_weekdays: set[int] | None = None,
) -> int:
selected_week_end = selected_week_start + timedelta(days=6)
return compute_cumulative_overtime_until_date(
entries=entries,
rules=rules,
weekly_target_fallback=weekly_target_fallback,
vacation_periods=vacation_periods,
non_working_dates=non_working_dates,
count_as_worktime_dates=count_as_worktime_dates,
overtime_adjustment_minutes_by_date=overtime_adjustment_minutes_by_date,
as_of_date=selected_week_end,
overtime_start_date=overtime_start_date,
overtime_expiry_days=overtime_expiry_days,
expire_negative_overtime=expire_negative_overtime,
relevant_weekdays=relevant_weekdays,
)
def compute_cumulative_overtime_until_date(
*,
entries: list,
rules: list,
weekly_target_fallback: int,
vacation_periods: list,
non_working_dates: set[date] | None,
count_as_worktime_dates: set[date] | None,
overtime_adjustment_minutes_by_date: dict[date, int] | None,
as_of_date: date,
overtime_start_date: date | None,
overtime_expiry_days: int | None,
expire_negative_overtime: bool,
relevant_weekdays: set[int] | None = None,
) -> int:
relevant_weekdays = relevant_weekdays or set(DEFAULT_WORKING_DAYS)
workdays_per_week = max(1, len(relevant_weekdays))
overtime_adjustment_minutes_by_date = overtime_adjustment_minutes_by_date or {}
earliest_entry_date = min((entry.date for entry in entries), default=None)
earliest_adjustment_date = min(overtime_adjustment_minutes_by_date.keys(), default=None)
range_start_candidates = [candidate for candidate in [earliest_entry_date, earliest_adjustment_date] if candidate is not None]
if not range_start_candidates:
return 0
range_start = min(range_start_candidates)
if range_start > as_of_date:
return 0
first_week_start = monday_of(range_start)
relevant_weeks = week_starts_between(first_week_start, monday_of(as_of_date))
base_target_map = target_map_for_weeks(rules, relevant_weeks, weekly_target_fallback)
vacation_dates = expand_vacation_dates(
vacation_periods,
range_start,
as_of_date,
relevant_weekdays=relevant_weekdays,
)
non_working_dates = non_working_dates or set()
count_as_worktime_dates = count_as_worktime_dates or set()
net_by_date: dict[date, int] = {}
for entry in entries:
if entry.date < range_start or entry.date > as_of_date:
continue
net_by_date[entry.date] = compute_net_minutes(
entry.start_minutes,
entry.end_minutes,
entry.break_minutes,
)
cutoff_date: date | None = None
if overtime_expiry_days is not None and overtime_expiry_days > 0:
cutoff_date = as_of_date - timedelta(days=overtime_expiry_days)
total = 0.0
current = range_start
while current <= as_of_date:
week_start = monday_of(current)
weekly_target = base_target_map.get(week_start, weekly_target_fallback)
day_adjustment = float(overtime_adjustment_minutes_by_date.get(current, 0))
regular_delta_allowed = overtime_start_date is None or current >= overtime_start_date
day_counts_as_worktime = current in count_as_worktime_dates and current.weekday() in relevant_weekdays
if regular_delta_allowed and current.weekday() in relevant_weekdays and (current not in vacation_dates or day_counts_as_worktime):
if current in non_working_dates and not day_counts_as_worktime:
day_target = 0.0
else:
day_target = weekly_target / workdays_per_week
else:
day_target = 0.0
if regular_delta_allowed:
if day_counts_as_worktime:
day_net = day_target
else:
day_net = 0.0 if current in non_working_dates else float(net_by_date.get(current, 0))
else:
day_net = 0.0
delta = day_net - day_target + day_adjustment
expired = cutoff_date is not None and current < cutoff_date
if expired:
if delta > 0:
current += timedelta(days=1)
continue
if delta < 0 and expire_negative_overtime:
current += timedelta(days=1)
continue
total += delta
current += timedelta(days=1)
return int(round(total))
+54
View File
@@ -0,0 +1,54 @@
from __future__ import annotations
from datetime import date
import holidays
GERMAN_STATE_OPTIONS: list[dict[str, str]] = [
{"code": "BW", "label": "Baden-Württemberg"},
{"code": "BY", "label": "Bayern"},
{"code": "BE", "label": "Berlin"},
{"code": "BB", "label": "Brandenburg"},
{"code": "HB", "label": "Bremen"},
{"code": "HH", "label": "Hamburg"},
{"code": "HE", "label": "Hessen"},
{"code": "MV", "label": "Mecklenburg-Vorpommern"},
{"code": "NI", "label": "Niedersachsen"},
{"code": "NW", "label": "Nordrhein-Westfalen"},
{"code": "RP", "label": "Rheinland-Pfalz"},
{"code": "SL", "label": "Saarland"},
{"code": "SN", "label": "Sachsen"},
{"code": "ST", "label": "Sachsen-Anhalt"},
{"code": "SH", "label": "Schleswig-Holstein"},
{"code": "TH", "label": "Thüringen"},
]
GERMAN_STATE_CODES = {item["code"] for item in GERMAN_STATE_OPTIONS}
def normalize_german_state_code(value: str | None) -> str | None:
if value is None:
return None
normalized = value.strip().upper()
if not normalized:
return None
if normalized not in GERMAN_STATE_CODES:
return None
return normalized
def list_public_holiday_dates(
*,
federal_state: str,
from_date: date,
to_date: date,
) -> set[date]:
if to_date < from_date:
return set()
years = list(range(from_date.year, to_date.year + 1))
holiday_map = holidays.country_holidays("DE", subdiv=federal_state, years=years)
result: set[date] = set()
for holiday_date in holiday_map.keys():
if from_date <= holiday_date <= to_date:
result.add(holiday_date)
return result
+70
View File
@@ -0,0 +1,70 @@
from __future__ import annotations
from base64 import urlsafe_b64encode
from datetime import datetime, timezone
import hashlib
import secrets
from cryptography.fernet import Fernet, InvalidToken
import pyotp
def utc_now() -> datetime:
return datetime.now(timezone.utc)
def _derive_fernet_key(source: str) -> bytes:
digest = hashlib.sha256(source.encode("utf-8")).digest()
return urlsafe_b64encode(digest)
def build_fernet(secret_source: str) -> Fernet:
return Fernet(_derive_fernet_key(secret_source))
def encrypt_secret(fernet: Fernet, value: str) -> str:
return fernet.encrypt(value.encode("utf-8")).decode("utf-8")
def decrypt_secret(fernet: Fernet, value: str | None) -> str | None:
if not value:
return None
try:
return fernet.decrypt(value.encode("utf-8")).decode("utf-8")
except InvalidToken:
return None
def generate_numeric_code(length: int = 6) -> str:
if length <= 0:
raise ValueError("length must be positive")
lower = 10 ** (length - 1)
upper = (10**length) - 1
return str(secrets.randbelow(upper - lower + 1) + lower)
def hash_token(token: str) -> str:
return hashlib.sha256(token.encode("utf-8")).hexdigest()
def generate_reset_token() -> str:
return secrets.token_urlsafe(48)
def normalize_otp_code(code: str) -> str:
return "".join(ch for ch in code.strip() if ch.isdigit())
def generate_totp_secret() -> str:
return pyotp.random_base32()
def build_totp_uri(*, secret: str, account_name: str, issuer: str = "Stundenfuchs") -> str:
return pyotp.TOTP(secret).provisioning_uri(name=account_name, issuer_name=issuer)
def verify_totp_code(*, secret: str, code: str) -> bool:
normalized = normalize_otp_code(code)
if len(normalized) != 6:
return False
return bool(pyotp.TOTP(secret).verify(normalized, valid_window=1))
+148
View File
@@ -0,0 +1,148 @@
from datetime import date, timedelta
from sqlalchemy import delete, select
from sqlalchemy.orm import Session
from app.models import User, WeeklyTargetRule
DEFAULT_REFERENCE_WEEK_START = date(1970, 1, 5) # Montag
def monday_of(day: date) -> date:
return day - timedelta(days=day.weekday())
def week_starts_between(start_week_start: date, end_week_start: date) -> list[date]:
weeks: list[date] = []
current = start_week_start
while current <= end_week_start:
weeks.append(current)
current += timedelta(days=7)
return weeks
def list_rules_for_user(db: Session, user_id: str) -> list[WeeklyTargetRule]:
stmt = (
select(WeeklyTargetRule)
.where(WeeklyTargetRule.user_id == user_id)
.order_by(WeeklyTargetRule.effective_from.asc())
)
return db.execute(stmt).scalars().all()
def target_for_week(
rules: list[WeeklyTargetRule],
week_start: date,
fallback_minutes: int,
) -> int:
target = fallback_minutes
for rule in rules:
if rule.effective_from <= week_start:
target = rule.weekly_target_minutes
else:
break
return target
def target_map_for_weeks(
rules: list[WeeklyTargetRule],
week_starts: list[date],
fallback_minutes: int,
) -> dict[date, int]:
result: dict[date, int] = {}
for week_start in week_starts:
result[week_start] = target_for_week(rules, week_start, fallback_minutes)
return result
def upsert_rule(db: Session, user_id: str, effective_from: date, weekly_target_minutes: int) -> None:
stmt = select(WeeklyTargetRule).where(
WeeklyTargetRule.user_id == user_id,
WeeklyTargetRule.effective_from == effective_from,
)
rule = db.execute(stmt).scalar_one_or_none()
if rule:
rule.weekly_target_minutes = weekly_target_minutes
return
db.add(
WeeklyTargetRule(
user_id=user_id,
effective_from=effective_from,
weekly_target_minutes=weekly_target_minutes,
)
)
def ensure_user_has_default_target_rule(db: Session, user: User) -> None:
stmt = select(WeeklyTargetRule.id).where(WeeklyTargetRule.user_id == user.id).limit(1)
existing = db.execute(stmt).scalar_one_or_none()
if existing:
return
db.add(
WeeklyTargetRule(
user_id=user.id,
effective_from=DEFAULT_REFERENCE_WEEK_START,
weekly_target_minutes=user.weekly_target_minutes,
)
)
def ensure_all_users_have_default_target_rules(db: Session) -> None:
users = db.execute(select(User)).scalars().all()
changed = False
for user in users:
before_count = db.execute(
select(WeeklyTargetRule.id).where(WeeklyTargetRule.user_id == user.id).limit(1)
).scalar_one_or_none()
if before_count:
continue
ensure_user_has_default_target_rule(db, user)
changed = True
if changed:
db.commit()
def apply_weekly_target_change(
db: Session,
*,
user: User,
selected_week_start: date,
new_target_minutes: int,
scope: str,
) -> None:
rules = list_rules_for_user(db, user.id)
fallback = user.weekly_target_minutes
if scope == "all_weeks":
db.execute(delete(WeeklyTargetRule).where(WeeklyTargetRule.user_id == user.id))
db.add(
WeeklyTargetRule(
user_id=user.id,
effective_from=DEFAULT_REFERENCE_WEEK_START,
weekly_target_minutes=new_target_minutes,
)
)
return
if scope == "from_current_week":
db.execute(
delete(WeeklyTargetRule).where(
WeeklyTargetRule.user_id == user.id,
WeeklyTargetRule.effective_from >= selected_week_start,
)
)
upsert_rule(db, user.id, selected_week_start, new_target_minutes)
return
if scope == "current_week":
next_week_start = selected_week_start + timedelta(days=7)
target_next_week_before = target_for_week(rules, next_week_start, fallback)
upsert_rule(db, user.id, selected_week_start, new_target_minutes)
upsert_rule(db, user.id, next_week_start, target_next_week_before)
return
raise ValueError("Ungueltiger Scope")
+162
View File
@@ -0,0 +1,162 @@
from datetime import date, timedelta
from sqlalchemy import select
from sqlalchemy.orm import Session
from app.models import VacationPeriod
from app.services.workdays import DEFAULT_WORKING_DAYS
def daterange(start: date, end: date):
current = start
while current <= end:
yield current
current += timedelta(days=1)
def list_vacations_for_user(
db: Session,
user_id: str,
from_date: date,
to_date: date,
) -> list[VacationPeriod]:
stmt = (
select(VacationPeriod)
.where(
VacationPeriod.user_id == user_id,
VacationPeriod.end_date >= from_date,
VacationPeriod.start_date <= to_date,
)
.order_by(VacationPeriod.start_date.asc())
)
return db.execute(stmt).scalars().all()
def expand_vacation_dates(
periods: list[VacationPeriod],
from_date: date,
to_date: date,
relevant_weekdays: set[int] | None = None,
) -> set[date]:
dates: set[date] = set()
for period in periods:
start = max(period.start_date, from_date)
end = min(period.end_date, to_date)
if end < start:
continue
for day in daterange(start, end):
if not period.include_weekends:
if relevant_weekdays is None:
if day.weekday() >= 5:
continue
elif day.weekday() not in relevant_weekdays:
continue
dates.add(day)
return dates
def collapse_dates_to_ranges(days: set[date]) -> list[tuple[date, date]]:
if not days:
return []
ordered = sorted(days)
ranges: list[tuple[date, date]] = []
start = ordered[0]
end = ordered[0]
for current in ordered[1:]:
if current == end + timedelta(days=1):
end = current
continue
ranges.append((start, end))
start = current
end = current
ranges.append((start, end))
return ranges
def vacation_workdays_in_week(
vacation_dates: set[date],
week_start: date,
relevant_weekdays: set[int] | None = None,
) -> int:
relevant_weekdays = relevant_weekdays or set(DEFAULT_WORKING_DAYS)
count = 0
for index in range(7):
day = week_start + timedelta(days=index)
if day in vacation_dates and day.weekday() in relevant_weekdays:
count += 1
return count
def effective_week_target(
base_target_minutes: int,
vacation_workdays: int,
*,
workdays_per_week: int = 5,
) -> int:
if vacation_workdays <= 0:
return base_target_minutes
workdays_per_week = max(1, workdays_per_week)
vacation_workdays = min(vacation_workdays, workdays_per_week)
day_target = base_target_minutes / workdays_per_week
reduced = int(round(base_target_minutes - (day_target * vacation_workdays)))
return max(0, reduced)
def apply_vacation_to_week_targets(
base_target_map: dict[date, int],
vacation_dates: set[date],
relevant_weekdays: set[int] | None = None,
) -> dict[date, int]:
relevant_weekdays = relevant_weekdays or set(DEFAULT_WORKING_DAYS)
workdays_per_week = max(1, len(relevant_weekdays))
effective_map: dict[date, int] = {}
for week_start, base_target in base_target_map.items():
vacation_days = vacation_workdays_in_week(vacation_dates, week_start, relevant_weekdays)
effective_map[week_start] = effective_week_target(
base_target,
vacation_days,
workdays_per_week=workdays_per_week,
)
return effective_map
def vacation_dates_for_weeks(
periods: list[VacationPeriod],
week_starts: list[date],
relevant_weekdays: set[int] | None = None,
) -> set[date]:
if not week_starts:
return set()
from_date = min(week_starts)
to_date = max(week_starts) + timedelta(days=6)
return expand_vacation_dates(periods, from_date, to_date, relevant_weekdays=relevant_weekdays)
def week_target_map_with_vacations(
base_target_map: dict[date, int],
periods: list[VacationPeriod],
relevant_weekdays: set[int] | None = None,
) -> dict[date, int]:
vacation_dates = vacation_dates_for_weeks(periods, list(base_target_map.keys()), relevant_weekdays=relevant_weekdays)
return apply_vacation_to_week_targets(base_target_map, vacation_dates, relevant_weekdays)
def vacations_by_week(
periods: list[VacationPeriod],
week_starts: list[date],
relevant_weekdays: set[int] | None = None,
) -> dict[date, int]:
relevant_weekdays = relevant_weekdays or set(DEFAULT_WORKING_DAYS)
vacation_dates = vacation_dates_for_weeks(periods, week_starts, relevant_weekdays=relevant_weekdays)
result: dict[date, int] = {}
for week_start in week_starts:
result[week_start] = vacation_workdays_in_week(vacation_dates, week_start, relevant_weekdays)
return result
+37
View File
@@ -0,0 +1,37 @@
from datetime import date
DEFAULT_WORKING_DAYS = (0, 1, 2, 3, 4)
def normalize_working_days(days: list[int] | set[int] | tuple[int, ...]) -> list[int]:
normalized = sorted({int(day) for day in days if 0 <= int(day) <= 6})
if not normalized:
return list(DEFAULT_WORKING_DAYS)
return normalized
def serialize_working_days(days: list[int] | set[int] | tuple[int, ...]) -> str:
return ",".join(str(day) for day in normalize_working_days(days))
def parse_working_days_csv(value: str | None) -> set[int]:
if not value:
return set(DEFAULT_WORKING_DAYS)
parsed: list[int] = []
for part in value.split(","):
item = part.strip()
if not item:
continue
try:
parsed.append(int(item))
except ValueError:
continue
normalized = normalize_working_days(parsed)
return set(normalized)
def is_workday(day: date, relevant_weekdays: set[int]) -> bool:
return day.weekday() in relevant_weekdays