Spaces:
Sleeping
Sleeping
from typing import List, Dict, Any | |
from typing import Tuple | |
import gradio as gr | |
import polars as pl | |
import time | |
from datetime import datetime, timedelta | |
from datetime import time as dt_time | |
import pandas as pd | |
from tqdm import tqdm | |
import geopy.distance | |
import requests | |
import re | |
import os | |
from bs4 import BeautifulSoup | |
from cachetools import cached, TTLCache | |
import concurrent.futures | |
from functools import partial | |
import traceback | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
def get_geo_optimal_stop( | |
method: str, selected_stops: List[str], show_top: int = 20 | |
) -> List[str]: | |
""" | |
Calculate and return the top optimal geographic stops based on a specified method. | |
Args: | |
method (str): Optimization method, either "minimize-worst-case" or "minimize-total". | |
selected_stops (List[str]): A list of selected stop identifiers. | |
show_top (int, optional): Number of top results to return. Defaults to 20. | |
Returns: | |
List[str]: A list of the top optimal stops based on the selected method. | |
Raises: | |
ValueError: If the method is not recognized. | |
""" | |
global DISTANCE_TABLE | |
dfs = [] | |
for si, stop in tqdm( | |
enumerate(selected_stops), | |
desc="Calculating optimal stops", | |
total=len(selected_stops), | |
): | |
df = ( | |
DISTANCE_TABLE.filter(pl.col("from") == stop) | |
.drop("from") | |
.with_columns( | |
pl.col("to").alias("target_stop"), | |
pl.col("distance_in_km").alias(f"distance_in_km_{si}"), | |
) | |
.select("target_stop", f"distance_in_km_{si}") | |
) | |
dfs.append(df) | |
print("Joining dataframes ...") | |
df = dfs[0] | |
for i in range(1, len(dfs)): | |
df = df.join(dfs[i], on="target_stop") | |
print("Finidng optimal stops ...") | |
df = df.with_columns( | |
pl.max_horizontal( | |
*[f"distance_in_km_{si}" for si in range(len(selected_stops))] | |
).alias("worst_case_km"), | |
pl.sum_horizontal( | |
*[f"distance_in_km_{si}" for si in range(len(selected_stops))] | |
).alias("total_km"), | |
) | |
if method == "minimize-worst-case": | |
df = df.sort("worst_case_km") | |
elif method == "minimize-total": | |
df = df.sort("total_km") | |
return df.head(show_top)["target_stop"].to_list() | |
def get_time_optimal_stop( | |
method: str, selected_stops: List[str], show_top: int = 20 | |
) -> List[str]: | |
""" | |
Calculate and return the top optimal geographic stops based on a specified method using total travel time. | |
Args: | |
method (str): Optimization method, either "minimize-worst-case" or "minimize-total". | |
selected_stops (List[str]): A list of selected stop identifiers. | |
show_top (int, optional): Number of top results to return. Defaults to 20. | |
Returns: | |
List[str]: A list of the top optimal stops based on the selected method | |
Raises: | |
ValueError: If the method is not recognized. | |
""" | |
global DISTANCE_TABLE | |
dfs = [] | |
for si, stop in tqdm( | |
enumerate(selected_stops), | |
desc="Calculating optimal stops", | |
total=len(selected_stops), | |
): | |
df = ( | |
DISTANCE_TABLE.filter(pl.col("from") == stop) | |
.drop("from") | |
.with_columns( | |
pl.col("to").alias("target_stop"), | |
pl.col("total_minutes").alias(f"total_minutes_{si}"), | |
) | |
.select("target_stop", f"total_minutes_{si}") | |
) | |
dfs.append(df) | |
print("Joining dataframes ...") | |
df = dfs[0] | |
for i in range(1, len(dfs)): | |
df = df.join(dfs[i], on="target_stop") | |
print("Finding optimal stops ...") | |
df = df.with_columns( | |
pl.max_horizontal( | |
*[f"total_minutes_{si}" for si in range(len(selected_stops))] | |
).alias("worst_case_minutes"), | |
pl.sum_horizontal( | |
*[f"total_minutes_{si}" for si in range(len(selected_stops))] | |
).alias("total_minutes"), | |
) | |
if method == "minimize-worst-case": | |
df = df.sort("worst_case_minutes") | |
elif method == "minimize-total": | |
df = df.sort("total_minutes") | |
else: | |
raise ValueError(f"Unknown method: {method}") | |
return df.head(show_top)["target_stop"].to_list() | |
def get_optimal_stop( | |
method: str, selected_stops: List[str], show_top_geo: int = 20, show_top_time: int = 20 | |
) -> List[str]: | |
""" | |
Calculate and return the top optimal geographic stops based on a specified method. | |
Args: | |
method (str): Optimization method, either "minimize-worst-case" or "minimize-total". | |
selected_stops (List[str]): A list of selected stop identifiers. | |
show_top_geo (int, optional): Number of top results to return for geographic optimization. Defaults to 20. | |
show_top_time (int, optional): Number of top results to return for time optimization. Defaults to 20. | |
Returns: | |
List[str]: A list of the top optimal stops based on the selected method. | |
Raises: | |
ValueError: If the method is not recognized. | |
""" | |
global DISTANCE_TABLE | |
geo_optimal_stops = get_geo_optimal_stop(method, selected_stops, show_top_geo) | |
print(geo_optimal_stops) | |
time_optimal_stops = get_time_optimal_stop(method, selected_stops, show_top_time) | |
print(time_optimal_stops) | |
print() | |
print([s for s in geo_optimal_stops if s not in time_optimal_stops]) | |
print([s for s in time_optimal_stops if s not in geo_optimal_stops]) | |
print() | |
print(list(set(geo_optimal_stops) & set(time_optimal_stops))) | |
print(list(set(geo_optimal_stops) | set(time_optimal_stops))) | |
return list(set(geo_optimal_stops) | set(time_optimal_stops)) | |
def validate_date_time(date_str: str, time_str: str) -> Tuple[bool, str]: | |
""" | |
Validates a date and time string against specific criteria. | |
Args: | |
date_str (str): The date string to validate, in the format 'DD/MM/YYYY'. | |
time_str (str): The time string to validate, in the format 'HH:MM'. | |
Returns: | |
Tuple[bool, str]: A tuple containing a boolean indicating if the input is valid, | |
and a string message indicating the error if invalid, or an empty string if valid. | |
""" | |
try: | |
event_datetime = datetime.strptime(f"{date_str} {time_str}", "%d/%m/%Y %H:%M") | |
except ValueError: | |
return ( | |
False, | |
"Invalid date or time format. Please ensure date is DD/MM/YYYY and time is HH:MM.", | |
) | |
now = datetime.now() | |
three_months_later = now + timedelta(days=90) # Approximation of 3 months | |
if event_datetime <= now: | |
return False, "The selected date and time must be in the future." | |
if event_datetime > three_months_later: | |
return ( | |
False, | |
"The selected date and time must not be more than 3 months in the future.", | |
) | |
return True, "" | |
def get_next_meetup_time(target_weekday: int, target_hour: int) -> datetime: | |
""" | |
Calculate the next occurrence of a meetup based on the target weekday and hour. | |
Args: | |
target_weekday (int): The day of the week for the meetup, where Monday is 0 | |
and Sunday is 6. | |
target_hour (int): The hour of the day for the meetup (24-hour format). | |
Returns: | |
datetime: A datetime object representing the next occurrence of the meetup | |
with the specified weekday and hour. | |
Raises: | |
ValueError: If `target_hour` is not between 0 and 23 inclusive. | |
""" | |
start_dt = datetime.now() | |
current_weekday = start_dt.weekday() | |
days_ahead = target_weekday - current_weekday | |
if days_ahead == 0: | |
if start_dt.time() >= dt_time(target_hour, 0): | |
days_ahead = 7 | |
else: | |
days_ahead = 0 | |
elif days_ahead < 0: | |
days_ahead += 7 | |
next_dt = start_dt + timedelta(days=days_ahead) | |
next_dt = next_dt.replace(hour=target_hour, minute=0, second=0, microsecond=0) | |
return next_dt | |
def parse_time_to_minutes(time_str: str) -> int: | |
""" | |
Parses a time string and converts it to a total number of minutes. | |
Args: | |
time_str (str): A string representing the time in hours and/or minutes. | |
Returns: | |
int: The total number of minutes calculated from the given time string. | |
Raises: | |
ValueError: If the time string is in an invalid format, or if negative values | |
or invalid values for hours or minutes are provided. | |
""" | |
pattern = r"^\s*(?:(\d+)\s*hod)?(?:\s*(\d+)\s*min)?\s*$" | |
match = re.match(pattern, time_str, re.IGNORECASE) | |
if not match: | |
raise ValueError(f"Invalid time format: '{time_str}'") | |
hours_str, minutes_str = match.groups() | |
hours = int(hours_str) if hours_str else 0 | |
minutes = int(minutes_str) if minutes_str else 0 | |
if hours < 0: | |
raise ValueError("Hours cannot be negative.") | |
if minutes < 0: | |
raise ValueError("Minutes cannot be negative.") | |
if minutes >= 60: | |
raise ValueError("Minutes must be less than 60.") | |
total_minutes = hours * 60 + minutes | |
return total_minutes | |
def get_total_minutes(from_stop: str, to_stop: str, dt: datetime) -> int: | |
""" | |
Sends a POST request to the specified URL using Webshare's rotating proxy and parses the response to extract time in minutes. | |
Args: | |
from_stop (str): The departure stop. | |
to_stop (str): The arrival stop. | |
dt (datetime.datetime): The date and time for the query. | |
Returns: | |
int: The total time in minutes extracted from the response. | |
Raises: | |
requests.HTTPError: If the HTTP request returned an unsuccessful status code. | |
ValueError: If expected HTML elements are not found in the response. | |
""" | |
if from_stop == to_stop: | |
return 0 | |
day_abbreviations = { | |
0: "po", # Monday -> po | |
1: "út", # Tuesday -> út | |
2: "st", # Wednesday -> st | |
3: "čt", # Thursday -> čt | |
4: "pá", # Friday -> pá | |
5: "so", # Saturday -> so | |
6: "ne", # Sunday -> ne | |
} | |
day = dt.day | |
month = dt.month | |
year = dt.year | |
weekday = dt.weekday() | |
abbreviation = day_abbreviations.get(weekday, "") | |
date_str = f"{day}.{month}.{year} {abbreviation}" | |
time_str = dt.strftime("%H:%M") | |
headers = { | |
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", | |
"accept-language": "en-US,en;q=0.9", | |
"cache-control": "max-age=0", | |
"content-type": "application/x-www-form-urlencoded", | |
"dnt": "1", | |
"origin": "https://idos.cz", | |
"priority": "u=0, i", | |
"referer": "https://idos.cz/pid/spojeni/", | |
} | |
data = [ | |
("From", from_stop), | |
("positionACPosition", ""), | |
("To", to_stop), | |
("positionACPosition", ""), | |
("AdvancedForm.Via[0]", ""), | |
("AdvancedForm.ViaHidden[0]", ""), | |
("Date", date_str), | |
("Time", time_str), | |
("IsArr", "True"), | |
] | |
url = "https://idos.cz/pid/spojeni/" | |
proxy_domain = os.getenv("PROXY_DOMAIN") | |
proxy_port = os.getenv("PROXY_PORT") | |
proxy_username = os.getenv("PROXY_USERNAME") | |
proxy_password = os.getenv("PROXY_PASSWORD") | |
proxy_url = f"http://{proxy_username}:{proxy_password}@{proxy_domain}:{proxy_port}" | |
proxies = { | |
"http": proxy_url, | |
"https": proxy_url, | |
} | |
try: | |
if proxy_domain is None: | |
response = requests.post(url, headers=headers, data=data, timeout=15) | |
else: | |
response = requests.post( | |
url, headers=headers, data=data, proxies=proxies, timeout=15 | |
) | |
response.raise_for_status() | |
except requests.RequestException as e: | |
raise requests.HTTPError(f"Failed to retrieve data from {url}.") from e | |
soup = BeautifulSoup(response.content, "html.parser") | |
connection_head = soup.find(class_="connection-head") | |
if not connection_head: | |
raise ValueError("No elements found with the class 'connection-head'.") | |
strong_tag = connection_head.find("strong") | |
if not strong_tag: | |
raise ValueError( | |
"No <strong> tag found within the first 'connection-head' element." | |
) | |
time_str_response = strong_tag.get_text(strip=True) | |
total_minutes = parse_time_to_minutes(time_str_response) | |
return total_minutes | |
def get_total_minutes_with_retries( | |
from_stop: str, | |
to_stop: str, | |
dt: datetime, | |
max_retries: int = 3, | |
retry_delay: int = 2, | |
) -> int: | |
""" | |
Calculate the total travel time in minutes between two stops with retry functionality. | |
Parameters: | |
from_stop (str): The name of the starting stop. | |
to_stop (str): The name of the destination stop. | |
dt (datetime): The date and time for which the travel time is being calculated. | |
max_retries (int, optional): Maximum number of retry attempts if an error occurs. Default is 3. | |
retry_delay (int, optional): Delay in seconds between retry attempts. Default is 2 seconds. | |
Returns: | |
int: The total travel time in minutes if successful, or `None` if all attempts fail. | |
""" | |
attempt = 0 | |
while attempt < max_retries: | |
try: | |
total_minutes = get_total_minutes(from_stop, to_stop, dt) | |
return total_minutes | |
except Exception as e: | |
attempt += 1 | |
if attempt < max_retries: | |
print( | |
f"Error processing pair ({from_stop}, {to_stop}): {e}. Retrying in {retry_delay} seconds... (Attempt {attempt}/{max_retries})" | |
) | |
time.sleep(retry_delay) | |
else: | |
print( | |
f"Failed to process pair ({from_stop}, {to_stop}) after {max_retries} attempts." | |
) | |
return None | |
return None | |
def get_actual_time_optimal_stop( | |
method: str, | |
selected_stops: List[str], | |
target_stops: List[str], | |
event_datetime: datetime, | |
show_top: int = 20, | |
) -> pl.DataFrame: | |
"""Calculate optimal stop times for a list of target stops. | |
Args: | |
method (str): The method for optimization. Can be 'minimize-worst-case' or 'minimize-total'. | |
selected_stops (List[str]): A list of selected stops to calculate travel times from. | |
target_stops (List[str]): A list of target stops to calculate travel times to. | |
event_datetime (datetime.datetime): The date and time of the event for which travel times are calculated. | |
show_top (int, optional): The number of top optimal stops to display, defaults to 20. | |
Returns: | |
polars.DataFrame: A DataFrame containing the calculated stop times, sorted according to the selected method. | |
Raises: | |
Exception: If there's an error processing any stop pair, it's logged, and the function continues. | |
""" | |
def process_target_stop(args): | |
target_stop, selected_stops, event_datetime = args | |
row = {"target_stop": target_stop} | |
for si, from_stop in enumerate(selected_stops): | |
try: | |
total_minutes = get_total_minutes_with_retries( | |
from_stop, target_stop, event_datetime | |
) | |
row[f"total_minutes_{si}"] = total_minutes | |
except Exception as e: | |
print(f"Error processing pair ({from_stop}, {target_stop}): {e}") | |
traceback.print_exc() | |
row[f"total_minutes_{si}"] = None | |
return row | |
rows = [] | |
arguments = [ | |
(target_stop, selected_stops, event_datetime) for target_stop in target_stops | |
] | |
with ThreadPoolExecutor(max_workers=5) as executor: | |
futures = { | |
executor.submit(process_target_stop, arg): arg[0] for arg in arguments | |
} | |
for future in tqdm(as_completed(futures), total=len(arguments)): | |
try: | |
result = future.result() | |
rows.append(result) | |
except Exception as e: | |
print(f"An error occurred with target_stop={futures[future]}: {e}") | |
df_times = pl.DataFrame(rows).with_columns( | |
pl.max_horizontal( | |
*[f"total_minutes_{si}" for si in range(len(selected_stops))] | |
).alias("worst_case_minutes"), | |
pl.sum_horizontal( | |
*[f"total_minutes_{si}" for si in range(len(selected_stops))] | |
).alias("total_minutes"), | |
) | |
if method == "minimize-worst-case": | |
df_times = df_times.sort("worst_case_minutes") | |
elif method == "minimize-total": | |
df_times = df_times.sort("total_minutes") | |
df_times = df_times.rename( | |
{"worst_case_minutes": "Worst Case Minutes", "total_minutes": "Total Minutes"} | |
) | |
for si in range(len(selected_stops)): | |
df_times = df_times.rename({f"total_minutes_{si}": f"t{si+1} mins"}) | |
df_times = df_times.drop_nulls() | |
return df_times.head(show_top) | |
def cerate_app(): | |
with gr.Blocks() as app: | |
gr.Markdown("## Optimal Public Transport Stop Finder in Prague") | |
gr.Markdown( | |
""" | |
Consider you are in Prague and you want to meet with your friends. What is the optimal stop to meet? Now you can find that with this app! | |
Time table data are being scraped from IDOS API, IDOS uses PID timetable data.""" | |
) | |
number_of_stops = gr.Slider( | |
minimum=2, maximum=12, step=1, value=3, label="Number of People" | |
) | |
method = gr.Radio( | |
choices=["Minimize worst case for each", "Minimize total time"], | |
value="Minimize worst case for each", | |
label="Optimization Method", | |
) | |
next_dt = get_next_meetup_time(4, 20) # Friday 20:00 | |
next_date = next_dt.strftime("%d/%m/%Y") | |
next_time = next_dt.strftime("%H:%M") | |
date_input = gr.Textbox( | |
label="Date (DD/MM/YYYY)", placeholder=f"e.g., {next_date}", value=next_date | |
) | |
time_input = gr.Textbox( | |
label="Time (HH:MM)", placeholder=f"e.g., {next_time}", value=next_time | |
) | |
dropdowns = [] | |
for i in range(12): | |
dd = gr.Dropdown( | |
choices=ALL_STOPS, label=f"Choose Starting Stop #{i+1}", visible=False | |
) | |
dropdowns.append(dd) | |
def update_dropdowns(n): | |
updates = [] | |
for i in range(12): | |
if i < n: | |
updates.append(gr.update(visible=True)) | |
else: | |
updates.append(gr.update(visible=False)) | |
return updates | |
number_of_stops.change( | |
fn=update_dropdowns, inputs=number_of_stops, outputs=dropdowns | |
) | |
search_button = gr.Button("Search") | |
def search_optimal_stop( | |
num_stops, chosen_method, date_str, time_str, *all_stops | |
): | |
is_valid, error_message = validate_date_time(date_str, time_str) | |
if not is_valid: | |
raise gr.Error(error_message) | |
selected_stops = [stop for stop in all_stops[:num_stops] if stop] | |
print("Number of stops:", num_stops) | |
print("Method selected:", chosen_method) | |
print("Selected stops:", selected_stops) | |
print("Selected date:", date_str) | |
print("Selected time:", time_str) | |
if chosen_method == "Minimize worst case for each": | |
method = "minimize-worst-case" | |
else: | |
method = "minimize-total" | |
try: | |
event_datetime = datetime.strptime( | |
f"{date_str} {time_str}", "%d/%m/%Y %H:%M" | |
) | |
print("Event DateTime:", event_datetime) | |
except ValueError as e: | |
raise gr.Error(f"Error parsing date and time: {e}") | |
target_stops = get_optimal_stop(method, selected_stops, show_top_geo=10, show_top_time=SHOW_TOP+10) | |
print(target_stops) | |
df_times = get_actual_time_optimal_stop( | |
method, selected_stops, target_stops, event_datetime, show_top=SHOW_TOP | |
) | |
df_times = df_times.with_row_index("#", offset=1) | |
return df_times | |
results_table = gr.Dataframe( | |
headers=["Target Stop", "Worst Case Minutes", "Total Minutes"], | |
datatype=["str", "number", "str"], | |
label="Optimal Stops", | |
) | |
search_button.click( | |
fn=search_optimal_stop, | |
inputs=[number_of_stops, method, date_input, time_input] + dropdowns, | |
outputs=results_table, | |
) | |
app.load( | |
lambda: [gr.update(visible=True) for _ in range(3)] | |
+ [gr.update(visible=False) for _ in range(9)], | |
inputs=[], | |
outputs=dropdowns, | |
) | |
gr.Markdown("---") | |
gr.Markdown( | |
""" | |
Created by [Daniel Herman](https://www.hermandaniel.com), check out the code [detrin/pub-finder](https://github.com/detrin/pub-finder). | |
""" | |
) | |
return app | |
# print("Loading time table file ...") | |
# prague_stops = pl.read_csv("Prague_stops_geo.csv") | |
# print("Calculating distances between stops ...") | |
# stops_geo_dist = ( | |
# prague_stops.join(prague_stops, how="cross") | |
# .with_columns( | |
# pl.struct(["lat", "lon", "lat_right", "lon_right"]) | |
# .map_elements( | |
# lambda x: geopy.distance.geodesic( | |
# (x["lat"], x["lon"]), (x["lat_right"], x["lon_right"]) | |
# ).km, | |
# return_dtype=pl.Float64, | |
# ) | |
# .alias("distance_in_km") | |
# ) | |
# .rename({"name": "from", "name_right": "to"}) | |
# .select(["from", "to", "distance_in_km"]) | |
# ) | |
stops_geo_dist = pl.read_parquet("Prague_stops_combinations.parquet") | |
print(stops_geo_dist) | |
DISTANCE_TABLE = stops_geo_dist | |
from_stops = DISTANCE_TABLE["from"].unique().sort().to_list() | |
to_stops = DISTANCE_TABLE["to"].unique().sort().to_list() | |
ALL_STOPS = sorted(list(set(from_stops) & set(to_stops))) | |
SHOW_TOP = 15 | |
if __name__ == "__main__": | |
app = cerate_app() | |
print("Starting app ...") | |
app.launch() | |