from crewai import Agent, Task, Crew import gradio as gr import asyncio from typing import List, Generator, Any, Dict, Union from langchain_openai import ChatOpenAI import queue import threading import os class AgentMessageQueue: def __init__(self): self.message_queue = queue.Queue() self.final_output = None def add_message(self, message: Dict): self.message_queue.put(message) def get_messages(self) -> List[Dict]: messages = [] while not self.message_queue.empty(): messages.append(self.message_queue.get()) return messages def set_final_output(self, output: str): self.final_output = output def get_final_output(self) -> str: return self.final_output class ArticleCrew: def __init__(self, api_key: str = None): self.api_key = api_key self.message_queue = AgentMessageQueue() self.planner = None self.writer = None self.editor = None def initialize_agents(self, topic: str): if not self.api_key: raise ValueError("OpenAI API key is required") os.environ["OPENAI_API_KEY"] = self.api_key llm = ChatOpenAI( temperature=0.7, model="gpt-4" ) self.planner = Agent( role="Content Planner", goal=f"Plan engaging and factually accurate content on {topic}", backstory=f"You're working on planning a blog article about the topic: {topic}. " "You collect information that helps the audience learn something " "and make informed decisions.", allow_delegation=False, verbose=True, llm=llm ) self.writer = Agent( role="Content Writer", goal=f"Write insightful and factually accurate opinion piece about the topic: {topic}", backstory=f"You're working on writing a new opinion piece about the topic: {topic}. " "You base your writing on the work of the Content Planner.", allow_delegation=False, verbose=True, llm=llm ) self.editor = Agent( role="Editor", goal="Edit a given blog post to align with the writing style", backstory="You are an editor who receives a blog post from the Content Writer.", allow_delegation=False, verbose=True, llm=llm ) def create_tasks(self, topic: str): if not self.planner or not self.writer or not self.editor: self.initialize_agents(topic) plan_task = Task( description=( f"1. Prioritize the latest trends, key players, and noteworthy news on {topic}.\n" f"2. Identify the target audience, considering their interests and pain points.\n" f"3. Develop a detailed content outline including introduction, key points, and call to action.\n" f"4. Include SEO keywords and relevant data or sources." ), expected_output="A comprehensive content plan document with an outline, audience analysis, SEO keywords, and resources.", agent=self.planner ) write_task = Task( description=( "1. Use the content plan to craft a compelling blog post.\n" "2. Incorporate SEO keywords naturally.\n" "3. Sections/Subtitles are properly named in an engaging manner.\n" "4. Ensure proper structure with introduction, body, and conclusion.\n" "5. Proofread for grammatical errors." ), expected_output="A well-written blog post in markdown format, ready for publication.", agent=self.writer ) edit_task = Task( description="Proofread the given blog post for grammatical errors and alignment with the brand's voice.", expected_output="A well-written blog post in markdown format, ready for publication.", agent=self.editor ) return [plan_task, write_task, edit_task] async def process_article(self, topic: str) -> Generator[List[Dict], None, None]: def step_callback(output: Any) -> None: try: output_str = str(output).strip() # Extract agent name if "# Agent:" in output_str: agent_name = output_str.split("# Agent:")[1].split("\n")[0].strip() else: agent_name = "Agent" # Extract task or final answer if "## Task:" in output_str: content = output_str.split("## Task:")[1].split("\n#")[0].strip() self.message_queue.add_message({ "role": "assistant", "content": content, "metadata": {"title": f"📋 {agent_name}'s Task"} }) elif "## Final Answer:" in output_str: content = output_str.split("## Final Answer:")[1].strip() if agent_name == "Editor": # For Editor's final answer, store it for later self.message_queue.set_final_output(content) self.message_queue.add_message({ "role": "assistant", "content": content, "metadata": {"title": f"✅ {agent_name}'s Output"} }) else: self.message_queue.add_message({ "role": "assistant", "content": output_str, "metadata": {"title": f"💭 {agent_name} thinking"} }) except Exception as e: print(f"Error in step_callback: {str(e)}") def task_callback(output: Any) -> None: try: content = str(output) if hasattr(output, 'agent'): agent_name = str(output.agent) else: agent_name = "Agent" self.message_queue.add_message({ "role": "assistant", "content": content.strip(), "metadata": {"title": f"✅ Task completed by {agent_name}"} }) # If this is the Editor's task completion, add the final article if agent_name == "Editor": final_content = self.message_queue.get_final_output() if final_content: self.message_queue.add_message({ "role": "assistant", "content": "Here's your completed article:", "metadata": {"title": "📝 Final Article"} }) self.message_queue.add_message({ "role": "assistant", "content": final_content }) self.message_queue.add_message({ "role": "assistant", "content": "Article generation completed!", "metadata": {"title": "✨ Complete"} }) except Exception as e: print(f"Error in task_callback: {str(e)}") self.initialize_agents(topic) crew = Crew( agents=[self.planner, self.writer, self.editor], tasks=self.create_tasks(topic), verbose=True, step_callback=step_callback, task_callback=task_callback ) # Start notification yield [{ "role": "assistant", "content": "Starting work on your article...", "metadata": {"title": "🚀 Process Started"} }] # Run crew in a separate thread result_container = [] def run_crew(): try: result = crew.kickoff(inputs={"topic": topic}) result_container.append(result) except Exception as e: result_container.append(e) print(f"Error occurred: {str(e)}") thread = threading.Thread(target=run_crew) thread.start() # Stream messages while crew is working while thread.is_alive() or not self.message_queue.message_queue.empty(): messages = self.message_queue.get_messages() if messages: yield messages await asyncio.sleep(0.1) def create_demo(): article_crew = None with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("# 📝 AI Article Writing Crew") gr.Markdown("Watch as this AI Crew collaborates to create your article! This application utilizes [CrewAI](https://www.crewai.com/) agents: Content Planner, Content Writer, and Content Editor, to write an article on any topic you choose. To get started, enter your OpenAI API Key below and press Enter!") openai_api_key = gr.Textbox( label='OpenAI API Key', type='password', placeholder='Type your OpenAI API key and press Enter!', interactive=True ) chatbot = gr.Chatbot( label="Writing Process", avatar_images=(None, "https://avatars.githubusercontent.com/u/170677839?v=4"), height=700, type="messages", show_label=True, visible=False, value=[] ) with gr.Row(equal_height=True): topic = gr.Textbox( label="Article Topic", placeholder="Enter the topic you want an article about...", scale=4, visible=False ) async def process_input(topic, history, api_key): nonlocal article_crew if not api_key: history.append({ "role": "assistant", "content": "Please provide an OpenAI API key first.", "metadata": {"title": "❌ Error"} }) yield history # Changed from return to yield return # Early return without value # Initialize or update ArticleCrew with API key if article_crew is None: article_crew = ArticleCrew(api_key=api_key) else: article_crew.api_key = api_key # Add user message history.append({ "role": "user", "content": f"Write an article about: {topic}" }) yield history try: async for messages in article_crew.process_article(topic): history.extend(messages) yield history except Exception as e: history.append({ "role": "assistant", "content": f"An error occurred: {str(e)}", "metadata": {"title": "❌ Error"} }) yield history btn = gr.Button("Write Article", variant="primary", scale=1, visible=False) def show_interface(): return { openai_api_key: gr.Textbox(visible=False), chatbot: gr.Chatbot(visible=True), topic: gr.Textbox(visible=True), btn: gr.Button(visible=True) } openai_api_key.submit( show_interface, None, [openai_api_key, chatbot, topic, btn] ) btn.click( process_input, inputs=[topic, chatbot, openai_api_key], # Added openai_api_key back as input outputs=[chatbot] ) return demo if __name__ == "__main__": demo = create_demo() demo.queue() demo.launch(debug=True)