causal-agent / tests /process_qrdata_benchmark.py
FireShadow's picture
Initial clean commit
1721aea
import csv
import json
import os
import time
from auto_causal.agent import run_causal_analysis
import auto_causal.components.output_formatter as cs_output_formatter
# Remove the direct import of cs_method_executor if it causes issues, we'll use importlib
# import auto_causal.tools.method_executor_tool as cs_method_executor
import importlib # Import importlib
# --- Configuration ---
# Absolute path as specified by user for the output log file
OUTPUT_LOG_FILE = "Project/fork_/causalscientist/tests/output/qr_data_4o-mini_latest"
# Relative path to the input CSV file from the workspace root
INPUT_CSV_PATH = "benchmark/qr_revised.csv"
# Prefix for constructing dataset paths
DATA_FILES_BASE_DIR = "benchmark/all_data_1/"
# --- Placeholder for the core analysis function ---
# This function needs to be implemented or imported from elsewhere.
# For the purpose of this script, it's a placeholder.
def benchmark_causal_analysis(natural_language_query: str, dataset_path: str, data_description: str):
"""
Placeholder for the actual causal analysis function.
This function would typically perform the analysis based on the inputs.
"""
print(f"[INFO] run_causal_analysis called with:")
print(f" Natural Language Query: '{natural_language_query}'")
print(f" Dataset Path: '{dataset_path}'")
# print(f" Data Description: '{data_description[:100]}...' (truncated)") # Truncate for brevity if needed
# Simulate some processing time
# time.sleep(0.1) # Optional: Simulate work
run_causal_analysis(natural_language_query, dataset_path, data_description)
# TODO: Replace this with actual analysis logic.
# Example: Simulate failure for demonstration purposes.
# import random
# # Fail if "example_fail_condition" is in the query or randomly
# if "example_fail_condition" in natural_language_query.lower() or random.random() < 0.1: # ~10% chance of failure
# print("[WARN] Simulating a failure in run_causal_analysis.")
# raise Exception("Simulated analysis error from run_causal_analysis")
print(f"[INFO] run_causal_analysis for '{dataset_path}' completed successfully.")
# Actual implementation might return a result or have side effects.
def main():
# Set the log file path for the output_formatter module
cs_output_formatter.CURRENT_OUTPUT_LOG_FILE = OUTPUT_LOG_FILE
# Set the log file path for the method_executor_tool module using importlib
try:
method_executor_module_name = "auto_causal.tools.method_executor_tool"
cs_method_executor_module = importlib.import_module(method_executor_module_name)
cs_method_executor_module.CURRENT_OUTPUT_LOG_FILE = OUTPUT_LOG_FILE
print(f"[INFO] Successfully set CURRENT_OUTPUT_LOG_FILE for {method_executor_module_name} to: {OUTPUT_LOG_FILE}")
except Exception as e:
print(f"[ERROR] Failed to set CURRENT_OUTPUT_LOG_FILE for method_executor_tool: {e}")
# Decide if you want to return or continue if this fails
return
# Ensure the output directory for the log file exists
output_log_dir = os.path.dirname(OUTPUT_LOG_FILE)
if not os.path.exists(output_log_dir):
try:
os.makedirs(output_log_dir)
print(f"Created directory: {output_log_dir}")
except OSError as e:
print(f"[ERROR] Failed to create directory '{output_log_dir}': {e}")
return # Stop if we can't create the log directory
current_query_sequence_number = 0
processed_csv_rows = 0
print(f"Starting processing of CSV: {INPUT_CSV_PATH}")
print(f"Output log will be written to: {OUTPUT_LOG_FILE}")
try:
with open(INPUT_CSV_PATH, mode='r', newline='', encoding='utf-8') as csv_file:
csv_reader = csv.DictReader(csv_file)
if not csv_reader.fieldnames:
print(f"[ERROR] CSV file '{INPUT_CSV_PATH}' is empty or has no header.")
return
required_columns = ['data_description', 'natural_language_query', 'data_files']
missing_cols = [col for col in required_columns if col not in csv_reader.fieldnames]
if missing_cols:
print(f"[ERROR] Missing required columns in CSV file '{INPUT_CSV_PATH}': {', '.join(missing_cols)}")
print(f"Available columns: {csv_reader.fieldnames}")
return
for row_number, row in enumerate(csv_reader, 1):
processed_csv_rows += 1
data_description = row.get('data_description', '').strip()
natural_language_query = row.get('natural_language_query', '').strip()
data_files_string = row.get('data_files', '').strip()
answer = row.get('answer', '').strip()
if not data_files_string:
print(f"[WARN] CSV Row {row_number}: 'data_files' field is empty. Skipping.")
continue
individual_files = [f.strip() for f in data_files_string.split(',') if f.strip()]
if not individual_files:
print(f"[WARN] CSV Row {row_number}: 'data_files' contained only separators or was effectively empty after stripping. Original: '{data_files_string}'. Skipping.")
continue
for file_name in individual_files:
current_query_sequence_number += 1
dataset_path = os.path.join(DATA_FILES_BASE_DIR, file_name)
log_data = {
"query_number": current_query_sequence_number,
"natural_language_query": natural_language_query,
"dataset_path": dataset_path,
"answer": answer
}
try:
with open(OUTPUT_LOG_FILE, mode='a', encoding='utf-8') as log_file:
log_file.write('\n' + json.dumps(log_data) + '\n')
except IOError as e:
print(f"[ERROR] Failed to write pre-analysis log for query #{current_query_sequence_number} to '{OUTPUT_LOG_FILE}': {e}")
continue # Skip to next file/row if logging fails
successful_analysis = False
for attempt in range(2): # Attempt 0 (first try), Attempt 1 (retry)
try:
print(f"[INFO] --- Starting Analysis (Attempt {attempt + 1}/2) ---")
print(f"[INFO] Query Sequence #: {current_query_sequence_number}")
print(f"[INFO] CSV Row: {row_number}, File: '{file_name}'")
benchmark_causal_analysis(
natural_language_query=natural_language_query,
dataset_path=dataset_path,
data_description=data_description
)
successful_analysis = True
print(f"[INFO] --- Analysis Successful (Attempt {attempt + 1}/2) ---")
break
except Exception as e:
print(f"[ERROR] run_causal_analysis failed on attempt {attempt + 1}/2 for query #{current_query_sequence_number}: {e}")
if attempt == 1: # This was the retry, and it also failed
print(f"[INFO] Both attempts failed for query #{current_query_sequence_number}.")
try:
with open(OUTPUT_LOG_FILE, mode='a', encoding='utf-8') as log_file:
log_file.write(f"\n{current_query_sequence_number}:Failed\n")
except IOError as ioe_fail:
print(f"[ERROR] Failed to write failure status for query #{current_query_sequence_number} to '{OUTPUT_LOG_FILE}': {ioe_fail}")
else:
print(f"[INFO] Will retry query #{current_query_sequence_number}.")
# time.sleep(1) # Optional: wait a bit before retrying
except FileNotFoundError:
print(f"[ERROR] Input CSV file not found: '{INPUT_CSV_PATH}'")
except Exception as e:
print(f"[ERROR] An unexpected error occurred during script execution: {e}")
import traceback
traceback.print_exc()
finally:
print(f"--- Script finished ---")
print(f"Total CSV rows processed: {processed_csv_rows}")
print(f"Total analysis calls attempted (query_number): {current_query_sequence_number}")
print(f"Log file: {OUTPUT_LOG_FILE}")
if __name__ == "__main__":
main()