Spaces:
Sleeping
Sleeping
""" | |
Core data processing and analysis logic for the PharmaCircle AI Data Analyst. | |
This module orchestrates the main analysis workflow: | |
1. Takes a user's natural language query. | |
2. Uses the LLM to generate a structured analysis plan. | |
3. Executes parallel queries against Solr for quantitative and qualitative data. | |
4. Generates a data visualization using the LLM. | |
5. Synthesizes the findings into a comprehensive, user-facing report. | |
""" | |
import json | |
import re | |
import datetime | |
import pandas as pd | |
import matplotlib.pyplot as plt | |
import seaborn as sns | |
import os | |
import concurrent.futures | |
import copy | |
import google.generativeai as genai | |
import urllib | |
from llm_prompts import ( | |
get_analysis_plan_prompt, | |
get_synthesis_report_prompt, | |
get_visualization_code_prompt | |
) | |
from extract_results import get_search_list_params | |
def parse_suggestions_from_report(report_text): | |
"""Extracts numbered suggestions from the report's markdown text.""" | |
suggestions_match = re.search(r"### (?:Deeper Dive: Suggested Follow-up Analyses|Suggestions for Further Exploration)\s*\n(.*?)$", report_text, re.DOTALL | re.IGNORECASE) | |
if not suggestions_match: return [] | |
suggestions_text = suggestions_match.group(1) | |
suggestions = re.findall(r"^\s*\d+\.\s*(.*)", suggestions_text, re.MULTILINE) | |
return [s.strip() for s in suggestions] | |
def llm_generate_analysis_plan_with_history(llm_model, natural_language_query, chat_history): | |
""" | |
Generates a complete analysis plan from a user query, considering chat history | |
and dynamic field suggestions from an external API. | |
""" | |
search_fields, search_name, field_mappings = [], "", {} | |
try: | |
# Call the external API to get dynamic fields, core name, and mappings | |
search_fields, search_name, field_mappings = get_search_list_params(natural_language_query) | |
print(f"API returned core: '{search_name}' with {len(search_fields)} fields and {len(field_mappings)} mappings.") | |
except Exception as e: | |
print(f"Warning: Could not retrieve dynamic search fields. Proceeding without them. Error: {e}") | |
# Determine the core name, default to 'news' if not provided by the API | |
core_name = search_name if search_name else 'news' | |
# Apply the field mappings to the suggestions before sending them to the LLM | |
mapped_search_fields = [] | |
if search_fields and field_mappings: | |
for field in search_fields: | |
original_name = field.get('field_name') | |
# Create a new dict to avoid modifying the original | |
mapped_field = field.copy() | |
if original_name in field_mappings: | |
mapped_field['field_name'] = field_mappings[original_name] | |
print(f"Mapped field '{original_name}' to '{mapped_field['field_name']}'") | |
mapped_search_fields.append(mapped_field) | |
else: | |
mapped_search_fields = search_fields | |
# Generate the prompt, passing the mapped fields and the dynamic core name | |
prompt = get_analysis_plan_prompt(natural_language_query, chat_history, mapped_search_fields, core_name) | |
try: | |
response = llm_model.generate_content(prompt) | |
cleaned_text = re.sub(r'```json\s*|\s*```', '', response.text, flags=re.MULTILINE | re.DOTALL).strip() | |
plan = json.loads(cleaned_text) | |
# Return the plan, the mapped fields for UI display, and the core name | |
return plan, mapped_search_fields, core_name | |
except Exception as e: | |
raw_response_text = response.text if 'response' in locals() else 'N/A' | |
print(f"Error in llm_generate_analysis_plan_with_history: {e}\nRaw Response:\n{raw_response_text}") | |
# Return None for the plan but still return other data for debugging | |
return None, mapped_search_fields, core_name | |
def execute_quantitative_query(solr_client, plan): | |
"""Executes the facet query to get aggregate data.""" | |
if not plan or 'quantitative_request' not in plan or 'json.facet' not in plan.get('quantitative_request', {}): | |
return None, None | |
try: | |
params = { | |
"q": plan.get('query_filter', '*_*'), | |
"rows": 0, | |
"json.facet": json.dumps(plan['quantitative_request']['json.facet']) | |
} | |
# Build the full Solr URL manually (for logging) from the client's current URL | |
base_url = f"{solr_client.url}/select" | |
query_string = urllib.parse.urlencode(params) | |
full_url = f"{base_url}?{query_string}" | |
print(f"[DEBUG] Solr QUANTITATIVE query URL: {full_url}") | |
results = solr_client.search(**params) | |
return results.raw_response.get("facets", {}), full_url | |
except Exception as e: | |
print(f"Error in quantitative query on core specified in client ({solr_client.url}): {e}") | |
return None, None | |
def execute_qualitative_query(solr_client, plan): | |
"""Executes the grouping query to get the best example docs.""" | |
if not plan or 'qualitative_request' not in plan: | |
return None, None | |
try: | |
qual_request = copy.deepcopy(plan['qualitative_request']) | |
params = { | |
"q": plan.get('query_filter', '*_*'), | |
"rows": 5, # Get a few examples per group | |
"fl": "*,score", | |
**qual_request | |
} | |
# Build the full Solr URL manually (for logging) from the client's current URL | |
base_url = f"{solr_client.url}/select" | |
query_string = urllib.parse.urlencode(params) | |
full_url = f"{base_url}?{query_string}" | |
print(f"[DEBUG] Solr QUALITATIVE query URL: {full_url}") | |
results = solr_client.search(**params) | |
return results.grouped, full_url | |
except Exception as e: | |
print(f"Error in qualitative query on core specified in client ({solr_client.url}): {e}") | |
return None, None | |
def llm_synthesize_enriched_report_stream(llm_model, query, quantitative_data, qualitative_data, plan): | |
""" | |
Generates an enriched report by synthesizing quantitative aggregates | |
and qualitative examples, and streams the result. | |
""" | |
prompt = get_synthesis_report_prompt(query, quantitative_data, qualitative_data, plan) | |
try: | |
response_stream = llm_model.generate_content(prompt, stream=True) | |
for chunk in response_stream: | |
yield chunk.text | |
except Exception as e: | |
print(f"Error in llm_synthesize_enriched_report_stream: {e}") | |
yield "Sorry, I was unable to generate a report for this data." | |
def llm_generate_visualization_code(llm_model, query_context, facet_data): | |
"""Generates Python code for visualization based on query and data.""" | |
prompt = get_visualization_code_prompt(query_context, facet_data) | |
try: | |
generation_config = genai.types.GenerationConfig(temperature=0) | |
response = llm_model.generate_content(prompt, generation_config=generation_config) | |
code = re.sub(r'^```python\s*|```$', '', response.text, flags=re.MULTILINE) | |
return code | |
except Exception as e: | |
print(f"Error in llm_generate_visualization_code: {e}\nRaw response: {response.text}") | |
return None | |
def execute_viz_code_and_get_path(viz_code, facet_data): | |
"""Executes visualization code and returns the path to the saved plot image.""" | |
if not viz_code: return None | |
try: | |
if not os.path.exists('/tmp/plots'): os.makedirs('/tmp/plots') | |
plot_path = f"/tmp/plots/plot_{datetime.datetime.now().timestamp()}.png" | |
exec_globals = {'facet_data': facet_data, 'plt': plt, 'sns': sns, 'pd': pd} | |
exec(viz_code, exec_globals) | |
fig = exec_globals.get('fig') | |
if fig: | |
fig.savefig(plot_path, bbox_inches='tight') | |
plt.close(fig) | |
return plot_path | |
return None | |
except Exception as e: | |
print(f"ERROR executing visualization code: {e}\n---Code---\n{viz_code}") | |
return None |