|
import streamlit as st |
|
import pandas as pd |
|
import plotly.express as px |
|
import plotly.graph_objects as go |
|
import requests |
|
import json |
|
from datetime import datetime |
|
|
|
def get_series_name_and_unit(series, dataset_description): |
|
""" |
|
Extract the name and unit from a time series using its dataset description. |
|
|
|
Args: |
|
series: Dictionary containing series data |
|
dataset_description: Dictionary containing dataset field descriptions |
|
|
|
Returns: |
|
tuple: (name, unit) of the series |
|
""" |
|
field_id = series['datacellar:datasetFieldID'] |
|
field = next((f for f in dataset_description['datacellar:datasetFields'] |
|
if f['datacellar:datasetFieldID'] == field_id), None) |
|
|
|
name = field['datacellar:fieldName'] if field else f'Series {field_id}' |
|
unit = field['datacellar:type']['datacellar:unitText'] if field else 'Unknown' |
|
|
|
|
|
if 'datacellar:timeSeriesMetadata' in series: |
|
metadata = series['datacellar:timeSeriesMetadata'] |
|
if 'datacellar:loadType' in metadata: |
|
name = metadata['datacellar:loadType'] |
|
|
|
return name, unit |
|
|
|
def process_series(series, dataset_description, is_input=False): |
|
""" |
|
Process a single time series into a pandas DataFrame. |
|
|
|
Args: |
|
series: Dictionary containing series data |
|
dataset_description: Dictionary containing dataset field descriptions |
|
is_input: Boolean indicating if this is input data |
|
|
|
Returns: |
|
tuple: (DataFrame, unit, name) of the processed series |
|
""" |
|
name, unit = get_series_name_and_unit(series, dataset_description) |
|
df = pd.DataFrame(series['datacellar:dataPoints']) |
|
|
|
|
|
df['datacellar:timeStamp'] = pd.to_datetime(df['datacellar:timeStamp']) |
|
df['datacellar:value'] = pd.to_numeric(df['datacellar:value'], errors='coerce') |
|
|
|
|
|
df['series_id'] = f'{name} (Input)' if is_input else name |
|
|
|
return df, unit, name |
|
|
|
def load_and_process_data(json_data, input_data=None): |
|
""" |
|
Load and process time series from the JSON data, filtering out empty series. |
|
""" |
|
series_by_unit = {} |
|
try: |
|
dataset_description = json_data['datacellar:datasetSelfDescription'] |
|
except: |
|
dataset_description = { |
|
"@type": "datacellar:DatasetField", |
|
"datacellar:datasetFieldID": 0, |
|
"datacellar:fieldName": "anomaly", |
|
"datacellar:description": "Anomalies", |
|
"datacellar:type": { |
|
"@type": "datacellar:boolean", |
|
"datacellar:unitText": "-" |
|
} |
|
} |
|
|
|
|
|
try: |
|
for series in json_data['datacellar:timeSeriesList']: |
|
|
|
if series.get('datacellar:dataPoints'): |
|
df, unit, _ = process_series(series, dataset_description) |
|
|
|
if not df.empty and df['datacellar:value'].notna().any(): |
|
if unit not in series_by_unit: |
|
series_by_unit[unit] = [] |
|
series_by_unit[unit].append(df) |
|
except Exception as e: |
|
st.error(f"Error processing series: {str(e)}") |
|
|
|
|
|
if input_data: |
|
input_description = input_data['datacellar:datasetSelfDescription'] |
|
for series in input_data['datacellar:timeSeriesList']: |
|
if series.get('datacellar:dataPoints'): |
|
df, unit, _ = process_series(series, input_description, is_input=True) |
|
if not df.empty and df['datacellar:value'].notna().any(): |
|
if unit not in series_by_unit: |
|
series_by_unit[unit] = [] |
|
series_by_unit[unit].append(df) |
|
|
|
|
|
result = {} |
|
for unit, dfs in series_by_unit.items(): |
|
if dfs: |
|
combined_df = pd.concat(dfs) |
|
if not combined_df.empty and combined_df['datacellar:value'].notna().any(): |
|
result[unit] = combined_df |
|
|
|
return result |
|
|
|
def create_time_series_plot(df, unit, service_type=None,fig=None): |
|
""" |
|
Create visualization for time series data, handling empty series appropriately. |
|
""" |
|
if service_type == "Anomaly Detection": |
|
|
|
if not fig: |
|
fig = go.Figure() |
|
|
|
|
|
input_data = df[df['series_id'].str.contains('Input')] |
|
input_data = input_data[input_data['datacellar:value'].notna()] |
|
|
|
if not input_data.empty: |
|
fig.add_trace(go.Scatter( |
|
x=input_data['datacellar:timeStamp'], |
|
y=input_data['datacellar:value'], |
|
mode='lines', |
|
name='Energy Consumption', |
|
line=dict(color='blue') |
|
)) |
|
|
|
|
|
anomalies = df[(~df['series_id'].str.contains('Output')) & |
|
(df['datacellar:value'] == True) & |
|
(df['datacellar:value'].notna())] |
|
if not anomalies.empty: |
|
anomaly_values = [] |
|
for timestamp in anomalies['datacellar:timeStamp']: |
|
value = input_data.loc[input_data['datacellar:timeStamp'] == timestamp, 'datacellar:value'] |
|
anomaly_values.append(value.iloc[0] if not value.empty else None) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
fig.update_layout( |
|
title=f'Time Series Data with Anomalies ({unit})', |
|
xaxis_title="Time", |
|
yaxis_title=f"Value ({unit})", |
|
hovermode='x unified', |
|
legend_title="Series" |
|
) |
|
return fig |
|
else: |
|
|
|
valid_series = [] |
|
for series_id in df['series_id'].unique(): |
|
series_data = df[df['series_id'] == series_id] |
|
if not series_data.empty and series_data['datacellar:value'].notna().any(): |
|
valid_series.append(series_id) |
|
|
|
|
|
if valid_series: |
|
filtered_df = df[df['series_id'].isin(valid_series)] |
|
return px.line( |
|
filtered_df, |
|
x='datacellar:timeStamp', |
|
y='datacellar:value', |
|
color='series_id', |
|
title=f'Time Series Data ({unit})' |
|
).update_layout( |
|
xaxis_title="Time", |
|
yaxis_title=f"Value ({unit})", |
|
hovermode='x unified', |
|
legend_title="Series" |
|
) |
|
else: |
|
|
|
return None |
|
|
|
def display_statistics(dfs_by_unit): |
|
""" |
|
Display statistics only for non-empty series. |
|
""" |
|
for unit, df in dfs_by_unit.items(): |
|
st.write(f"## Measurements in {unit}") |
|
for series_id in df['series_id'].unique(): |
|
series_data = df[df['series_id'] == series_id] |
|
|
|
if not series_data.empty and series_data['datacellar:value'].notna().any(): |
|
st.write(f"### {series_id}") |
|
|
|
cols = st.columns(4) |
|
metrics = [ |
|
("Average", series_data['datacellar:value'].mean()), |
|
("Max", series_data['datacellar:value'].max()), |
|
("Min", series_data['datacellar:value'].min()), |
|
("Total", series_data['datacellar:value'].sum() * 6/3600) |
|
] |
|
|
|
for col, (label, value) in zip(cols, metrics): |
|
with col: |
|
unit_suffix = "h" if label == "Total" else "" |
|
st.metric(label, f"{value:.2f} {unit}{unit_suffix}") |
|
|
|
def call_api(file_content, token, service_endpoint): |
|
""" |
|
Call the analysis API with the provided data. |
|
|
|
Args: |
|
file_content: Binary content of the JSON file |
|
token: API authentication token |
|
service_endpoint: String indicating which API endpoint to call |
|
|
|
Returns: |
|
dict: JSON response from the API or None if the call fails |
|
""" |
|
try: |
|
url = f'https://loki.linksfoundation.com/datacellar/{service_endpoint}' |
|
response = requests.post( |
|
url, |
|
headers={'Authorization': f'Bearer {token}'}, |
|
files={'input_file': ('data.json', file_content, 'application/json')} |
|
) |
|
|
|
if response.status_code == 401: |
|
st.error("Authentication failed. Please check your API token.") |
|
return None |
|
|
|
return response.json() |
|
except Exception as e: |
|
st.error(f"API Error: {str(e)}") |
|
return None |
|
|
|
def get_dataset_type(json_data): |
|
""" |
|
Determine the type of dataset from its description. |
|
|
|
Args: |
|
json_data: Dictionary containing the JSON data |
|
|
|
Returns: |
|
str: "production", "consumption", or "other" |
|
""" |
|
desc = json_data.get('datacellar:description', '').lower() |
|
if 'production' in desc: |
|
return "production" |
|
elif 'consumption' in desc: |
|
return "consumption" |
|
return "other" |
|
|
|
def get_forecast_horizon(json_data): |
|
""" |
|
Determine the forecast horizon from dataset description. |
|
|
|
Args: |
|
json_data: Dictionary containing the JSON data |
|
|
|
Returns: |
|
str: "long", "short", or None |
|
""" |
|
desc = json_data.get('datacellar:description', '').lower() |
|
if 'long term' in desc: |
|
return "long" |
|
elif 'short term' in desc: |
|
return "short" |
|
return None |