Spaces:
Sleeping
Sleeping
File size: 11,624 Bytes
214d428 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 |
import gradio as gr
import pandas as pd
import os
import sqlite3
from filter import filter_data, sdg_column_mapping
from tar import sdg_indicators_mapping, sdg_input_mapping ,sdg_formulas_mapping, sdg_progress_mapping
from tqdm import tqdm
import time
import uuid
from datetime import datetime
# Load your dataset
cwd = os.getcwd()
csv_file_path = os.path.join(cwd,'Sustainable_Development_Report_2023_(with_indicators)__-2086263501583264136.csv')
data = pd.read_csv(csv_file_path)
# Function to create a SQLite database and table if it doesn't exist
def sign_in(username, password):
try:
conn = sqlite3.connect('data_storage.db')
c = conn.cursor()
# Check if the user_sessions table exists
c.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='user_sessions'")
if c.fetchone() is None:
# If the table doesn't exist, create it
c.execute("""
CREATE TABLE user_sessions (
user_id TEXT PRIMARY KEY,
timestamp TEXT
)
""")
c.execute("SELECT * FROM user_info WHERE username=? AND password=?", (username, password))
user = c.fetchone()
conn.close()
if user:
user_id = str(uuid.uuid4())
timestamp = datetime.now()
conn = sqlite3.connect('data_storage.db')
c = conn.cursor()
c.execute("INSERT INTO user_sessions (user_id, timestamp) VALUES (?, ?)", (user_id, timestamp))
conn.commit()
conn.close()
return "Welcome, " + username
else:
return "Invalid username or password"
except Exception as e:
return str(e)
def sign_up(username, password, email):
try:
conn = sqlite3.connect('data_storage.db')
c = conn.cursor()
# Check if the user_info table exists
c.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='user_info'")
if c.fetchone() is None:
# If the table doesn't exist, create it
c.execute("""
CREATE TABLE user_info (
username TEXT PRIMARY KEY,
password TEXT,
email TEXT
)
""")
c.execute("INSERT INTO user_info (username, password, email) VALUES (?, ?, ?)", (username, password, email))
conn.commit()
conn.close()
return "Sign up successful"
except Exception as e:
return str(e)
def save_inputs(indicator, names, inputs,user_id, timestamp):
# Connect to the SQLite database
conn = sqlite3.connect('data_storage.db')
c = conn.cursor()
timestamp = time.time()
# Create table if it doesn't exist
c.execute('''CREATE TABLE IF NOT EXISTS inputs
(indicator text, names text, inputs text, user_id, timestamp)''')
# Insert a row of data
c.execute("INSERT INTO inputs VALUES (?, ?, ?, ?,?)", (indicator, str(names), str(inputs), user_id, timestamp))
# Save (commit) the changes
conn.commit()
# Close the connection
conn.close()
def create_database():
conn = sqlite3.connect("data_storage.db")
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS user_data (
user_id TEXT PRIMARY KEY,
username TEXT,
country_name TEXT,
sdg_number INTEGER,
result TEXT,
indicator TEXT,
names TEXT,
input_first REAL,
input_2nd REAL,
input_3rd REAL,
input_4th REAL,
timestamp REAL
)
""")
conn.commit()
conn.close()
def filter_and_store_data(username, country_name, sdg_number, result, indicator, names, input_first, input_2nd, input_3rd, input_4th, user_id, timestamp):
try:
# Convert the selected SDG number to an integer
sdg_number = int(sdg_number)
# Get the list of columns based on the selected SDG number
selected_columns = ['Name'] + sdg_column_mapping.get(sdg_number, [])
# Filter data based on country and selected columns
selected_data = data[(data['Name'] == country_name)][selected_columns]
# Convert the selected data to an HTML table with more rows
html_table = selected_data.to_html(index=False)
# Wrap the HTML table in a div with a fixed height and scrollable overflow
scrollable_html_table = f'<div style="height: 300px; overflow-y: scroll;">{html_table}</div>'
# Get the current timestamp
timestamp = time.time()
timestamp = datetime.now()
# Store the data in the SQLite database
# Store the data in the SQLite database
conn = sqlite3.connect("data_storage.db")
cursor = conn.cursor()
cursor.execute("INSERT INTO user_data (username, country_name, sdg_number, result, indicator, names, input_first, input_2nd, input_3rd, input_4th, user_id, timestamp) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(username, country_name, sdg_number, str(scrollable_html_table), str(indicator), str(names), input_first, input_2nd, input_3rd, input_4th, user_id, timestamp))
conn.commit()
conn.close()
return scrollable_html_table
except ValueError:
return "Please select a valid SDG number.", []
except Exception as e:
return f"An error occurred: {e}", []
def gradio_app(username, country_name, sdg_number, result, indicator, names, input_first, input_2nd, input_3rd, input_4th, user_id, timestamp):
# Call the filter_and_store_data function
return filter_and_store_data(username, country_name, sdg_number, result, indicator, names, input_first, input_2nd, input_3rd, input_4th, user_id, timestamp)
def gradios(username, country_name, sdg_number, indicator, names, input_first, input_2nd, input_3rd, input_4th):
# Get the formula for the selected indicator
formula = sdg_formulas_mapping[indicator]
# Calculate the result using the formula
result = formula(input_first, input_2nd, input_3rd, input_4th)
# Store the result in the database
conn = sqlite3.connect("data_storage.db")
cursor = conn.cursor()
cursor.execute("""
INSERT INTO user_data (username, country_name, sdg_number, result, indicator, names, input_first, input_2nd, input_3rd, input_4th, timestamp)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (username, country_name, sdg_number, result, indicator, names, input_first, input_2nd, input_3rd, input_4th))
conn.commit()
conn.close()
return f"Result: {result}"
def user_latest_data():
conn = sqlite3.connect("data_storage.db")
df = pd.read_sql_query("SELECT * FROM user_data ORDER BY username DESC LIMIT 1", conn)
conn.close()
# Convert numeric values to strings
return str(df['username'].values[0])
def user_late_data():
conn = sqlite3.connect("data_storage.db")
df = pd.read_sql_query("SELECT * FROM user_data ORDER BY username DESC LIMIT 1", conn)
conn.close()
# Check if the DataFrame is not empty and contains the 'username' column
if not df.empty and 'username' in df.columns and not df['username'].empty:
return str(df['username'].values[0])
else:
return "No username available"
def get_nam(indicator):
if indicator in sdg_input_mapping:
names = [item['name'] for item in sdg_input_mapping[indicator]]
return names
def update_nam_dropdown(indicator):
return get_nam(indicator)
def add_indicator(sdg_number):
max_inputs = 5
def variable_output2(k):
k = int(k)
return [gr.Number(visible=True)]*k + [gr.Number(visible=False)]*(max_inputs-k)
with gr.Row(variant='panel') as row:
indicator_dropdown2 = gr.Dropdown(choices=list(sdg_input_mapping.keys()), interactive=True, type="value", label="Please choose an Indicator")
names_dropdown = gr.Dropdown(choices=[update_nam_dropdown(indicator_dropdown2.choices[0])], allow_custom_value=True,multiselect=True, interactive=True, type="value", label="The indicators Inputs needed respectively")
indicator_dropdown2.change(fn=update_nam_dropdown, inputs=indicator_dropdown2, outputs=names_dropdown)
with gr.Column(variant='panel'):
u = gr.Slider(1, max_inputs, value=max_inputs, step=1, label="How many inputs to fill:")
textboxes = []
for i in range(max_inputs):
t = gr.Number(label=f"Please fill with the input number {i}", interactive=True)
textboxes.append(t)
u.change(variable_output2, u, textboxes)
save_button = gr.Button("Save")
#save_button.click(save_inputs, inputs=[indicator_dropdown2, names_dropdown, t], outputs=[])
def save_inputs_wrapper(indicator, names, *textboxes):
return save_inputs(indicator, names, [str(t) if isinstance(t, float) else str(t.value) for t in textboxes])
save_button.click(save_inputs_wrapper, inputs=[indicator_dropdown2, names_dropdown, *textboxes], outputs=[])
return row
max_indicators = 15
def variable_outputs(k):
k = int(k)
num_rows = sdg_progress_mapping.get(k, 0)
return [gr.Textbox(visible=True)]*num_rows + [gr.Textbox(visible=False)]*(max_indicators-num_rows)
def get_new_ind(sdg_number):
if sdg_number in sdg_indicators_mapping:
ind = sdg_indicators_mapping.get(sdg_number, [])
return list(ind)
def update_indicator_dropdown(sdg_number):
return get_new_ind(sdg_number)
def fetch_all_data():
conn = sqlite3.connect("data_storage.db")
# Fetch user data
user_df = pd.read_sql_query("SELECT * FROM user_info", conn)
latest_user_row = user_df.iloc[[-1]]
user_data = str(latest_user_row['username'].values[0])
# Fetch user data
user_data_df = pd.read_sql_query("SELECT * FROM user_data", conn)
latest_user_data_row = user_data_df.iloc[[-1]]
user_data_values = (
str(latest_user_data_row['country_name'].values[0]),
str(latest_user_data_row['sdg_number'].values[0]),
str(latest_user_data_row['result'].values[0])
)
# Fetch input data
input_df = pd.read_sql_query("SELECT * FROM inputs", conn)
latest_input_row = input_df.iloc[[-1]]
input_data = (
str(latest_input_row['indicator'].values[0]),
str(latest_input_row['names'].values[0]),
str(latest_input_row['inputs'].values[0])
)
conn.close()
return [user_data] + list(user_data_values) + list(input_data)
def calculate_indicator(user_id, timestamp):
try:
conn = sqlite3.connect("data_storage.db")
# Fetch user data based on user_id and timestamp
user_data_df = pd.read_sql_query("SELECT * FROM user_data WHERE user_id=? AND timestamp=?", conn, params=(user_id, timestamp))
if user_data_df.empty:
return "No data found for the specified user_id and timestamp"
# Extract relevant information
indicator = user_data_df['indicator'].values[0]
names = user_data_df['names'].values[0]
input_first = user_data_df['input_first'].values[0]
# Perform the calculation
if indicator in sdg_formulas_mapping:
formula = sdg_formulas_mapping[indicator]
result = formula(input_first)
return f"Result for {indicator}: {result}"
else:
return f"Indicator {indicator} not found in the dictionary"
except Exception as e:
return f"An error occurred: {e}" |