Spaces:
Runtime error
Runtime error
| import webbrowser | |
| import dash as dcc | |
| import dash as html | |
| import numpy as np | |
| import plotly.graph_objects as go | |
| import plotly.express as px | |
| import networkx as nx | |
| import datetime | |
| class Visualizer: | |
| def wnd_visualization(figure_handle, dataframe, current_time, start_time = 0, end_time = 10^6): | |
| # Diagonal line y=x | |
| figure_handle.add_trace(go.Scatter(x=[start_time, end_time], y=[start_time, end_time], mode='lines', line=dict(color='black', dash='dash'))) | |
| # Dotted lines from the point to axes | |
| figure_handle.add_trace(go.Scatter(x=[current_time, current_time, start_time], y=[start_time, current_time, current_time], mode='lines', line=dict(color='black', dash='dot'))) | |
| for time, name in dataframe[['time', 'actionName']].values: | |
| figure_handle.add_trace(go.Scatter(x=[time], y=[time], mode='markers', marker=dict(color='black', size=10))) | |
| timestamps_and_ids_dict = {} | |
| for index, row in dataframe.iterrows(): | |
| array_data = row['attributes'] | |
| if isinstance(array_data, str): | |
| timestamp_id_triples = [] | |
| for pair in array_data.split(';'): | |
| triple = pair.split('|') | |
| triple.append("") | |
| timestamp_id_triples.append(triple) | |
| for triple in timestamp_id_triples: | |
| if '(' in triple[1]: | |
| triple[1], triple[2] = triple[1].split('(') | |
| else: | |
| timestamp_id_triples = [] | |
| timestamp_id_triples.sort(key=lambda x: float(x[0][0])) | |
| timestamp_id_dict = {float(triple[0]): str(triple[1]) for triple in timestamp_id_triples} | |
| timestamps_and_ids_dict[row['time']] = timestamp_id_dict | |
| timestamps_before_simulation_time = {timestamp: data for timestamp, data in timestamps_and_ids_dict.items() if timestamp <= current_time} | |
| print(timestamps_before_simulation_time) | |
| mental_model_over_time = {} | |
| if timestamps_before_simulation_time: | |
| for mm_time, array_timestamps_and_ids in timestamps_before_simulation_time.items(): | |
| for timestamp, value in array_timestamps_and_ids.items(): | |
| if value in mental_model_over_time: | |
| # Value is already in the dictionary, append the timestamp to the existing list | |
| mental_model_over_time[value].append((mm_time, timestamp)) | |
| else: | |
| # Value is not in the dictionary, create a new list with the timestamp | |
| mental_model_over_time[value] = [(mm_time, timestamp)] | |
| # Get last update on mental model | |
| previous_timestamp, array_timestamps_and_ids = max(timestamps_before_simulation_time.items(), key=lambda x: x[0]) | |
| # Assuming mental model does not change in between, load this as current mental model into dictionary | |
| for timestamp, value in array_timestamps_and_ids.items(): | |
| if value in mental_model_over_time: | |
| # Value is already in the dictionary, append the timestamp to the existing list | |
| mental_model_over_time[value].append((current_time, timestamp)) | |
| else: | |
| # Value is not in the dictionary, create a new list with the timestamp | |
| mental_model_over_time[value] = [(current_time, timestamp)] | |
| # Add text box with name of event | |
| figure_handle.add_annotation(x=current_time, y=timestamp, text=value, showarrow=True, arrowhead=1, arrowcolor='black', arrowwidth=2) | |
| for label, coordinates in mental_model_over_time.items(): | |
| for i in range(len(coordinates) - 1): | |
| x = [coordinates[i][0], coordinates[i + 1][0], coordinates[i + 1][0]] | |
| y = [coordinates[i][1], coordinates[i][1], coordinates[i + 1][1]] | |
| if all(xi <= yi for xi, yi in zip(x, y)): | |
| figure_handle.add_trace(go.Scatter(x=x, y=y, mode='lines', line=dict(color='blue'), marker_symbol='triangle-up')) | |
| if all(xi >= yi for xi, yi in zip(x, y)): | |
| figure_handle.add_trace(go.Scatter(x=x, y=y, mode='lines', line=dict(color='gray'), marker_symbol='circle')) | |
| figure_handle.update_layout( | |
| xaxis_title='Real Time', | |
| yaxis_title='Comprehension of Timeline of Events', | |
| showlegend=False, | |
| xaxis=dict( | |
| range=[start_time,end_time] | |
| ), | |
| yaxis=dict( | |
| range=[start_time,end_time], | |
| scaleanchor="x", | |
| scaleratio=1 | |
| ) | |
| ) | |
| return figure_handle | |
| def plot_trajectory(figure_handle, parsed_data, uptime=float('inf'), background_image=None): | |
| fig = figure_handle | |
| if background_image: | |
| fig.add_layout_image( | |
| source=background_image, | |
| xref="x", | |
| yref="y", | |
| x=-96.88, | |
| y=33.16, | |
| sizex=0.23, | |
| sizey=0.44, | |
| sizing="fill", | |
| opacity=0.5, | |
| layer="below" | |
| ) | |
| else: | |
| fig.update_layout( | |
| mapbox_style="open-street-map", | |
| mapbox_center_lon=-96.765, | |
| mapbox_center_lat=32.95, | |
| mapbox_zoom=10 | |
| ) | |
| # Create a graph for the flight plans | |
| G = nx.Graph() | |
| # Define waypoints and their positions with new names from the embedded code | |
| waypoints = { | |
| "NTI": (-96.7535, 32.928), "NTHW": (-96.749, 32.8573), "RUBL": (-96.7588, 32.78), | |
| "HW342": (-96.8003, 32.7508), "TLWY": (-96.807, 32.769), "4DT": (-96.8508, 32.846), | |
| "T57": (-96.686, 32.888), "FSC": (-96.8213, 33.1413), "PLN": (-96.7275, 33.0297) | |
| } | |
| # Add edges between waypoints to the graph with updated names | |
| edges = [("NTI", "NTHW"), ("NTHW", "RUBL"), ("RUBL", "HW342"), ("HW342", "TLWY"), | |
| ("TLWY", "4DT"), ("T57", "NTI"), ("FSC", "PLN"),("PLN","NTI")] | |
| G.add_edges_from(edges) | |
| # Draw the graph with smaller waypoint symbols in gray and keep the node labels | |
| for edge in G.edges(): | |
| x_values = [waypoints[edge[0]][0], waypoints[edge[1]][0]] | |
| y_values = [waypoints[edge[0]][1], waypoints[edge[1]][1]] | |
| fig.add_trace(go.Scattermapbox(lon=x_values, lat=y_values, mode='lines', line=dict(color='gray'), showlegend=False))#, dash='dash' | |
| for node in G.nodes(): | |
| x_value = waypoints[node][0] | |
| y_value = waypoints[node][1] | |
| fig.add_trace(go.Scattermapbox(lon=[x_value], lat=[y_value], mode='markers', marker=dict(size=5, color='gray'), showlegend=False))#, symbol='pentagon' | |
| # for node, (x_value, y_value) in waypoints.items(): | |
| # fig.add_annotation(lon=x_value, lat=y_value, text=node, showarrow=False, font=dict(size=6, color='gray'), xanchor='left', yanchor='bottom') | |
| # Add legend label for waypoints and corridors | |
| legend_labels = [] | |
| legend_labels.append("Corridors") | |
| legend_labels.append("Waypoints") | |
| colors = ['blue', 'red', 'green', 'purple', 'orange', 'yellow', 'pink', 'cyan', 'magenta', 'brown'] | |
| color_index = 0 | |
| for ac_name, data in parsed_data.items(): | |
| mask = (data['time'] >= 0) & (data['time'] <= uptime) | |
| filtered_xdata = data['longitude_deg'][mask] | |
| filtered_ydata = data['latitude_deg'][mask] | |
| filtered_heading = data['heading_deg'][mask] | |
| filtered_altitude = data['altitude_ft'][mask] # In case we ever want to do 3D visualizations | |
| fig.add_trace(go.Scattermapbox(#Scatter3d possibly for 3D but issue with Scattermapbox not being compatible | |
| mode="lines", | |
| lon=filtered_xdata, | |
| lat=filtered_ydata, | |
| line=dict(width=2, color=colors[color_index]), | |
| name=ac_name | |
| )) | |
| if len(filtered_xdata) > 0 and len(filtered_ydata) > 0: | |
| current_heading = filtered_heading.iloc[-1] | |
| fig.add_trace(go.Scattermapbox( | |
| mode="markers", | |
| lon=[filtered_xdata.iloc[-1]], | |
| lat=[filtered_ydata.iloc[-1]], | |
| marker=dict(size=10, color=colors[color_index]),#, symbol=['cross']), #To make custom markers, you need MapBox access token (to use any of Mapbox's tools, APIs, or SDK) | |
| name=ac_name + " Current Location" | |
| )) | |
| color_index = (color_index + 1) % len(colors) | |
| fig.update_layout( | |
| mapbox=dict( | |
| center=dict(lon=-96.765, lat=32.95), | |
| zoom=9 | |
| ), | |
| margin=dict(l=20, r=20, t=20, b=20) | |
| ) | |
| return fig | |
| def timeline(figure_handle, events, current_time = 0, start_date = 0, end_date = 0): | |
| category_order = sorted(list(set([e['agent'] for e in events]))) | |
| figure_handle = px.timeline( | |
| events, x_start="start", x_end="end", y="agent", | |
| hover_data=['attributes', 'duration'], | |
| color='name',# height=400, width=1600, | |
| category_orders={'agent': category_order} | |
| ) | |
| # Add a vertical dashed line at x = current_time | |
| current_datetime = start_date + datetime.timedelta(seconds=current_time) | |
| figure_handle.add_shape( | |
| type="line", | |
| x0=current_datetime, y0=0, | |
| x1=current_datetime, y1=1, | |
| yref="paper", | |
| line=dict( | |
| color="Black", | |
| width=2, | |
| dash="dash", | |
| ) | |
| ) | |
| figure_handle.update_layout( | |
| xaxis_range=[start_date, end_date]#, | |
| # width=600, | |
| # height=500 | |
| ) | |
| return figure_handle |