|
|
""" |
|
|
In-Memory Storage for Workflow Engine. |
|
|
|
|
|
Provides thread-safe storage for graphs and execution runs. |
|
|
Can be easily replaced with a database implementation. |
|
|
""" |
|
|
|
|
|
from typing import Any, Dict, List, Optional |
|
|
from datetime import datetime |
|
|
import asyncio |
|
|
from dataclasses import dataclass, field |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class StoredGraph: |
|
|
"""A stored graph definition.""" |
|
|
graph_id: str |
|
|
name: str |
|
|
definition: Dict[str, Any] |
|
|
created_at: datetime = field(default_factory=datetime.now) |
|
|
updated_at: datetime = field(default_factory=datetime.now) |
|
|
|
|
|
def to_dict(self) -> Dict[str, Any]: |
|
|
return { |
|
|
"graph_id": self.graph_id, |
|
|
"name": self.name, |
|
|
"definition": self.definition, |
|
|
"created_at": self.created_at.isoformat(), |
|
|
"updated_at": self.updated_at.isoformat(), |
|
|
} |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class StoredRun: |
|
|
"""A stored execution run.""" |
|
|
run_id: str |
|
|
graph_id: str |
|
|
status: str |
|
|
initial_state: Dict[str, Any] |
|
|
current_state: Dict[str, Any] = field(default_factory=dict) |
|
|
final_state: Optional[Dict[str, Any]] = None |
|
|
execution_log: List[Dict[str, Any]] = field(default_factory=list) |
|
|
current_node: Optional[str] = None |
|
|
iteration: int = 0 |
|
|
started_at: datetime = field(default_factory=datetime.now) |
|
|
completed_at: Optional[datetime] = None |
|
|
error: Optional[str] = None |
|
|
|
|
|
def to_dict(self) -> Dict[str, Any]: |
|
|
return { |
|
|
"run_id": self.run_id, |
|
|
"graph_id": self.graph_id, |
|
|
"status": self.status, |
|
|
"initial_state": self.initial_state, |
|
|
"current_state": self.current_state, |
|
|
"final_state": self.final_state, |
|
|
"execution_log": self.execution_log, |
|
|
"current_node": self.current_node, |
|
|
"iteration": self.iteration, |
|
|
"started_at": self.started_at.isoformat(), |
|
|
"completed_at": self.completed_at.isoformat() if self.completed_at else None, |
|
|
"error": self.error, |
|
|
} |
|
|
|
|
|
|
|
|
class GraphStorage: |
|
|
""" |
|
|
Thread-safe in-memory storage for workflow graphs. |
|
|
|
|
|
Stores graph definitions by their ID, allowing creation, |
|
|
retrieval, update, and deletion operations. |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
self._graphs: Dict[str, StoredGraph] = {} |
|
|
self._lock = asyncio.Lock() |
|
|
|
|
|
async def save(self, graph_id: str, name: str, definition: Dict[str, Any]) -> StoredGraph: |
|
|
""" |
|
|
Save a graph definition. |
|
|
|
|
|
Args: |
|
|
graph_id: Unique graph identifier |
|
|
name: Graph name |
|
|
definition: Graph definition dict |
|
|
|
|
|
Returns: |
|
|
The stored graph |
|
|
""" |
|
|
async with self._lock: |
|
|
stored = StoredGraph( |
|
|
graph_id=graph_id, |
|
|
name=name, |
|
|
definition=definition, |
|
|
) |
|
|
self._graphs[graph_id] = stored |
|
|
return stored |
|
|
|
|
|
async def get(self, graph_id: str) -> Optional[StoredGraph]: |
|
|
"""Get a graph by ID.""" |
|
|
async with self._lock: |
|
|
return self._graphs.get(graph_id) |
|
|
|
|
|
async def update(self, graph_id: str, definition: Dict[str, Any]) -> Optional[StoredGraph]: |
|
|
"""Update a graph definition.""" |
|
|
async with self._lock: |
|
|
if graph_id not in self._graphs: |
|
|
return None |
|
|
stored = self._graphs[graph_id] |
|
|
stored.definition = definition |
|
|
stored.updated_at = datetime.now() |
|
|
return stored |
|
|
|
|
|
async def delete(self, graph_id: str) -> bool: |
|
|
"""Delete a graph.""" |
|
|
async with self._lock: |
|
|
if graph_id in self._graphs: |
|
|
del self._graphs[graph_id] |
|
|
return True |
|
|
return False |
|
|
|
|
|
async def list_all(self) -> List[StoredGraph]: |
|
|
"""List all stored graphs.""" |
|
|
async with self._lock: |
|
|
return list(self._graphs.values()) |
|
|
|
|
|
async def exists(self, graph_id: str) -> bool: |
|
|
"""Check if a graph exists.""" |
|
|
async with self._lock: |
|
|
return graph_id in self._graphs |
|
|
|
|
|
def __len__(self) -> int: |
|
|
return len(self._graphs) |
|
|
|
|
|
|
|
|
class RunStorage: |
|
|
""" |
|
|
Thread-safe in-memory storage for execution runs. |
|
|
|
|
|
Stores run state, allowing real-time updates and queries |
|
|
for ongoing and completed runs. |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
self._runs: Dict[str, StoredRun] = {} |
|
|
self._lock = asyncio.Lock() |
|
|
|
|
|
async def create( |
|
|
self, |
|
|
run_id: str, |
|
|
graph_id: str, |
|
|
initial_state: Dict[str, Any] |
|
|
) -> StoredRun: |
|
|
""" |
|
|
Create a new run. |
|
|
|
|
|
Args: |
|
|
run_id: Unique run identifier |
|
|
graph_id: Associated graph ID |
|
|
initial_state: Initial state data |
|
|
|
|
|
Returns: |
|
|
The stored run |
|
|
""" |
|
|
async with self._lock: |
|
|
stored = StoredRun( |
|
|
run_id=run_id, |
|
|
graph_id=graph_id, |
|
|
status="pending", |
|
|
initial_state=initial_state, |
|
|
current_state=initial_state.copy(), |
|
|
) |
|
|
self._runs[run_id] = stored |
|
|
return stored |
|
|
|
|
|
async def get(self, run_id: str) -> Optional[StoredRun]: |
|
|
"""Get a run by ID.""" |
|
|
async with self._lock: |
|
|
return self._runs.get(run_id) |
|
|
|
|
|
async def update_state( |
|
|
self, |
|
|
run_id: str, |
|
|
current_state: Dict[str, Any], |
|
|
current_node: Optional[str] = None, |
|
|
iteration: Optional[int] = None |
|
|
) -> Optional[StoredRun]: |
|
|
"""Update the current state of a run.""" |
|
|
async with self._lock: |
|
|
if run_id not in self._runs: |
|
|
return None |
|
|
stored = self._runs[run_id] |
|
|
stored.current_state = current_state |
|
|
stored.status = "running" |
|
|
if current_node is not None: |
|
|
stored.current_node = current_node |
|
|
if iteration is not None: |
|
|
stored.iteration = iteration |
|
|
return stored |
|
|
|
|
|
async def add_log_entry( |
|
|
self, |
|
|
run_id: str, |
|
|
entry: Dict[str, Any] |
|
|
) -> Optional[StoredRun]: |
|
|
"""Add an entry to the execution log.""" |
|
|
async with self._lock: |
|
|
if run_id not in self._runs: |
|
|
return None |
|
|
self._runs[run_id].execution_log.append(entry) |
|
|
return self._runs[run_id] |
|
|
|
|
|
async def complete( |
|
|
self, |
|
|
run_id: str, |
|
|
final_state: Dict[str, Any], |
|
|
execution_log: List[Dict[str, Any]] |
|
|
) -> Optional[StoredRun]: |
|
|
"""Mark a run as completed.""" |
|
|
async with self._lock: |
|
|
if run_id not in self._runs: |
|
|
return None |
|
|
stored = self._runs[run_id] |
|
|
stored.status = "completed" |
|
|
stored.final_state = final_state |
|
|
stored.execution_log = execution_log |
|
|
stored.completed_at = datetime.now() |
|
|
return stored |
|
|
|
|
|
async def fail( |
|
|
self, |
|
|
run_id: str, |
|
|
error: str, |
|
|
final_state: Optional[Dict[str, Any]] = None |
|
|
) -> Optional[StoredRun]: |
|
|
"""Mark a run as failed.""" |
|
|
async with self._lock: |
|
|
if run_id not in self._runs: |
|
|
return None |
|
|
stored = self._runs[run_id] |
|
|
stored.status = "failed" |
|
|
stored.error = error |
|
|
stored.final_state = final_state |
|
|
stored.completed_at = datetime.now() |
|
|
return stored |
|
|
|
|
|
async def list_all(self) -> List[StoredRun]: |
|
|
"""List all runs.""" |
|
|
async with self._lock: |
|
|
return list(self._runs.values()) |
|
|
|
|
|
async def list_by_graph(self, graph_id: str) -> List[StoredRun]: |
|
|
"""List all runs for a specific graph.""" |
|
|
async with self._lock: |
|
|
return [r for r in self._runs.values() if r.graph_id == graph_id] |
|
|
|
|
|
async def delete(self, run_id: str) -> bool: |
|
|
"""Delete a run.""" |
|
|
async with self._lock: |
|
|
if run_id in self._runs: |
|
|
del self._runs[run_id] |
|
|
return True |
|
|
return False |
|
|
|
|
|
def __len__(self) -> int: |
|
|
return len(self._runs) |
|
|
|
|
|
|
|
|
|
|
|
graph_storage = GraphStorage() |
|
|
run_storage = RunStorage() |
|
|
|