Spaces:
Sleeping
Sleeping
import gradio as gr | |
import pandas as pd | |
import os | |
import sqlite3 | |
from filter import filter_data, sdg_column_mapping | |
from tar import sdg_indicators_mapping, sdg_input_mapping ,sdg_formulas_mapping, sdg_progress_mapping | |
from tqdm import tqdm | |
import time | |
import uuid | |
from datetime import datetime | |
# Load your dataset | |
cwd = os.getcwd() | |
csv_file_path = os.path.join(cwd,'Sustainable_Development_Report_2023_(with_indicators)__-2086263501583264136.csv') | |
data = pd.read_csv(csv_file_path) | |
# Function to create a SQLite database and table if it doesn't exist | |
def sign_in(username, password): | |
try: | |
conn = sqlite3.connect('data_storage.db') | |
c = conn.cursor() | |
# Check if the user_sessions table exists | |
c.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='user_sessions'") | |
if c.fetchone() is None: | |
# If the table doesn't exist, create it | |
c.execute(""" | |
CREATE TABLE user_sessions ( | |
user_id TEXT PRIMARY KEY, | |
timestamp TEXT | |
) | |
""") | |
c.execute("SELECT * FROM user_info WHERE username=? AND password=?", (username, password)) | |
user = c.fetchone() | |
conn.close() | |
if user: | |
user_id = str(uuid.uuid4()) | |
timestamp = datetime.now() | |
conn = sqlite3.connect('data_storage.db') | |
c = conn.cursor() | |
c.execute("INSERT INTO user_sessions (user_id, timestamp) VALUES (?, ?)", (user_id, timestamp)) | |
conn.commit() | |
conn.close() | |
return "Welcome, " + username | |
else: | |
return "Invalid username or password" | |
except Exception as e: | |
return str(e) | |
def sign_up(username, password, email): | |
try: | |
conn = sqlite3.connect('data_storage.db') | |
c = conn.cursor() | |
# Check if the user_info table exists | |
c.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='user_info'") | |
if c.fetchone() is None: | |
# If the table doesn't exist, create it | |
c.execute(""" | |
CREATE TABLE user_info ( | |
username TEXT PRIMARY KEY, | |
password TEXT, | |
email TEXT | |
) | |
""") | |
c.execute("INSERT INTO user_info (username, password, email) VALUES (?, ?, ?)", (username, password, email)) | |
conn.commit() | |
conn.close() | |
return "Sign up successful" | |
except Exception as e: | |
return str(e) | |
def save_inputs(indicator, names, inputs,user_id, timestamp): | |
# Connect to the SQLite database | |
conn = sqlite3.connect('data_storage.db') | |
c = conn.cursor() | |
timestamp = time.time() | |
# Create table if it doesn't exist | |
c.execute('''CREATE TABLE IF NOT EXISTS inputs | |
(indicator text, names text, inputs text, user_id, timestamp)''') | |
# Insert a row of data | |
c.execute("INSERT INTO inputs VALUES (?, ?, ?, ?,?)", (indicator, str(names), str(inputs), user_id, timestamp)) | |
# Save (commit) the changes | |
conn.commit() | |
# Close the connection | |
conn.close() | |
def create_database(): | |
conn = sqlite3.connect("data_storage.db") | |
cursor = conn.cursor() | |
cursor.execute(""" | |
CREATE TABLE IF NOT EXISTS user_data ( | |
user_id TEXT PRIMARY KEY, | |
username TEXT, | |
country_name TEXT, | |
sdg_number INTEGER, | |
result TEXT, | |
indicator TEXT, | |
names TEXT, | |
input_first REAL, | |
input_2nd REAL, | |
input_3rd REAL, | |
input_4th REAL, | |
timestamp REAL | |
) | |
""") | |
conn.commit() | |
conn.close() | |
def filter_and_store_data(username, country_name, sdg_number, result, indicator, names, input_first, input_2nd, input_3rd, input_4th, user_id, timestamp): | |
try: | |
# Convert the selected SDG number to an integer | |
sdg_number = int(sdg_number) | |
# Get the list of columns based on the selected SDG number | |
selected_columns = ['Name'] + sdg_column_mapping.get(sdg_number, []) | |
# Filter data based on country and selected columns | |
selected_data = data[(data['Name'] == country_name)][selected_columns] | |
# Convert the selected data to an HTML table with more rows | |
html_table = selected_data.to_html(index=False) | |
# Wrap the HTML table in a div with a fixed height and scrollable overflow | |
scrollable_html_table = f'<div style="height: 300px; overflow-y: scroll;">{html_table}</div>' | |
# Get the current timestamp | |
timestamp = time.time() | |
timestamp = datetime.now() | |
# Store the data in the SQLite database | |
# Store the data in the SQLite database | |
conn = sqlite3.connect("data_storage.db") | |
cursor = conn.cursor() | |
cursor.execute("INSERT INTO user_data (username, country_name, sdg_number, result, indicator, names, input_first, input_2nd, input_3rd, input_4th, user_id, timestamp) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", | |
(username, country_name, sdg_number, str(scrollable_html_table), str(indicator), str(names), input_first, input_2nd, input_3rd, input_4th, user_id, timestamp)) | |
conn.commit() | |
conn.close() | |
return scrollable_html_table | |
except ValueError: | |
return "Please select a valid SDG number.", [] | |
except Exception as e: | |
return f"An error occurred: {e}", [] | |
def gradio_app(username, country_name, sdg_number, result, indicator, names, input_first, input_2nd, input_3rd, input_4th, user_id, timestamp): | |
# Call the filter_and_store_data function | |
return filter_and_store_data(username, country_name, sdg_number, result, indicator, names, input_first, input_2nd, input_3rd, input_4th, user_id, timestamp) | |
def gradios(username, country_name, sdg_number, indicator, names, input_first, input_2nd, input_3rd, input_4th): | |
# Get the formula for the selected indicator | |
formula = sdg_formulas_mapping[indicator] | |
# Calculate the result using the formula | |
result = formula(input_first, input_2nd, input_3rd, input_4th) | |
# Store the result in the database | |
conn = sqlite3.connect("data_storage.db") | |
cursor = conn.cursor() | |
cursor.execute(""" | |
INSERT INTO user_data (username, country_name, sdg_number, result, indicator, names, input_first, input_2nd, input_3rd, input_4th, timestamp) | |
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
""", (username, country_name, sdg_number, result, indicator, names, input_first, input_2nd, input_3rd, input_4th)) | |
conn.commit() | |
conn.close() | |
return f"Result: {result}" | |
def user_latest_data(): | |
conn = sqlite3.connect("data_storage.db") | |
df = pd.read_sql_query("SELECT * FROM user_data ORDER BY username DESC LIMIT 1", conn) | |
conn.close() | |
# Convert numeric values to strings | |
return str(df['username'].values[0]) | |
def user_late_data(): | |
conn = sqlite3.connect("data_storage.db") | |
df = pd.read_sql_query("SELECT * FROM user_data ORDER BY username DESC LIMIT 1", conn) | |
conn.close() | |
# Check if the DataFrame is not empty and contains the 'username' column | |
if not df.empty and 'username' in df.columns and not df['username'].empty: | |
return str(df['username'].values[0]) | |
else: | |
return "No username available" | |
def get_nam(indicator): | |
if indicator in sdg_input_mapping: | |
names = [item['name'] for item in sdg_input_mapping[indicator]] | |
return names | |
def update_nam_dropdown(indicator): | |
return get_nam(indicator) | |
def add_indicator(sdg_number): | |
max_inputs = 5 | |
def variable_output2(k): | |
k = int(k) | |
return [gr.Number(visible=True)]*k + [gr.Number(visible=False)]*(max_inputs-k) | |
with gr.Row(variant='panel') as row: | |
indicator_dropdown2 = gr.Dropdown(choices=list(sdg_input_mapping.keys()), interactive=True, type="value", label="Please choose an Indicator") | |
names_dropdown = gr.Dropdown(choices=[update_nam_dropdown(indicator_dropdown2.choices[0])], allow_custom_value=True,multiselect=True, interactive=True, type="value", label="The indicators Inputs needed respectively") | |
indicator_dropdown2.change(fn=update_nam_dropdown, inputs=indicator_dropdown2, outputs=names_dropdown) | |
with gr.Column(variant='panel'): | |
u = gr.Slider(1, max_inputs, value=max_inputs, step=1, label="How many inputs to fill:") | |
textboxes = [] | |
for i in range(max_inputs): | |
t = gr.Number(label=f"Please fill with the input number {i}", interactive=True) | |
textboxes.append(t) | |
u.change(variable_output2, u, textboxes) | |
save_button = gr.Button("Save") | |
#save_button.click(save_inputs, inputs=[indicator_dropdown2, names_dropdown, t], outputs=[]) | |
def save_inputs_wrapper(indicator, names, *textboxes): | |
return save_inputs(indicator, names, [str(t) if isinstance(t, float) else str(t.value) for t in textboxes]) | |
save_button.click(save_inputs_wrapper, inputs=[indicator_dropdown2, names_dropdown, *textboxes], outputs=[]) | |
return row | |
max_indicators = 15 | |
def variable_outputs(k): | |
k = int(k) | |
num_rows = sdg_progress_mapping.get(k, 0) | |
return [gr.Textbox(visible=True)]*num_rows + [gr.Textbox(visible=False)]*(max_indicators-num_rows) | |
def get_new_ind(sdg_number): | |
if sdg_number in sdg_indicators_mapping: | |
ind = sdg_indicators_mapping.get(sdg_number, []) | |
return list(ind) | |
def update_indicator_dropdown(sdg_number): | |
return get_new_ind(sdg_number) | |
def fetch_all_data(): | |
conn = sqlite3.connect("data_storage.db") | |
# Fetch user data | |
user_df = pd.read_sql_query("SELECT * FROM user_info", conn) | |
latest_user_row = user_df.iloc[[-1]] | |
user_data = str(latest_user_row['username'].values[0]) | |
# Fetch user data | |
user_data_df = pd.read_sql_query("SELECT * FROM user_data", conn) | |
latest_user_data_row = user_data_df.iloc[[-1]] | |
user_data_values = ( | |
str(latest_user_data_row['country_name'].values[0]), | |
str(latest_user_data_row['sdg_number'].values[0]), | |
str(latest_user_data_row['result'].values[0]) | |
) | |
# Fetch input data | |
input_df = pd.read_sql_query("SELECT * FROM inputs", conn) | |
latest_input_row = input_df.iloc[[-1]] | |
input_data = ( | |
str(latest_input_row['indicator'].values[0]), | |
str(latest_input_row['names'].values[0]), | |
str(latest_input_row['inputs'].values[0]) | |
) | |
conn.close() | |
return [user_data] + list(user_data_values) + list(input_data) | |
def calculate_indicator(user_id, timestamp): | |
try: | |
conn = sqlite3.connect("data_storage.db") | |
# Fetch user data based on user_id and timestamp | |
user_data_df = pd.read_sql_query("SELECT * FROM user_data WHERE user_id=? AND timestamp=?", conn, params=(user_id, timestamp)) | |
if user_data_df.empty: | |
return "No data found for the specified user_id and timestamp" | |
# Extract relevant information | |
indicator = user_data_df['indicator'].values[0] | |
names = user_data_df['names'].values[0] | |
input_first = user_data_df['input_first'].values[0] | |
# Perform the calculation | |
if indicator in sdg_formulas_mapping: | |
formula = sdg_formulas_mapping[indicator] | |
result = formula(input_first) | |
return f"Result for {indicator}: {result}" | |
else: | |
return f"Indicator {indicator} not found in the dictionary" | |
except Exception as e: | |
return f"An error occurred: {e}" |