File size: 8,620 Bytes
193db9d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 |
# %%
from typing import Any, Literal, Optional
from pydantic import BaseModel, Field, model_validator
"""
Core data structures for defining workflows and their components.
This module defines the primary classes used to model workflows, steps, and their
input/output fields. These data structures serve as the foundation for workflow
definition, validation, and execution throughout the workflows package.
The primary components are:
- InputField: Represents an input to a model step with name and source variable
- OutputField: Represents an output from a model step with name and type
- ModelStep: Represents a single step in a workflow with inputs and outputs
- Workflow: A collection of interconnected steps with defined inputs and outputs
All classes use Pydantic's BaseModel for validation and serialization support.
"""
FieldType = Literal["input", "output"]
SUPPORTED_TYPES = Literal["str", "int", "float", "bool", "list[str]", "list[int]", "list[float]", "list[bool]"]
"""Supported field types for input and output fields"""
class InputField(BaseModel):
"""
Defines an input field for a model step.
An input field specifies what data a step requires, where it comes from,
and optional pre-processing to apply before use.
Attributes:
name: The name of the input field within the step's context
description: Human-readable description of the input's purpose
variable: Reference to the source variable (format: "{step_id}.{field_name}" or external input name)
func: Optional function name to transform the input value before use
"""
name: str
description: str
variable: str
# function to call on the input before passing it to the model
func: str | None = None
class OutputField(BaseModel):
"""
Defines an output field produced by a model step.
An output field specifies a value that the step will produce, including
its data type and optional post-processing.
Attributes:
name: The name of the output field within the step's context
description: Human-readable description of the output's purpose
type: The data type of the output (one of SUPPORTED_TYPES)
func: Optional function name to transform the raw output value
"""
name: str
type: SUPPORTED_TYPES = Field(default="str")
description: str
# function to call on the output string from the model
func: str | None = None
class ModelStep(BaseModel):
"""
Represents a single step in a workflow.
A model step encapsulates the details of a specific operation within a workflow,
including what model to use, what inputs it requires, and what outputs it produces.
Attributes:
id: Unique identifier for this step within a workflow
model: The model to use for this step (e.g., "gpt-4")
provider: The provider of the model (e.g., "openai")
call_type: The type of operation (e.g., "llm", "search")
system_prompt: Instructions for the model
input_fields: List of input fields required by this step
output_fields: List of output fields produced by this step
"""
id: str
name: str
model: str
provider: str
call_type: str = "llm" # llm, search, etc # TODO: make this enum or provide explicit options using Literal
# TODO: Validate that this is not None for call_type = llm
temperature: Optional[float] = None
system_prompt: str
input_fields: list[InputField]
output_fields: list[OutputField]
def fields(self, field_type: FieldType) -> list[InputField | OutputField]:
return self.input_fields if field_type == "input" else self.output_fields
def get_full_model_name(self):
return f"{self.provider} {self.model}"
def get_produced_variables(self) -> list[str]:
return [f"{self.id}.{field.name}" for field in self.output_fields if field.name]
def update(self, update: dict[str, Any]) -> "ModelStep":
return self.model_copy(update=update)
def update_property(self, field: str, value: Any) -> "ModelStep":
"Update the `field` key of the model step with `value`."
return self.update({field: value})
def update_field(self, field_type: FieldType, index: int, key: str, value: str) -> "ModelStep":
"""Update a specific field of an input or output field at the given index."""
if field_type == "input":
fields = self.input_fields
elif field_type == "output":
fields = self.output_fields
else:
raise ValueError(f"Invalid field type: {field_type}")
if index < len(fields):
fields[index] = fields[index].model_copy(update={key: value})
return self.model_copy()
@staticmethod
def create_new_field(field_type: FieldType, input_var: str | None = None) -> InputField | OutputField:
if field_type == "input":
return InputField(name="", description="", variable=input_var)
elif field_type == "output":
return OutputField(name="", description="")
else:
raise ValueError(f"Invalid field type: {field_type}")
def add_field(self, field_type: FieldType, index: int = -1, input_var: str | None = None) -> "ModelStep":
"""Add a new field to the state and update visibility.
Args:
field_type: Type of field to add ('input' or 'output').
index: Position to insert the new field (-1 to append).
Returns:
A new ModelStep with the updated fields.
"""
new_step = self.model_copy()
fields = new_step.input_fields if field_type == "input" else new_step.output_fields
new_field = ModelStep.create_new_field(field_type, input_var)
fields.insert(index + 1, new_field) if index != -1 else fields.append(new_field)
return new_step
def delete_field(self, field_type: FieldType, index: int) -> "ModelStep":
"""
Delete an input or output field from the state and update visibility.
Args:
field_type: Type of field to delete ('input' or 'output').
index: Index of the field to delete. [-1 to delete the last field]
Returns:
A new ModelStep with the updated fields.
"""
new_step = self.model_copy()
fields = new_step.input_fields if field_type == "input" else new_step.output_fields
fields.pop(index)
return new_step
class Workflow(BaseModel):
"""
Represents a complete workflow composed of interconnected steps.
A workflow defines a directed acyclic graph of model steps, where outputs
from earlier steps can be used as inputs to later steps.
Attributes:
inputs: List of input variables required by the workflow
outputs: List of output variables produced by the workflow
steps: Dictionary mapping step IDs to ModelStep instances
The inputs and outputs lists use the format "{step_id}.{field_name}"
to uniquely identify variables within the workflow.
"""
# variables of form {node}.{field}
inputs: list[str] = Field(default_factory=list)
# variables of form {node}.{field}
outputs: dict[str, str | None] = Field(default_factory=dict)
steps: dict[str, ModelStep] = Field(default_factory=dict)
def model_dump(self, *args, **kwargs):
data = super().model_dump(*args, **kwargs)
data["steps"] = list(data["steps"].values())
return data
@model_validator(mode="before")
def dictify_steps(cls, data):
if "steps" in data and isinstance(data["steps"], list):
steps_dict = {}
for step in data["steps"]:
if step["id"] in steps_dict:
raise ValueError(f"Duplicate step ID: {step['id']}")
steps_dict[step["id"]] = step
data["steps"] = steps_dict
return data
def get_step_variables(self, step_id: str) -> list[str]:
"""Get all variables from a specific step."""
step = self.steps[step_id]
variables = []
for output in step.output_fields:
if output.name == "":
continue
output_var = f"{step.id}.{output.name}"
variables.append(output_var)
return variables
def get_available_variables(self) -> list[str]:
"""Get all output variables from all steps."""
variables = set(self.inputs)
for step in self.steps.values():
variables.update(self.get_step_variables(step.id))
return list(variables)
# %%
|