Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| from copy import deepcopy | |
| from datetime import datetime, timezone | |
| from uuid import uuid4 | |
| class InMemoryDatabase: | |
| def __init__(self) -> None: | |
| self.reset() | |
| def reset(self) -> None: | |
| self.owners: dict[str, dict] = {} | |
| self.workflows: dict[str, list[dict]] = {} | |
| self.data_files: dict[str, list[dict]] = {} | |
| self.execution_logs: dict[str, list[dict]] = {} | |
| self.escalations: dict[str, list[dict]] = {} | |
| self.sheets: dict[str, dict[str, list[dict]]] = {} | |
| self.outbound_emails: dict[str, list[dict]] = {} | |
| def ensure_owner(self, owner_id: str, email: str) -> dict: | |
| owner = self.owners.get(owner_id) | |
| if owner: | |
| owner["email"] = email | |
| return owner | |
| owner = { | |
| "id": owner_id, | |
| "email": email, | |
| "business_description": "", | |
| "business_analysis": {}, | |
| "spreadsheet_id": "demo-sheet", | |
| "spreadsheet_config": {"connected": True, "inventory_sheet": "Inventory", "orders_sheet": "Orders"}, | |
| "preferred_tone": "friendly", | |
| "created_at": datetime.now(timezone.utc).isoformat(), | |
| "state": "onboarding", | |
| } | |
| self.owners[owner_id] = owner | |
| self.sheets[owner_id] = { | |
| "Inventory": [ | |
| {"item": "Honeycrisp apples", "stock": 120}, | |
| {"item": "Fuji apples", "stock": 90}, | |
| ], | |
| "Orders": [], | |
| } | |
| return owner | |
| def save_owner(self, owner: dict) -> dict: | |
| self.owners[owner["id"]] = owner | |
| return owner | |
| def get_owner(self, owner_id: str) -> dict: | |
| if owner_id not in self.owners: | |
| raise KeyError(f"owner {owner_id} not found") | |
| return deepcopy(self.owners[owner_id]) | |
| def save_workflow(self, owner_id: str, workflow: dict) -> dict: | |
| record = deepcopy(workflow) | |
| record["id"] = str(uuid4()) | |
| record["status"] = "active" | |
| self.workflows.setdefault(owner_id, []).append(record) | |
| return deepcopy(record) | |
| def list_workflows(self, owner_id: str) -> list[dict]: | |
| return deepcopy(self.workflows.get(owner_id, [])) | |
| def deactivate_workflows(self, owner_id: str) -> list[dict]: | |
| workflows = self.workflows.get(owner_id, []) | |
| for workflow in workflows: | |
| workflow["status"] = "inactive" | |
| return deepcopy(workflows) | |
| def get_workflow(self, owner_id: str, workflow_id: str) -> dict: | |
| for workflow in self.workflows.get(owner_id, []): | |
| if workflow["id"] == workflow_id: | |
| return deepcopy(workflow) | |
| raise KeyError(f"workflow {workflow_id} not found") | |
| def save_data_file(self, owner_id: str, filename: str, file_type: str, purpose: str, parsed_data: dict) -> dict: | |
| record = { | |
| "id": str(uuid4()), | |
| "filename": filename, | |
| "file_type": file_type, | |
| "purpose": purpose, | |
| "parsed_data": parsed_data, | |
| "uploaded_at": datetime.now(timezone.utc).isoformat(), | |
| } | |
| self.data_files.setdefault(owner_id, []).append(record) | |
| return deepcopy(record) | |
| def list_data_files(self, owner_id: str) -> list[dict]: | |
| return deepcopy(self.data_files.get(owner_id, [])) | |
| def save_execution_log(self, owner_id: str, workflow_id: str, trigger_data: dict, steps_executed: list, outcome: str, error_message: str | None) -> dict: | |
| record = { | |
| "id": str(uuid4()), | |
| "workflow_id": workflow_id, | |
| "trigger_data": trigger_data, | |
| "steps_executed": steps_executed, | |
| "outcome": outcome, | |
| "error_message": error_message, | |
| "executed_at": datetime.now(timezone.utc).isoformat(), | |
| } | |
| self.execution_logs.setdefault(owner_id, []).append(record) | |
| return deepcopy(record) | |
| def list_execution_logs(self, owner_id: str) -> list[dict]: | |
| return deepcopy(self.execution_logs.get(owner_id, [])) | |
| def create_escalation(self, owner_id: str, workflow_id: str, execution_id: str, payload: dict) -> dict: | |
| record = { | |
| "id": str(uuid4()), | |
| "workflow_id": workflow_id, | |
| "execution_id": execution_id, | |
| "reason": payload["message"], | |
| "context": payload, | |
| "options": payload.get("options", []), | |
| "owner_response": None, | |
| "status": "pending", | |
| "created_at": datetime.now(timezone.utc).isoformat(), | |
| } | |
| self.escalations.setdefault(owner_id, []).append(record) | |
| return deepcopy(record) | |
| def list_escalations(self, owner_id: str) -> list[dict]: | |
| return deepcopy(self.escalations.get(owner_id, [])) | |
| def resolve_escalation(self, escalation_id: str, response: str) -> dict: | |
| for owner_escalations in self.escalations.values(): | |
| for item in owner_escalations: | |
| if item["id"] == escalation_id: | |
| item["owner_response"] = response | |
| item["status"] = "resolved" | |
| return deepcopy(item) | |
| raise KeyError(f"escalation {escalation_id} not found") | |
| def read_sheet(self, owner_id: str, sheet_name: str) -> list[dict]: | |
| return deepcopy(self.sheets.setdefault(owner_id, {}).setdefault(sheet_name, [])) | |
| def append_sheet_row(self, owner_id: str, sheet_name: str, row: dict) -> None: | |
| self.sheets.setdefault(owner_id, {}).setdefault(sheet_name, []).append(deepcopy(row)) | |
| def update_sheet_row(self, owner_id: str, sheet_name: str, lookup_column: str, lookup_value, update_column: str, update_value) -> None: | |
| for row in self.sheets.setdefault(owner_id, {}).setdefault(sheet_name, []): | |
| if row.get(lookup_column) == lookup_value: | |
| row[update_column] = update_value | |
| def record_outbound_email(self, owner_id: str, message: dict) -> None: | |
| self.outbound_emails.setdefault(owner_id, []).append(deepcopy(message)) | |
| db = InMemoryDatabase() | |