|
from typing import Dict, List, Any, Optional, Callable |
|
import pandas as pd |
|
import numpy as np |
|
from pathlib import Path |
|
|
|
class PandasDataTools: |
|
"""Tools for data analysis operations on CSV files.""" |
|
|
|
def __init__(self, csv_directory: str): |
|
"""Initialize with directory containing CSV files.""" |
|
self.csv_directory = csv_directory |
|
self.dataframes = {} |
|
|
|
def _load_dataframe(self, filename: str) -> pd.DataFrame: |
|
"""Load a CSV file as DataFrame, with caching.""" |
|
if filename not in self.dataframes: |
|
file_path = Path(self.csv_directory) / filename |
|
if not file_path.exists() and not filename.endswith('.csv'): |
|
file_path = Path(self.csv_directory) / f"{filename}.csv" |
|
|
|
if file_path.exists(): |
|
self.dataframes[filename] = pd.read_csv(file_path) |
|
else: |
|
raise ValueError(f"CSV file not found: {filename}") |
|
|
|
return self.dataframes[filename] |
|
|
|
def get_tools(self) -> List[Dict[str, Any]]: |
|
"""Get all available data tools.""" |
|
tools = [ |
|
{ |
|
"name": "describe_csv", |
|
"description": "Get statistical description of a CSV file", |
|
"function": self.describe_csv |
|
}, |
|
{ |
|
"name": "filter_data", |
|
"description": "Filter CSV data based on conditions", |
|
"function": self.filter_data |
|
}, |
|
{ |
|
"name": "group_and_aggregate", |
|
"description": "Group data and calculate aggregate statistics", |
|
"function": self.group_and_aggregate |
|
}, |
|
{ |
|
"name": "sort_data", |
|
"description": "Sort data by specified columns", |
|
"function": self.sort_data |
|
}, |
|
{ |
|
"name": "calculate_correlation", |
|
"description": "Calculate correlation between columns", |
|
"function": self.calculate_correlation |
|
} |
|
] |
|
return tools |
|
|
|
|
|
def describe_csv(self, filename: str) -> Dict[str, Any]: |
|
"""Get statistical description of CSV data.""" |
|
df = self._load_dataframe(filename) |
|
description = df.describe().to_dict() |
|
|
|
|
|
result = { |
|
"statistics": description, |
|
"shape": df.shape, |
|
"columns": df.columns.tolist(), |
|
"dtypes": {col: str(dtype) for col, dtype in df.dtypes.items()} |
|
} |
|
|
|
return result |
|
|
|
def filter_data(self, filename: str, column: str, condition: str, value: Any) -> Dict[str, Any]: |
|
"""Filter data based on condition (==, >, <, >=, <=, !=, contains).""" |
|
df = self._load_dataframe(filename) |
|
|
|
if condition == "==": |
|
filtered = df[df[column] == value] |
|
elif condition == ">": |
|
filtered = df[df[column] > float(value)] |
|
elif condition == "<": |
|
filtered = df[df[column] < float(value)] |
|
elif condition == ">=": |
|
filtered = df[df[column] >= float(value)] |
|
elif condition == "<=": |
|
filtered = df[df[column] <= float(value)] |
|
elif condition == "!=": |
|
filtered = df[df[column] != value] |
|
elif condition.lower() == "contains": |
|
filtered = df[df[column].astype(str).str.contains(str(value))] |
|
else: |
|
return {"error": f"Unsupported condition: {condition}"} |
|
|
|
return { |
|
"result_count": len(filtered), |
|
"results": filtered.head(10).to_dict(orient="records"), |
|
"total_count": len(df) |
|
} |
|
|
|
def group_and_aggregate(self, filename: str, group_by: str, agg_column: str, |
|
agg_function: str = "mean") -> Dict[str, Any]: |
|
"""Group by column and calculate aggregate statistic.""" |
|
df = self._load_dataframe(filename) |
|
|
|
agg_functions = { |
|
"mean": np.mean, |
|
"sum": np.sum, |
|
"min": np.min, |
|
"max": np.max, |
|
"count": len, |
|
"median": np.median |
|
} |
|
|
|
if agg_function not in agg_functions: |
|
return {"error": f"Unsupported aggregation function: {agg_function}"} |
|
|
|
grouped = df.groupby(group_by)[agg_column].agg(agg_functions[agg_function]) |
|
|
|
return { |
|
"group_by": group_by, |
|
"aggregated_column": agg_column, |
|
"aggregation": agg_function, |
|
"results": grouped.to_dict() |
|
} |
|
|
|
def sort_data(self, filename: str, sort_by: str, ascending: bool = True) -> Dict[str, Any]: |
|
"""Sort data by column.""" |
|
df = self._load_dataframe(filename) |
|
|
|
sorted_df = df.sort_values(by=sort_by, ascending=ascending) |
|
|
|
return { |
|
"sorted_by": sort_by, |
|
"ascending": ascending, |
|
"results": sorted_df.head(10).to_dict(orient="records") |
|
} |
|
|
|
def calculate_correlation(self, filename: str, column1: str, column2: str) -> Dict[str, Any]: |
|
"""Calculate correlation between two columns.""" |
|
df = self._load_dataframe(filename) |
|
|
|
try: |
|
correlation = df[column1].corr(df[column2]) |
|
return { |
|
"correlation": correlation, |
|
"column1": column1, |
|
"column2": column2 |
|
} |
|
except Exception as e: |
|
return {"error": f"Could not calculate correlation: {str(e)}"} |
|
|