Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| from backend.ai.classifier import classify_email | |
| from backend.ai.composer import compose_reply | |
| from backend.ai.extractor import extract_email_data | |
| def email_received(trigger: dict) -> dict: | |
| return trigger | |
| def schedule(trigger: dict) -> dict: | |
| return trigger | |
| def file_uploaded(trigger: dict) -> dict: | |
| return trigger | |
| def manual_trigger(trigger: dict) -> dict: | |
| return trigger | |
| def ai_extract(params: dict, runtime: dict) -> dict: | |
| email = runtime["trigger"]["email"] | |
| return extract_email_data(email["from"], email.get("subject", ""), email.get("body", "")) | |
| def ai_classify(params: dict, runtime: dict) -> dict: | |
| email = runtime["trigger"]["email"] | |
| return classify_email(email["from"], email.get("subject", ""), email.get("body", ""), runtime["owner"].get("business_description", "")) | |
| def ai_compose(params: dict, runtime: dict) -> dict: | |
| context = _resolve_value(params.get("context"), runtime) | |
| return {"reply_body": compose_reply(context if isinstance(context, dict) else {"customer_name": "there"})} | |
| def sheets_read(params: dict, runtime: dict) -> dict: | |
| sheet_name = params["sheet_name"] | |
| return {"rows": runtime["db"].read_sheet(runtime["owner_id"], sheet_name)} | |
| def sheets_write(params: dict, runtime: dict) -> dict: | |
| data = _resolve_value(params["data"], runtime) | |
| sheet_name = params["sheet_name"] | |
| runtime["db"].append_sheet_row(runtime["owner_id"], sheet_name, data) | |
| return {"written": data} | |
| def sheets_update(params: dict, runtime: dict) -> dict: | |
| runtime["db"].update_sheet_row( | |
| runtime["owner_id"], | |
| params["sheet_name"], | |
| params["lookup_column"], | |
| _resolve_value(params["lookup_value"], runtime), | |
| params["update_column"], | |
| _resolve_value(params["update_value"], runtime), | |
| ) | |
| return {"updated": True} | |
| def send_email(params: dict, runtime: dict) -> dict: | |
| message = { | |
| "to": _resolve_value(params["to"], runtime), | |
| "subject": params["subject"], | |
| "body": _resolve_value(params["body"], runtime), | |
| } | |
| runtime["db"].record_outbound_email(runtime["owner_id"], message) | |
| return message | |
| def notify_owner(params: dict, runtime: dict) -> dict: | |
| payload = { | |
| "message": params["message"], | |
| "severity": params.get("severity", "info"), | |
| "options": params.get("options", []), | |
| } | |
| runtime["db"].create_escalation(runtime["owner_id"], runtime["workflow_id"], runtime.get("execution_id", "pending"), payload) | |
| return payload | |
| def condition(params: dict, runtime: dict) -> dict: | |
| check = params["check"] | |
| if "needs_clarification" in check: | |
| extracted = runtime["step_results"].get("step_extract", {}) | |
| return {"next_step": params["if_false"] if extracted.get("needs_clarification") else params["if_true"]} | |
| return {"next_step": params["if_true"]} | |
| def loop(params: dict, runtime: dict) -> dict: | |
| items = _resolve_value(params["items"], runtime) | |
| return {"items": items} | |
| def wait_for_input(params: dict, runtime: dict) -> dict: | |
| return notify_owner( | |
| { | |
| "message": params["prompt"], | |
| "severity": "warning", | |
| "options": params["options"], | |
| }, | |
| runtime, | |
| ) | |
| def _resolve_value(value, runtime: dict): | |
| if isinstance(value, dict): | |
| return {key: _resolve_value(item, runtime) for key, item in value.items()} | |
| if isinstance(value, list): | |
| return [_resolve_value(item, runtime) for item in value] | |
| if isinstance(value, str) and value.startswith("{{") and value.endswith("}}"): | |
| path = value[2:-2].strip().split(".") | |
| current = runtime | |
| for part in path: | |
| if part in current: | |
| current = current[part] | |
| elif part == "trigger": | |
| current = runtime["trigger"] | |
| elif part in runtime["step_results"]: | |
| current = runtime["step_results"][part] | |
| elif isinstance(current, dict) and part in current: | |
| current = current[part] | |
| elif part == "config": | |
| current = runtime["owner"] | |
| else: | |
| return value | |
| return current | |
| return value | |