SYSTEM_PROMPT = """ 1. Air Quality Data (df): - Columns: 'Timestamp', 'station', 'PM2.5', 'PM10', 'address', 'city', 'latitude', 'longitude', 'state' - Example row: ['2023-01-01', 'StationA', 45.67, 78.9, '123 Main St', 'Mumbai', 19.07, 72.87, 'Maharashtra'] - Frequency: daily - 'pollution' generally means 'PM2.5'. - PM2.5 guidelines: India: 60, WHO: 15. PM10 guidelines: India: 100, WHO: 50. 2. NCAP Funding Data (ncap_data): - Columns: 'city', 'state', 'funding_received', 'year', 'project', 'status' - Example row: ['Mumbai', 'Maharashtra', 10000000, 2022, 'Clean Air Project', 'Ongoing'] 3. State Population Data (states_data): - Columns: 'state', 'population', 'year', 'urban_population', 'rural_population' - Example row: ['Maharashtra', 123000000, 2021, 60000000, 63000000] You already have these dataframes loaded as df, ncap_data, and states_data. Do not read any files. Use these dataframes to answer questions about air quality, funding, or population. When aggregating, report standard deviation, standard error, and number of data points. Always report units. If a plot is required, follow the previous instructions for saving and reporting plots. If a question is about funding or population, use the relevant dataframe. """ import os import pandas as pd from pandasai import Agent, SmartDataframe from typing import Tuple from PIL import Image from pandasai.llm import HuggingFaceTextGen from dotenv import load_dotenv from langchain_groq import ChatGroq from langchain_google_genai import ChatGoogleGenerativeAI import matplotlib.pyplot as plt import json from datetime import datetime from dotenv import load_dotenv # FORCE reload environment variables load_dotenv(override=True) Groq_Token = os.getenv("GROQ_API_KEY") hf_token = os.getenv("HF_TOKEN") gemini_token = os.getenv("GEMINI_TOKEN") import uuid # FORCE reload environment variables models = { "gpt-oss-20b": "openai/gpt-oss-20b", "gpt-oss-120b": "openai/gpt-oss-120b", "llama3.1": "llama-3.1-8b-instant", "llama3.3": "llama-3.3-70b-versatile", "deepseek-R1": "deepseek-r1-distill-llama-70b", "llama4 maverik":"meta-llama/llama-4-maverick-17b-128e-instruct", "llama4 scout":"meta-llama/llama-4-scout-17b-16e-instruct", "gemini-pro": "gemini-1.5-pro" } def log_interaction(user_query, model_name, response_content, generated_code, execution_time, error_message=None, is_image=False): """Log user interactions to Hugging Face dataset""" try: if not hf_token or hf_token.strip() == "": print("Warning: HF_TOKEN not available, skipping logging") return # Create log entry log_entry = { "timestamp": datetime.now().isoformat(), "session_id": str(uuid.uuid4()), "user_query": user_query, "model_name": model_name, "response_content": str(response_content), "generated_code": generated_code or "", "execution_time_seconds": execution_time, "error_message": error_message or "", "is_image_output": is_image, "success": error_message is None } # Create DataFrame df = pd.DataFrame([log_entry]) # Create unique filename with timestamp timestamp_str = datetime.now().strftime("%Y%m%d_%H%M%S") random_id = str(uuid.uuid4())[:8] filename = f"interaction_log_{timestamp_str}_{random_id}.parquet" # Save locally first local_path = f"/tmp/{filename}" df.to_parquet(local_path, index=False) # Clean up local file if os.path.exists(local_path): os.remove(local_path) print(f"Successfully logged interaction locally: {filename}") except Exception as e: print(f"Error logging interaction: {e}") def preprocess_and_load_df(path: str) -> pd.DataFrame: """Load and preprocess the dataframe""" try: df = pd.read_csv(path) df["Timestamp"] = pd.to_datetime(df["Timestamp"]) return df except Exception as e: raise Exception(f"Error loading dataframe: {e}") def load_smart_df(df: pd.DataFrame, inference_server: str, name="mistral") -> SmartDataframe: """Load smart dataframe with error handling""" try: if name == "gemini-pro": if not gemini_token or gemini_token.strip() == "": raise ValueError("Gemini API token not available or empty") llm = ChatGoogleGenerativeAI( model=models[name], google_api_key=gemini_token, temperature=0.1 ) else: if not Groq_Token or Groq_Token.strip() == "": raise ValueError("Groq API token not available or empty") llm = ChatGroq( model=models[name], api_key=Groq_Token, temperature=0.1 ) smart_df = SmartDataframe(df, config={"llm": llm, "max_retries": 5, "enable_cache": False}) return smart_df except Exception as e: raise Exception(f"Error loading smart dataframe: {e}") try: response = agent.chat(prompt) execution_time = (datetime.now() - start_time).total_seconds() gen_code = getattr(agent, 'last_code_generated', '') ex_code = getattr(agent, 'last_code_executed', '') last_prompt = getattr(agent, 'last_prompt', prompt) # Log the interaction log_interaction( user_query=prompt, model_name="pandas_ai_agent", response_content=response, generated_code=gen_code, execution_time=execution_time, error_message=None, is_image=isinstance(response, str) and any(response.endswith(ext) for ext in ['.png', '.jpg', '.jpeg']) ) return { "role": "assistant", "content": response, "gen_code": gen_code, "ex_code": ex_code, "last_prompt": last_prompt, "error": None } except Exception as e: execution_time = (datetime.now() - start_time).total_seconds() error_msg = str(e) # Log the failed interaction log_interaction( user_query=prompt, model_name="pandas_ai_agent", response_content=f"Error: {error_msg}", generated_code="", execution_time=execution_time, error_message=error_msg, is_image=False ) return { "role": "assistant", "content": f"Error: {error_msg}", "gen_code": "", "ex_code": "", "last_prompt": prompt, "error": error_msg } def decorate_with_code(response: dict) -> str: """Decorate response with code details""" gen_code = response.get("gen_code", "No code generated") last_prompt = response.get("last_prompt", "No prompt") return f"""
Generated Code ```python {gen_code} ```
Prompt {last_prompt} """ def show_response(st, response): """Display response with error handling""" try: with st.chat_message(response["role"]): content = response.get("content", "No content") try: # Try to open as image image = Image.open(content) if response.get("gen_code"): st.markdown(decorate_with_code(response), unsafe_allow_html=True) st.image(image) return {"is_image": True} except: # Not an image, display as text if response.get("gen_code"): display_content = decorate_with_code(response) + f"""
{content}""" else: display_content = content st.markdown(display_content, unsafe_allow_html=True) return {"is_image": False} except Exception as e: st.error(f"Error displaying response: {e}") return {"is_image": False} def ask_question(model_name, question): """Ask question with comprehensive error handling and logging""" start_time = datetime.now() try: # Reload environment variables to get fresh values load_dotenv(override=True) fresh_groq_token = os.getenv("GROQ_API_KEY") fresh_gemini_token = os.getenv("GEMINI_TOKEN") print(f"ask_question - Fresh Groq Token: {'Present' if fresh_groq_token else 'Missing'}") # Check API availability with fresh tokens if model_name == "gemini-pro": if not fresh_gemini_token or fresh_gemini_token.strip() == "": execution_time = (datetime.now() - start_time).total_seconds() error_msg = "Missing or empty API token" # Log the failed interaction log_interaction( user_query=question, model_name=model_name, response_content="❌ Gemini API token not available or empty", generated_code="", execution_time=execution_time, error_message=error_msg, is_image=False ) return { "role": "assistant", "content": "❌ Gemini API token not available or empty. Please set GEMINI_TOKEN in your environment variables.", "gen_code": "", "ex_code": "", "last_prompt": question, "error": error_msg } llm = ChatGoogleGenerativeAI( model=models[model_name], google_api_key=fresh_gemini_token, temperature=0 ) else: if not fresh_groq_token or fresh_groq_token.strip() == "": execution_time = (datetime.now() - start_time).total_seconds() error_msg = "Missing or empty API token" # Log the failed interaction log_interaction( user_query=question, model_name=model_name, response_content="❌ Groq API token not available or empty", generated_code="", execution_time=execution_time, error_message=error_msg, is_image=False ) return { "role": "assistant", "content": "❌ Groq API token not available or empty. Please set GROQ_API_KEY in your environment variables and restart the application.", "gen_code": "", "ex_code": "", "last_prompt": question, "error": error_msg } # Test the API key by trying to create the client try: llm = ChatGroq( model=models[model_name], api_key=fresh_groq_token, temperature=0.1 ) # Test with a simple call to verify the API key works test_response = llm.invoke("Test") print("API key test successful") except Exception as api_error: execution_time = (datetime.now() - start_time).total_seconds() error_msg = str(api_error) if "organization_restricted" in error_msg.lower() or "unauthorized" in error_msg.lower(): response_content = "❌ API Key Error: Your Groq API key appears to be invalid, expired, or restricted. Please check your API key in the .env file." log_error_msg = f"API key validation failed: {error_msg}" else: response_content = f"❌ API Connection Error: {error_msg}" log_error_msg = error_msg # Log the failed interaction log_interaction( user_query=question, model_name=model_name, response_content=response_content, generated_code="", execution_time=execution_time, error_message=log_error_msg, is_image=False ) return { "role": "assistant", "content": response_content, "gen_code": "", "ex_code": "", "last_prompt": question, "error": log_error_msg } # Check if data file exists if not os.path.exists("Data.csv"): execution_time = (datetime.now() - start_time).total_seconds() error_msg = "Data file not found" # Log the failed interaction log_interaction( user_query=question, model_name=model_name, response_content="❌ Data.csv file not found", generated_code="", execution_time=execution_time, error_message=error_msg, is_image=False ) return { "role": "assistant", "content": "❌ Data.csv file not found. Please ensure the data file is in the correct location.", "gen_code": "", "ex_code": "", "last_prompt": question, "error": error_msg } df_check = pd.read_csv("Data.csv") df_check["Timestamp"] = pd.to_datetime(df_check["Timestamp"]) df_check = df_check.head(5) new_line = "\n" parameters = {"font.size": 12, "figure.dpi": 600} template = f"""```python import pandas as pd import matplotlib.pyplot as plt import uuid plt.rcParams.update({parameters}) df = pd.read_csv("Data.csv") df["Timestamp"] = pd.to_datetime(df["Timestamp"]) # Available columns and data types: {new_line.join(map(lambda x: '# '+x, str(df_check.dtypes).split(new_line)))} # Question: {question.strip()} # Generate code to answer the question and save result in 'answer' variable # If creating a plot, save it with a unique filename and store the filename in 'answer' # If returning text/numbers, store the result directly in 'answer' ```""" system_prompt = """You are a helpful assistant that generates Python code for data analysis. Rules: 1. Always save your final result in a variable called 'answer' 2. If creating a plot, save it with plt.savefig() and store the filename in 'answer' 3. If returning text/numbers, store the result directly in 'answer' 4. Use descriptive variable names and add comments 5. Handle potential errors gracefully 6. For plots, use unique filenames to avoid conflicts """ query = f"""{system_prompt} Complete the following code to answer the user's question: {template} """ # Make API call if model_name == "gemini-pro": response = llm.invoke(query) answer = response.content else: response = llm.invoke(query) answer = response.content # Extract and execute code try: if "```python" in answer: code_part = answer.split("```python")[1].split("```")[0] else: code_part = answer full_code = f""" {template.split("```python")[1].split("```")[0]} {code_part} """ # Execute code in a controlled environment local_vars = {} global_vars = { 'pd': pd, 'plt': plt, 'os': os, 'uuid': __import__('uuid') } exec(full_code, global_vars, local_vars) # Get the answer if 'answer' in local_vars: answer_result = local_vars['answer'] else: answer_result = "No answer variable found in generated code" execution_time = (datetime.now() - start_time).total_seconds() # Determine if output is an image is_image = isinstance(answer_result, str) and any(answer_result.endswith(ext) for ext in ['.png', '.jpg', '.jpeg']) # Log successful interaction log_interaction( user_query=question, model_name=model_name, response_content=str(answer_result), generated_code=full_code, execution_time=execution_time, error_message=None, is_image=is_image ) return { "role": "assistant", "content": answer_result, "gen_code": full_code, "ex_code": full_code, "last_prompt": question, "error": None } except Exception as code_error: execution_time = (datetime.now() - start_time).total_seconds() error_msg = str(code_error) # Log the failed code execution log_interaction( user_query=question, model_name=model_name, response_content=f"❌ Error executing generated code: {error_msg}", generated_code=full_code if 'full_code' in locals() else "", execution_time=execution_time, error_message=error_msg, is_image=False ) return { "role": "assistant", "content": f"❌ Error executing generated code: {error_msg}", "gen_code": full_code if 'full_code' in locals() else "", "ex_code": full_code if 'full_code' in locals() else "", "last_prompt": question, "error": error_msg } except Exception as e: execution_time = (datetime.now() - start_time).total_seconds() error_msg = str(e) # Handle specific API errors if "organization_restricted" in error_msg: response_content = "❌ API Organization Restricted: Your API key access has been restricted. Please check your Groq API key or try generating a new one." log_error_msg = "API access restricted" elif "rate_limit" in error_msg.lower(): response_content = "❌ Rate limit exceeded. Please wait a moment and try again." log_error_msg = "Rate limit exceeded" else: response_content = f"❌ Error: {error_msg}" log_error_msg = error_msg # Log the failed interaction log_interaction( user_query=question, model_name=model_name, response_content=response_content, generated_code="", execution_time=execution_time, error_message=log_error_msg, is_image=False ) return { "role": "assistant", "content": response_content, "gen_code": "", "ex_code": "", "last_prompt": question, "error": log_error_msg }