| | import re |
| | import signal |
| | from contextlib import contextmanager, redirect_stdout |
| | from dataclasses import dataclass |
| | from enum import Enum |
| | from io import StringIO |
| | from typing import Optional, Type |
| |
|
| | from ..schema import ActionReturn, ActionStatusCode |
| | from .base_action import AsyncActionMixin, BaseAction, tool_api |
| | from .parser import BaseParser, JsonParser |
| |
|
| |
|
| | class Status(str, Enum): |
| | """Execution status.""" |
| | SUCCESS = 'success' |
| | FAILURE = 'failure' |
| |
|
| |
|
| | @dataclass |
| | class ExecutionResult: |
| | """Execution result.""" |
| | status: Status |
| | value: Optional[str] = None |
| | msg: Optional[str] = None |
| |
|
| |
|
| | @contextmanager |
| | def _raise_timeout(timeout): |
| |
|
| | def _handler(signum, frame): |
| | raise TimeoutError() |
| |
|
| | signal.signal(signal.SIGALRM, _handler) |
| | signal.alarm(timeout) |
| |
|
| | try: |
| | yield |
| | finally: |
| | signal.alarm(0) |
| |
|
| |
|
| | class IPythonInteractive(BaseAction): |
| | """An interactive IPython shell for code execution. |
| | |
| | Args: |
| | timeout (int): Upper bound of waiting time for Python script execution. |
| | Defaults to ``20``. |
| | max_out_len (int): maximum output length. No truncation occurs if negative. |
| | Defaults to ``2048``. |
| | use_signals (bool): whether signals should be used for timing function out |
| | or the multiprocessing. Set to ``False`` when not running in the main |
| | thread, e.g. web applications. Defaults to ``True`` |
| | description (dict): The description of the action. Defaults to ``None``. |
| | parser (Type[BaseParser]): The parser class to process the |
| | action's inputs and outputs. Defaults to :class:`JsonParser`. |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | timeout: int = 30, |
| | max_out_len: int = 8192, |
| | use_signals: bool = True, |
| | description: Optional[dict] = None, |
| | parser: Type[BaseParser] = JsonParser, |
| | ): |
| | super().__init__(description, parser) |
| | self.timeout = timeout |
| | self._executor = self.create_shell() |
| | self._highlighting = re.compile( |
| | r'(?:\x1B[@-_]|[\x80-\x9F])[0-?]*[ -/]*[@-~]') |
| | self._max_out_len = max_out_len if max_out_len >= 0 else None |
| | self._use_signals = use_signals |
| |
|
| | def reset(self): |
| | """Clear the context.""" |
| | self._executor.reset() |
| |
|
| | @tool_api |
| | def run(self, command: str, timeout: Optional[int] = None) -> ActionReturn: |
| | """Launch an IPython Interactive Shell to execute code. |
| | |
| | Args: |
| | command (:class:`str`): Python code snippet |
| | timeout (:class:`Optional[int]`): timeout for execution. |
| | This argument only works in the main thread. Defaults to ``None``. |
| | """ |
| | from timeout_decorator import timeout as timer |
| | tool_return = ActionReturn(args={'text': command}, type=self.name) |
| | ret = ( |
| | timer(timeout or self.timeout)(self.exec)(command) |
| | if self._use_signals else self.exec(command)) |
| | if ret.status is Status.SUCCESS: |
| | tool_return.result = [{'type': 'text', 'content': ret.value}] |
| | tool_return.state = ActionStatusCode.SUCCESS |
| | else: |
| | tool_return.errmsg = ret.msg |
| | tool_return.state = ActionStatusCode.API_ERROR |
| | return tool_return |
| |
|
| | def exec(self, code: str) -> ExecutionResult: |
| | """Run Python scripts in IPython shell. |
| | |
| | Args: |
| | code (:class:`str`): code block |
| | |
| | Returns: |
| | :py:class:`ExecutionResult`: execution result |
| | """ |
| | with StringIO() as io: |
| | with redirect_stdout(io): |
| | ret = self._executor.run_cell(self.extract_code(code)) |
| | result = ret.result |
| | if result is not None: |
| | return ExecutionResult(Status.SUCCESS, |
| | str(result)[:self._max_out_len]) |
| | outs = io.getvalue().strip().split('\n') |
| | if not outs: |
| | return ExecutionResult(Status.SUCCESS, '') |
| | for i, out in enumerate(outs): |
| | if re.search('Error|Traceback', out, re.S): |
| | if 'TimeoutError' in out: |
| | return ExecutionResult( |
| | Status.FAILURE, |
| | msg=('The code interpreter encountered ' |
| | 'a timeout error.')) |
| | err_idx = i |
| | break |
| | else: |
| | return ExecutionResult(Status.SUCCESS, |
| | outs[-1].strip()[:self._max_out_len]) |
| | return ExecutionResult( |
| | Status.FAILURE, |
| | msg=self._highlighting.sub( |
| | '', '\n'.join(outs[err_idx:])[:self._max_out_len]), |
| | ) |
| |
|
| | @staticmethod |
| | def create_shell(): |
| | from IPython import InteractiveShell |
| | from traitlets.config import Config |
| |
|
| | c = Config() |
| | c.HistoryManager.enabled = False |
| | c.HistoryManager.hist_file = ':memory:' |
| | return InteractiveShell( |
| | user_ns={'_raise_timeout': _raise_timeout}, config=c) |
| |
|
| | @staticmethod |
| | def extract_code(text: str) -> str: |
| | """Extract Python code from markup languages. |
| | |
| | Args: |
| | text (:class:`str`): Markdown-formatted text |
| | |
| | Returns: |
| | :class:`str`: Python code |
| | """ |
| | import json5 |
| |
|
| | |
| | triple_match = re.search(r'```[^\n]*\n(.+?)```', text, re.DOTALL) |
| | |
| | single_match = re.search(r'`([^`]*)`', text, re.DOTALL) |
| | if triple_match: |
| | text = triple_match.group(1) |
| | elif single_match: |
| | text = single_match.group(1) |
| | else: |
| | try: |
| | text = json5.loads(text)['code'] |
| | except Exception: |
| | pass |
| | |
| | return text |
| |
|
| | @staticmethod |
| | def wrap_code_with_timeout(code: str, timeout: int) -> str: |
| | if not code.strip(): |
| | return code |
| | code = code.strip('\n').rstrip() |
| | indent = len(code) - len(code.lstrip()) |
| | handle = ' ' * indent + f'with _raise_timeout({timeout}):\n' |
| | block = '\n'.join([' ' + line for line in code.split('\n')]) |
| | wrapped_code = handle + block |
| | last_line = code.split('\n')[-1] |
| | is_expression = True |
| | try: |
| | compile(last_line.lstrip(), '<stdin>', 'eval') |
| | except SyntaxError: |
| | is_expression = False |
| | if is_expression: |
| | wrapped_code += '\n' * 5 + last_line |
| | return wrapped_code |
| |
|
| |
|
| | class AsyncIPythonInteractive(AsyncActionMixin, IPythonInteractive): |
| | """An interactive IPython shell for code execution. |
| | |
| | Args: |
| | timeout (int): Upper bound of waiting time for Python script execution. |
| | Defaults to ``20``. |
| | max_out_len (int): maximum output length. No truncation occurs if negative. |
| | Defaults to ``2048``. |
| | use_signals (bool): whether signals should be used for timing function out |
| | or the multiprocessing. Set to ``False`` when not running in the main |
| | thread, e.g. web applications. Defaults to ``True`` |
| | description (dict): The description of the action. Defaults to ``None``. |
| | parser (Type[BaseParser]): The parser class to process the |
| | action's inputs and outputs. Defaults to :class:`JsonParser`. |
| | """ |
| |
|
| | @tool_api |
| | async def run(self, |
| | command: str, |
| | timeout: Optional[int] = None) -> ActionReturn: |
| | """Launch an IPython Interactive Shell to execute code. |
| | |
| | Args: |
| | command (:class:`str`): Python code snippet |
| | timeout (:class:`Optional[int]`): timeout for execution. |
| | This argument only works in the main thread. Defaults to ``None``. |
| | """ |
| | tool_return = ActionReturn(args={'text': command}, type=self.name) |
| | ret = await self.exec(command, timeout) |
| | if ret.status is Status.SUCCESS: |
| | tool_return.result = [{'type': 'text', 'content': ret.value}] |
| | tool_return.state = ActionStatusCode.SUCCESS |
| | else: |
| | tool_return.errmsg = ret.msg |
| | tool_return.state = ActionStatusCode.API_ERROR |
| | return tool_return |
| |
|
| | async def exec(self, code: str, timeout: int = None) -> ExecutionResult: |
| | """Asynchronously run Python scripts in IPython shell. |
| | |
| | Args: |
| | code (:class:`str`): code block |
| | timeout (:class:`int`): max waiting time for code execution |
| | |
| | Returns: |
| | :py:class:`ExecutionResult`: execution result |
| | """ |
| | with StringIO() as io: |
| | with redirect_stdout(io): |
| | ret = await self._executor.run_cell_async( |
| | |
| | self.wrap_code_with_timeout( |
| | self.extract_code(code), timeout or self.timeout)) |
| | result = ret.result |
| | if result is not None: |
| | return ExecutionResult(Status.SUCCESS, |
| | str(result)[:self._max_out_len]) |
| | outs = io.getvalue().strip().split('\n') |
| | if not outs: |
| | return ExecutionResult(Status.SUCCESS, '') |
| | for i, out in enumerate(outs): |
| | if re.search('Error|Traceback', out, re.S): |
| | if 'TimeoutError' in out: |
| | return ExecutionResult( |
| | Status.FAILURE, |
| | msg=('The code interpreter encountered a ' |
| | 'timeout error.')) |
| | err_idx = i |
| | break |
| | else: |
| | return ExecutionResult(Status.SUCCESS, |
| | outs[-1].strip()[:self._max_out_len]) |
| | return ExecutionResult( |
| | Status.FAILURE, |
| | msg=self._highlighting.sub( |
| | '', '\n'.join(outs[err_idx:])[:self._max_out_len]), |
| | ) |
| |
|