SYSTEM_PROMPT = """
1. Air Quality Data (df):
- Columns: 'Timestamp', 'station', 'PM2.5', 'PM10', 'address', 'city', 'latitude', 'longitude', 'state'
- Example row: ['2023-01-01', 'StationA', 45.67, 78.9, '123 Main St', 'Mumbai', 19.07, 72.87, 'Maharashtra']
- Frequency: daily
- 'pollution' generally means 'PM2.5'.
- PM2.5 guidelines: India: 60, WHO: 15. PM10 guidelines: India: 100, WHO: 50.
2. NCAP Funding Data (ncap_data):
- Columns: 'city', 'state', 'funding_received', 'year', 'project', 'status'
- Example row: ['Mumbai', 'Maharashtra', 10000000, 2022, 'Clean Air Project', 'Ongoing']
3. State Population Data (states_data):
- Columns: 'state', 'population', 'year', 'urban_population', 'rural_population'
- Example row: ['Maharashtra', 123000000, 2021, 60000000, 63000000]
You already have these dataframes loaded as df, ncap_data, and states_data. Do not read any files. Use these dataframes to answer questions about air quality, funding, or population. When aggregating, report standard deviation, standard error, and number of data points. Always report units. If a plot is required, follow the previous instructions for saving and reporting plots. If a question is about funding or population, use the relevant dataframe.
"""
import os
import pandas as pd
from pandasai import Agent, SmartDataframe
from typing import Tuple
from PIL import Image
from pandasai.llm import HuggingFaceTextGen
from dotenv import load_dotenv
from langchain_groq import ChatGroq
from langchain_google_genai import ChatGoogleGenerativeAI
import matplotlib.pyplot as plt
import json
from datetime import datetime
from dotenv import load_dotenv
# FORCE reload environment variables
load_dotenv(override=True)
Groq_Token = os.getenv("GROQ_API_KEY")
hf_token = os.getenv("HF_TOKEN")
gemini_token = os.getenv("GEMINI_TOKEN")
import uuid
# FORCE reload environment variables
models = {
"gpt-oss-20b": "openai/gpt-oss-20b",
"gpt-oss-120b": "openai/gpt-oss-120b",
"llama3.1": "llama-3.1-8b-instant",
"llama3.3": "llama-3.3-70b-versatile",
"deepseek-R1": "deepseek-r1-distill-llama-70b",
"llama4 maverik":"meta-llama/llama-4-maverick-17b-128e-instruct",
"llama4 scout":"meta-llama/llama-4-scout-17b-16e-instruct",
"gemini-pro": "gemini-1.5-pro"
}
def log_interaction(user_query, model_name, response_content, generated_code, execution_time, error_message=None, is_image=False):
"""Log user interactions to Hugging Face dataset"""
try:
if not hf_token or hf_token.strip() == "":
print("Warning: HF_TOKEN not available, skipping logging")
return
# Create log entry
log_entry = {
"timestamp": datetime.now().isoformat(),
"session_id": str(uuid.uuid4()),
"user_query": user_query,
"model_name": model_name,
"response_content": str(response_content),
"generated_code": generated_code or "",
"execution_time_seconds": execution_time,
"error_message": error_message or "",
"is_image_output": is_image,
"success": error_message is None
}
# Create DataFrame
df = pd.DataFrame([log_entry])
# Create unique filename with timestamp
timestamp_str = datetime.now().strftime("%Y%m%d_%H%M%S")
random_id = str(uuid.uuid4())[:8]
filename = f"interaction_log_{timestamp_str}_{random_id}.parquet"
# Save locally first
local_path = f"/tmp/{filename}"
df.to_parquet(local_path, index=False)
# Clean up local file
if os.path.exists(local_path):
os.remove(local_path)
print(f"Successfully logged interaction locally: {filename}")
except Exception as e:
print(f"Error logging interaction: {e}")
def preprocess_and_load_df(path: str) -> pd.DataFrame:
"""Load and preprocess the dataframe"""
try:
df = pd.read_csv(path)
df["Timestamp"] = pd.to_datetime(df["Timestamp"])
return df
except Exception as e:
raise Exception(f"Error loading dataframe: {e}")
def load_smart_df(df: pd.DataFrame, inference_server: str, name="mistral") -> SmartDataframe:
"""Load smart dataframe with error handling"""
try:
if name == "gemini-pro":
if not gemini_token or gemini_token.strip() == "":
raise ValueError("Gemini API token not available or empty")
llm = ChatGoogleGenerativeAI(
model=models[name],
google_api_key=gemini_token,
temperature=0.1
)
else:
if not Groq_Token or Groq_Token.strip() == "":
raise ValueError("Groq API token not available or empty")
llm = ChatGroq(
model=models[name],
api_key=Groq_Token,
temperature=0.1
)
smart_df = SmartDataframe(df, config={"llm": llm, "max_retries": 5, "enable_cache": False})
return smart_df
except Exception as e:
raise Exception(f"Error loading smart dataframe: {e}")
try:
response = agent.chat(prompt)
execution_time = (datetime.now() - start_time).total_seconds()
gen_code = getattr(agent, 'last_code_generated', '')
ex_code = getattr(agent, 'last_code_executed', '')
last_prompt = getattr(agent, 'last_prompt', prompt)
# Log the interaction
log_interaction(
user_query=prompt,
model_name="pandas_ai_agent",
response_content=response,
generated_code=gen_code,
execution_time=execution_time,
error_message=None,
is_image=isinstance(response, str) and any(response.endswith(ext) for ext in ['.png', '.jpg', '.jpeg'])
)
return {
"role": "assistant",
"content": response,
"gen_code": gen_code,
"ex_code": ex_code,
"last_prompt": last_prompt,
"error": None
}
except Exception as e:
execution_time = (datetime.now() - start_time).total_seconds()
error_msg = str(e)
# Log the failed interaction
log_interaction(
user_query=prompt,
model_name="pandas_ai_agent",
response_content=f"Error: {error_msg}",
generated_code="",
execution_time=execution_time,
error_message=error_msg,
is_image=False
)
return {
"role": "assistant",
"content": f"Error: {error_msg}",
"gen_code": "",
"ex_code": "",
"last_prompt": prompt,
"error": error_msg
}
def decorate_with_code(response: dict) -> str:
"""Decorate response with code details"""
gen_code = response.get("gen_code", "No code generated")
last_prompt = response.get("last_prompt", "No prompt")
return f"""
Generated Code
```python
{gen_code}
```
Prompt
{last_prompt}
"""
def show_response(st, response):
"""Display response with error handling"""
try:
with st.chat_message(response["role"]):
content = response.get("content", "No content")
try:
# Try to open as image
image = Image.open(content)
if response.get("gen_code"):
st.markdown(decorate_with_code(response), unsafe_allow_html=True)
st.image(image)
return {"is_image": True}
except:
# Not an image, display as text
if response.get("gen_code"):
display_content = decorate_with_code(response) + f"""
{content}"""
else:
display_content = content
st.markdown(display_content, unsafe_allow_html=True)
return {"is_image": False}
except Exception as e:
st.error(f"Error displaying response: {e}")
return {"is_image": False}
def ask_question(model_name, question):
"""Ask question with comprehensive error handling and logging"""
start_time = datetime.now()
try:
# Reload environment variables to get fresh values
load_dotenv(override=True)
fresh_groq_token = os.getenv("GROQ_API_KEY")
fresh_gemini_token = os.getenv("GEMINI_TOKEN")
print(f"ask_question - Fresh Groq Token: {'Present' if fresh_groq_token else 'Missing'}")
# Check API availability with fresh tokens
if model_name == "gemini-pro":
if not fresh_gemini_token or fresh_gemini_token.strip() == "":
execution_time = (datetime.now() - start_time).total_seconds()
error_msg = "Missing or empty API token"
# Log the failed interaction
log_interaction(
user_query=question,
model_name=model_name,
response_content="❌ Gemini API token not available or empty",
generated_code="",
execution_time=execution_time,
error_message=error_msg,
is_image=False
)
return {
"role": "assistant",
"content": "❌ Gemini API token not available or empty. Please set GEMINI_TOKEN in your environment variables.",
"gen_code": "",
"ex_code": "",
"last_prompt": question,
"error": error_msg
}
llm = ChatGoogleGenerativeAI(
model=models[model_name],
google_api_key=fresh_gemini_token,
temperature=0
)
else:
if not fresh_groq_token or fresh_groq_token.strip() == "":
execution_time = (datetime.now() - start_time).total_seconds()
error_msg = "Missing or empty API token"
# Log the failed interaction
log_interaction(
user_query=question,
model_name=model_name,
response_content="❌ Groq API token not available or empty",
generated_code="",
execution_time=execution_time,
error_message=error_msg,
is_image=False
)
return {
"role": "assistant",
"content": "❌ Groq API token not available or empty. Please set GROQ_API_KEY in your environment variables and restart the application.",
"gen_code": "",
"ex_code": "",
"last_prompt": question,
"error": error_msg
}
# Test the API key by trying to create the client
try:
llm = ChatGroq(
model=models[model_name],
api_key=fresh_groq_token,
temperature=0.1
)
# Test with a simple call to verify the API key works
test_response = llm.invoke("Test")
print("API key test successful")
except Exception as api_error:
execution_time = (datetime.now() - start_time).total_seconds()
error_msg = str(api_error)
if "organization_restricted" in error_msg.lower() or "unauthorized" in error_msg.lower():
response_content = "❌ API Key Error: Your Groq API key appears to be invalid, expired, or restricted. Please check your API key in the .env file."
log_error_msg = f"API key validation failed: {error_msg}"
else:
response_content = f"❌ API Connection Error: {error_msg}"
log_error_msg = error_msg
# Log the failed interaction
log_interaction(
user_query=question,
model_name=model_name,
response_content=response_content,
generated_code="",
execution_time=execution_time,
error_message=log_error_msg,
is_image=False
)
return {
"role": "assistant",
"content": response_content,
"gen_code": "",
"ex_code": "",
"last_prompt": question,
"error": log_error_msg
}
# Check if data file exists
if not os.path.exists("Data.csv"):
execution_time = (datetime.now() - start_time).total_seconds()
error_msg = "Data file not found"
# Log the failed interaction
log_interaction(
user_query=question,
model_name=model_name,
response_content="❌ Data.csv file not found",
generated_code="",
execution_time=execution_time,
error_message=error_msg,
is_image=False
)
return {
"role": "assistant",
"content": "❌ Data.csv file not found. Please ensure the data file is in the correct location.",
"gen_code": "",
"ex_code": "",
"last_prompt": question,
"error": error_msg
}
df_check = pd.read_csv("Data.csv")
df_check["Timestamp"] = pd.to_datetime(df_check["Timestamp"])
df_check = df_check.head(5)
new_line = "\n"
parameters = {"font.size": 12, "figure.dpi": 600}
template = f"""```python
import pandas as pd
import matplotlib.pyplot as plt
import uuid
plt.rcParams.update({parameters})
df = pd.read_csv("Data.csv")
df["Timestamp"] = pd.to_datetime(df["Timestamp"])
# Available columns and data types:
{new_line.join(map(lambda x: '# '+x, str(df_check.dtypes).split(new_line)))}
# Question: {question.strip()}
# Generate code to answer the question and save result in 'answer' variable
# If creating a plot, save it with a unique filename and store the filename in 'answer'
# If returning text/numbers, store the result directly in 'answer'
```"""
system_prompt = """You are a helpful assistant that generates Python code for data analysis.
Rules:
1. Always save your final result in a variable called 'answer'
2. If creating a plot, save it with plt.savefig() and store the filename in 'answer'
3. If returning text/numbers, store the result directly in 'answer'
4. Use descriptive variable names and add comments
5. Handle potential errors gracefully
6. For plots, use unique filenames to avoid conflicts
"""
query = f"""{system_prompt}
Complete the following code to answer the user's question:
{template}
"""
# Make API call
if model_name == "gemini-pro":
response = llm.invoke(query)
answer = response.content
else:
response = llm.invoke(query)
answer = response.content
# Extract and execute code
try:
if "```python" in answer:
code_part = answer.split("```python")[1].split("```")[0]
else:
code_part = answer
full_code = f"""
{template.split("```python")[1].split("```")[0]}
{code_part}
"""
# Execute code in a controlled environment
local_vars = {}
global_vars = {
'pd': pd,
'plt': plt,
'os': os,
'uuid': __import__('uuid')
}
exec(full_code, global_vars, local_vars)
# Get the answer
if 'answer' in local_vars:
answer_result = local_vars['answer']
else:
answer_result = "No answer variable found in generated code"
execution_time = (datetime.now() - start_time).total_seconds()
# Determine if output is an image
is_image = isinstance(answer_result, str) and any(answer_result.endswith(ext) for ext in ['.png', '.jpg', '.jpeg'])
# Log successful interaction
log_interaction(
user_query=question,
model_name=model_name,
response_content=str(answer_result),
generated_code=full_code,
execution_time=execution_time,
error_message=None,
is_image=is_image
)
return {
"role": "assistant",
"content": answer_result,
"gen_code": full_code,
"ex_code": full_code,
"last_prompt": question,
"error": None
}
except Exception as code_error:
execution_time = (datetime.now() - start_time).total_seconds()
error_msg = str(code_error)
# Log the failed code execution
log_interaction(
user_query=question,
model_name=model_name,
response_content=f"❌ Error executing generated code: {error_msg}",
generated_code=full_code if 'full_code' in locals() else "",
execution_time=execution_time,
error_message=error_msg,
is_image=False
)
return {
"role": "assistant",
"content": f"❌ Error executing generated code: {error_msg}",
"gen_code": full_code if 'full_code' in locals() else "",
"ex_code": full_code if 'full_code' in locals() else "",
"last_prompt": question,
"error": error_msg
}
except Exception as e:
execution_time = (datetime.now() - start_time).total_seconds()
error_msg = str(e)
# Handle specific API errors
if "organization_restricted" in error_msg:
response_content = "❌ API Organization Restricted: Your API key access has been restricted. Please check your Groq API key or try generating a new one."
log_error_msg = "API access restricted"
elif "rate_limit" in error_msg.lower():
response_content = "❌ Rate limit exceeded. Please wait a moment and try again."
log_error_msg = "Rate limit exceeded"
else:
response_content = f"❌ Error: {error_msg}"
log_error_msg = error_msg
# Log the failed interaction
log_interaction(
user_query=question,
model_name=model_name,
response_content=response_content,
generated_code="",
execution_time=execution_time,
error_message=log_error_msg,
is_image=False
)
return {
"role": "assistant",
"content": response_content,
"gen_code": "",
"ex_code": "",
"last_prompt": question,
"error": log_error_msg
}