Spaces:
Sleeping
Sleeping
| import logging | |
| import os | |
| import uuid | |
| import aiohttp | |
| import json | |
| import httpx | |
| from typing import Annotated | |
| from typing import TypedDict, List, Optional, Literal | |
| from typing_extensions import TypedDict | |
| from pydantic import BaseModel, Field | |
| from trafilatura import extract | |
| from langchain_core.messages import AIMessage, HumanMessage, AnyMessage, ToolCall, SystemMessage, ToolMessage | |
| from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder | |
| from langchain_core.tools import tool | |
| from langchain_community.tools import TavilySearchResults | |
| from langgraph.graph.state import CompiledStateGraph | |
| from langgraph.graph import StateGraph, START, END, add_messages | |
| from langgraph.prebuilt import ToolNode | |
| from langgraph.prebuilt import ToolNode, tools_condition | |
| from langgraph.checkpoint.memory import MemorySaver | |
| from langgraph.types import Command, interrupt | |
| from langchain_anthropic import ChatAnthropic | |
| from langchain_openai import ChatOpenAI | |
| from mistralai import Mistral | |
| from langchain.chat_models import init_chat_model | |
| from langchain_core.messages.utils import convert_to_openai_messages | |
| class State(TypedDict): | |
| messages: Annotated[list, add_messages] | |
| class DebugToolNode(ToolNode): | |
| async def invoke(self, state, config=None): | |
| print("🛠️ ToolNode activated") | |
| print(f"Available tools: {[tool.name for tool in self.tool_map.values()]}") | |
| print(f"Tool calls in last message: {state.messages[-1].tool_calls}") | |
| return await super().invoke(state, config) | |
| logger = logging.getLogger(__name__) | |
| ASSISTANT_SYSTEM_PROMPT_BASE = """""" | |
| search_enabled = bool(os.environ.get("TAVILY_API_KEY")) | |
| try: | |
| with open('brainstorming_system_prompt.txt', 'r') as file: | |
| brainstorming_system_prompt = file.read() | |
| except FileNotFoundError: | |
| print("File 'system_prompt.txt' not found!") | |
| except Exception as e: | |
| print(f"Error reading file: {e}") | |
| def evaluate_idea_completion(response) -> bool: | |
| """ | |
| Evaluates whether the assistant's response indicates a complete DIY project idea. | |
| You can customize the logic based on your specific criteria. | |
| """ | |
| # Example logic: Check if the response contains certain keywords | |
| required_keywords = ["materials", "dimensions", "tools", "steps"] | |
| # Determine the type of response and extract text accordingly | |
| if isinstance(response, dict): | |
| # If response is a dictionary, extract values and join them into a single string | |
| response_text = ' '.join(str(value).lower() for value in response.values()) | |
| elif isinstance(response, str): | |
| # If response is a string, convert it to lowercase | |
| response_text = response.lower() | |
| else: | |
| # If response is of an unexpected type, convert it to string and lowercase | |
| response_text = str(response).lower() | |
| return all(keyword in response_text for keyword in required_keywords) | |
| async def human_assistance(query: str) -> str: | |
| """Request assistance from a human.""" | |
| human_response = await interrupt({"query": query}) # async wait | |
| return human_response["data"] | |
| async def download_website_text(url: str) -> str: | |
| """Download the text from a website""" | |
| try: | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get(url) as response: | |
| response.raise_for_status() | |
| downloaded = await response.text() | |
| result = extract(downloaded, include_formatting=True, include_links=True, output_format='json', with_metadata=True) | |
| return result or "No text found on the website" | |
| except Exception as e: | |
| logger.error(f"Failed to download {url}: {str(e)}") | |
| return f"Error retrieving website content: {str(e)}" | |
| async def finalize_idea() -> str: | |
| """Marks the brainstorming phase as complete. This function does nothing else.""" | |
| return "Brainstorming finalized." | |
| tools = [download_website_text, human_assistance,finalize_idea] | |
| memory = MemorySaver() | |
| if search_enabled: | |
| tavily_search_tool = TavilySearchResults( | |
| max_results=5, | |
| search_depth="advanced", | |
| include_answer=True, | |
| include_raw_content=True, | |
| ) | |
| tools.append(tavily_search_tool) | |
| else: | |
| print("TAVILY_API_KEY environment variable not found. Websearch disabled") | |
| weak_model = ChatOpenAI( | |
| model="gpt-4o", | |
| temperature=0, | |
| max_tokens=None, | |
| timeout=None, | |
| max_retries=2, | |
| # api_key="...", # if you prefer to pass api key in directly instaed of using env vars | |
| # base_url="...", | |
| # organization="...", | |
| # other params... | |
| ) | |
| api_key = os.environ["MISTRAL_API_KEY"] | |
| model = "mistral-large-latest" | |
| client = Mistral(api_key=api_key) | |
| # ChatAnthropic( | |
| # model="claude-3-5-sonnet-20240620", | |
| # temperature=0, | |
| # max_tokens=1024, | |
| # timeout=None, | |
| # max_retries=2, | |
| # # other params... | |
| # ) | |
| search_enabled = bool(os.environ.get("TAVILY_API_KEY")) | |
| if not os.environ.get("OPENAI_API_KEY"): | |
| print('Open API key not found') | |
| prompt_planning_model = ChatOpenAI( | |
| model="gpt-4o", | |
| temperature=0, | |
| max_tokens=None, | |
| timeout=None, | |
| max_retries=2, | |
| # api_key="...", # if you prefer to pass api key in directly instaed of using env vars | |
| # base_url="...", | |
| # organization="...", | |
| # other params... | |
| ) | |
| threed_object_gen_model = ChatOpenAI( | |
| model="gpt-4o", | |
| temperature=0, | |
| max_tokens=None, | |
| timeout=None, | |
| max_retries=2, | |
| # api_key="...", # if you prefer to pass api key in directly instaed of using env vars | |
| # base_url="...", | |
| # organization="...", | |
| # other params... | |
| ) | |
| model = weak_model | |
| assistant_model = weak_model | |
| class GraphProcessingState(BaseModel): | |
| # user_input: str = Field(default_factory=str, description="The original user input") | |
| messages: Annotated[list[AnyMessage], add_messages] = Field(default_factory=list) | |
| prompt: str = Field(default_factory=str, description="The prompt to be used for the model") | |
| tools_enabled: dict = Field(default_factory=dict, description="The tools enabled for the assistant") | |
| search_enabled: bool = Field(default=True, description="Whether to enable search tools") | |
| next_stage: str = Field(default="", description="The next stage to execute, decided by the guidance node.") | |
| tool_call_required: bool = Field(default=False, description="Whether a tool should be called from brainstorming.") | |
| loop_brainstorming: bool = Field(default=False, description="Whether to loop back to brainstorming for further iteration.") | |
| # Completion flags for each stage | |
| idea_complete: bool = Field(default=False) | |
| brainstorming_complete: bool = Field(default=False) | |
| planning_complete: bool = Field(default=False) | |
| drawing_complete: bool = Field(default=False) | |
| product_searching_complete: bool = Field(default=False) | |
| purchasing_complete: bool = Field(default=False) | |
| generated_image_url_from_dalle: str = Field(default="", description="The generated_image_url_from_dalle.") | |
| async def guidance_node(state: GraphProcessingState, config=None): | |
| # print(f"Prompt: {state.prompt}") | |
| # print(f"Prompt: {state.prompt}") | |
| # # print(f"Message: {state.messages}") | |
| # print(f"Tools Enabled: {state.tools_enabled}") | |
| # print(f"Search Enabled: {state.search_enabled}") | |
| # for message in state.messages: | |
| # print(f'\ncomplete message', message) | |
| # if isinstance(message, HumanMessage): | |
| # print(f"Human: {message.content}\n") | |
| # elif isinstance(message, AIMessage): | |
| # # Check if content is non-empty | |
| # if message.content: | |
| # # If content is a list (e.g., list of dicts), extract text | |
| # if isinstance(message.content, list): | |
| # texts = [item.get('text', '') for item in message.content if isinstance(item, dict) and 'text' in item] | |
| # if texts: | |
| # print(f"AI: {' '.join(texts)}\n") | |
| # elif isinstance(message.content, str): | |
| # print(f"AI: {message.content}") | |
| # elif isinstance(message, SystemMessage): | |
| # print(f"System: {message.content}\n") | |
| # elif isinstance(message, ToolMessage): | |
| # print(f"Tool: {message.content}\n") | |
| print("\n🕵️♀️🕵️♀️ | start | progress checking nodee \n") # Added a newline for clarity | |
| # print(f"Prompt: {state.prompt}\n") | |
| if state.messages: | |
| last_message = state.messages[-1] | |
| if isinstance(last_message, HumanMessage): | |
| print(f"🧑 Human: {last_message.content}\n") | |
| elif isinstance(last_message, AIMessage): | |
| if last_message.content: | |
| if isinstance(last_message.content, list): | |
| texts = [item.get('text', '') for item in last_message.content if isinstance(item, dict) and 'text' in item] | |
| if texts: | |
| print(f"🤖 AI: {' '.join(texts)}\n") | |
| elif isinstance(last_message.content, str): | |
| print(f"🤖 AI: {last_message.content}\n") | |
| elif isinstance(last_message, SystemMessage): | |
| print(f"⚙️ System: {last_message.content}\n") | |
| elif isinstance(last_message, ToolMessage): | |
| print(f"🛠️ Tool: {last_message.content}\n") | |
| else: | |
| print("\n(No messages found.)") | |
| # Log boolean completion flags | |
| # Define the order of stages | |
| stage_order = ["brainstorming", "planning", "drawing", "product_searching", "purchasing"] | |
| # Identify completed and incomplete stages | |
| completed = [stage for stage in stage_order if getattr(state, f"{stage}_complete", False)] | |
| incomplete = [stage for stage in stage_order if not getattr(state, f"{stage}_complete", False)] | |
| # Determine the next stage | |
| if not incomplete: | |
| # All stages are complete | |
| return { | |
| "messages": [AIMessage(content="All DIY project stages are complete!")], | |
| "next_stage": "end_project", | |
| "pending_approval_stage": None, | |
| } | |
| else: | |
| # Set the next stage to the first incomplete stage | |
| next_stage = incomplete[0] | |
| print(f"Next Stage: {state.next_stage}") | |
| print("\n🕵️♀️🕵️♀️ | end | progress checking nodee \n") # Added a newline for clarity | |
| return { | |
| "messages": [], | |
| "next_stage": next_stage, | |
| "pending_approval_stage": None, | |
| } | |
| def guidance_routing(state: GraphProcessingState) -> str: | |
| print("\n🔀🔀 Routing checkpoint 🔀🔀\n") | |
| print(f"Next Stage: {state.next_stage}\n") | |
| print(f"Brainstorming complete: {state.brainstorming_complete}") | |
| print(f"Prompt planing: {state.planning_complete}") | |
| print(f"Drwaing 3d model: {state.drawing_complete}") | |
| print(f"Finding products: {state.product_searching_complete}\n") | |
| next_stage = state.next_stage | |
| if next_stage == "brainstorming": | |
| return "brainstorming_node" | |
| elif next_stage == "planning": | |
| # return "generate_3d_node" | |
| return "prompt_planning_node" | |
| elif next_stage == "drawing": | |
| return "generate_3d_node" | |
| elif next_stage == "product_searching": | |
| print('\n may day may day may day may day may day') | |
| print(f"Prompt: {state.prompt}") | |
| print(f"Prompt: {state.prompt}") | |
| # print(f"Message: {state.messages}") | |
| print(f"Tools Enabled: {state.tools_enabled}") | |
| print(f"Search Enabled: {state.search_enabled}") | |
| for message in state.messages: | |
| print(f'\ncomplete message', message) | |
| if isinstance(message, HumanMessage): | |
| print(f"Human: {message.content}\n") | |
| elif isinstance(message, AIMessage): | |
| # Check if content is non-empty | |
| if message.content: | |
| # If content is a list (e.g., list of dicts), extract text | |
| if isinstance(message.content, list): | |
| texts = [item.get('text', '') for item in message.content if isinstance(item, dict) and 'text' in item] | |
| if texts: | |
| print(f"AI: {' '.join(texts)}\n") | |
| elif isinstance(message.content, str): | |
| print(f"AI: {message.content}") | |
| elif isinstance(message, SystemMessage): | |
| print(f"System: {message.content}\n") | |
| elif isinstance(message, ToolMessage): | |
| print(f"Tool: {message.content}\n") | |
| # return "drawing_node" | |
| # elif next_stage == "product_searching": | |
| # return "product_searching" | |
| # elif next_stage == "purchasing": | |
| # return "purchasing_node" | |
| return END | |
| async def brainstorming_node(state: GraphProcessingState, config=None): | |
| print("\n🧠🧠 | start | brainstorming Node \n") # Added a newline for clarity | |
| # Check if model is available | |
| if not model: | |
| return {"messages": [AIMessage(content="Model not available for brainstorming.")]} | |
| # Filter out messages with empty content | |
| filtered_messages = [ | |
| message for message in state.messages | |
| if isinstance(message, (HumanMessage, AIMessage, SystemMessage, ToolMessage)) and message.content | |
| ] | |
| # Ensure there is at least one message with content | |
| if not filtered_messages: | |
| filtered_messages.append(AIMessage(content="No valid messages provided.")) | |
| stage_order = ["brainstorming", "planning", "drawing", "product_searching", "purchasing"] | |
| completed = [stage for stage in stage_order if getattr(state, f"{stage}_complete", False)] | |
| incomplete = [stage for stage in stage_order if not getattr(state, f"{stage}_complete", False)] | |
| if not incomplete: | |
| print("All stages complete!") | |
| # Handle case where all stages are complete | |
| # You might want to return a message and end, or set proposed_next_stage to a special value | |
| ai_all_complete_msg = AIMessage(content="All DIY project stages are complete!") | |
| return { | |
| "messages": current_messages + [ai_all_complete_msg], | |
| "next_stage": "end_project", # Or None, or a final summary node | |
| "pending_approval_stage": None, | |
| } | |
| else: | |
| # THIS LINE DEFINES THE VARIABLE | |
| proposed_next_stage = incomplete[0] | |
| guidance_prompt_text = ( | |
| """ | |
| You are a warm, encouraging, and knowledgeable AI assistant, acting as a **Creative DIY Collaborator**. Your primary goal is to guide the user through a friendly and inspiring conversation to finalize **ONE specific, viable DIY project idea**. While we want to be efficient, the top priority is making the user feel heard, understood, and confident in their final choice. | |
| ⚠️ Your core directive remains speed and convergence: If you identify an idea that clearly meets ALL **Critical Criteria** and the user seems positive or neutral, you must suggest finalizing it **immediately**. Do NOT delay by offering too many alternatives once a solid candidate emerges. Your goal is to converge on a "good enough" idea the user is happy with, not to explore every possibility. | |
| **Your Conversational Style & Strategy:** | |
| 1. **Be an Active Listener:** Start by acknowledging and validating the user's input. Show you understand their core desire (e.g., "That sounds like a fun goal! Creating a custom piece for your living room is always rewarding."). | |
| 2. **Ask Inspiring, Open-Ended Questions:** Instead of generic questions, make them feel personal and insightful. | |
| * *Instead of:* "What do you want to build?" | |
| * *Try:* "What part of your home are you dreaming of improving?" or "Are you thinking of a gift for someone special, or a project just for you?" | |
| 3. **Act as a Knowledgeable Guide:** When a user is unsure, proactively suggest appealing ideas based on their subtle clues. Connect their interests to tangible projects. | |
| * *Example:* If the user mentions liking plants and having a small balcony, you could suggest: "That's great! We could think about a vertical herb garden to save space, or maybe some simple, stylish hanging macrame planters. Does either of those spark your interest?" | |
| 4. **Guide, Don't Just Gatekeep:** When an idea *almost* meets the criteria, don't just reject it. Gently guide it towards feasibility. | |
| * *Example:* "A full-sized dining table might require some specialized tools. How about we adapt that idea into a beautiful, buildable coffee table or a set of side tables using similar techniques?" | |
| **Critical Criteria for the Final DIY Project Idea (Your non-negotiable checklist):** | |
| 1. **Buildable:** Achievable by an average person with basic DIY skills. | |
| 2. **Common Materials/Tools:** Uses only materials (e.g., wood, screws, glue, paint, fabric, cardboard) and basic hand tools (e.g., screwdrivers, hammers, saws, drills) commonly available in general hardware stores, craft stores, or supermarkets worldwide. | |
| 3. **Avoid Specializations:** Explicitly AVOID projects requiring specialized electronic components, 3D printing, specific brand items not universally available, or complex machinery. | |
| 4. **Tangible Product:** The final result must be a physical, tangible item. | |
| **Your Internal Process (How you think on each turn):** | |
| 1. **THOUGHT:** | |
| * Clearly state your understanding of the user’s current input and conversational state. | |
| * Outline your plan: Engage with their latest input using your **Conversational Style**. Propose or refine an idea to meet the **Critical Criteria**. | |
| * **Tool Identification (`human_assistance`):** Decide if you need to ask a question. The question should be formulated according to the "Inspiring, Open-Ended Questions" principle. Clearly state your intention to use the `human_assistance` tool with the exact friendly and natural-sounding question as the `query`. | |
| * **Idea Finalization Check:** Check if the current idea satisfies ALL **Critical Criteria**. If yes, and the user shows no objection, move to finalize immediately. Remember: **good enough is final enough**. | |
| 2. **TOOL USE (`human_assistance` - If Needed):** | |
| * Invoke `human_assistance` with your well-formulated, friendly query. | |
| 3. **RESPONSE SYNTHESIS / IDEA FINALIZATION:** | |
| * **If an idea is finalized:** Respond *only* with the exact phrase: | |
| `IDEA FINALIZED: [Name of the Idea]` | |
| (e.g., `IDEA FINALIZED: Simple Wooden Spice Rack`) | |
| * **If brainstorming continues:** | |
| * Provide your engaging suggestions or refinements based on your **Conversational Style**. | |
| * Await the user response. | |
| **General Guidelines (Your core principles):** | |
| * **Empathy Over Pure Efficiency:** A positive, collaborative experience is the primary goal. Don't rush the user if they are still exploring. | |
| * **Criteria Focused:** Always gently guide ideas toward the **Critical Criteria**. | |
| * **One Main Idea at a Time:** Focus the conversation on a single project idea to avoid confusion. | |
| * **Rapid Convergence:** Despite the friendly tone, always be looking for the fastest path to a final, viable idea. | |
| """ | |
| ) | |
| if state.prompt: | |
| final_prompt = "\n".join([ guidance_prompt_text, state.prompt, ASSISTANT_SYSTEM_PROMPT_BASE]) | |
| else: | |
| final_prompt = "\n".join([ guidance_prompt_text, ASSISTANT_SYSTEM_PROMPT_BASE]) | |
| prompt = ChatPromptTemplate.from_messages( | |
| [ | |
| ("system", final_prompt), | |
| MessagesPlaceholder(variable_name="messages"), | |
| ] | |
| ) | |
| # Tools allowed for brainstorming | |
| node_tools = [human_assistance] | |
| if state.search_enabled and tavily_search_tool: # only add search tool if enabled and initialized | |
| node_tools.append(tavily_search_tool) | |
| mistraltools = [ | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "human_assistance", | |
| "description": "Ask a question from the user", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "query": { | |
| "type": "string", | |
| "query": "The transaction id.", | |
| } | |
| }, | |
| "required": ["query"], | |
| }, | |
| }, | |
| }, | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "finalize_idea", | |
| "description": "Handles finalized ideas. Saves or dispatches the confirmed idea for the next steps. but make sure you give your response with key word IDEA FINALIZED", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "idea_name": { | |
| "type": "string", | |
| "description": "The name of the finalized DIY idea.", | |
| } | |
| }, | |
| "required": ["idea_name"] | |
| } | |
| } | |
| } | |
| ] | |
| llm = init_chat_model("mistral-large-latest", model_provider="mistralai") | |
| llm_with_tools = llm.bind_tools(mistraltools) | |
| chain = prompt | llm_with_tools | |
| openai_messages = convert_to_openai_messages(state.messages) | |
| openai_messages_with_prompt = [ | |
| {"role": "system", "content": final_prompt}, # your guidance prompt | |
| *openai_messages # history you’ve already converted | |
| ] | |
| print('open ai formatted', openai_messages_with_prompt[-1]) | |
| for msg in openai_messages_with_prompt: | |
| print(msg) | |
| mistralmodel = "mistral-saba-2502" | |
| # Pass filtered messages to the chain | |
| try: | |
| # response = await chain.ainvoke({"messages": filtered_messages}, config=config) | |
| response = client.chat.complete( | |
| model = mistralmodel, | |
| messages = openai_messages_with_prompt, | |
| tools = mistraltools, | |
| tool_choice = "any", | |
| parallel_tool_calls = False, | |
| ) | |
| mistral_message = response.choices[0].message | |
| tool_call = response.choices[0].message.tool_calls[0] | |
| function_name = tool_call.function.name | |
| function_params = json.loads(tool_call.function.arguments) | |
| ai_message = AIMessage( | |
| content=mistral_message.content or "", # Use empty string if blank | |
| additional_kwargs={ | |
| "tool_calls": [ | |
| { | |
| "id": tool_call.id, | |
| "function": { | |
| "name": tool_call.function.name, | |
| "arguments": tool_call.function.arguments, | |
| }, | |
| "type": "function", # Add this if your chain expects it | |
| } | |
| ] | |
| } | |
| ) | |
| updates = { | |
| "messages": [ai_message], | |
| "tool_calls": [ | |
| { | |
| "name": function_name, | |
| "arguments": function_params, | |
| } | |
| ], | |
| "next": function_name, | |
| } | |
| print("\nfunction_name: ", function_name, "\nfunction_params: ", function_params) | |
| print('\n🔍 response from brainstorm\n', updates) | |
| if function_name == "finalize_idea": | |
| print('finalazing idea') | |
| state.brainstorming_complete = True | |
| updates["brainstorming_complete"] = True | |
| if isinstance(response, AIMessage) and response.content: | |
| print(' Identified last AI message', response) | |
| if isinstance(response.content, str): | |
| content = response.content.strip() | |
| elif isinstance(response.content, list): | |
| texts = [item.get("text", "") for item in response.content if isinstance(item, dict)] | |
| content = " ".join(texts).strip() | |
| else: | |
| content = str(response.content).strip() | |
| print('content for idea finalizing:', content) | |
| if "finalize_idea:" in content: # Use 'in' instead of 'startswith' | |
| print('✅ final idea') | |
| updates.update({ | |
| "brainstorming_complete": True, | |
| "tool_call_required": False, | |
| "loop_brainstorming": False, | |
| }) | |
| return updates | |
| else: | |
| # tool_calls = getattr(response, "tool_calls", None) | |
| if tool_call: | |
| print('🛠️ tool call requested at brainstorming node') | |
| updates.update({ | |
| "tool_call_required": True, | |
| "loop_brainstorming": False, | |
| }) | |
| if tool_call: | |
| tool_call = response.choices[0].message.tool_calls[0] | |
| function_name = tool_call.function.name | |
| function_params = json.loads(tool_call.function.arguments) | |
| print("\nfunction_name: ", function_name, "\nfunction_params: ", function_params) | |
| # for tool_call in response.tool_calls: | |
| # tool_name = tool_call['name'] | |
| # if tool_name == "human_assistance": | |
| # query = tool_call['args']['query'] | |
| # print(f"Human input needed: {query}") | |
| # for tool_call in tool_calls: | |
| # if isinstance(tool_call, dict) and 'name' in tool_call and 'args' in tool_call: | |
| # print(f"🔧 Tool Call (Dict): {tool_call.get('name')}, Args: {tool_call.get('args')}") | |
| # else: | |
| # print(f"🔧 Unknown tool_call format: {tool_call}") | |
| else: | |
| print('💬 decided tp keep brainstorming') | |
| updates.update({ | |
| "tool_call_required": False, | |
| "loop_brainstorming": True, | |
| }) | |
| print(f"Brainstorming continues: {content}") | |
| else: | |
| # If no proper response, keep looping brainstorming | |
| updates["tool_call_required"] = False | |
| updates["loop_brainstorming"] = True | |
| print("\n🧠🧠 | end | brainstorming Node \n") | |
| return updates | |
| except Exception as e: | |
| print(f"Error: {e}") | |
| return { | |
| "messages": [AIMessage(content="Error.")], | |
| "next_stage": "brainstorming" | |
| } | |
| async def prompt_planning_node(state: GraphProcessingState, config=None): | |
| print("\n🚩🚩 | start | prompt planing Node \n") | |
| # Ensure we have a model | |
| if not model: | |
| return {"messages": [AIMessage(content="Model not available for planning.")]} | |
| filtered_messages = state.messages | |
| # Filter out empty messages | |
| # filtered_messages = [ | |
| # msg for msg in state.messages | |
| # if isinstance(msg, (HumanMessage, AIMessage, SystemMessage, ToolMessage)) and msg.content | |
| # ] | |
| # filtered_messages = [] | |
| # for msg in state.messages: | |
| # if isinstance(msg, ToolMessage): | |
| # # 🛠️ ToolMessage needs to be paired with a prior assistant message that called the tool | |
| # tool_name = msg.name or "unknown_tool" | |
| # tool_call_id = msg.tool_call_id or "tool_call_id_missing" | |
| # # Simulated assistant message that initiated the tool call | |
| # fake_assistant_msg = AIMessage( | |
| # content="", | |
| # additional_kwargs={ | |
| # "tool_calls": [ | |
| # { | |
| # "id": tool_call_id, | |
| # "type": "function", | |
| # "function": { | |
| # "name": tool_name, | |
| # "arguments": json.dumps({"content": msg.content or ""}), | |
| # } | |
| # } | |
| # ] | |
| # } | |
| # ) | |
| # # Append both in correct sequence | |
| # filtered_messages.append(fake_assistant_msg) | |
| # filtered_messages.append(msg) | |
| # elif isinstance(msg, (HumanMessage, AIMessage, SystemMessage)) and msg.content: | |
| # filtered_messages.append(msg) | |
| # Fallback if list ends up empty | |
| if not filtered_messages: | |
| filtered_messages.append(AIMessage(content="No valid messages provided.")) | |
| # Define the system prompt for planning | |
| guidance_prompt_text = """ | |
| You are a creative and helpful AI assistant acting as a **DIY Project Brainstorming & 3D-Prompt Generator**. Your mission is to collaborate with the user to: | |
| 1. Brainstorm and refine one specific, viable DIY project idea. | |
| 2. Identify the single key component from that idea that should be 3D-modeled. | |
| 3. Produce a final, precise text prompt for an OpenAI 3D-generation endpoint. | |
| --- | |
| **Critical Criteria for the DIY Project** (must be met): | |
| • Buildable by an average person with only basic DIY skills. | |
| • Uses common materials/tools (e.g., wood, screws, glue, paint; hammer, saw, drill). | |
| • No specialized electronics, 3D printers, or proprietary parts. | |
| • Results in a tangible, physical item. | |
| --- | |
| **Available Tools** | |
| • human_assistance – ask the user clarifying questions. | |
| • (optional) your project-specific search tool – look up inspiration or standard dimensions if needed. | |
| --- | |
| **When the DIY idea is fully detailed and meets all criteria, output exactly and only:** | |
| ACCURATE PROMPT FOR MODEL GENERATING: [Your final single-paragraph prompt here] | |
| """ | |
| # Build final prompt | |
| if state.prompt: | |
| final_prompt = "\n".join([guidance_prompt_text, state.prompt, ASSISTANT_SYSTEM_PROMPT_BASE]) | |
| else: | |
| final_prompt = "\n".join([guidance_prompt_text, ASSISTANT_SYSTEM_PROMPT_BASE]) | |
| prompt = ChatPromptTemplate.from_messages([ | |
| ("system", final_prompt), | |
| MessagesPlaceholder(variable_name="messages"), | |
| ]) | |
| # Bind tools | |
| node_tools = [human_assistance] | |
| if state.search_enabled and tavily_search_tool: | |
| node_tools.append(tavily_search_tool) | |
| llm_with_tools = prompt_planning_model.bind_tools(node_tools) | |
| chain = prompt | llm_with_tools | |
| # print(' 👾👾👾👾Debugging the request going in to prompt planing model') | |
| # print("Prompt: ", prompt) | |
| # print("chain: ", chain) | |
| for msg in filtered_messages: | |
| print('✨msg : ',msg) | |
| print('\n') | |
| try: | |
| response = await chain.ainvoke({"messages": filtered_messages}, config=config) | |
| print('\nresponse ->: ', response) | |
| # Log any required human assistance query | |
| if hasattr(response, "tool_calls"): | |
| for call in response.tool_calls: | |
| if call.get("name") == "human_assistance": | |
| print(f"Human input needed: {call['args']['query']}") | |
| updates = {"messages": [response]} | |
| # Extract response text | |
| content = "" | |
| if isinstance(response.content, str): | |
| content = response.content.strip() | |
| elif isinstance(response.content, list): | |
| content = " ".join(item.get("text","") for item in response.content if isinstance(item, dict)).strip() | |
| # Check for finalization signalif "finalize_idea:" in content: | |
| if "ACCURATE PROMPT FOR MODEL GENERATING" in content: | |
| dalle_prompt_text = content.replace("ACCURATE PROMPT FOR MODEL GENERATING:", "").strip() | |
| print(f"\n🤖🤖🤖🤖Extracted DALL-E prompt: {dalle_prompt_text}") | |
| generated_image_url = None | |
| generated_3d_model_url = None # This will store the final 3D model URL | |
| # --- START: New code for DALL-E and Trellis API calls --- | |
| OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
| if not OPENAI_API_KEY: | |
| print("Error: OPENAI_API_KEY environment variable not set.") | |
| updates["messages"].append(AIMessage(content="OpenAI API key not configured. Cannot generate image.")) | |
| else: | |
| # 1. Call DALL-E API | |
| dalle_api_url = "https://api.openai.com/v1/images/generations" | |
| dalle_headers = { | |
| "Content-Type": "application/json", | |
| "Authorization": f"Bearer {OPENAI_API_KEY}" | |
| } | |
| _model_to_use_for_dalle_call = "dall-e-2" # <<< IMPORTANT: Set this to "dall-e-2" or "dall-e-3" | |
| _processed_prompt_text = dalle_prompt_text # Start with the original prompt | |
| _prompt_was_trimmed_or_issue_found = False | |
| _warning_or_error_message_for_updates = None | |
| max_prompt_lengths = { | |
| "dall-e-2": 1000, | |
| "dall-e-3": 4000, | |
| "gpt-image-1": 32000 # Included for completeness, though payload is for DALL-E | |
| } | |
| if not _processed_prompt_text: # Check for empty prompt | |
| _message = f"Error: The DALL-E prompt for model '{_model_to_use_for_dalle_call}' cannot be empty. API call will likely fail." | |
| print(f"\n🛑🛑🛑🛑 {_message}") | |
| _warning_or_error_message_for_updates = _message | |
| _prompt_was_trimmed_or_issue_found = True | |
| # NOTE: OpenAI API will return an error for an empty prompt. | |
| # If you want to prevent the call entirely here, you could add: | |
| # updates["messages"].append(AIMessage(content=_message)) | |
| # return # or raise an exception | |
| elif _model_to_use_for_dalle_call in max_prompt_lengths: | |
| _max_len = max_prompt_lengths[_model_to_use_for_dalle_call] | |
| _original_len = len(_processed_prompt_text) | |
| if _original_len > _max_len: | |
| _processed_prompt_text = _processed_prompt_text[:_max_len] | |
| _message = ( | |
| f"Warning: Prompt for DALL-E ({_model_to_use_for_dalle_call}) was {_original_len} characters. " | |
| f"It has been TRUNCATED to the maximum of {_max_len} characters." | |
| ) | |
| print(f"\n⚠️⚠️⚠️⚠️ {_message}") | |
| _warning_or_error_message_for_updates = _message | |
| _prompt_was_trimmed_or_issue_found = True | |
| else: | |
| # Model specified in _model_to_use_for_dalle_call is not in our length check dictionary | |
| _message = ( | |
| f"Notice: Model '{_model_to_use_for_dalle_call}' not found in pre-defined prompt length limits. " | |
| "Proceeding with the original prompt. API may reject if prompt is too long for this model." | |
| ) | |
| print(f"\nℹ️ℹ️ℹ️ℹ️ {_message}") | |
| # You might not want to add this specific notice to 'updates["messages"]' unless it's critical | |
| # _warning_or_error_message_for_updates = _message | |
| # _prompt_was_trimmed_or_issue_found = True # Or not, depending on how you view this | |
| # Add warning/error to updates if one was generated | |
| if _warning_or_error_message_for_updates: | |
| # Check if 'updates' and 'AIMessage' are available in the current scope to avoid errors | |
| if 'updates' in locals() and isinstance(updates, dict) and 'messages' in updates and 'AIMessage' in globals(): | |
| updates["messages"].append(AIMessage(content=_warning_or_error_message_for_updates)) | |
| elif 'updates' in globals() and isinstance(updates, dict) and 'messages' in updates: # If AIMessage isn't defined, just append string | |
| updates["messages"].append(_warning_or_error_message_for_updates) | |
| # --- Prompt Trimming Logic END --- | |
| dalle_payload = { | |
| "model": _model_to_use_for_dalle_call, # Use the model determined above | |
| "prompt": _processed_prompt_text, # Use the processed (potentially trimmed) prompt | |
| "n": 1, | |
| "size": "1024x1024" | |
| # You can add other DALL-E 3 specific params if _model_to_use_for_dalle_call is "dall-e-3" | |
| # e.g., "quality": "hd", "style": "vivid" | |
| } | |
| print(f"\n🤖🤖🤖🤖Calling DALL-E with prompt: {dalle_prompt_text}") | |
| async with aiohttp.ClientSession() as session: | |
| try: | |
| async with session.post(dalle_api_url, headers=dalle_headers, json=dalle_payload) as dalle_response: | |
| dalle_response.raise_for_status() # Raise an exception for HTTP errors | |
| dalle_data = await dalle_response.json() | |
| if dalle_data.get("data") and len(dalle_data["data"]) > 0: | |
| generated_image_url = dalle_data["data"][0].get("url") | |
| print(f"DALL-E generated image URL: {generated_image_url}") | |
| updates["messages"].append(AIMessage(content=f"Image generated by DALL-E: {generated_image_url}")) | |
| else: | |
| print("Error: DALL-E API did not return image data.") | |
| updates["messages"].append(AIMessage(content="Failed to get image from DALL-E.")) | |
| except aiohttp.ClientError as e: | |
| print(f"DALL-E API call error: {e}") | |
| updates["messages"].append(AIMessage(content=f"Error calling DALL-E: {e}")) | |
| except json.JSONDecodeError as e: | |
| print(f"DALL-E API JSON decode error: {e}. Response: {await dalle_response.text()}") | |
| updates["messages"].append(AIMessage(content=f"Error decoding DALL-E response: {e}")) | |
| except Exception as e: | |
| print(f"Unexpected error during DALL-E processing: {e}") | |
| updates["messages"].append(AIMessage(content=f"Unexpected error with DALL-E: {e}")) | |
| updates.update({ | |
| "generated_image_url_from_dalle": generated_image_url, | |
| "planning_complete": True, | |
| "tool_call_required": False, | |
| "loop_planning": False, | |
| }) | |
| else: | |
| # Check if a tool call was requested | |
| if getattr(response, "tool_calls", None): | |
| updates.update({ | |
| "tool_call_required": True, | |
| "loop_planning": False, | |
| }) | |
| else: | |
| updates.update({ | |
| "tool_call_required": False, | |
| "loop_planning": True, | |
| }) | |
| print("\n🚩🚩 | end | prompt planing Node \n") | |
| return updates | |
| except Exception as e: | |
| print(f"Error in prompt_planning node: {e}") | |
| return { | |
| "messages": [AIMessage(content="Error in prompt_planning node.")], | |
| "next_stage": state.next_stage or "planning" | |
| } | |
| async def generate_3d_node(state: GraphProcessingState, config=None): | |
| print("\n🚀🚀🚀 | start | Generate 3D Node 🚀🚀🚀\n") | |
| # 1. Get the image URL | |
| # For now, using a hardcoded URL as requested for testing. | |
| # In a real scenario, you might get this from the state: | |
| # image_url = state.get("image_url_for_3d") | |
| # if not image_url: | |
| # print("No image_url_for_3d found in state.") | |
| # return {"messages": [AIMessage(content="No image URL found for 3D generation.")]} | |
| hardcoded_image_url = state.generated_image_url_from_dalle | |
| print(f"Using hardcoded image_url: {hardcoded_image_url}") | |
| # 2. Define API endpoint and parameters | |
| api_base_url = "https://wishwa-code--trellis-3d-model-generate-dev.modal.run/" | |
| params = { | |
| "image_url": hardcoded_image_url, | |
| "simplify": "0.95", | |
| "texture_size": "1024", | |
| "sparse_sampling_steps": "12", | |
| "sparse_sampling_cfg": "7.5", | |
| "slat_sampling_steps": "12", | |
| "slat_sampling_cfg": "3", | |
| "seed": "42", | |
| "output_format": "glb" | |
| } | |
| # Create a directory to store generated models if it doesn't exist | |
| output_dir = "generated_3d_models" | |
| os.makedirs(output_dir, exist_ok=True) | |
| # 3. Attempt generation with retries | |
| for attempt in range(1, 2): | |
| print(f"Attempt {attempt} to call 3D generation API...") | |
| try: | |
| # Note: The API call can take a long time (1.5 mins in your curl example) | |
| # Ensure your HTTP client timeout is sufficient. | |
| # httpx default timeout is 5 seconds, which is too short. | |
| async with httpx.AsyncClient(timeout=120.0) as client: # Timeout set to 120 seconds | |
| response = await client.get(api_base_url, params=params) | |
| response.raise_for_status() # Raises an HTTPStatusError for 4XX/5XX responses | |
| # Successfully got a response | |
| if response.status_code == 200: | |
| # Assuming the response body is the .glb file content | |
| file_name = f"model_{uuid.uuid4()}.glb" | |
| file_path = os.path.join(output_dir, file_name) | |
| with open(file_path, "wb") as f: | |
| f.write(response.content) | |
| print(f"Success: 3D model saved to {file_path}") | |
| return { | |
| "messages": [AIMessage(content=f"3D object generation successful: {file_path}")], | |
| "generate_3d_complete": True, | |
| "three_d_model_path": file_path, | |
| "next_stage": state.get("next_stage") or 'end' # Use .get for safer access | |
| } | |
| else: | |
| # This case might not be reached if raise_for_status() is used effectively, | |
| # but good for explicit handling. | |
| error_message = f"API returned status {response.status_code}: {response.text}" | |
| print(error_message) | |
| if attempt == 3: # Last attempt | |
| return {"messages": [AIMessage(content=f"Failed to generate 3D object. Last error: {error_message}")]} | |
| except httpx.HTTPStatusError as e: | |
| error_message = f"HTTP error occurred: {e.response.status_code} - {e.response.text}" | |
| print(error_message) | |
| if attempt == 3: | |
| return {"messages": [AIMessage(content=f"Failed to generate 3D object after 3 attempts. Last HTTP error: {error_message}")]} | |
| except httpx.RequestError as e: # Catches network errors, timeout errors etc. | |
| error_message = f"Request error occurred: {str(e)}" | |
| print(error_message) | |
| if attempt == 3: | |
| return {"messages": [AIMessage(content=f"Failed to generate 3D object after 3 attempts. Last request error: {error_message}")]} | |
| except Exception as e: | |
| error_message = f"An unexpected error occurred: {str(e)}" | |
| print(error_message) | |
| if attempt == 3: | |
| return {"messages": [AIMessage(content=f"Failed to generate 3D object after 3 attempts. Last unexpected error: {error_message}")]} | |
| if attempt < 2: | |
| print("Retrying...") | |
| else: | |
| print("Max retries reached.") | |
| # Failed after retries (this path should ideally be covered by returns in the loop) | |
| return {"messages": [AIMessage(content="Failed to generate a valid 3D object after 3 attempts.")]} | |
| def define_workflow() -> CompiledStateGraph: | |
| """Defines the workflow graph""" | |
| # Initialize the graph | |
| workflow = StateGraph(GraphProcessingState) | |
| # Add nodes | |
| workflow.add_node("tools", DebugToolNode(tools)) | |
| workflow.add_node("guidance_node", guidance_node) | |
| workflow.add_node("brainstorming_node", brainstorming_node) | |
| workflow.add_node("prompt_planning_node", prompt_planning_node) | |
| workflow.add_node("generate_3d_node", generate_3d_node) | |
| # workflow.add_node("planning_node", planning_node) | |
| # Edges | |
| workflow.add_conditional_edges( | |
| "guidance_node", | |
| guidance_routing, | |
| { | |
| "brainstorming_node" : "brainstorming_node", | |
| "prompt_planning_node" : "prompt_planning_node", | |
| "generate_3d_node" : "generate_3d_node" | |
| } | |
| ) | |
| workflow.add_conditional_edges( | |
| "brainstorming_node", | |
| tools_condition, | |
| ) | |
| workflow.add_conditional_edges( | |
| "prompt_planning_node", | |
| tools_condition, | |
| ) | |
| workflow.add_edge("tools", "guidance_node") | |
| workflow.add_edge("brainstorming_node", "guidance_node") | |
| workflow.add_edge("prompt_planning_node", "guidance_node") | |
| workflow.add_edge("generate_3d_node", "guidance_node") | |
| # workflow.add_conditional_edges( | |
| # "guidance_node", # The source node | |
| # custom_route_after_guidance, # Your custom condition function | |
| # { | |
| # # "Path name": "Destination node name" | |
| # "execute_tools": "tools", # If function returns "execute_tools" | |
| # "proceed_to_next_stage": "planning_node" # If function returns "proceed_to_next_stage" | |
| # # Or this could be another router, or END | |
| # } | |
| # ) | |
| # workflow.add_conditional_edges("guidance_node", guidance_routing) | |
| # workflow.add_conditional_edges("brainstorming_node", brainstorming_routing) | |
| # # Set end nodes | |
| workflow.set_entry_point("guidance_node") | |
| # workflow.set_finish_point("assistant_node") | |
| compiled_graph = workflow.compile(checkpointer=memory) | |
| try: | |
| img_bytes = compiled_graph.get_graph().draw_mermaid_png() | |
| with open("graph.png", "wb") as f: | |
| f.write(img_bytes) | |
| print("Graph image saved as graph.png") | |
| except Exception as e: | |
| print("Can't print the graph:") | |
| print(e) | |
| return compiled_graph | |
| graph = define_workflow() | |
| # async def assistant_node(state: GraphProcessingState, config=None): | |
| # print("\n--- Assistance Node (Debug via print) ---") # Added a newline for clarity | |
| # print(f"Prompt: {state.prompt}") | |
| # print(f"Tools Enabled: {state.tools_enabled}") | |
| # print(f"Search Enabled: {state.search_enabled}") | |
| # print(f"Next Stage: {state.next_stage}") | |
| # # Log boolean completion flags | |
| # print(f"Idea Complete: {state.idea_complete}") | |
| # print(f"Brainstorming Complete: {state.brainstorming_complete}") | |
| # print(f"Planning Complete: {state.planning_complete}") | |
| # print(f"Drawing Complete: {state.drawing_complete}") | |
| # print(f"Product Searching Complete: {state.product_searching_complete}") | |
| # print(f"Purchasing Complete: {state.purchasing_complete}") | |
| # print("--- End Guidance Node Debug ---") # Added for clarity | |
| # print(f"\nMessage: {state.messages}") | |
| # assistant_tools = [] | |
| # if state.tools_enabled.get("download_website_text", True): | |
| # assistant_tools.append(download_website_text) | |
| # if search_enabled and state.tools_enabled.get("tavily_search_results_json", True): | |
| # assistant_tools.append(tavily_search_tool) | |
| # assistant_model = model.bind_tools(assistant_tools) | |
| # if state.prompt: | |
| # final_prompt = "\n".join([state.prompt, ASSISTANT_SYSTEM_PROMPT_BASE]) | |
| # else: | |
| # final_prompt = ASSISTANT_SYSTEM_PROMPT_BASE | |
| # prompt = ChatPromptTemplate.from_messages( | |
| # [ | |
| # ("system", final_prompt), | |
| # MessagesPlaceholder(variable_name="messages"), | |
| # ] | |
| # ) | |
| # chain = prompt | assistant_model | |
| # response = await chain.ainvoke({"messages": state.messages}, config=config) | |
| # for msg in response: | |
| # if isinstance(msg, HumanMessage): | |
| # print("Human:", msg.content) | |
| # elif isinstance(msg, AIMessage): | |
| # if isinstance(msg.content, list): | |
| # ai_texts = [part.get("text", "") for part in msg.content if isinstance(part, dict)] | |
| # print("AI:", " ".join(ai_texts)) | |
| # else: | |
| # print("AI:", msg.content) | |
| # idea_complete = evaluate_idea_completion(response) | |
| # return { | |
| # "messages": response, | |
| # "idea_complete": idea_complete | |
| # } | |
| # # message = llm_with_tools.invoke(state["messages"]) | |
| # # Because we will be interrupting during tool execution, | |
| # # we disable parallel tool calling to avoid repeating any | |
| # # tool invocations when we resume. | |
| # assert len(response.tool_calls) <= 1 | |
| # idea_complete = evaluate_idea_completion(response) | |
| # return { | |
| # "messages": response, | |
| # "idea_complete": idea_complete | |
| # } | |
| # | |
| # async def planning_node(state: GraphProcessingState, config=None): | |
| # # Define the system prompt for planning | |
| # planning_prompt = "Based on the user's idea, create a detailed step-by-step plan to build the DIY product." | |
| # # Combine the planning prompt with any existing prompts | |
| # if state.prompt: | |
| # final_prompt = "\n".join([planning_prompt, state.prompt, ASSISTANT_SYSTEM_PROMPT_BASE]) | |
| # else: | |
| # final_prompt = "\n".join([planning_prompt, ASSISTANT_SYSTEM_PROMPT_BASE]) | |
| # # Create the prompt template | |
| # prompt = ChatPromptTemplate.from_messages( | |
| # [ | |
| # ("system", final_prompt), | |
| # MessagesPlaceholder(variable_name="messages"), | |
| # ] | |
| # ) | |
| # # Bind tools if necessary | |
| # assistant_tools = [] | |
| # if state.tools_enabled.get("download_website_text", True): | |
| # assistant_tools.append(download_website_text) | |
| # if search_enabled and state.tools_enabled.get("tavily_search_results_json", True): | |
| # assistant_tools.append(tavily_search_tool) | |
| # assistant_model = model.bind_tools(assistant_tools) | |
| # # Create the chain and invoke it | |
| # chain = prompt | assistant_model | |
| # response = await chain.ainvoke({"messages": state.messages}, config=config) | |
| # return { | |
| # "messages": response | |
| # } | |
| # async def guidance_node(state: GraphProcessingState, config=None): | |
| # print("\n--- Guidance Node (Debug via print) ---") | |
| # print(f"Prompt: {state.prompt}") | |
| # for message in state.messages: | |
| # if isinstance(message, HumanMessage): | |
| # print(f"Human: {message.content}") | |
| # elif isinstance(message, AIMessage): | |
| # if message.content: | |
| # if isinstance(message.content, list): | |
| # texts = [item.get('text', '') for item in message.content if isinstance(item, dict) and 'text' in item] | |
| # if texts: | |
| # print(f"AI: {' '.join(texts)}") | |
| # elif isinstance(message.content, str): | |
| # print(f"AI: {message.content}") | |
| # elif isinstance(message, SystemMessage): | |
| # print(f"System: {message.content}") | |
| # elif isinstance(message, ToolMessage): | |
| # print(f"Tool: {message.content}") | |
| # print(f"Tools Enabled: {state.tools_enabled}") | |
| # print(f"Search Enabled: {state.search_enabled}") | |
| # print(f"Next Stage: {state.next_stage}") | |
| # print(f"Brainstorming Complete: {state.brainstorming_complete}") | |
| # guidance_node.count = getattr(guidance_node, 'count', 0) + 1 | |
| # print('\nGuidance Node called count', guidance_node.count) | |
| # print("\n--- End Guidance Node Debug ---") | |
| # stage_order = ["brainstorming", "planning", "drawing", "product_searching", "purchasing"] | |
| # completed = [stage for stage in stage_order if getattr(state, f"{stage}_complete", False)] | |
| # incomplete = [stage for stage in stage_order if not getattr(state, f"{stage}_complete", False)] | |
| # if not incomplete: | |
| # print("All stages complete!") | |
| # # Handle case where all stages are complete | |
| # # You might want to return a message and end, or set proposed_next_stage to a special value | |
| # ai_all_complete_msg = AIMessage(content="All DIY project stages are complete!") | |
| # return { | |
| # "messages": current_messages + [ai_all_complete_msg], | |
| # "next_stage": "end_project", # Or None, or a final summary node | |
| # "pending_approval_stage": None, | |
| # } | |
| # else: | |
| # # THIS LINE DEFINES THE VARIABLE | |
| # proposed_next_stage = incomplete[0] | |
| # print(f"Proposed next stage: {proposed_next_stage}") | |
| # status_summary = f"Completed stages: {completed}\nIncomplete stages: {incomplete}" | |
| # guidance_prompt_text = ( | |
| # "You are the Guiding Assistant for a DIY project. Your primary responsibility is to determine the next logical step " | |
| # "and then **obtain the user's explicit approval** before proceeding.\n\n" | |
| # f"CURRENT PROJECT STATUS:\n{status_summary}\n\n" | |
| # f"Based on the status, the most logical next stage appears to be: **'{proposed_next_stage}'**.\n\n" | |
| # "YOUR TASK:\n" | |
| # f"1. Formulate a clear and concise question for the user, asking if they agree to proceed to the **'{proposed_next_stage}'** stage. For example: 'It looks like '{proposed_next_stage}' is next. Shall we proceed with that?' or 'Are you ready to move on to {proposed_next_stage}?'\n" | |
| # "2. **You MUST use the 'human_assistance' tool to ask this question.** Do not answer directly. Invoke the tool with your question.\n" | |
| # "Example of tool usage (though you don't write this, you *call* the tool):\n" | |
| # "Tool Call: human_assistance(query='The next stage is planning. Do you want to proceed with planning?')\n\n" | |
| # "Consider the user's most recent message if it provides any preference." | |
| # ) | |
| # if state.prompt: | |
| # final_prompt = "\n".join([guidance_prompt_text, state.prompt, ASSISTANT_SYSTEM_PROMPT_BASE]) | |
| # else: | |
| # final_prompt = "\n".join([guidance_prompt_text, ASSISTANT_SYSTEM_PROMPT_BASE]) | |
| # prompt = ChatPromptTemplate.from_messages( | |
| # [ | |
| # ("system", final_prompt), | |
| # MessagesPlaceholder(variable_name="messages"), | |
| # ] | |
| # ) | |
| # assistant_model = model.bind_tools([human_assistance]) | |
| # chain = prompt | assistant_model | |
| # try: | |
| # response = await chain.ainvoke({"messages": state.messages}, config=config) | |
| # for msg in response: | |
| # if isinstance(msg, HumanMessage): | |
| # print("Human:", msg.content) | |
| # elif isinstance(msg, AIMessage): | |
| # if isinstance(msg.content, list): | |
| # ai_texts = [part.get("text", "") for part in msg.content if isinstance(part, dict)] | |
| # print("AI:", " ".join(ai_texts)) | |
| # else: | |
| # print("AI:", msg.content) | |
| # # Check for tool calls in the response | |
| # if hasattr(response, "tool_calls"): | |
| # for tool_call in response.tool_calls: | |
| # tool_name = tool_call['name'] | |
| # if tool_name == "human_assistance": | |
| # query = tool_call['args']['query'] | |
| # print(f"Human input needed: {query}") | |
| # # Handle human assistance tool call | |
| # # You can pause execution and wait for user input here | |
| # return { | |
| # "messages": [response], | |
| # "next_stage": incomplete[0] if incomplete else "brainstorming" | |
| # } | |
| # except Exception as e: | |
| # print(f"Error in guidance node: {e}") | |
| # return { | |
| # "messages": [AIMessage(content="Error in guidance node.")], | |
| # "next_stage": "brainstorming" | |
| # } |