Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
import re | |
import json | |
import sys | |
import contextlib | |
from io import StringIO | |
import time | |
import logging | |
from src.utils.logger import Logger | |
import textwrap | |
logger = Logger(__name__, level="INFO", see_time=False, console_log=False) | |
def stdoutIO(stdout=None): | |
old = sys.stdout | |
if stdout is None: | |
stdout = StringIO() | |
sys.stdout = stdout | |
yield stdout | |
sys.stdout = old | |
def clean_print_statements(code_block): | |
""" | |
This function cleans up any `print()` statements that might contain unwanted `\n` characters. | |
It ensures print statements are properly formatted without unnecessary newlines. | |
""" | |
# This regex targets print statements, even if they have newlines inside | |
return re.sub(r'print\((.*?)(\\n.*?)(.*?)\)', r'print(\1\3)', code_block, flags=re.DOTALL) | |
def remove_code_block_from_summary(summary): | |
# use regex to remove code block from summary list | |
summary = re.sub(r'```python\n(.*?)\n```', '', summary) | |
return summary.split("\n") | |
def remove_main_block(code): | |
# Match the __main__ block | |
pattern = r'(?m)^if\s+__name__\s*==\s*["\']__main__["\']\s*:\s*\n((?:\s+.*\n?)*)' | |
match = re.search(pattern, code) | |
if match: | |
main_block = match.group(1) | |
# Dedent the code block inside __main__ | |
dedented_block = textwrap.dedent(main_block) | |
# Remove \n from any print statements in the block (also handling multiline print cases) | |
dedented_block = clean_print_statements(dedented_block) | |
# Replace the block in the code | |
cleaned_code = re.sub(pattern, dedented_block, code) | |
# Optional: Remove leading newlines if any | |
cleaned_code = cleaned_code.strip() | |
return cleaned_code | |
return code | |
def format_code_block(code_str): | |
code_clean = re.sub(r'^```python\n?', '', code_str, flags=re.MULTILINE) | |
code_clean = re.sub(r'\n```$', '', code_clean) | |
return f'\n{code_clean}\n' | |
def format_code_backticked_block(code_str): | |
code_clean = re.sub(r'^```python\n?', '', code_str, flags=re.MULTILINE) | |
code_clean = re.sub(r'\n```$', '', code_clean) | |
# Only match assignments at top level (not indented) | |
# 1. Remove 'df = pd.DataFrame()' if it's at the top level | |
# Remove reading the csv file if it's already in the context | |
modified_code = re.sub(r"df\s*=\s*pd\.read_csv\([\"\'].*?[\"\']\).*?(\n|$)", '', code_clean) | |
# Only match assignments at top level (not indented) | |
# 1. Remove 'df = pd.DataFrame()' if it's at the top level | |
modified_code = re.sub( | |
r"^df\s*=\s*pd\.DataFrame\(\s*\)\s*(#.*)?$", | |
'', | |
modified_code, | |
flags=re.MULTILINE | |
) | |
# # Remove sample dataframe lines with multiple array values | |
modified_code = re.sub(r"^# Sample DataFrames?.*?(\n|$)", '', modified_code, flags=re.MULTILINE | re.IGNORECASE) | |
# # Remove plt.show() statements | |
modified_code = re.sub(r"plt\.show\(\).*?(\n|$)", '', modified_code) | |
# remove main | |
code_clean = remove_main_block(modified_code) | |
return f'```python\n{code_clean}\n```' | |
# In format_response.py, modify the execute_code function: | |
def execute_code_from_markdown(code_str, dataframe=None): | |
import pandas as pd | |
import plotly.express as px | |
import plotly | |
import plotly.graph_objects as go | |
import matplotlib.pyplot as plt | |
import seaborn as sns | |
import numpy as np | |
context = { | |
'pd': pd, | |
'px': px, | |
'go': go, | |
'plt': plt, | |
'plotly': plotly, | |
'__builtins__': __builtins__, | |
'__import__': __import__, | |
'sns': sns, | |
'np': np, | |
'json_outputs': [] # List to store multiple Plotly JSON outputs | |
} | |
# If a dataframe is provided, add it to the context | |
if dataframe is not None: | |
context['df'] = dataframe | |
# Modify code to store multiple JSON outputs | |
modified_code = re.sub( | |
r'(\w*_?)fig(\w*)\.show\(\)', | |
r'json_outputs.append(plotly.io.to_json(\1fig\2, pretty=True))', | |
code_str | |
) | |
modified_code = re.sub( | |
r'(\w*_?)fig(\w*)\.to_html\(.*?\)', | |
r'json_outputs.append(plotly.io.to_json(\1fig\2, pretty=True))', | |
modified_code | |
) | |
# Remove reading the csv file if it's already in the context | |
modified_code = re.sub(r"df\s*=\s*pd\.read_csv\([\"\'].*?[\"\']\).*?(\n|$)", '', modified_code) | |
# Only match assignments at top level (not indented) | |
# 1. Remove 'df = pd.DataFrame()' if it's at the top level | |
modified_code = re.sub( | |
r"^df\s*=\s*pd\.DataFrame\(\s*\)\s*(#.*)?$", | |
'', | |
modified_code, | |
flags=re.MULTILINE | |
) | |
# # Remove sample dataframe lines with multiple array values | |
modified_code = re.sub(r"^# Sample DataFrames?.*?(\n|$)", '', modified_code, flags=re.MULTILINE | re.IGNORECASE) | |
# # Remove plt.show() statements | |
modified_code = re.sub(r"plt\.show\(\).*?(\n|$)", '', modified_code) | |
# Only add df = pd.read_csv() if no dataframe was provided and the code contains pd.read_csv | |
if dataframe is None and 'pd.read_csv' not in modified_code: | |
modified_code = re.sub( | |
r'import pandas as pd', | |
r'import pandas as pd\n\n# Read Housing.csv\ndf = pd.read_csv("Housing.csv")', | |
modified_code | |
) | |
try: | |
with stdoutIO() as s: | |
exec(modified_code, context) # Execute the modified code | |
output = s.getvalue() | |
json_outputs = context.get('json_outputs', []) | |
return output, json_outputs | |
except Exception as e: | |
return "Error executing code: " + str(e), [] | |
def format_response_to_markdown(api_response, agent_name = None, dataframe=None): | |
try: | |
markdown = [] | |
logger.log_message(f"API response for {agent_name} at {time.strftime('%Y-%m-%d %H:%M:%S')}: {api_response}", level=logging.INFO) | |
if isinstance(api_response, dict): | |
for key in api_response: | |
if "error" in api_response[key]: | |
return f"**Error**: Rate limit exceeded. Please try switching models from the settings." | |
# You can add more checks here if needed for other keys | |
# Handle error responses | |
if isinstance(api_response, dict) and "error" in api_response: | |
return f"**Error**: {api_response['error']}" | |
if "response" in api_response and isinstance(api_response['response'], str): | |
if any(err in api_response['response'].lower() for err in ["auth", "api", "lm"]): | |
return "**Error**: Authentication failed. Please check your API key in settings and try again." | |
if "model" in api_response['response'].lower(): | |
return "**Error**: Model configuration error. Please verify your model selection in settings." | |
for agent, content in api_response.items(): | |
agent = agent.split("__")[0] if "__" in agent else agent | |
if "memory" in agent or not content: | |
continue | |
markdown.append(f"\n## {agent.replace('_', ' ').title()}\n") | |
if agent == "analytical_planner": | |
if 'plan_desc' in content: | |
markdown.append(f"### Reasoning\n{content['plan_desc']}\n") | |
else: | |
markdown.append(f"### Reasoning\n{content['rationale']}\n") | |
else: | |
if "rationale" in content: | |
markdown.append(f"### Reasoning\n{content['rationale']}\n") | |
if 'code' in content: | |
markdown.append(f"### Code Implementation\n{format_code_backticked_block(content['code'])}\n") | |
if agent_name is not None: | |
# execute the code | |
clean_code = format_code_block(content['code']) | |
output, json_outputs = execute_code_from_markdown(clean_code, dataframe) | |
if output: | |
markdown.append("### Execution Output\n") | |
markdown.append(f"```output\n{output}\n```\n") | |
if json_outputs: | |
markdown.append("### Plotly JSON Outputs\n") | |
for idx, json_output in enumerate(json_outputs): | |
if len(json_output) > 1000000: # If JSON is larger than 1MB | |
logger.log_message(f"Large JSON output detected: {len(json_output)} bytes", level=logging.WARNING) | |
markdown.append(f"```plotly\n{json_output}\n```\n") | |
if 'summary' in content: | |
# make the summary a bullet-point list | |
summary_lines = remove_code_block_from_summary(content['summary']) | |
summary_lines = content['summary'].split('\n') | |
# remove code block from summary | |
markdown.append("### Summary\n") | |
for line in summary_lines: | |
if line != "": | |
markdown.append(f"• {line.strip().replace('•', '').replace('-', '').replace('*', '') if line.strip().startswith('•') or line.strip().startswith('-') or line.strip().startswith('*') else line.strip()}\n") | |
if 'refined_complete_code' in content and 'summary' in content: | |
try: | |
if content['refined_complete_code'] is not None and content['refined_complete_code'] != "": | |
clean_code = format_code_block(content['refined_complete_code']) | |
markdown_code = format_code_backticked_block(content['refined_complete_code']) | |
output, json_outputs = execute_code_from_markdown(clean_code, dataframe) | |
elif "```python" in content['summary']: | |
clean_code = format_code_block(content['summary']) | |
markdown_code = format_code_backticked_block(content['summary']) | |
output, json_outputs = execute_code_from_markdown(clean_code, dataframe) | |
except Exception as e: | |
logger.log_message(f"Error in execute_code_from_markdown: {str(e)}", level=logging.ERROR) | |
markdown_code = f"**Error**: {str(e)}" | |
# continue | |
if markdown_code is not None: | |
markdown.append(f"### Refined Complete Code\n{markdown_code}\n") | |
if output: | |
markdown.append("### Execution Output\n") | |
markdown.append(f"```output\n{output}\n```\n") | |
if json_outputs: | |
markdown.append("### Plotly JSON Outputs\n") | |
for idx, json_output in enumerate(json_outputs): | |
markdown.append(f"```plotly\n{json_output}\n```\n") | |
# if agent_name is not None: | |
# if f"memory_{agent_name}" in api_response: | |
# markdown.append(f"### Memory\n{api_response[f'memory_{agent_name}']}\n") | |
except Exception as e: | |
logger.log_message(f"Error in format_response_to_markdown: {str(e)}", level=logging.ERROR) | |
return f"{str(e)}" | |
# logger.log_message(f"Generated markdown content for agent '{agent_name}' at {time.strftime('%Y-%m-%d %H:%M:%S')}: {markdown}, length: {len(markdown)}", level=logging.INFO) | |
if not markdown or len(markdown) <= 1: | |
logger.log_message(f"Generated markdown (ERROR) content for agent '{agent_name}' at {time.strftime('%Y-%m-%d %H:%M:%S')}: {markdown}, length: {len(markdown)}", level=logging.INFO) | |
return "Please provide a valid query..." | |
return '\n'.join(markdown) | |
# Example usage with dummy data | |
if __name__ == "__main__": | |
sample_response = { | |
"code_combiner_agent": { | |
"reasoning": "Sample reasoning for multiple charts.", | |
"refined_complete_code": """ | |
```python | |
import plotly.express as px | |
import pandas as pd | |
# Sample Data | |
df = pd.DataFrame({'Category': ['A', 'B', 'C'], 'Values': [10, 20, 30]}) | |
# First Chart | |
fig = px.bar(df, x='Category', y='Values', title='Bar Chart') | |
fig.show() | |
# Second Chart | |
fig2 = px.pie(df, values='Values', names='Category', title='Pie Chart') | |
fig2.show() | |
``` | |
""" | |
} | |
} | |
formatted_md = format_response_to_markdown(sample_response) |