File size: 8,620 Bytes
193db9d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
# %%
from typing import Any, Literal, Optional

from pydantic import BaseModel, Field, model_validator

"""
Core data structures for defining workflows and their components.

This module defines the primary classes used to model workflows, steps, and their
input/output fields. These data structures serve as the foundation for workflow
definition, validation, and execution throughout the workflows package.

The primary components are:
- InputField: Represents an input to a model step with name and source variable
- OutputField: Represents an output from a model step with name and type
- ModelStep: Represents a single step in a workflow with inputs and outputs
- Workflow: A collection of interconnected steps with defined inputs and outputs

All classes use Pydantic's BaseModel for validation and serialization support.
"""
FieldType = Literal["input", "output"]

SUPPORTED_TYPES = Literal["str", "int", "float", "bool", "list[str]", "list[int]", "list[float]", "list[bool]"]
"""Supported field types for input and output fields"""


class InputField(BaseModel):
    """
    Defines an input field for a model step.

    An input field specifies what data a step requires, where it comes from,
    and optional pre-processing to apply before use.

    Attributes:
        name: The name of the input field within the step's context
        description: Human-readable description of the input's purpose
        variable: Reference to the source variable (format: "{step_id}.{field_name}" or external input name)
        func: Optional function name to transform the input value before use
    """

    name: str
    description: str
    variable: str

    # function to call on the input before passing it to the model
    func: str | None = None


class OutputField(BaseModel):
    """
    Defines an output field produced by a model step.

    An output field specifies a value that the step will produce, including
    its data type and optional post-processing.

    Attributes:
        name: The name of the output field within the step's context
        description: Human-readable description of the output's purpose
        type: The data type of the output (one of SUPPORTED_TYPES)
        func: Optional function name to transform the raw output value
    """

    name: str
    type: SUPPORTED_TYPES = Field(default="str")
    description: str

    # function to call on the output string from the model
    func: str | None = None


class ModelStep(BaseModel):
    """
    Represents a single step in a workflow.

    A model step encapsulates the details of a specific operation within a workflow,
    including what model to use, what inputs it requires, and what outputs it produces.

    Attributes:
        id: Unique identifier for this step within a workflow
        model: The model to use for this step (e.g., "gpt-4")
        provider: The provider of the model (e.g., "openai")
        call_type: The type of operation (e.g., "llm", "search")
        system_prompt: Instructions for the model
        input_fields: List of input fields required by this step
        output_fields: List of output fields produced by this step
    """

    id: str
    name: str
    model: str
    provider: str
    call_type: str = "llm"  # llm, search, etc # TODO: make this enum or provide explicit options using Literal

    # TODO: Validate that this is not None for call_type = llm
    temperature: Optional[float] = None

    system_prompt: str
    input_fields: list[InputField]
    output_fields: list[OutputField]

    def fields(self, field_type: FieldType) -> list[InputField | OutputField]:
        return self.input_fields if field_type == "input" else self.output_fields

    def get_full_model_name(self):
        return f"{self.provider} {self.model}"

    def get_produced_variables(self) -> list[str]:
        return [f"{self.id}.{field.name}" for field in self.output_fields if field.name]

    def update(self, update: dict[str, Any]) -> "ModelStep":
        return self.model_copy(update=update)

    def update_property(self, field: str, value: Any) -> "ModelStep":
        "Update the `field` key of the model step with `value`."
        return self.update({field: value})

    def update_field(self, field_type: FieldType, index: int, key: str, value: str) -> "ModelStep":
        """Update a specific field of an input or output field at the given index."""
        if field_type == "input":
            fields = self.input_fields
        elif field_type == "output":
            fields = self.output_fields
        else:
            raise ValueError(f"Invalid field type: {field_type}")

        if index < len(fields):
            fields[index] = fields[index].model_copy(update={key: value})
        return self.model_copy()

    @staticmethod
    def create_new_field(field_type: FieldType, input_var: str | None = None) -> InputField | OutputField:
        if field_type == "input":
            return InputField(name="", description="", variable=input_var)
        elif field_type == "output":
            return OutputField(name="", description="")
        else:
            raise ValueError(f"Invalid field type: {field_type}")

    def add_field(self, field_type: FieldType, index: int = -1, input_var: str | None = None) -> "ModelStep":
        """Add a new field to the state and update visibility.

        Args:
            field_type: Type of field to add ('input' or 'output').
            index: Position to insert the new field (-1 to append).
        Returns:
            A new ModelStep with the updated fields.
        """
        new_step = self.model_copy()
        fields = new_step.input_fields if field_type == "input" else new_step.output_fields
        new_field = ModelStep.create_new_field(field_type, input_var)
        fields.insert(index + 1, new_field) if index != -1 else fields.append(new_field)
        return new_step

    def delete_field(self, field_type: FieldType, index: int) -> "ModelStep":
        """
        Delete an input or output field from the state and update visibility.

        Args:
            field_type: Type of field to delete ('input' or 'output').
            index: Index of the field to delete. [-1 to delete the last field]

        Returns:
            A new ModelStep with the updated fields.
        """
        new_step = self.model_copy()
        fields = new_step.input_fields if field_type == "input" else new_step.output_fields
        fields.pop(index)
        return new_step


class Workflow(BaseModel):
    """
    Represents a complete workflow composed of interconnected steps.

    A workflow defines a directed acyclic graph of model steps, where outputs
    from earlier steps can be used as inputs to later steps.

    Attributes:
        inputs: List of input variables required by the workflow
        outputs: List of output variables produced by the workflow
        steps: Dictionary mapping step IDs to ModelStep instances

    The inputs and outputs lists use the format "{step_id}.{field_name}"
    to uniquely identify variables within the workflow.
    """

    # variables of form {node}.{field}
    inputs: list[str] = Field(default_factory=list)

    # variables of form {node}.{field}
    outputs: dict[str, str | None] = Field(default_factory=dict)
    steps: dict[str, ModelStep] = Field(default_factory=dict)

    def model_dump(self, *args, **kwargs):
        data = super().model_dump(*args, **kwargs)
        data["steps"] = list(data["steps"].values())
        return data

    @model_validator(mode="before")
    def dictify_steps(cls, data):
        if "steps" in data and isinstance(data["steps"], list):
            steps_dict = {}
            for step in data["steps"]:
                if step["id"] in steps_dict:
                    raise ValueError(f"Duplicate step ID: {step['id']}")
                steps_dict[step["id"]] = step
            data["steps"] = steps_dict
        return data

    def get_step_variables(self, step_id: str) -> list[str]:
        """Get all variables from a specific step."""
        step = self.steps[step_id]
        variables = []
        for output in step.output_fields:
            if output.name == "":
                continue
            output_var = f"{step.id}.{output.name}"
            variables.append(output_var)
        return variables

    def get_available_variables(self) -> list[str]:
        """Get all output variables from all steps."""
        variables = set(self.inputs)
        for step in self.steps.values():
            variables.update(self.get_step_variables(step.id))
        return list(variables)


# %%