|
from __future__ import annotations |
|
|
|
import dataclasses |
|
import secrets |
|
from abc import ABC, abstractmethod |
|
from dataclasses import dataclass |
|
from typing import Any, Dict, List, Literal, Optional, Tuple, Union |
|
|
|
from taskweaver.plugin.context import ArtifactType |
|
|
|
|
|
@dataclass |
|
class EnvPlugin: |
|
name: str |
|
impl: str |
|
config: Optional[Dict[str, str]] |
|
loaded: bool |
|
|
|
|
|
def get_id(length: int = 6, prefix: Optional[str] = None) -> str: |
|
"""Get a random id with the given length and prefix.""" |
|
id = secrets.token_hex(length) |
|
if prefix is not None: |
|
return f"{prefix}-{id}" |
|
return id |
|
|
|
|
|
@dataclass |
|
class ExecutionArtifact: |
|
name: str = "" |
|
type: ArtifactType = "file" |
|
mime_type: str = "" |
|
original_name: str = "" |
|
file_name: str = "" |
|
file_content: str = "" |
|
file_content_encoding: Literal["str", "base64"] = "str" |
|
preview: str = "" |
|
|
|
@staticmethod |
|
def from_dict(d: Dict[str, str]) -> ExecutionArtifact: |
|
return ExecutionArtifact( |
|
name=d["name"], |
|
|
|
type=d["type"], |
|
mime_type=d["mime_type"], |
|
original_name=d["original_name"], |
|
file_name=d["file_name"], |
|
file_content=d["file_content"], |
|
preview=d["preview"], |
|
) |
|
|
|
def to_dict(self) -> Dict[str, Any]: |
|
return dataclasses.asdict(self) |
|
|
|
|
|
@dataclass |
|
class ExecutionResult: |
|
execution_id: str |
|
code: str |
|
|
|
is_success: bool = False |
|
error: Optional[str] = None |
|
|
|
output: Union[str, List[Tuple[str, str]]] = "" |
|
stdout: List[str] = dataclasses.field(default_factory=list) |
|
stderr: List[str] = dataclasses.field(default_factory=list) |
|
|
|
log: List[Tuple[str, str, str]] = dataclasses.field(default_factory=list) |
|
artifact: List[ExecutionArtifact] = dataclasses.field(default_factory=list) |
|
|
|
|
|
class Client(ABC): |
|
""" |
|
Client is the interface for the execution client. |
|
""" |
|
|
|
@abstractmethod |
|
def start(self) -> None: |
|
... |
|
|
|
@abstractmethod |
|
def stop(self) -> None: |
|
... |
|
|
|
@abstractmethod |
|
def load_plugin( |
|
self, |
|
plugin_name: str, |
|
plugin_code: str, |
|
plugin_config: Dict[str, str], |
|
) -> None: |
|
... |
|
|
|
@abstractmethod |
|
def test_plugin(self, plugin_name: str) -> None: |
|
... |
|
|
|
@abstractmethod |
|
def update_session_var(self, session_var_dict: Dict[str, str]) -> None: |
|
... |
|
|
|
@abstractmethod |
|
def execute_code(self, exec_id: str, code: str) -> ExecutionResult: |
|
... |
|
|
|
|
|
class Manager(ABC): |
|
""" |
|
Manager is the interface for the execution manager. |
|
""" |
|
|
|
@abstractmethod |
|
def initialize(self) -> None: |
|
... |
|
|
|
@abstractmethod |
|
def clean_up(self) -> None: |
|
... |
|
|
|
@abstractmethod |
|
def get_session_client( |
|
self, |
|
session_id: str, |
|
env_id: Optional[str] = None, |
|
session_dir: Optional[str] = None, |
|
cwd: Optional[str] = None, |
|
) -> Client: |
|
... |
|
|