Spaces:
Running
Running
""" | |
Helper utilities for parsing durations - 1s, 1d, 10d, 30d, 1mo, 2mo | |
duration_in_seconds is used in diff parts of the code base, example | |
- Router - Provider budget routing | |
- Proxy - Key, Team Generation | |
""" | |
import re | |
import time | |
from datetime import datetime, timedelta, timezone | |
from typing import Optional, Tuple | |
def _extract_from_regex(duration: str) -> Tuple[int, str]: | |
match = re.match(r"(\d+)(mo|[smhdw]?)", duration) | |
if not match: | |
raise ValueError("Invalid duration format") | |
value, unit = match.groups() | |
value = int(value) | |
return value, unit | |
def get_last_day_of_month(year, month): | |
# Handle December case | |
if month == 12: | |
return 31 | |
# Next month is January, so subtract a day from March 1st | |
next_month = datetime(year=year, month=month + 1, day=1) | |
last_day_of_month = (next_month - timedelta(days=1)).day | |
return last_day_of_month | |
def duration_in_seconds(duration: str) -> int: | |
""" | |
Parameters: | |
- duration: | |
- "<number>s" - seconds | |
- "<number>m" - minutes | |
- "<number>h" - hours | |
- "<number>d" - days | |
- "<number>w" - weeks | |
- "<number>mo" - months | |
Returns time in seconds till when budget needs to be reset | |
""" | |
value, unit = _extract_from_regex(duration=duration) | |
if unit == "s": | |
return value | |
elif unit == "m": | |
return value * 60 | |
elif unit == "h": | |
return value * 3600 | |
elif unit == "d": | |
return value * 86400 | |
elif unit == "w": | |
return value * 604800 | |
elif unit == "mo": | |
now = time.time() | |
current_time = datetime.fromtimestamp(now) | |
if current_time.month == 12: | |
target_year = current_time.year + 1 | |
target_month = 1 | |
else: | |
target_year = current_time.year | |
target_month = current_time.month + value | |
# Determine the day to set for next month | |
target_day = current_time.day | |
last_day_of_target_month = get_last_day_of_month(target_year, target_month) | |
if target_day > last_day_of_target_month: | |
target_day = last_day_of_target_month | |
next_month = datetime( | |
year=target_year, | |
month=target_month, | |
day=target_day, | |
hour=current_time.hour, | |
minute=current_time.minute, | |
second=current_time.second, | |
microsecond=current_time.microsecond, | |
) | |
# Calculate the duration until the first day of the next month | |
duration_until_next_month = next_month - current_time | |
return int(duration_until_next_month.total_seconds()) | |
else: | |
raise ValueError(f"Unsupported duration unit, passed duration: {duration}") | |
def get_next_standardized_reset_time( | |
duration: str, current_time: datetime, timezone_str: str = "UTC" | |
) -> datetime: | |
""" | |
Get the next standardized reset time based on the duration. | |
All durations will reset at predictable intervals, aligned from the current time: | |
- Nd: If N=1, reset at next midnight; if N>1, reset every N days from now | |
- Nh: Every N hours, aligned to hour boundaries (e.g., 1:00, 2:00) | |
- Nm: Every N minutes, aligned to minute boundaries (e.g., 1:05, 1:10) | |
- Ns: Every N seconds, aligned to second boundaries | |
Parameters: | |
- duration: Duration string (e.g. "30s", "30m", "30h", "30d") | |
- current_time: Current datetime | |
- timezone_str: Timezone string (e.g. "UTC", "US/Eastern", "Asia/Kolkata") | |
Returns: | |
- Next reset time at a standardized interval in the specified timezone | |
""" | |
# Set up timezone and normalize current time | |
current_time, timezone = _setup_timezone(current_time, timezone_str) | |
# Parse duration | |
value, unit = _parse_duration(duration) | |
if value is None: | |
# Fall back to default if format is invalid | |
return current_time.replace( | |
hour=0, minute=0, second=0, microsecond=0 | |
) + timedelta(days=1) | |
# Midnight of the current day in the specified timezone | |
base_midnight = current_time.replace(hour=0, minute=0, second=0, microsecond=0) | |
# Handle different time units | |
if unit == "d": | |
return _handle_day_reset(current_time, base_midnight, value, timezone) | |
elif unit == "h": | |
return _handle_hour_reset(current_time, base_midnight, value) | |
elif unit == "m": | |
return _handle_minute_reset(current_time, base_midnight, value) | |
elif unit == "s": | |
return _handle_second_reset(current_time, base_midnight, value) | |
else: | |
# Unrecognized unit, default to next midnight | |
return base_midnight + timedelta(days=1) | |
def _setup_timezone( | |
current_time: datetime, timezone_str: str = "UTC" | |
) -> Tuple[datetime, timezone]: | |
"""Set up timezone and normalize current time to that timezone.""" | |
try: | |
if timezone_str is None: | |
tz = timezone.utc | |
else: | |
# Map common timezone strings to their UTC offsets | |
timezone_map = { | |
"US/Eastern": timezone(timedelta(hours=-4)), # EDT | |
"US/Pacific": timezone(timedelta(hours=-7)), # PDT | |
"Asia/Kolkata": timezone(timedelta(hours=5, minutes=30)), # IST | |
"Europe/London": timezone(timedelta(hours=1)), # BST | |
"UTC": timezone.utc, | |
} | |
tz = timezone_map.get(timezone_str, timezone.utc) | |
except Exception: | |
# If timezone is invalid, fall back to UTC | |
tz = timezone.utc | |
# Convert current_time to the target timezone | |
if current_time.tzinfo is None: | |
# Naive datetime - assume it's UTC | |
utc_time = current_time.replace(tzinfo=timezone.utc) | |
current_time = utc_time.astimezone(tz) | |
else: | |
# Already has timezone - convert to target timezone | |
current_time = current_time.astimezone(tz) | |
return current_time, tz | |
def _parse_duration(duration: str) -> Tuple[Optional[int], Optional[str]]: | |
"""Parse the duration string into value and unit.""" | |
match = re.match(r"(\d+)([a-z]+)", duration) | |
if not match: | |
return None, None | |
value, unit = match.groups() | |
return int(value), unit | |
def _handle_day_reset( | |
current_time: datetime, base_midnight: datetime, value: int, timezone: timezone | |
) -> datetime: | |
"""Handle day-based reset times.""" | |
if value == 1: # Daily reset at midnight | |
return base_midnight + timedelta(days=1) | |
elif value == 7: # Weekly reset on Monday at midnight | |
days_until_monday = (7 - current_time.weekday()) % 7 | |
if days_until_monday == 0: # If today is Monday | |
days_until_monday = 7 | |
return base_midnight + timedelta(days=days_until_monday) | |
elif value == 30: # Monthly reset on 1st at midnight | |
# Get 1st of next month at midnight | |
if current_time.month == 12: | |
next_reset = datetime( | |
year=current_time.year + 1, | |
month=1, | |
day=1, | |
hour=0, | |
minute=0, | |
second=0, | |
microsecond=0, | |
tzinfo=timezone, | |
) | |
else: | |
next_reset = datetime( | |
year=current_time.year, | |
month=current_time.month + 1, | |
day=1, | |
hour=0, | |
minute=0, | |
second=0, | |
microsecond=0, | |
tzinfo=timezone, | |
) | |
return next_reset | |
else: # Custom day value - next interval is value days from current | |
return current_time.replace( | |
hour=0, minute=0, second=0, microsecond=0 | |
) + timedelta(days=value) | |
def _handle_hour_reset( | |
current_time: datetime, base_midnight: datetime, value: int | |
) -> datetime: | |
"""Handle hour-based reset times.""" | |
current_hour = current_time.hour | |
current_minute = current_time.minute | |
current_second = current_time.second | |
current_microsecond = current_time.microsecond | |
# Calculate next hour aligned with the value | |
if current_minute == 0 and current_second == 0 and current_microsecond == 0: | |
next_hour = ( | |
current_hour + value - (current_hour % value) | |
if current_hour % value != 0 | |
else current_hour + value | |
) | |
else: | |
next_hour = ( | |
current_hour + value - (current_hour % value) | |
if current_hour % value != 0 | |
else current_hour + value | |
) | |
# Handle overnight case | |
if next_hour >= 24: | |
next_hour = next_hour % 24 | |
next_day = base_midnight + timedelta(days=1) | |
return next_day.replace(hour=next_hour) | |
return current_time.replace(hour=next_hour, minute=0, second=0, microsecond=0) | |
def _handle_minute_reset( | |
current_time: datetime, base_midnight: datetime, value: int | |
) -> datetime: | |
"""Handle minute-based reset times.""" | |
current_hour = current_time.hour | |
current_minute = current_time.minute | |
current_second = current_time.second | |
current_microsecond = current_time.microsecond | |
# Calculate next minute aligned with the value | |
if current_second == 0 and current_microsecond == 0: | |
next_minute = ( | |
current_minute + value - (current_minute % value) | |
if current_minute % value != 0 | |
else current_minute + value | |
) | |
else: | |
next_minute = ( | |
current_minute + value - (current_minute % value) | |
if current_minute % value != 0 | |
else current_minute + value | |
) | |
# Handle hour rollover | |
next_hour = current_hour + (next_minute // 60) | |
next_minute = next_minute % 60 | |
# Handle overnight case | |
if next_hour >= 24: | |
next_hour = next_hour % 24 | |
next_day = base_midnight + timedelta(days=1) | |
return next_day.replace( | |
hour=next_hour, minute=next_minute, second=0, microsecond=0 | |
) | |
return current_time.replace( | |
hour=next_hour, minute=next_minute, second=0, microsecond=0 | |
) | |
def _handle_second_reset( | |
current_time: datetime, base_midnight: datetime, value: int | |
) -> datetime: | |
"""Handle second-based reset times.""" | |
current_hour = current_time.hour | |
current_minute = current_time.minute | |
current_second = current_time.second | |
current_microsecond = current_time.microsecond | |
# Calculate next second aligned with the value | |
if current_microsecond == 0: | |
next_second = ( | |
current_second + value - (current_second % value) | |
if current_second % value != 0 | |
else current_second + value | |
) | |
else: | |
next_second = ( | |
current_second + value - (current_second % value) | |
if current_second % value != 0 | |
else current_second + value | |
) | |
# Handle minute rollover | |
additional_minutes = next_second // 60 | |
next_second = next_second % 60 | |
next_minute = current_minute + additional_minutes | |
# Handle hour rollover | |
next_hour = current_hour + (next_minute // 60) | |
next_minute = next_minute % 60 | |
# Handle overnight case | |
if next_hour >= 24: | |
next_hour = next_hour % 24 | |
next_day = base_midnight + timedelta(days=1) | |
return next_day.replace( | |
hour=next_hour, minute=next_minute, second=next_second, microsecond=0 | |
) | |
return current_time.replace( | |
hour=next_hour, minute=next_minute, second=next_second, microsecond=0 | |
) | |