Spaces:
Running
Running
from __future__ import annotations | |
import asyncio | |
import logging | |
import os | |
# from lmnr.sdk.decorators import observe | |
from browser_use.agent.gif import create_history_gif | |
from browser_use.agent.service import Agent, AgentHookFunc | |
from browser_use.agent.views import ( | |
ActionResult, | |
AgentHistory, | |
AgentHistoryList, | |
AgentStepInfo, | |
ToolCallingMethod, | |
) | |
from browser_use.browser.views import BrowserStateHistory | |
from browser_use.utils import time_execution_async | |
from dotenv import load_dotenv | |
from browser_use.agent.message_manager.utils import is_model_without_tool_support | |
load_dotenv() | |
logger = logging.getLogger(__name__) | |
SKIP_LLM_API_KEY_VERIFICATION = ( | |
os.environ.get("SKIP_LLM_API_KEY_VERIFICATION", "false").lower()[0] in "ty1" | |
) | |
class BrowserUseAgent(Agent): | |
def _set_tool_calling_method(self) -> ToolCallingMethod | None: | |
tool_calling_method = self.settings.tool_calling_method | |
if tool_calling_method == 'auto': | |
if is_model_without_tool_support(self.model_name): | |
return 'raw' | |
elif self.chat_model_library == 'ChatGoogleGenerativeAI': | |
return None | |
elif self.chat_model_library == 'ChatOpenAI': | |
return 'function_calling' | |
elif self.chat_model_library == 'AzureChatOpenAI': | |
return 'function_calling' | |
else: | |
return None | |
else: | |
return tool_calling_method | |
async def run( | |
self, max_steps: int = 100, on_step_start: AgentHookFunc | None = None, | |
on_step_end: AgentHookFunc | None = None | |
) -> AgentHistoryList: | |
"""Execute the task with maximum number of steps""" | |
loop = asyncio.get_event_loop() | |
# Set up the Ctrl+C signal handler with callbacks specific to this agent | |
from browser_use.utils import SignalHandler | |
signal_handler = SignalHandler( | |
loop=loop, | |
pause_callback=self.pause, | |
resume_callback=self.resume, | |
custom_exit_callback=None, # No special cleanup needed on forced exit | |
exit_on_second_int=True, | |
) | |
signal_handler.register() | |
try: | |
self._log_agent_run() | |
# Execute initial actions if provided | |
if self.initial_actions: | |
result = await self.multi_act(self.initial_actions, check_for_new_elements=False) | |
self.state.last_result = result | |
for step in range(max_steps): | |
# Check if waiting for user input after Ctrl+C | |
if self.state.paused: | |
signal_handler.wait_for_resume() | |
signal_handler.reset() | |
# Check if we should stop due to too many failures | |
if self.state.consecutive_failures >= self.settings.max_failures: | |
logger.error(f'❌ Stopping due to {self.settings.max_failures} consecutive failures') | |
break | |
# Check control flags before each step | |
if self.state.stopped: | |
logger.info('Agent stopped') | |
break | |
while self.state.paused: | |
await asyncio.sleep(0.2) # Small delay to prevent CPU spinning | |
if self.state.stopped: # Allow stopping while paused | |
break | |
if on_step_start is not None: | |
await on_step_start(self) | |
step_info = AgentStepInfo(step_number=step, max_steps=max_steps) | |
await self.step(step_info) | |
if on_step_end is not None: | |
await on_step_end(self) | |
if self.state.history.is_done(): | |
if self.settings.validate_output and step < max_steps - 1: | |
if not await self._validate_output(): | |
continue | |
await self.log_completion() | |
break | |
else: | |
error_message = 'Failed to complete task in maximum steps' | |
self.state.history.history.append( | |
AgentHistory( | |
model_output=None, | |
result=[ActionResult(error=error_message, include_in_memory=True)], | |
state=BrowserStateHistory( | |
url='', | |
title='', | |
tabs=[], | |
interacted_element=[], | |
screenshot=None, | |
), | |
metadata=None, | |
) | |
) | |
logger.info(f'❌ {error_message}') | |
return self.state.history | |
except KeyboardInterrupt: | |
# Already handled by our signal handler, but catch any direct KeyboardInterrupt as well | |
logger.info('Got KeyboardInterrupt during execution, returning current history') | |
return self.state.history | |
finally: | |
# Unregister signal handlers before cleanup | |
signal_handler.unregister() | |
if self.settings.save_playwright_script_path: | |
logger.info( | |
f'Agent run finished. Attempting to save Playwright script to: {self.settings.save_playwright_script_path}' | |
) | |
try: | |
# Extract sensitive data keys if sensitive_data is provided | |
keys = list(self.sensitive_data.keys()) if self.sensitive_data else None | |
# Pass browser and context config to the saving method | |
self.state.history.save_as_playwright_script( | |
self.settings.save_playwright_script_path, | |
sensitive_data_keys=keys, | |
browser_config=self.browser.config, | |
context_config=self.browser_context.config, | |
) | |
except Exception as script_gen_err: | |
# Log any error during script generation/saving | |
logger.error(f'Failed to save Playwright script: {script_gen_err}', exc_info=True) | |
await self.close() | |
if self.settings.generate_gif: | |
output_path: str = 'agent_history.gif' | |
if isinstance(self.settings.generate_gif, str): | |
output_path = self.settings.generate_gif | |
create_history_gif(task=self.task, history=self.state.history, output_path=output_path) | |