flow-pilot / backend /engine /primitives.py
DevelopedBy-Siva
setup the initial app and deploy
83fe4f9
from __future__ import annotations
from backend.ai.classifier import classify_email
from backend.ai.composer import compose_reply
from backend.ai.extractor import extract_email_data
def email_received(trigger: dict) -> dict:
return trigger
def schedule(trigger: dict) -> dict:
return trigger
def file_uploaded(trigger: dict) -> dict:
return trigger
def manual_trigger(trigger: dict) -> dict:
return trigger
def ai_extract(params: dict, runtime: dict) -> dict:
email = runtime["trigger"]["email"]
return extract_email_data(email["from"], email.get("subject", ""), email.get("body", ""))
def ai_classify(params: dict, runtime: dict) -> dict:
email = runtime["trigger"]["email"]
return classify_email(email["from"], email.get("subject", ""), email.get("body", ""), runtime["owner"].get("business_description", ""))
def ai_compose(params: dict, runtime: dict) -> dict:
context = _resolve_value(params.get("context"), runtime)
return {"reply_body": compose_reply(context if isinstance(context, dict) else {"customer_name": "there"})}
def sheets_read(params: dict, runtime: dict) -> dict:
sheet_name = params["sheet_name"]
return {"rows": runtime["db"].read_sheet(runtime["owner_id"], sheet_name)}
def sheets_write(params: dict, runtime: dict) -> dict:
data = _resolve_value(params["data"], runtime)
sheet_name = params["sheet_name"]
runtime["db"].append_sheet_row(runtime["owner_id"], sheet_name, data)
return {"written": data}
def sheets_update(params: dict, runtime: dict) -> dict:
runtime["db"].update_sheet_row(
runtime["owner_id"],
params["sheet_name"],
params["lookup_column"],
_resolve_value(params["lookup_value"], runtime),
params["update_column"],
_resolve_value(params["update_value"], runtime),
)
return {"updated": True}
def send_email(params: dict, runtime: dict) -> dict:
message = {
"to": _resolve_value(params["to"], runtime),
"subject": params["subject"],
"body": _resolve_value(params["body"], runtime),
}
runtime["db"].record_outbound_email(runtime["owner_id"], message)
return message
def notify_owner(params: dict, runtime: dict) -> dict:
payload = {
"message": params["message"],
"severity": params.get("severity", "info"),
"options": params.get("options", []),
}
runtime["db"].create_escalation(runtime["owner_id"], runtime["workflow_id"], runtime.get("execution_id", "pending"), payload)
return payload
def condition(params: dict, runtime: dict) -> dict:
check = params["check"]
if "needs_clarification" in check:
extracted = runtime["step_results"].get("step_extract", {})
return {"next_step": params["if_false"] if extracted.get("needs_clarification") else params["if_true"]}
return {"next_step": params["if_true"]}
def loop(params: dict, runtime: dict) -> dict:
items = _resolve_value(params["items"], runtime)
return {"items": items}
def wait_for_input(params: dict, runtime: dict) -> dict:
return notify_owner(
{
"message": params["prompt"],
"severity": "warning",
"options": params["options"],
},
runtime,
)
def _resolve_value(value, runtime: dict):
if isinstance(value, dict):
return {key: _resolve_value(item, runtime) for key, item in value.items()}
if isinstance(value, list):
return [_resolve_value(item, runtime) for item in value]
if isinstance(value, str) and value.startswith("{{") and value.endswith("}}"):
path = value[2:-2].strip().split(".")
current = runtime
for part in path:
if part in current:
current = current[part]
elif part == "trigger":
current = runtime["trigger"]
elif part in runtime["step_results"]:
current = runtime["step_results"][part]
elif isinstance(current, dict) and part in current:
current = current[part]
elif part == "config":
current = runtime["owner"]
else:
return value
return current
return value