qatch-demo / utilities.py
franceth's picture
Css, Prop vs Non-Prop, Metrics Update
dee3b00 verified
raw
history blame
4.69 kB
import csv
import re
import pandas as pd
import sqlite3
import gradio as gr
import os
from qatch.connectors.sqlite_connector import SqliteConnector
def extract_tables(file_path):
conn = sqlite3.connect(file_path)
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
tabelle = cursor.fetchall()
tabelle = [tabella for tabella in tabelle if tabella[0] != 'sqlite_sequence']
return tabelle
def extract_dataframes(file_path):
conn = sqlite3.connect(file_path)
tabelle = extract_tables(file_path)
dfs = {}
for tabella in tabelle:
nome_tabella = tabella[0]
df = pd.read_sql_query(f"SELECT * FROM {nome_tabella}", conn)
dfs[nome_tabella] = df
conn.close()
return dfs
def carica_sqlite(file_path, db_id):
data_output = {'data_frames': extract_dataframes(file_path),'db':SqliteConnector(relative_db_path=file_path, db_name=db_id)}
return data_output
# Funzione per leggere un file CSV
def load_csv(file):
df = pd.read_csv(file)
return df
# Funzione per leggere un file Excel
def carica_excel(file):
xls = pd.ExcelFile(file)
dfs = {}
for sheet_name in xls.sheet_names:
dfs[sheet_name] = xls.parse(sheet_name)
return dfs
def load_data(data_path : str, db_name : str):
data_output = {'data_frames': {} ,'db': None}
table_name = os.path.splitext(os.path.basename(data_path))[0]
if data_path.endswith(".sqlite") :
data_output = carica_sqlite(data_path, db_name)
elif data_path.endswith(".csv"):
data_output['data_frames'] = {f"{table_name}_table" : load_csv(data_path)}
elif data_path.endswith(".xlsx"):
data_output['data_frames'] = carica_excel(data_path)
else:
raise gr.Error("Formato file non supportato. Carica un file SQLite, CSV o Excel.")
return data_output
def read_api(api_key_path):
with open(api_key_path, "r", encoding="utf-8") as file:
api_key = file.read()
return api_key
def read_models_csv(file_path):
# Reads a CSV file and returns a list of dictionaries
models = [] # Change {} to []
with open(file_path, mode="r", newline="") as file:
reader = csv.DictReader(file)
for row in reader:
row["price"] = float(row["price"]) # Convert price to float
models.append(row) # Append to the list
return models
def csv_to_dict(file_path):
with open(file_path, mode='r', encoding='utf-8') as file:
reader = csv.DictReader(file)
data = []
for row in reader:
if "price" in row:
row["price"] = float(row["price"])
data.append(row)
return data
def increment_filename(filename):
base, ext = os.path.splitext(filename)
numbers = re.findall(r'\d+', base)
if numbers:
max_num = max(map(int, numbers)) + 1
new_base = re.sub(r'(\d+)', lambda m: str(max_num) if int(m.group(1)) == max(map(int, numbers)) else m.group(1), base)
else:
new_base = base + '1'
return new_base + ext
def prepare_prompt(prompt, question, schema, samples):
prompt = prompt.replace("{schema}", schema).replace("{question}", question)
prompt += f" Some istanze: {samples}"
return prompt
def generate_some_samples(connector, tbl_name):
samples = []
query = f"SELECT * FROM {tbl_name} LIMIT 3"
try:
sample_data = connector.execute_query(query)
samples.append(str(sample_data))
except Exception as e:
samples.append(f"Error: {e}")
return samples
def extract_tables_dict(pnp_path):
tables_dict = {}
with open(pnp_path, mode='r', encoding='utf-8') as file:
reader = csv.DictReader(file)
tbl_db_pairs = set() # Use a set to avoid duplicates
for row in reader:
tbl_name = row.get("tbl_name")
db_path = row.get("db_path")
if tbl_name and db_path:
tbl_db_pairs.add((tbl_name, db_path)) # Add the pair to the set
for tbl_name, db_path in list(tbl_db_pairs):
if tbl_name and db_path:
connector = sqlite3.connect(db_path)
query = f"SELECT * FROM {tbl_name} LIMIT 5"
try:
df = pd.read_sql_query(query, connector)
tables_dict[tbl_name] = df
except Exception as e:
tables_dict[tbl_name] = pd.DataFrame({"Error": [str(e)]}) # DataFrame con messaggio di errore
return tables_dict