import re import signal from contextlib import contextmanager, redirect_stdout from dataclasses import dataclass from enum import Enum from io import StringIO from typing import Optional, Type from ..schema import ActionReturn, ActionStatusCode from .base_action import AsyncActionMixin, BaseAction, tool_api from .parser import BaseParser, JsonParser class Status(str, Enum): """Execution status.""" SUCCESS = 'success' FAILURE = 'failure' @dataclass class ExecutionResult: """Execution result.""" status: Status value: Optional[str] = None msg: Optional[str] = None @contextmanager def _raise_timeout(timeout): def _handler(signum, frame): raise TimeoutError() signal.signal(signal.SIGALRM, _handler) signal.alarm(timeout) try: yield finally: signal.alarm(0) class IPythonInteractive(BaseAction): """An interactive IPython shell for code execution. Args: timeout (int): Upper bound of waiting time for Python script execution. Defaults to ``20``. max_out_len (int): maximum output length. No truncation occurs if negative. Defaults to ``2048``. use_signals (bool): whether signals should be used for timing function out or the multiprocessing. Set to ``False`` when not running in the main thread, e.g. web applications. Defaults to ``True`` description (dict): The description of the action. Defaults to ``None``. parser (Type[BaseParser]): The parser class to process the action's inputs and outputs. Defaults to :class:`JsonParser`. """ def __init__( self, timeout: int = 30, max_out_len: int = 8192, use_signals: bool = True, description: Optional[dict] = None, parser: Type[BaseParser] = JsonParser, ): super().__init__(description, parser) self.timeout = timeout self._executor = self.create_shell() self._highlighting = re.compile( r'(?:\x1B[@-_]|[\x80-\x9F])[0-?]*[ -/]*[@-~]') self._max_out_len = max_out_len if max_out_len >= 0 else None self._use_signals = use_signals def reset(self): """Clear the context.""" self._executor.reset() @tool_api def run(self, command: str, timeout: Optional[int] = None) -> ActionReturn: """Launch an IPython Interactive Shell to execute code. Args: command (:class:`str`): Python code snippet timeout (:class:`Optional[int]`): timeout for execution. This argument only works in the main thread. Defaults to ``None``. """ from timeout_decorator import timeout as timer tool_return = ActionReturn(args={'text': command}, type=self.name) ret = ( timer(timeout or self.timeout)(self.exec)(command) if self._use_signals else self.exec(command)) if ret.status is Status.SUCCESS: tool_return.result = [{'type': 'text', 'content': ret.value}] tool_return.state = ActionStatusCode.SUCCESS else: tool_return.errmsg = ret.msg tool_return.state = ActionStatusCode.API_ERROR return tool_return def exec(self, code: str) -> ExecutionResult: """Run Python scripts in IPython shell. Args: code (:class:`str`): code block Returns: :py:class:`ExecutionResult`: execution result """ with StringIO() as io: with redirect_stdout(io): ret = self._executor.run_cell(self.extract_code(code)) result = ret.result if result is not None: return ExecutionResult(Status.SUCCESS, str(result)[:self._max_out_len]) outs = io.getvalue().strip().split('\n') if not outs: return ExecutionResult(Status.SUCCESS, '') for i, out in enumerate(outs): if re.search('Error|Traceback', out, re.S): if 'TimeoutError' in out: return ExecutionResult( Status.FAILURE, msg=('The code interpreter encountered ' 'a timeout error.')) err_idx = i break else: return ExecutionResult(Status.SUCCESS, outs[-1].strip()[:self._max_out_len]) return ExecutionResult( Status.FAILURE, msg=self._highlighting.sub( '', '\n'.join(outs[err_idx:])[:self._max_out_len]), ) @staticmethod def create_shell(): from IPython import InteractiveShell from traitlets.config import Config c = Config() c.HistoryManager.enabled = False c.HistoryManager.hist_file = ':memory:' return InteractiveShell( user_ns={'_raise_timeout': _raise_timeout}, config=c) @staticmethod def extract_code(text: str) -> str: """Extract Python code from markup languages. Args: text (:class:`str`): Markdown-formatted text Returns: :class:`str`: Python code """ import json5 # Match triple backtick blocks first triple_match = re.search(r'```[^\n]*\n(.+?)```', text, re.DOTALL) # Match single backtick blocks second single_match = re.search(r'`([^`]*)`', text, re.DOTALL) if triple_match: text = triple_match.group(1) elif single_match: text = single_match.group(1) else: try: text = json5.loads(text)['code'] except Exception: pass # If no code blocks found, return original text return text @staticmethod def wrap_code_with_timeout(code: str, timeout: int) -> str: if not code.strip(): return code code = code.strip('\n').rstrip() indent = len(code) - len(code.lstrip()) handle = ' ' * indent + f'with _raise_timeout({timeout}):\n' block = '\n'.join([' ' + line for line in code.split('\n')]) wrapped_code = handle + block last_line = code.split('\n')[-1] is_expression = True try: compile(last_line.lstrip(), '', 'eval') except SyntaxError: is_expression = False if is_expression: wrapped_code += '\n' * 5 + last_line return wrapped_code class AsyncIPythonInteractive(AsyncActionMixin, IPythonInteractive): """An interactive IPython shell for code execution. Args: timeout (int): Upper bound of waiting time for Python script execution. Defaults to ``20``. max_out_len (int): maximum output length. No truncation occurs if negative. Defaults to ``2048``. use_signals (bool): whether signals should be used for timing function out or the multiprocessing. Set to ``False`` when not running in the main thread, e.g. web applications. Defaults to ``True`` description (dict): The description of the action. Defaults to ``None``. parser (Type[BaseParser]): The parser class to process the action's inputs and outputs. Defaults to :class:`JsonParser`. """ @tool_api async def run(self, command: str, timeout: Optional[int] = None) -> ActionReturn: """Launch an IPython Interactive Shell to execute code. Args: command (:class:`str`): Python code snippet timeout (:class:`Optional[int]`): timeout for execution. This argument only works in the main thread. Defaults to ``None``. """ tool_return = ActionReturn(args={'text': command}, type=self.name) ret = await self.exec(command, timeout) if ret.status is Status.SUCCESS: tool_return.result = [{'type': 'text', 'content': ret.value}] tool_return.state = ActionStatusCode.SUCCESS else: tool_return.errmsg = ret.msg tool_return.state = ActionStatusCode.API_ERROR return tool_return async def exec(self, code: str, timeout: int = None) -> ExecutionResult: """Asynchronously run Python scripts in IPython shell. Args: code (:class:`str`): code block timeout (:class:`int`): max waiting time for code execution Returns: :py:class:`ExecutionResult`: execution result """ with StringIO() as io: with redirect_stdout(io): ret = await self._executor.run_cell_async( # ret = await self.create_shell().run_cell_async( self.wrap_code_with_timeout( self.extract_code(code), timeout or self.timeout)) result = ret.result if result is not None: return ExecutionResult(Status.SUCCESS, str(result)[:self._max_out_len]) outs = io.getvalue().strip().split('\n') if not outs: return ExecutionResult(Status.SUCCESS, '') for i, out in enumerate(outs): if re.search('Error|Traceback', out, re.S): if 'TimeoutError' in out: return ExecutionResult( Status.FAILURE, msg=('The code interpreter encountered a ' 'timeout error.')) err_idx = i break else: return ExecutionResult(Status.SUCCESS, outs[-1].strip()[:self._max_out_len]) return ExecutionResult( Status.FAILURE, msg=self._highlighting.sub( '', '\n'.join(outs[err_idx:])[:self._max_out_len]), )