CSVBot-Llama2 / app.py
hmrizal's picture
81f0d23 verified
import gradio as gr
import os
import uuid
import threading
import pandas as pd
import numpy as np
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch
# Global model cache
"model": None,
"tokenizer": None,
"init_lock": threading.Lock()
# Create directories for user data
os.makedirs("user_data", exist_ok=True)
def initialize_model_once():
"""Initialize Phi-4-mini model once"""
with MODEL_CACHE["init_lock"]:
if MODEL_CACHE["model"] is None:
# Load Phi-4-mini model
MODEL_CACHE["tokenizer"] = AutoTokenizer.from_pretrained("microsoft/Phi-4-mini-instruct")
MODEL_CACHE["model"] = AutoModelForCausalLM.from_pretrained(
return MODEL_CACHE["model"], MODEL_CACHE["tokenizer"]
def generate_pandas_code(prompt, max_new_tokens=512):
"""Generate Python code using the Phi-4-mini model"""
model, tokenizer = initialize_model_once()
inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
with torch.no_grad():
outputs = model.generate(
response = tokenizer.decode(outputs[0], skip_special_tokens=True)
# Extract only the generated part, removing the input prompt
generated_text = response[len(tokenizer.decode(inputs.input_ids[0], skip_special_tokens=True)):]
# Extract code between ```python and ``` if present
import re
code_match = re.search(r'```python\s*(.*?)\s*```', generated_text, re.DOTALL)
if code_match:
return code_match.group(1).strip()
# Return the raw generated text as fallback
return generated_text.strip()
class ChatBot:
def __init__(self, session_id):
self.session_id = session_id
self.csv_info = None
self.df = None
self.chat_history = []
self.user_dir = f"user_data/{session_id}"
os.makedirs(self.user_dir, exist_ok=True)
def process_file(self, file):
if file is None:
return "Mohon upload file CSV terlebih dahulu."
# Handle file from Gradio
file_path = file.name if hasattr(file, 'name') else str(file)
file_name = os.path.basename(file_path)
# Load and save CSV directly with pandas
self.df = pd.read_csv(file_path)
user_file_path = f"{self.user_dir}/uploaded.csv"
self.df.to_csv(user_file_path, index=False)
# Store CSV info
self.csv_info = {
"filename": file_name,
"rows": self.df.shape[0],
"columns": self.df.shape[1],
"column_names": self.df.columns.tolist(),
print(f"CSV verified: {self.df.shape[0]} rows, {len(self.df.columns)} columns")
except Exception as e:
return f"Error membaca CSV: {str(e)}"
# Add file info to chat history
file_info = f"CSV berhasil dimuat: {file_name} dengan {self.df.shape[0]} baris dan {len(self.df.columns)} kolom. Kolom: {', '.join(self.df.columns.tolist())}"
self.chat_history.append(("System", file_info))
return f"File CSV '{file_name}' berhasil diproses! Anda dapat mulai mengajukan pertanyaan tentang data."
except Exception as e:
import traceback
return f"Error pemrosesan file: {str(e)}"
def execute_query(self, code):
"""Safely execute pandas code"""
# Create local context with the dataframe
local_vars = {"df": self.df, "pd": pd, "np": np}
# Execute code with timeout
exec(code, {"pd": pd, "np": np}, local_vars)
# Get result
if "result" in local_vars:
return local_vars["result"]
# If no result variable, find the last variable created
last_var = None
for var_name, var_value in local_vars.items():
if var_name not in ["df", "pd", "np"] and var_name != "__builtins__":
last_var = var_value
if last_var is not None:
return last_var
return self.df # Return the dataframe as default
except Exception as e:
raise Exception(f"Gagal menjalankan kode: {str(e)}")
def chat(self, message, history):
if self.df is None:
return "Mohon upload file CSV terlebih dahulu."
# Handle common metadata questions directly to save resources
message_lower = message.lower()
if "nama file" in message_lower:
return f"Nama file CSV adalah: {self.csv_info['filename']}"
elif "nama kolom" in message_lower:
return f"Kolom dalam CSV: {', '.join(self.csv_info['column_names'])}"
elif "jumlah baris" in message_lower or "berapa baris" in message_lower:
return f"Jumlah baris dalam CSV: {self.csv_info['rows']}"
elif "jumlah kolom" in message_lower or "berapa kolom" in message_lower:
return f"Jumlah kolom dalam CSV: {self.csv_info['columns']}"
# Get sample data for context
sample_df = self.df.head(5)
sample_str = sample_df.to_string()
data_types = {col: str(dtype) for col, dtype in self.df.dtypes.items()}
# Create prompt for LLM
prompt = f"""
You are a data analyst that translates natural language questions into Python pandas code.
DataFrame information:
- Column names: {', '.join(self.csv_info['column_names'])}
- Data types: {data_types}
- Number of rows: {self.csv_info['rows']}
- Sample data:
User question: {message}
Write a short Python code using pandas to answer the user's question.
The code must use the 'df' variable as the DataFrame name.
The code should assign the final result to a variable named 'result'.
Only return the Python code without any explanation.
# Generate code with Phi-4
code = generate_pandas_code(prompt)
# Add result variable if not present
if not any(line.strip().startswith("result =") for line in code.split("\n")):
if code.startswith("df."):
code = "result = " + code
elif not "result" in code:
code = "result = " + code
except Exception as e:
print(f"Error generating code: {str(e)}")
# Fallback for basic questions
if "rata-rata" in message_lower or "mean" in message_lower:
code = "result = df.describe()"
elif "jumlah" in message_lower or "count" in message_lower:
code = "result = df.count()"
return f"Maaf, saya tidak dapat menghasilkan kode untuk pertanyaan ini. Error: {str(e)}"
# Execute the code and get result
print(f"Executing code: {code}")
result = self.execute_query(code)
# Check if result is relevant to the question
if result is None or (isinstance(result, pd.DataFrame) and result.empty):
return "Maaf, kita tidak bisa mendapatkan informasi terkait pertanyaan anda di dalam file CSV anda."
# Format result based on its type
if isinstance(result, pd.DataFrame):
if len(result) > 5:
result_str = result.head(5).to_string() + f"\n\n[Total {len(result)} baris]"
result_str = result.to_string()
elif isinstance(result, (pd.Series, np.ndarray)):
if len(result) > 10:
result_str = str(result[:10]) + f"\n\n[Total {len(result)} item]"
result_str = str(result)
elif hasattr(result, "__len__") and not isinstance(result, (str, int, float)):
result_str = str(result)
if len(result) > 0:
result_str += f"\n\n[Total {len(result)} item]"
result_str = str(result)
# Format response
response = f"Hasil analisis:\n\n{result_str}\n\nKode yang dijalankan:\n```python\n{code}\n```"
self.chat_history.append((message, response))
return response
except Exception as e:
return f"Error saat menganalisis data: {str(e)}\n\nKode yang dicoba:\n```python\n{code}\n```"
except Exception as e:
import traceback
return f"Error: {str(e)}"
# UI Code (sama seperti sebelumnya)
def create_gradio_interface():
with gr.Blocks(title="CSV Data Analyzer") as interface:
session_id = gr.State(lambda: str(uuid.uuid4()))
chatbot_state = gr.State(lambda: None)
gr.HTML("<h1 style='text-align: center;'>CSV Data Analyzer</h1>")
gr.HTML("<h3 style='text-align: center;'>Ajukan pertanyaan tentang data CSV Anda</h3>")
with gr.Row():
with gr.Column(scale=1):
file_input = gr.File(
label="Upload CSV Anda",
process_button = gr.Button("Proses CSV")
with gr.Accordion("Contoh Pertanyaan", open=False):
- "Berapa jumlah data yang memiliki nilai Glucose di atas 150?"
- "Hitung nilai rata-rata setiap kolom numerik"
- "Berapa banyak data untuk setiap kelompok dalam kolom Outcome?"
- "Berapa jumlah baris dalam dataset ini?"
- "Berapa jumlah kolom dalam dataset ini?"
with gr.Column(scale=2):
chatbot_interface = gr.Chatbot(
label="Riwayat Chat",
message_input = gr.Textbox(
label="Ketik pertanyaan Anda",
placeholder="Contoh: Berapa jumlah data yang memiliki nilai Glucose di atas 150?",
submit_button = gr.Button("Kirim")
clear_button = gr.Button("Bersihkan Chat")
# Handler functions
def handle_process_file(file, sess_id):
chatbot = ChatBot(sess_id)
result = chatbot.process_file(file)
return chatbot, [(None, result)]
inputs=[file_input, session_id],
outputs=[chatbot_state, chatbot_interface]
def user_message_submitted(message, history, chatbot, sess_id):
history = history + [(message, None)]
return history, "", chatbot, sess_id
def bot_response(history, chatbot, sess_id):
if chatbot is None:
chatbot = ChatBot(sess_id)
history[-1] = (history[-1][0], "Mohon upload file CSV terlebih dahulu.")
return chatbot, history
user_message = history[-1][0]
response = chatbot.chat(user_message, history[:-1])
history[-1] = (user_message, response)
return chatbot, history
inputs=[message_input, chatbot_interface, chatbot_state, session_id],
outputs=[chatbot_interface, message_input, chatbot_state, session_id]
inputs=[chatbot_interface, chatbot_state, session_id],
outputs=[chatbot_state, chatbot_interface]
inputs=[message_input, chatbot_interface, chatbot_state, session_id],
outputs=[chatbot_interface, message_input, chatbot_state, session_id]
inputs=[chatbot_interface, chatbot_state, session_id],
outputs=[chatbot_state, chatbot_interface]
def handle_clear_chat(chatbot):
if chatbot is not None:
chatbot.chat_history = []
return chatbot, []
outputs=[chatbot_state, chatbot_interface]
return interface
# Launch the interface
if __name__ == "__main__":
demo = create_gradio_interface()