Spaces:
Running
Running
import gradio as gr | |
import os | |
import uuid | |
import threading | |
import pandas as pd | |
import numpy as np | |
from transformers import AutoModelForCausalLM, AutoTokenizer | |
import torch | |
# Global model cache | |
MODEL_CACHE = { | |
"model": None, | |
"tokenizer": None, | |
"init_lock": threading.Lock() | |
} | |
# Create directories for user data | |
os.makedirs("user_data", exist_ok=True) | |
def initialize_model_once(): | |
"""Initialize Phi-4-mini model once""" | |
with MODEL_CACHE["init_lock"]: | |
if MODEL_CACHE["model"] is None: | |
# Load Phi-4-mini model | |
MODEL_CACHE["tokenizer"] = AutoTokenizer.from_pretrained("microsoft/Phi-4-mini-instruct") | |
MODEL_CACHE["model"] = AutoModelForCausalLM.from_pretrained( | |
"microsoft/Phi-4-mini-instruct", | |
torch_dtype=torch.float16, | |
device_map="auto" | |
) | |
return MODEL_CACHE["model"], MODEL_CACHE["tokenizer"] | |
def generate_pandas_code(prompt, max_new_tokens=512): | |
"""Generate Python code using the Phi-4-mini model""" | |
model, tokenizer = initialize_model_once() | |
inputs = tokenizer(prompt, return_tensors="pt").to(model.device) | |
with torch.no_grad(): | |
outputs = model.generate( | |
**inputs, | |
max_new_tokens=max_new_tokens, | |
do_sample=True, | |
temperature=0.2, | |
top_p=0.9, | |
) | |
response = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
# Extract only the generated part, removing the input prompt | |
generated_text = response[len(tokenizer.decode(inputs.input_ids[0], skip_special_tokens=True)):] | |
# Extract code between ```python and ``` if present | |
import re | |
code_match = re.search(r'```python\s*(.*?)\s*```', generated_text, re.DOTALL) | |
if code_match: | |
return code_match.group(1).strip() | |
else: | |
# Return the raw generated text as fallback | |
return generated_text.strip() | |
class ChatBot: | |
def __init__(self, session_id): | |
self.session_id = session_id | |
self.csv_info = None | |
self.df = None | |
self.chat_history = [] | |
self.user_dir = f"user_data/{session_id}" | |
os.makedirs(self.user_dir, exist_ok=True) | |
def process_file(self, file): | |
if file is None: | |
return "Mohon upload file CSV terlebih dahulu." | |
try: | |
# Handle file from Gradio | |
file_path = file.name if hasattr(file, 'name') else str(file) | |
file_name = os.path.basename(file_path) | |
# Load and save CSV directly with pandas | |
try: | |
self.df = pd.read_csv(file_path) | |
user_file_path = f"{self.user_dir}/uploaded.csv" | |
self.df.to_csv(user_file_path, index=False) | |
# Store CSV info | |
self.csv_info = { | |
"filename": file_name, | |
"rows": self.df.shape[0], | |
"columns": self.df.shape[1], | |
"column_names": self.df.columns.tolist(), | |
} | |
print(f"CSV verified: {self.df.shape[0]} rows, {len(self.df.columns)} columns") | |
except Exception as e: | |
return f"Error membaca CSV: {str(e)}" | |
# Add file info to chat history | |
file_info = f"CSV berhasil dimuat: {file_name} dengan {self.df.shape[0]} baris dan {len(self.df.columns)} kolom. Kolom: {', '.join(self.df.columns.tolist())}" | |
self.chat_history.append(("System", file_info)) | |
return f"File CSV '{file_name}' berhasil diproses! Anda dapat mulai mengajukan pertanyaan tentang data." | |
except Exception as e: | |
import traceback | |
print(traceback.format_exc()) | |
return f"Error pemrosesan file: {str(e)}" | |
def execute_query(self, code): | |
"""Safely execute pandas code""" | |
try: | |
# Create local context with the dataframe | |
local_vars = {"df": self.df, "pd": pd, "np": np} | |
# Execute code with timeout | |
exec(code, {"pd": pd, "np": np}, local_vars) | |
# Get result | |
if "result" in local_vars: | |
return local_vars["result"] | |
else: | |
# If no result variable, find the last variable created | |
last_var = None | |
for var_name, var_value in local_vars.items(): | |
if var_name not in ["df", "pd", "np"] and var_name != "__builtins__": | |
last_var = var_value | |
if last_var is not None: | |
return last_var | |
else: | |
return self.df # Return the dataframe as default | |
except Exception as e: | |
raise Exception(f"Gagal menjalankan kode: {str(e)}") | |
def chat(self, message, history): | |
if self.df is None: | |
return "Mohon upload file CSV terlebih dahulu." | |
try: | |
# Handle common metadata questions directly to save resources | |
message_lower = message.lower() | |
if "nama file" in message_lower: | |
return f"Nama file CSV adalah: {self.csv_info['filename']}" | |
elif "nama kolom" in message_lower: | |
return f"Kolom dalam CSV: {', '.join(self.csv_info['column_names'])}" | |
elif "jumlah baris" in message_lower or "berapa baris" in message_lower: | |
return f"Jumlah baris dalam CSV: {self.csv_info['rows']}" | |
elif "jumlah kolom" in message_lower or "berapa kolom" in message_lower: | |
return f"Jumlah kolom dalam CSV: {self.csv_info['columns']}" | |
# Get sample data for context | |
sample_df = self.df.head(5) | |
sample_str = sample_df.to_string() | |
data_types = {col: str(dtype) for col, dtype in self.df.dtypes.items()} | |
# Create prompt for LLM | |
prompt = f""" | |
You are a data analyst that translates natural language questions into Python pandas code. | |
DataFrame information: | |
- Column names: {', '.join(self.csv_info['column_names'])} | |
- Data types: {data_types} | |
- Number of rows: {self.csv_info['rows']} | |
- Sample data: | |
{sample_str} | |
User question: {message} | |
Write a short Python code using pandas to answer the user's question. | |
The code must use the 'df' variable as the DataFrame name. | |
The code should assign the final result to a variable named 'result'. | |
Only return the Python code without any explanation. | |
```python | |
""" | |
# Generate code with Phi-4 | |
try: | |
code = generate_pandas_code(prompt) | |
# Add result variable if not present | |
if not any(line.strip().startswith("result =") for line in code.split("\n")): | |
if code.startswith("df."): | |
code = "result = " + code | |
elif not "result" in code: | |
code = "result = " + code | |
except Exception as e: | |
print(f"Error generating code: {str(e)}") | |
# Fallback for basic questions | |
if "rata-rata" in message_lower or "mean" in message_lower: | |
code = "result = df.describe()" | |
elif "jumlah" in message_lower or "count" in message_lower: | |
code = "result = df.count()" | |
else: | |
return f"Maaf, saya tidak dapat menghasilkan kode untuk pertanyaan ini. Error: {str(e)}" | |
# Execute the code and get result | |
try: | |
print(f"Executing code: {code}") | |
result = self.execute_query(code) | |
# Check if result is relevant to the question | |
if result is None or (isinstance(result, pd.DataFrame) and result.empty): | |
return "Maaf, kita tidak bisa mendapatkan informasi terkait pertanyaan anda di dalam file CSV anda." | |
# Format result based on its type | |
if isinstance(result, pd.DataFrame): | |
if len(result) > 5: | |
result_str = result.head(5).to_string() + f"\n\n[Total {len(result)} baris]" | |
else: | |
result_str = result.to_string() | |
elif isinstance(result, (pd.Series, np.ndarray)): | |
if len(result) > 10: | |
result_str = str(result[:10]) + f"\n\n[Total {len(result)} item]" | |
else: | |
result_str = str(result) | |
elif hasattr(result, "__len__") and not isinstance(result, (str, int, float)): | |
result_str = str(result) | |
if len(result) > 0: | |
result_str += f"\n\n[Total {len(result)} item]" | |
else: | |
result_str = str(result) | |
# Format response | |
response = f"Hasil analisis:\n\n{result_str}\n\nKode yang dijalankan:\n```python\n{code}\n```" | |
self.chat_history.append((message, response)) | |
return response | |
except Exception as e: | |
return f"Error saat menganalisis data: {str(e)}\n\nKode yang dicoba:\n```python\n{code}\n```" | |
except Exception as e: | |
import traceback | |
print(traceback.format_exc()) | |
return f"Error: {str(e)}" | |
# UI Code (sama seperti sebelumnya) | |
def create_gradio_interface(): | |
with gr.Blocks(title="CSV Data Analyzer") as interface: | |
session_id = gr.State(lambda: str(uuid.uuid4())) | |
chatbot_state = gr.State(lambda: None) | |
gr.HTML("<h1 style='text-align: center;'>CSV Data Analyzer</h1>") | |
gr.HTML("<h3 style='text-align: center;'>Ajukan pertanyaan tentang data CSV Anda</h3>") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
file_input = gr.File( | |
label="Upload CSV Anda", | |
file_types=[".csv"] | |
) | |
process_button = gr.Button("Proses CSV") | |
with gr.Accordion("Contoh Pertanyaan", open=False): | |
gr.Markdown(""" | |
- "Berapa jumlah data yang memiliki nilai Glucose di atas 150?" | |
- "Hitung nilai rata-rata setiap kolom numerik" | |
- "Berapa banyak data untuk setiap kelompok dalam kolom Outcome?" | |
- "Berapa jumlah baris dalam dataset ini?" | |
- "Berapa jumlah kolom dalam dataset ini?" | |
""") | |
with gr.Column(scale=2): | |
chatbot_interface = gr.Chatbot( | |
label="Riwayat Chat", | |
height=400 | |
) | |
message_input = gr.Textbox( | |
label="Ketik pertanyaan Anda", | |
placeholder="Contoh: Berapa jumlah data yang memiliki nilai Glucose di atas 150?", | |
lines=2 | |
) | |
submit_button = gr.Button("Kirim") | |
clear_button = gr.Button("Bersihkan Chat") | |
# Handler functions | |
def handle_process_file(file, sess_id): | |
chatbot = ChatBot(sess_id) | |
result = chatbot.process_file(file) | |
return chatbot, [(None, result)] | |
process_button.click( | |
fn=handle_process_file, | |
inputs=[file_input, session_id], | |
outputs=[chatbot_state, chatbot_interface] | |
) | |
def user_message_submitted(message, history, chatbot, sess_id): | |
history = history + [(message, None)] | |
return history, "", chatbot, sess_id | |
def bot_response(history, chatbot, sess_id): | |
if chatbot is None: | |
chatbot = ChatBot(sess_id) | |
history[-1] = (history[-1][0], "Mohon upload file CSV terlebih dahulu.") | |
return chatbot, history | |
user_message = history[-1][0] | |
response = chatbot.chat(user_message, history[:-1]) | |
history[-1] = (user_message, response) | |
return chatbot, history | |
submit_button.click( | |
fn=user_message_submitted, | |
inputs=[message_input, chatbot_interface, chatbot_state, session_id], | |
outputs=[chatbot_interface, message_input, chatbot_state, session_id] | |
).then( | |
fn=bot_response, | |
inputs=[chatbot_interface, chatbot_state, session_id], | |
outputs=[chatbot_state, chatbot_interface] | |
) | |
message_input.submit( | |
fn=user_message_submitted, | |
inputs=[message_input, chatbot_interface, chatbot_state, session_id], | |
outputs=[chatbot_interface, message_input, chatbot_state, session_id] | |
).then( | |
fn=bot_response, | |
inputs=[chatbot_interface, chatbot_state, session_id], | |
outputs=[chatbot_state, chatbot_interface] | |
) | |
def handle_clear_chat(chatbot): | |
if chatbot is not None: | |
chatbot.chat_history = [] | |
return chatbot, [] | |
clear_button.click( | |
fn=handle_clear_chat, | |
inputs=[chatbot_state], | |
outputs=[chatbot_state, chatbot_interface] | |
) | |
return interface | |
# Launch the interface | |
if __name__ == "__main__": | |
demo = create_gradio_interface() | |
demo.launch(share=True) |