from datetime import date, timedelta from sqlalchemy import select from sqlalchemy.orm import Session from app.models import VacationPeriod from app.services.workdays import DEFAULT_WORKING_DAYS def daterange(start: date, end: date): current = start while current <= end: yield current current += timedelta(days=1) def list_vacations_for_user( db: Session, user_id: str, from_date: date, to_date: date, ) -> list[VacationPeriod]: stmt = ( select(VacationPeriod) .where( VacationPeriod.user_id == user_id, VacationPeriod.end_date >= from_date, VacationPeriod.start_date <= to_date, ) .order_by(VacationPeriod.start_date.asc()) ) return db.execute(stmt).scalars().all() def expand_vacation_dates( periods: list[VacationPeriod], from_date: date, to_date: date, relevant_weekdays: set[int] | None = None, ) -> set[date]: dates: set[date] = set() for period in periods: start = max(period.start_date, from_date) end = min(period.end_date, to_date) if end < start: continue for day in daterange(start, end): if not period.include_weekends: if relevant_weekdays is None: if day.weekday() >= 5: continue elif day.weekday() not in relevant_weekdays: continue dates.add(day) return dates def collapse_dates_to_ranges(days: set[date]) -> list[tuple[date, date]]: if not days: return [] ordered = sorted(days) ranges: list[tuple[date, date]] = [] start = ordered[0] end = ordered[0] for current in ordered[1:]: if current == end + timedelta(days=1): end = current continue ranges.append((start, end)) start = current end = current ranges.append((start, end)) return ranges def vacation_workdays_in_week( vacation_dates: set[date], week_start: date, relevant_weekdays: set[int] | None = None, ) -> int: relevant_weekdays = relevant_weekdays or set(DEFAULT_WORKING_DAYS) count = 0 for index in range(7): day = week_start + timedelta(days=index) if day in vacation_dates and day.weekday() in relevant_weekdays: count += 1 return count def effective_week_target( base_target_minutes: int, vacation_workdays: int, *, workdays_per_week: int = 5, ) -> int: if vacation_workdays <= 0: return base_target_minutes workdays_per_week = max(1, workdays_per_week) vacation_workdays = min(vacation_workdays, workdays_per_week) day_target = base_target_minutes / workdays_per_week reduced = int(round(base_target_minutes - (day_target * vacation_workdays))) return max(0, reduced) def apply_vacation_to_week_targets( base_target_map: dict[date, int], vacation_dates: set[date], relevant_weekdays: set[int] | None = None, ) -> dict[date, int]: relevant_weekdays = relevant_weekdays or set(DEFAULT_WORKING_DAYS) workdays_per_week = max(1, len(relevant_weekdays)) effective_map: dict[date, int] = {} for week_start, base_target in base_target_map.items(): vacation_days = vacation_workdays_in_week(vacation_dates, week_start, relevant_weekdays) effective_map[week_start] = effective_week_target( base_target, vacation_days, workdays_per_week=workdays_per_week, ) return effective_map def vacation_dates_for_weeks( periods: list[VacationPeriod], week_starts: list[date], relevant_weekdays: set[int] | None = None, ) -> set[date]: if not week_starts: return set() from_date = min(week_starts) to_date = max(week_starts) + timedelta(days=6) return expand_vacation_dates(periods, from_date, to_date, relevant_weekdays=relevant_weekdays) def week_target_map_with_vacations( base_target_map: dict[date, int], periods: list[VacationPeriod], relevant_weekdays: set[int] | None = None, ) -> dict[date, int]: vacation_dates = vacation_dates_for_weeks(periods, list(base_target_map.keys()), relevant_weekdays=relevant_weekdays) return apply_vacation_to_week_targets(base_target_map, vacation_dates, relevant_weekdays) def vacations_by_week( periods: list[VacationPeriod], week_starts: list[date], relevant_weekdays: set[int] | None = None, ) -> dict[date, int]: relevant_weekdays = relevant_weekdays or set(DEFAULT_WORKING_DAYS) vacation_dates = vacation_dates_for_weeks(periods, week_starts, relevant_weekdays=relevant_weekdays) result: dict[date, int] = {} for week_start in week_starts: result[week_start] = vacation_workdays_in_week(vacation_dates, week_start, relevant_weekdays) return result