from enum import Enum from typing import Dict, List, Optional, Union from datetime import datetime from pydantic import BaseModel class ModuleType(str, Enum): docker = "docker" template = "template" flow = "flow" class DockerParams(BaseModel): docker_image: str docker_command: Optional[str] = "" docker_num_gpus: Optional[int] = 0 docker_env_vars: Optional[Dict] = None input_dir: Optional[str] = None input_ipfs_hash: Optional[str] = None docker_input_dir: Optional[str] = None docker_output_dir: Optional[str] = None save_location: str = "node" class Config: allow_mutation = True class Config: allow_mutation = True json_encoders = { datetime: lambda v: v.isoformat(), } def model_dict(self): model_dict = self.dict() for key, value in model_dict.items(): if isinstance(value, datetime): model_dict[key] = value.isoformat() return model_dict class ModuleRun(BaseModel): module_name: str module_type: ModuleType consumer_id: str status: str = "pending" error: bool = False id: Optional[str] = None results: list[str] = [] orchestrator_node: str worker_nodes: Optional[list[str]] = None error_message: Optional[str] = None created_time: Optional[str] = None start_processing_time: Optional[datetime] = None completed_time: Optional[datetime] = None duration: Optional[float] = None module_params: Optional[Union[Dict, DockerParams]] = None child_runs: List['ModuleRun'] = [] parent_runs: List['ModuleRun'] = [] input_schema_ipfs_hash: Optional[str] = None class Config: allow_mutation = True json_encoders = { datetime: lambda v: v.isoformat(), } def model_dict(self): model_dict = self.dict() for key, value in model_dict.items(): if isinstance(value, datetime): model_dict[key] = value.isoformat() elif isinstance(value, ModuleType): model_dict[key] = value.value for i, parent_run in enumerate(model_dict['parent_runs']): for key, value in parent_run.items(): if isinstance(value, datetime): model_dict['parent_runs'][i][key] = value.isoformat() for i, child_run in enumerate(model_dict['child_runs']): for key, value in child_run.items(): if isinstance(value, datetime): model_dict['child_runs'][i][key] = value.isoformat() return model_dict class ModuleRunInput(BaseModel): module_name: str consumer_id: str orchestrator_node: str worker_nodes: Optional[list[str]] = None module_params: Optional[Union[Dict, DockerParams]] = None module_type: Optional[ModuleType] = None parent_runs: List['ModuleRun'] = [] def model_dict(self): model_dict = self.dict() for i, parent_run in enumerate(model_dict['parent_runs']): for key, value in parent_run.items(): if isinstance(value, datetime): model_dict['parent_runs'][i][key] = value.isoformat() return model_dict