PolicySummarizer / crew.py
Nadasr's picture
Upload 4 files
7fae465 verified
"""
CrewAI Configuration - Policy Summarizer
"""
import os
from crewai import Agent, Task, Crew, Process
from tools.web_scraper import web_scraper_tool
from tools.text_analyzer import text_analyzer_tool
from utils.logger import log_agent_action, clear_logs
def create_agents():
"""Create the 3 agents"""
orchestrator = Agent(
role="Policy Analysis Orchestrator",
goal="Coordinate the policy analysis and create a user-friendly summary",
backstory="""You are an expert at analyzing legal documents and presenting
complex information in simple terms. You coordinate the analysis workflow.""",
verbose=True,
allow_delegation=True
)
scraper = Agent(
role="Web Content Scraper",
goal="Extract clean policy text from web URLs",
backstory="""You specialize in web scraping and content extraction.
You can extract policy text while filtering out irrelevant content.""",
verbose=True,
allow_delegation=False,
tools=[web_scraper_tool]
)
analyzer = Agent(
role="Policy Analyzer",
goal="Analyze policies to identify key points, rights, and concerns",
backstory="""You are a legal expert who analyzes terms of service and
privacy policies. You identify user rights and potential red flags.""",
verbose=True,
allow_delegation=False,
tools=[text_analyzer_tool]
)
return orchestrator, scraper, analyzer
def create_tasks(orchestrator, scraper, analyzer, url: str):
"""Create the tasks for each agent"""
scrape_task = Task(
description=f"""
Scrape the policy content from: {url}
Use the web_scraper_tool to fetch and extract the text.
Return the full policy text content.
""",
expected_output="The extracted policy text content",
agent=scraper
)
analyze_task = Task(
description="""
Analyze the scraped policy content:
1. Use text_analyzer_tool to identify key sections
2. Find user rights (deletion, access, opt-out, etc.)
3. Identify concerns and red flags
4. Note data collection and sharing practices
""",
expected_output="Structured analysis with sections, rights, and concerns",
agent=analyzer,
context=[scrape_task]
)
summary_task = Task(
description="""
Create a user-friendly summary with these sections:
## πŸ“„ Policy Summary
[3-5 key points about this policy]
## βœ… Your Rights
[List user rights with brief explanations]
## ⚠️ Concerns & Warnings
[List red flags with severity: πŸ”΄ High, 🟑 Medium, 🟒 Low]
## πŸ’‘ Recommendation
[Overall assessment and advice]
Use simple language, avoid legal jargon.
""",
expected_output="A formatted, user-friendly policy summary",
agent=orchestrator,
context=[scrape_task, analyze_task]
)
return [scrape_task, analyze_task, summary_task]
def run_policy_analysis(url: str) -> str:
"""Main function to analyze a policy URL"""
clear_logs()
log_agent_action(
agent_name="System",
action="Starting Analysis",
input_summary=f"URL length: {len(url)}",
output_summary="Initializing agents...",
duration_seconds=0,
success=True
)
try:
orchestrator, scraper, analyzer = create_agents()
tasks = create_tasks(orchestrator, scraper, analyzer, url)
crew = Crew(
agents=[orchestrator, scraper, analyzer],
tasks=tasks,
process=Process.sequential,
verbose=True
)
result = crew.kickoff()
return str(result)
except Exception as e:
return f"❌ Error: {str(e)}"