Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import pandas as pd | |
| import os | |
| import sqlite3 | |
| from filter import filter_data, sdg_column_mapping | |
| from tar import sdg_indicators_mapping, sdg_input_mapping ,sdg_formulas_mapping, sdg_progress_mapping | |
| from tqdm import tqdm | |
| import time | |
| import uuid | |
| from datetime import datetime | |
| # Load your dataset | |
| cwd = os.getcwd() | |
| csv_file_path = os.path.join(cwd,'Sustainable_Development_Report_2023_(with_indicators)__-2086263501583264136.csv') | |
| data = pd.read_csv(csv_file_path) | |
| # Function to create a SQLite database and table if it doesn't exist | |
| def sign_in(username, password): | |
| try: | |
| conn = sqlite3.connect('data_storage.db') | |
| c = conn.cursor() | |
| # Check if the user_sessions table exists | |
| c.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='user_sessions'") | |
| if c.fetchone() is None: | |
| # If the table doesn't exist, create it | |
| c.execute(""" | |
| CREATE TABLE user_sessions ( | |
| user_id TEXT PRIMARY KEY, | |
| timestamp TEXT | |
| ) | |
| """) | |
| c.execute("SELECT * FROM user_info WHERE username=? AND password=?", (username, password)) | |
| user = c.fetchone() | |
| conn.close() | |
| if user: | |
| user_id = str(uuid.uuid4()) | |
| timestamp = datetime.now() | |
| conn = sqlite3.connect('data_storage.db') | |
| c = conn.cursor() | |
| c.execute("INSERT INTO user_sessions (user_id, timestamp) VALUES (?, ?)", (user_id, timestamp)) | |
| conn.commit() | |
| conn.close() | |
| return "Welcome, " + username | |
| else: | |
| return "Invalid username or password" | |
| except Exception as e: | |
| return str(e) | |
| def sign_up(username, password, email): | |
| try: | |
| conn = sqlite3.connect('data_storage.db') | |
| c = conn.cursor() | |
| # Check if the user_info table exists | |
| c.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='user_info'") | |
| if c.fetchone() is None: | |
| # If the table doesn't exist, create it | |
| c.execute(""" | |
| CREATE TABLE user_info ( | |
| username TEXT PRIMARY KEY, | |
| password TEXT, | |
| email TEXT | |
| ) | |
| """) | |
| c.execute("INSERT INTO user_info (username, password, email) VALUES (?, ?, ?)", (username, password, email)) | |
| conn.commit() | |
| conn.close() | |
| return "Sign up successful" | |
| except Exception as e: | |
| return str(e) | |
| def save_inputs(indicator, names, inputs,user_id, timestamp): | |
| # Connect to the SQLite database | |
| conn = sqlite3.connect('data_storage.db') | |
| c = conn.cursor() | |
| timestamp = time.time() | |
| # Create table if it doesn't exist | |
| c.execute('''CREATE TABLE IF NOT EXISTS inputs | |
| (indicator text, names text, inputs text, user_id, timestamp)''') | |
| # Insert a row of data | |
| c.execute("INSERT INTO inputs VALUES (?, ?, ?, ?,?)", (indicator, str(names), str(inputs), user_id, timestamp)) | |
| # Save (commit) the changes | |
| conn.commit() | |
| # Close the connection | |
| conn.close() | |
| def create_database(): | |
| conn = sqlite3.connect("data_storage.db") | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS user_data ( | |
| user_id TEXT PRIMARY KEY, | |
| username TEXT, | |
| country_name TEXT, | |
| sdg_number INTEGER, | |
| result TEXT, | |
| indicator TEXT, | |
| names TEXT, | |
| input_first REAL, | |
| input_2nd REAL, | |
| input_3rd REAL, | |
| input_4th REAL, | |
| timestamp REAL | |
| ) | |
| """) | |
| conn.commit() | |
| conn.close() | |
| def filter_and_store_data(username, country_name, sdg_number, result, indicator, names, input_first, input_2nd, input_3rd, input_4th, user_id, timestamp): | |
| try: | |
| # Convert the selected SDG number to an integer | |
| sdg_number = int(sdg_number) | |
| # Get the list of columns based on the selected SDG number | |
| selected_columns = ['Name'] + sdg_column_mapping.get(sdg_number, []) | |
| # Filter data based on country and selected columns | |
| selected_data = data[(data['Name'] == country_name)][selected_columns] | |
| # Convert the selected data to an HTML table with more rows | |
| html_table = selected_data.to_html(index=False) | |
| # Wrap the HTML table in a div with a fixed height and scrollable overflow | |
| scrollable_html_table = f'<div style="height: 300px; overflow-y: scroll;">{html_table}</div>' | |
| # Get the current timestamp | |
| timestamp = time.time() | |
| timestamp = datetime.now() | |
| # Store the data in the SQLite database | |
| # Store the data in the SQLite database | |
| conn = sqlite3.connect("data_storage.db") | |
| cursor = conn.cursor() | |
| cursor.execute("INSERT INTO user_data (username, country_name, sdg_number, result, indicator, names, input_first, input_2nd, input_3rd, input_4th, user_id, timestamp) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", | |
| (username, country_name, sdg_number, str(scrollable_html_table), str(indicator), str(names), input_first, input_2nd, input_3rd, input_4th, user_id, timestamp)) | |
| conn.commit() | |
| conn.close() | |
| return scrollable_html_table | |
| except ValueError: | |
| return "Please select a valid SDG number.", [] | |
| except Exception as e: | |
| return f"An error occurred: {e}", [] | |
| def gradio_app(username, country_name, sdg_number, result, indicator, names, input_first, input_2nd, input_3rd, input_4th, user_id, timestamp): | |
| # Call the filter_and_store_data function | |
| return filter_and_store_data(username, country_name, sdg_number, result, indicator, names, input_first, input_2nd, input_3rd, input_4th, user_id, timestamp) | |
| def gradios(username, country_name, sdg_number, indicator, names, input_first, input_2nd, input_3rd, input_4th): | |
| # Get the formula for the selected indicator | |
| formula = sdg_formulas_mapping[indicator] | |
| # Calculate the result using the formula | |
| result = formula(input_first, input_2nd, input_3rd, input_4th) | |
| # Store the result in the database | |
| conn = sqlite3.connect("data_storage.db") | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| INSERT INTO user_data (username, country_name, sdg_number, result, indicator, names, input_first, input_2nd, input_3rd, input_4th, timestamp) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, (username, country_name, sdg_number, result, indicator, names, input_first, input_2nd, input_3rd, input_4th)) | |
| conn.commit() | |
| conn.close() | |
| return f"Result: {result}" | |
| def user_latest_data(): | |
| conn = sqlite3.connect("data_storage.db") | |
| df = pd.read_sql_query("SELECT * FROM user_data ORDER BY username DESC LIMIT 1", conn) | |
| conn.close() | |
| # Convert numeric values to strings | |
| return str(df['username'].values[0]) | |
| def user_late_data(): | |
| conn = sqlite3.connect("data_storage.db") | |
| df = pd.read_sql_query("SELECT * FROM user_data ORDER BY username DESC LIMIT 1", conn) | |
| conn.close() | |
| # Check if the DataFrame is not empty and contains the 'username' column | |
| if not df.empty and 'username' in df.columns and not df['username'].empty: | |
| return str(df['username'].values[0]) | |
| else: | |
| return "No username available" | |
| def get_nam(indicator): | |
| if indicator in sdg_input_mapping: | |
| names = [item['name'] for item in sdg_input_mapping[indicator]] | |
| return names | |
| def update_nam_dropdown(indicator): | |
| return get_nam(indicator) | |
| def add_indicator(sdg_number): | |
| max_inputs = 5 | |
| def variable_output2(k): | |
| k = int(k) | |
| return [gr.Number(visible=True)]*k + [gr.Number(visible=False)]*(max_inputs-k) | |
| with gr.Row(variant='panel') as row: | |
| indicator_dropdown2 = gr.Dropdown(choices=list(sdg_input_mapping.keys()), interactive=True, type="value", label="Please choose an Indicator") | |
| names_dropdown = gr.Dropdown(choices=[update_nam_dropdown(indicator_dropdown2.choices[0])], allow_custom_value=True,multiselect=True, interactive=True, type="value", label="The indicators Inputs needed respectively") | |
| indicator_dropdown2.change(fn=update_nam_dropdown, inputs=indicator_dropdown2, outputs=names_dropdown) | |
| with gr.Column(variant='panel'): | |
| u = gr.Slider(1, max_inputs, value=max_inputs, step=1, label="How many inputs to fill:") | |
| textboxes = [] | |
| for i in range(max_inputs): | |
| t = gr.Number(label=f"Please fill with the input number {i}", interactive=True) | |
| textboxes.append(t) | |
| u.change(variable_output2, u, textboxes) | |
| save_button = gr.Button("Save") | |
| #save_button.click(save_inputs, inputs=[indicator_dropdown2, names_dropdown, t], outputs=[]) | |
| def save_inputs_wrapper(indicator, names, *textboxes): | |
| return save_inputs(indicator, names, [str(t) if isinstance(t, float) else str(t.value) for t in textboxes]) | |
| save_button.click(save_inputs_wrapper, inputs=[indicator_dropdown2, names_dropdown, *textboxes], outputs=[]) | |
| return row | |
| max_indicators = 15 | |
| def variable_outputs(k): | |
| k = int(k) | |
| num_rows = sdg_progress_mapping.get(k, 0) | |
| return [gr.Textbox(visible=True)]*num_rows + [gr.Textbox(visible=False)]*(max_indicators-num_rows) | |
| def get_new_ind(sdg_number): | |
| if sdg_number in sdg_indicators_mapping: | |
| ind = sdg_indicators_mapping.get(sdg_number, []) | |
| return list(ind) | |
| def update_indicator_dropdown(sdg_number): | |
| return get_new_ind(sdg_number) | |
| def fetch_all_data(): | |
| conn = sqlite3.connect("data_storage.db") | |
| # Fetch user data | |
| user_df = pd.read_sql_query("SELECT * FROM user_info", conn) | |
| latest_user_row = user_df.iloc[[-1]] | |
| user_data = str(latest_user_row['username'].values[0]) | |
| # Fetch user data | |
| user_data_df = pd.read_sql_query("SELECT * FROM user_data", conn) | |
| latest_user_data_row = user_data_df.iloc[[-1]] | |
| user_data_values = ( | |
| str(latest_user_data_row['country_name'].values[0]), | |
| str(latest_user_data_row['sdg_number'].values[0]), | |
| str(latest_user_data_row['result'].values[0]) | |
| ) | |
| # Fetch input data | |
| input_df = pd.read_sql_query("SELECT * FROM inputs", conn) | |
| latest_input_row = input_df.iloc[[-1]] | |
| input_data = ( | |
| str(latest_input_row['indicator'].values[0]), | |
| str(latest_input_row['names'].values[0]), | |
| str(latest_input_row['inputs'].values[0]) | |
| ) | |
| conn.close() | |
| return [user_data] + list(user_data_values) + list(input_data) | |
| def calculate_indicator(user_id, timestamp): | |
| try: | |
| conn = sqlite3.connect("data_storage.db") | |
| # Fetch user data based on user_id and timestamp | |
| user_data_df = pd.read_sql_query("SELECT * FROM user_data WHERE user_id=? AND timestamp=?", conn, params=(user_id, timestamp)) | |
| if user_data_df.empty: | |
| return "No data found for the specified user_id and timestamp" | |
| # Extract relevant information | |
| indicator = user_data_df['indicator'].values[0] | |
| names = user_data_df['names'].values[0] | |
| input_first = user_data_df['input_first'].values[0] | |
| # Perform the calculation | |
| if indicator in sdg_formulas_mapping: | |
| formula = sdg_formulas_mapping[indicator] | |
| result = formula(input_first) | |
| return f"Result for {indicator}: {result}" | |
| else: | |
| return f"Indicator {indicator} not found in the dictionary" | |
| except Exception as e: | |
| return f"An error occurred: {e}" |