Files
maddin 6fbd1bb3c2
CI / checks (push) Has been cancelled
chore: initialize public repository
2026-03-22 12:57:09 +00:00

333 lines
10 KiB
Python

from datetime import date, timedelta
from sqlalchemy import select
from sqlalchemy.orm import Session
from app.models import AutoEntrySkip, OvertimeAdjustment, SpecialDayStatus, TimeEntry, User
from app.services.calculations import automatic_break_minutes_for_net_minutes, compute_net_minutes
from app.services.targets import list_rules_for_user, monday_of, target_for_week
from app.services.vacations import expand_vacation_dates, list_vacations_for_user
from app.services.workdays import parse_working_days_csv
ENTRY_MODE_MANUAL = "manual"
ENTRY_MODE_AUTO_UNTIL_TODAY = "auto_until_today"
AUTO_ENTRY_NOTE = "Automatisch vorausgefuellt"
SPECIAL_DAY_STATUS_HOLIDAY = "holiday"
SPECIAL_DAY_STATUS_SICK = "sick"
def get_user_working_days(user: User) -> set[int]:
return parse_working_days_csv(user.working_days_csv)
def list_special_statuses_for_user(
db: Session,
user_id: str,
from_date: date,
to_date: date,
) -> list[SpecialDayStatus]:
stmt = (
select(SpecialDayStatus)
.where(
SpecialDayStatus.user_id == user_id,
SpecialDayStatus.date >= from_date,
SpecialDayStatus.date <= to_date,
)
.order_by(SpecialDayStatus.date.asc())
)
return db.execute(stmt).scalars().all()
def special_status_map(periods: list[SpecialDayStatus]) -> dict[date, str]:
return {period.date: period.status for period in periods}
def special_status_dates(periods: list[SpecialDayStatus]) -> set[date]:
return {period.date for period in periods}
def count_as_worktime_dates_for_user(
*,
user: User,
vacation_dates: set[date],
special_statuses: list[SpecialDayStatus],
) -> set[date]:
dates: set[date] = set()
if user.count_vacation_as_worktime:
dates.update(vacation_dates)
if user.count_holiday_as_worktime:
dates.update(period.date for period in special_statuses if period.status == SPECIAL_DAY_STATUS_HOLIDAY)
if user.count_sick_as_worktime:
dates.update(period.date for period in special_statuses if period.status == SPECIAL_DAY_STATUS_SICK)
return dates
def effective_non_working_dates_for_user(
*,
user: User,
special_statuses: list[SpecialDayStatus],
) -> set[date]:
blocked: set[date] = set()
for period in special_statuses:
if period.status == SPECIAL_DAY_STATUS_HOLIDAY and user.count_holiday_as_worktime:
continue
if period.status == SPECIAL_DAY_STATUS_SICK and user.count_sick_as_worktime:
continue
blocked.add(period.date)
return blocked
def clear_special_status_for_date(*, db: Session, user_id: str, day: date) -> None:
stmt = select(SpecialDayStatus).where(SpecialDayStatus.user_id == user_id, SpecialDayStatus.date == day)
existing = db.execute(stmt).scalar_one_or_none()
if existing:
db.delete(existing)
def list_overtime_adjustments_for_user(
db: Session,
user_id: str,
from_date: date,
to_date: date,
) -> list[OvertimeAdjustment]:
stmt = (
select(OvertimeAdjustment)
.where(
OvertimeAdjustment.user_id == user_id,
OvertimeAdjustment.date >= from_date,
OvertimeAdjustment.date <= to_date,
)
.order_by(OvertimeAdjustment.date.asc())
)
return db.execute(stmt).scalars().all()
def overtime_adjustment_map(adjustments: list[OvertimeAdjustment]) -> dict[date, OvertimeAdjustment]:
return {adjustment.date: adjustment for adjustment in adjustments}
def overtime_adjustment_minutes_map(adjustments: list[OvertimeAdjustment]) -> dict[date, int]:
return {adjustment.date: adjustment.minutes for adjustment in adjustments}
def clear_overtime_adjustment_for_date(*, db: Session, user_id: str, day: date) -> None:
stmt = select(OvertimeAdjustment).where(OvertimeAdjustment.user_id == user_id, OvertimeAdjustment.date == day)
existing = db.execute(stmt).scalar_one_or_none()
if existing:
db.delete(existing)
def auto_entry_skip_dates_for_user(
db: Session,
user_id: str,
from_date: date,
to_date: date,
) -> set[date]:
stmt = (
select(AutoEntrySkip.date)
.where(
AutoEntrySkip.user_id == user_id,
AutoEntrySkip.date >= from_date,
AutoEntrySkip.date <= to_date,
)
.order_by(AutoEntrySkip.date.asc())
)
return set(db.execute(stmt).scalars().all())
def mark_auto_entry_skip_for_date(*, db: Session, user_id: str, day: date) -> None:
stmt = select(AutoEntrySkip).where(AutoEntrySkip.user_id == user_id, AutoEntrySkip.date == day)
existing = db.execute(stmt).scalar_one_or_none()
if not existing:
db.add(AutoEntrySkip(user_id=user_id, date=day))
def clear_auto_entry_skip_for_date(*, db: Session, user_id: str, day: date) -> None:
stmt = select(AutoEntrySkip).where(AutoEntrySkip.user_id == user_id, AutoEntrySkip.date == day)
existing = db.execute(stmt).scalar_one_or_none()
if existing:
db.delete(existing)
def build_auto_day_entry(
*,
weekly_target_minutes: int,
workdays_per_week: int,
automatic_break_rules_enabled: bool,
default_break_minutes: int,
) -> tuple[int, int, int] | None:
if workdays_per_week <= 0:
return None
day_net_minutes = int(round(weekly_target_minutes / workdays_per_week))
if day_net_minutes <= 0:
return None
start_minutes = 8 * 60 + 30
break_minutes = (
automatic_break_minutes_for_net_minutes(day_net_minutes)
if automatic_break_rules_enabled
else max(0, default_break_minutes)
)
end_minutes = start_minutes + day_net_minutes + break_minutes
if end_minutes > (24 * 60 - 1):
end_minutes = 24 * 60 - 1
available_span = end_minutes - start_minutes
if available_span <= 0:
return None
break_minutes = min(break_minutes, max(0, available_span - 1))
return start_minutes, end_minutes, break_minutes
def auto_entry_sync_start_date(user: User) -> date:
if user.overtime_start_date:
return user.overtime_start_date
return user.created_at.date()
def delete_future_auto_entries(
*,
db: Session,
user_id: str,
after_date: date,
) -> int:
stmt = (
select(TimeEntry)
.where(
TimeEntry.user_id == user_id,
TimeEntry.date > after_date,
TimeEntry.notes == AUTO_ENTRY_NOTE,
)
.order_by(TimeEntry.date.asc())
)
entries = db.execute(stmt).scalars().all()
for entry in entries:
db.delete(entry)
return len(entries)
def autofill_entries_for_range(
*,
db: Session,
user: User,
range_start: date,
range_end: date,
) -> int:
if user.entry_mode != ENTRY_MODE_AUTO_UNTIL_TODAY:
return 0
if range_end < range_start:
return 0
effective_end = min(range_end, date.today())
effective_start = max(range_start, auto_entry_sync_start_date(user))
if effective_start > effective_end:
return 0
working_days = get_user_working_days(user)
if not working_days:
return 0
workdays_per_week = len(working_days)
rules = list_rules_for_user(db, user.id)
vacations = list_vacations_for_user(db, user.id, effective_start, effective_end)
vacation_dates = expand_vacation_dates(vacations, effective_start, effective_end, relevant_weekdays=working_days)
special_statuses = list_special_statuses_for_user(db, user.id, effective_start, effective_end)
special_dates = special_status_dates(special_statuses)
overtime_adjustments = list_overtime_adjustments_for_user(db, user.id, effective_start, effective_end)
adjustment_dates = set(overtime_adjustment_minutes_map(overtime_adjustments).keys())
skipped_auto_dates = auto_entry_skip_dates_for_user(db, user.id, effective_start, effective_end)
existing_dates_stmt = (
select(TimeEntry.date)
.where(
TimeEntry.user_id == user.id,
TimeEntry.date >= effective_start,
TimeEntry.date <= effective_end,
)
.order_by(TimeEntry.date.asc())
)
existing_dates = set(db.execute(existing_dates_stmt).scalars().all())
created = 0
cursor = effective_start
while cursor <= effective_end:
if cursor in existing_dates:
cursor += timedelta(days=1)
continue
if cursor in vacation_dates:
cursor += timedelta(days=1)
continue
if cursor in special_dates:
cursor += timedelta(days=1)
continue
if cursor in adjustment_dates:
cursor += timedelta(days=1)
continue
if cursor in skipped_auto_dates:
cursor += timedelta(days=1)
continue
if cursor.weekday() not in working_days:
cursor += timedelta(days=1)
continue
weekly_target_minutes = target_for_week(rules, monday_of(cursor), user.weekly_target_minutes)
entry_values = build_auto_day_entry(
weekly_target_minutes=weekly_target_minutes,
workdays_per_week=workdays_per_week,
automatic_break_rules_enabled=bool(user.automatic_break_rules_enabled),
default_break_minutes=user.default_break_minutes,
)
if entry_values is None:
cursor += timedelta(days=1)
continue
start_minutes, end_minutes, break_minutes = entry_values
db.add(
TimeEntry(
user_id=user.id,
date=cursor,
start_minutes=start_minutes,
end_minutes=end_minutes,
break_minutes=break_minutes,
break_rule_mode="auto",
notes=AUTO_ENTRY_NOTE,
)
)
existing_dates.add(cursor)
created += 1
cursor += timedelta(days=1)
return created
def sync_auto_entries_for_all_users(
*,
db: Session,
up_to_date: date | None = None,
) -> dict[str, int]:
effective_date = up_to_date or date.today()
stmt = (
select(User)
.where(
User.is_active.is_(True),
User.entry_mode == ENTRY_MODE_AUTO_UNTIL_TODAY,
)
.order_by(User.created_at.asc())
)
users = db.execute(stmt).scalars().all()
created = 0
deleted = 0
for user in users:
deleted += delete_future_auto_entries(db=db, user_id=user.id, after_date=effective_date)
created += autofill_entries_for_range(
db=db,
user=user,
range_start=auto_entry_sync_start_date(user),
range_end=effective_date,
)
return {"users": len(users), "created": created, "deleted_future": deleted}