project67 / file_management.py
SmokeyBandit's picture
Update file_management.py
6f40526 verified
import os
import json
import uuid
import traceback
import logging
from pathlib import Path
import numpy as np
import pandas as pd
from typing import Dict, Any
# Import the base intelligent agent.
# Adjust this import as necessary if you move IntelligentAgent to a separate module.
logger = logging.getLogger(__name__)
from app.py import IntelligentAgent
class FileManagementAgent(IntelligentAgent):
def __init__(self, hub):
super().__init__("file_management", hub)
def process_task(self, task: str) -> Dict[str, Any]:
logger.info(f"FileManagementAgent processing: {task}")
task_lower = task.lower()
if any(word in task_lower for word in ["create", "make", "generate", "write"]):
operation = "create"
elif any(word in task_lower for word in ["read", "open", "show", "display", "content"]):
operation = "read"
elif any(word in task_lower for word in ["list", "find", "directory", "folder", "files in"]):
operation = "list"
elif any(word in task_lower for word in ["delete", "remove"]):
operation = "delete"
else:
operation = "unknown"
filename = None
file_extensions = ['.txt', '.json', '.csv', '.md', '.py', '.html', '.js', '.css']
words = task.split()
for word in words:
for ext in file_extensions:
if ext in word.lower():
filename = word.strip(':"\'.,;')
break
if filename:
break
if not filename:
file_keywords = ["file", "named", "called", "filename"]
for i, word in enumerate(words):
if word.lower() in file_keywords and i < len(words) - 1:
potential_name = words[i+1].strip(':"\'.,;')
if '.' not in potential_name:
if "json" in task_lower:
potential_name += ".json"
elif "csv" in task_lower:
potential_name += ".csv"
elif "python" in task_lower or "py" in task_lower:
potential_name += ".py"
else:
potential_name += ".txt"
filename = potential_name
break
if not filename:
if "json" in task_lower:
filename = f"data_{uuid.uuid4().hex[:6]}.json"
elif "csv" in task_lower:
filename = f"data_{uuid.uuid4().hex[:6]}.csv"
elif "python" in task_lower or "py" in task_lower:
filename = f"script_{uuid.uuid4().hex[:6]}.py"
elif "log" in task_lower:
filename = f"log_{uuid.uuid4().hex[:6]}.txt"
else:
filename = f"file_{uuid.uuid4().hex[:6]}.txt"
result = {}
if operation == "create":
if filename.endswith('.json'):
content = json.dumps({
"name": "Sample Data",
"description": task,
"created": pd.Timestamp.now().isoformat(),
"values": [1, 2, 3, 4, 5],
"metadata": {"source": "FileManagementAgent", "version": "1.0"}
}, indent=2)
elif filename.endswith('.csv'):
content = "id,name,value,timestamp\n"
for i in range(5):
content += f"{i+1},Item{i+1},{np.random.randint(1, 100)},{pd.Timestamp.now().isoformat()}\n"
elif filename.endswith('.py'):
content = f"""# Generated Python Script: {filename}
# Created: {pd.Timestamp.now().isoformat()}
# Description: {task}
def main():
print("Hello from the FileManagementAgent!")
data = [1, 2, 3, 4, 5]
result = sum(data)
print(f"Sample calculation: sum(data) = {{result}}")
return result
if __name__ == "__main__":
main()
"""
else:
content = f"File created by FileManagementAgent\nCreated: {pd.Timestamp.now().isoformat()}\nBased on request: {task}\n\nThis is sample content."
try:
with open(filename, 'w', encoding='utf-8') as f:
f.write(content)
result = {
"text": f"Successfully created file: {filename}",
"operation": "create",
"filename": filename,
"size": len(content),
"preview": content[:200] + "..." if len(content) > 200 else content
}
self.memory.add_short_term({"operation": "create", "filename": filename, "timestamp": pd.Timestamp.now().isoformat()})
self.memory.add_long_term(f"file:{filename}", {"operation": "create", "type": Path(filename).suffix, "timestamp": pd.Timestamp.now().isoformat()})
except Exception as e:
error_msg = f"Error creating file {filename}: {str(e)}"
logger.error(error_msg)
result = {"text": error_msg, "error": str(e)}
elif operation == "read":
if not filename:
result = {"text": "Please specify a filename to read."}
elif not Path(filename).exists():
result = {"text": f"File '{filename}' not found."}
else:
try:
with open(filename, 'r', encoding='utf-8') as f:
content = f.read()
result = {"text": f"Content of {filename}:\n\n{content}", "operation": "read", "filename": filename, "content": content, "size": len(content)}
self.memory.add_short_term({"operation": "read", "filename": filename, "timestamp": pd.Timestamp.now().isoformat()})
except Exception as e:
error_msg = f"Error reading file {filename}: {str(e)}"
logger.error(error_msg)
result = {"text": error_msg, "error": str(e)}
elif operation == "list":
try:
directory = "."
for term in ["directory", "folder", "in"]:
if term in task_lower:
parts = task_lower.split(term)
if len(parts) > 1:
potential_dir = parts[1].strip().split()[0].strip(':"\'.,;')
if Path(potential_dir).exists() and Path(potential_dir).is_dir():
directory = potential_dir
extension_filter = None
for ext in file_extensions:
if ext in task_lower:
extension_filter = ext
break
files = list(Path(directory).glob('*' + (extension_filter or '')))
file_groups = {}
for file in files:
file_groups.setdefault(file.suffix, []).append({
"name": file.name,
"size": file.stat().st_size,
"modified": pd.Timestamp(file.stat().st_mtime, unit='s').isoformat()
})
response_text = f"Found {len(files)} files" + (f" with extension {extension_filter}" if extension_filter else "") + f" in {directory}:\n\n"
for ext, group in file_groups.items():
response_text += f"{ext} files ({len(group)}):\n"
for file_info in sorted(group, key=lambda x: x["name"]):
size_kb = file_info["size"] / 1024
response_text += f"- {file_info['name']} ({size_kb:.1f} KB, modified: {file_info['modified']})\n"
response_text += "\n"
result = {"text": response_text, "operation": "list", "directory": directory, "file_count": len(files), "files": file_groups}
self.memory.add_short_term({"operation": "list", "directory": directory, "file_count": len(files), "timestamp": pd.Timestamp.now().isoformat()})
except Exception as e:
error_msg = f"Error listing files: {str(e)}"
logger.error(error_msg)
result = {"text": error_msg, "error": str(e)}
elif operation == "delete":
if not filename:
result = {"text": "Please specify a filename to delete."}
elif not Path(filename).exists():
result = {"text": f"File '{filename}' not found."}
else:
try:
os.remove(filename)
result = {"text": f"Successfully deleted file: {filename}", "operation": "delete", "filename": filename}
self.memory.add_short_term({"operation": "delete", "filename": filename, "timestamp": pd.Timestamp.now().isoformat()})
self.memory.add_long_term(f"file:{filename}", {"operation": "delete", "timestamp": pd.Timestamp.now().isoformat()})
except Exception as e:
error_msg = f"Error deleting file {filename}: {str(e)}"
logger.error(error_msg)
result = {"text": error_msg, "error": str(e)}
else:
result = {"text": f"Unknown operation requested in task: {task}"}
return result