Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| Client for multi-threaded telco agent. | |
| This client handles routing between main and secondary threads: | |
| - Main thread: Handles long-running operations (package changes, contract closures, etc.) | |
| - Secondary thread: Handles interim queries while main thread is busy | |
| Usage: | |
| # Interactive mode | |
| python telco_client.py --interactive | |
| # Single message | |
| python telco_client.py | |
| # Custom server URL | |
| python telco_client.py --url http://localhost:2024 --interactive | |
| """ | |
| import argparse | |
| import asyncio | |
| import sys | |
| import time | |
| import uuid | |
| from pathlib import Path | |
| import contextlib | |
| from langgraph_sdk import get_client | |
| from langgraph_sdk.schema import StreamPart | |
| import httpx | |
| from typing import Any, Optional | |
| # Terminal colors | |
| RESET = "\033[0m" | |
| BOLD = "\033[1m" | |
| DIM = "\033[2m" | |
| FG_BLUE = "\033[34m" | |
| FG_GREEN = "\033[32m" | |
| FG_CYAN = "\033[36m" | |
| FG_YELLOW = "\033[33m" | |
| FG_MAGENTA = "\033[35m" | |
| FG_GRAY = "\033[90m" | |
| PROMPT_STR = f"{BOLD}> {RESET}" | |
| def _show_prompt() -> None: | |
| sys.stdout.write(PROMPT_STR) | |
| sys.stdout.flush() | |
| def _write_line(s: str) -> None: | |
| sys.stdout.write("\r\x1b[2K" + s + "\n") | |
| sys.stdout.flush() | |
| _show_prompt() | |
| def _write_line_no_prompt(s: str) -> None: | |
| sys.stdout.write("\r\x1b[2K" + s + "\n") | |
| sys.stdout.flush() | |
| def _log(msg: str) -> None: | |
| _write_line(f"{FG_GRAY}{msg}{RESET}") | |
| def _user(msg: str) -> None: | |
| _write_line_no_prompt(f"{FG_BLUE}User{RESET}: {msg}") | |
| def _assistant(msg: str) -> None: | |
| _write_line(f"{FG_GREEN}Assistant{RESET}: {msg}") | |
| def _event(label: str, text: str) -> None: | |
| _write_line(f"{FG_YELLOW}[{label}]{RESET} {DIM}{text}{RESET}") | |
| def _extract_text_from_messages(messages: list[Any]) -> Optional[str]: | |
| """Extract text from a list of message objects.""" | |
| if not isinstance(messages, list) or not messages: | |
| return None | |
| last = messages[-1] | |
| if isinstance(last, dict): | |
| content = last.get("content") | |
| if isinstance(content, str): | |
| return content | |
| if isinstance(content, list): | |
| pieces: list[str] = [] | |
| for seg in content: | |
| if isinstance(seg, dict): | |
| t = seg.get("text") or seg.get("content") or "" | |
| if isinstance(t, str) and t: | |
| pieces.append(t) | |
| if pieces: | |
| return "\n".join(pieces) | |
| return None | |
| def _extract_text(payload: Any, *, graph_key: str | None = None) -> Optional[str]: | |
| """Extract assistant text from various payload shapes.""" | |
| # Direct string | |
| if isinstance(payload, str): | |
| return payload | |
| # List of messages or mixed | |
| if isinstance(payload, list): | |
| text = _extract_text_from_messages(payload) | |
| if text: | |
| return text | |
| # Fallback: any string entries | |
| for v in payload: | |
| t = _extract_text(v, graph_key=graph_key) | |
| if t: | |
| return t | |
| return None | |
| # Dict payloads | |
| if isinstance(payload, dict): | |
| # Graph-level direct string | |
| if graph_key and isinstance(payload.get(graph_key), str): | |
| return payload[graph_key] | |
| # Common shapes | |
| if isinstance(payload.get("value"), (str, list, dict)): | |
| t = _extract_text(payload.get("value"), graph_key=graph_key) | |
| if t: | |
| return t | |
| if isinstance(payload.get("messages"), list): | |
| t = _extract_text_from_messages(payload.get("messages", [])) | |
| if t: | |
| return t | |
| if isinstance(payload.get("content"), str): | |
| return payload.get("content") | |
| # Search nested values | |
| for v in payload.values(): | |
| t = _extract_text(v, graph_key=graph_key) | |
| if t: | |
| return t | |
| return None | |
| async def stream_run( | |
| client, | |
| thread_id: str, | |
| graph: str, | |
| message: dict, | |
| label: str, | |
| *, | |
| namespace_for_memory: tuple[str, ...], | |
| global_last_text: dict[str, str], # Shared across runs for deduplication | |
| ) -> int: | |
| """Stream a run and print output.""" | |
| printed_once = False | |
| command: dict[str, Any] | None = None | |
| config = { | |
| "configurable": { | |
| "thread_id": thread_id, | |
| "namespace_for_memory": list(namespace_for_memory), | |
| } | |
| } | |
| while True: | |
| last_text: Optional[str] = global_last_text.get("last", None) # Global de-dupe | |
| stream = client.runs.stream( | |
| thread_id=thread_id, | |
| assistant_id=graph, | |
| input=message if command is None else None, | |
| command=command, | |
| stream_mode=["values", "custom"], | |
| config=config, | |
| ) | |
| saw_interrupt = False | |
| async for part in stream: | |
| assert isinstance(part, StreamPart) | |
| if part.event == "metadata": | |
| data = part.data or {} | |
| run_id = (data.get("run_id") if isinstance(data, dict) else None) or "?" | |
| _event(label, f"run started (run_id={run_id}, thread_id={thread_id})") | |
| continue | |
| if part.event == "custom": | |
| data = part.data | |
| text = _extract_text(data, graph_key=graph) | |
| if text and text != last_text: | |
| _assistant(text) | |
| last_text = text | |
| global_last_text["last"] = text | |
| continue | |
| if part.event == "values": | |
| data = part.data | |
| text = _extract_text(data, graph_key=graph) | |
| if text and text != last_text: | |
| _assistant(text) | |
| last_text = text | |
| global_last_text["last"] = text | |
| continue | |
| # Uncomment for debug info | |
| # if part.event: | |
| # _event(label, f"{part.event} {part.data}") | |
| if part.event == "end": | |
| return 0 | |
| if saw_interrupt: | |
| command = {"resume": None} | |
| continue | |
| return 0 | |
| async def ainput(prompt: str = "") -> str: | |
| """Async input wrapper.""" | |
| loop = asyncio.get_running_loop() | |
| return await loop.run_in_executor(None, lambda: input(prompt)) | |
| async def read_latest_status(client, namespace_for_memory: tuple[str, ...]) -> dict: | |
| """Read the latest tool status from the store.""" | |
| ns_list = list(namespace_for_memory) | |
| try: | |
| items = await client.store.search_items(ns_list) | |
| except Exception: | |
| return {} | |
| # Normalize return shape: SDK may return a dict with 'items' or a bare list | |
| items_list: list[Any] | None = None | |
| if isinstance(items, dict): | |
| inner = items.get("items") | |
| if isinstance(inner, list): | |
| items_list = inner | |
| elif isinstance(items, list): | |
| items_list = items | |
| if not items_list: | |
| return {} | |
| # Walk from the end to find the most recent item that has a 'status' | |
| for item in reversed(items_list): | |
| value = getattr(item, "value", None) | |
| if value is None and isinstance(item, dict): | |
| value = item.get("value") | |
| if isinstance(value, dict) and "status" in value: | |
| return value | |
| # Fallback to last value if present | |
| last = items_list[-1] | |
| value = getattr(last, "value", None) | |
| if value is None and isinstance(last, dict): | |
| value = last.get("value") | |
| return value if isinstance(value, dict) else {} | |
| async def check_completion_flag(client, namespace_for_memory: tuple[str, ...]) -> bool: | |
| """Check if main operation has completed recently.""" | |
| ns_list = list(namespace_for_memory) | |
| try: | |
| items = await client.store.search_items(ns_list) | |
| except Exception: | |
| return False | |
| # Normalize return shape | |
| items_list: list[Any] | None = None | |
| if isinstance(items, dict): | |
| inner = items.get("items") | |
| if isinstance(inner, list): | |
| items_list = inner | |
| elif isinstance(items, list): | |
| items_list = items | |
| if not items_list: | |
| return False | |
| # Look for completion flag | |
| for item in reversed(items_list): | |
| key = getattr(item, "key", None) or (item.get("key") if isinstance(item, dict) else None) | |
| if key == "main_operation_complete": | |
| value = getattr(item, "value", None) | |
| if value is None and isinstance(item, dict): | |
| value = item.get("value") | |
| if isinstance(value, dict) and value.get("ready_for_new_operation"): | |
| return True | |
| return False | |
| async def run_client( | |
| base_url: str, | |
| graph: str, | |
| user_id: str, | |
| interactive: bool, | |
| thread_file: str | None, | |
| initial_message: str | None, | |
| ) -> int: | |
| """Main client logic.""" | |
| client = get_client(url=base_url) | |
| # Primary and secondary thread ids | |
| thread_path = Path(thread_file) if thread_file else None | |
| # Main thread: load from file if present; otherwise create on server and persist | |
| if thread_path and thread_path.exists(): | |
| try: | |
| loaded = thread_path.read_text().strip().splitlines() | |
| thread_id_main = loaded[0] if loaded else None | |
| except Exception: | |
| thread_id_main = None | |
| if not thread_id_main: | |
| t = await client.threads.create() | |
| thread_id_main = getattr(t, "thread_id", None) or ( | |
| t["thread_id"] if isinstance(t, dict) else str(uuid.uuid4()) | |
| ) | |
| try: | |
| thread_path.write_text(thread_id_main + "\n") | |
| except Exception: | |
| pass | |
| else: | |
| try: | |
| await client.threads.create(thread_id=thread_id_main, if_exists="do_nothing") | |
| except httpx.HTTPStatusError as e: | |
| if getattr(e, "response", None) is not None and e.response.status_code == 409: | |
| pass | |
| else: | |
| raise | |
| else: | |
| t = await client.threads.create() | |
| thread_id_main = getattr(t, "thread_id", None) or ( | |
| t["thread_id"] if isinstance(t, dict) else str(uuid.uuid4()) | |
| ) | |
| if thread_path: | |
| try: | |
| thread_path.write_text(thread_id_main + "\n") | |
| except Exception: | |
| pass | |
| # Secondary thread: always create on server (ephemeral) | |
| t2 = await client.threads.create() | |
| thread_id_updates = getattr(t2, "thread_id", None) or ( | |
| t2["thread_id"] if isinstance(t2, dict) else str(uuid.uuid4()) | |
| ) | |
| # Shared namespace used by server agent's tools | |
| namespace_for_memory = (user_id, "tools_updates") | |
| print(f"{FG_MAGENTA}Telco Agent Multi-Threaded Client{RESET}") | |
| print(f"Main Thread ID: {FG_CYAN}{thread_id_main}{RESET}") | |
| print(f"Secondary Thread ID: {FG_CYAN}{thread_id_updates}{RESET}") | |
| print(f"Namespace: {FG_CYAN}{namespace_for_memory}{RESET}") | |
| print() | |
| # Interactive loop | |
| if interactive: | |
| print(f"{FG_CYAN}Interactive Mode: Type your message. Use /exit to quit.{RESET}") | |
| print(f"{FG_GRAY}Long operations will run in background. You can ask questions while they run.{RESET}") | |
| print() | |
| # Clear any stale flags from previous sessions | |
| try: | |
| ns_list = list(namespace_for_memory) | |
| await client.store.delete_item(ns_list, "main_operation_complete") | |
| await client.store.delete_item(ns_list, "working-tool-status-update") | |
| await client.store.delete_item(ns_list, "secondary_status") | |
| await client.store.delete_item(ns_list, "secondary_abort") | |
| await client.store.delete_item(ns_list, "secondary_interim_messages") | |
| except Exception: | |
| pass # Flags might not exist, that's okay | |
| _show_prompt() | |
| # Track background task and state | |
| main_job: asyncio.Task[int] | None = None | |
| interim_messages_reset = True | |
| global_last_text: dict[str, str] = {} # Global deduplication | |
| cooldown_until: float = 0 # Cooldown timestamp | |
| last_operation_complete_time: float = 0 | |
| while True: | |
| try: | |
| user_text = await ainput("") | |
| except (KeyboardInterrupt, EOFError): | |
| user_text = "/exit" | |
| user_text = (user_text or "").strip() | |
| if not user_text: | |
| continue | |
| if user_text.lower() in {"exit", "quit", "/exit"}: | |
| break | |
| _user(user_text) | |
| # Check if we're in cooldown period | |
| current_time = time.time() | |
| if current_time < cooldown_until: | |
| wait_time = int(cooldown_until - current_time) | |
| _event("cooldown", f"Operation just completed, waiting {wait_time}s before starting new operation...") | |
| await asyncio.sleep(cooldown_until - current_time) | |
| cooldown_until = 0 | |
| # Clear completion flag after cooldown | |
| try: | |
| ns_list = list(namespace_for_memory) | |
| # Try to delete completion flag (may not exist) | |
| try: | |
| await client.store.delete_item(ns_list, "main_operation_complete") | |
| except Exception: | |
| pass | |
| except Exception: | |
| pass | |
| # Determine current status based ONLY on server-side store | |
| # Don't use main_job.done() because the client task finishes quickly | |
| # even though the server operation continues | |
| long_info = await read_latest_status(client, namespace_for_memory) | |
| long_running = bool(long_info.get("status") == "running") | |
| just_completed = await check_completion_flag(client, namespace_for_memory) | |
| # If operation just completed, set cooldown but don't skip the message | |
| if just_completed and last_operation_complete_time != current_time: | |
| _event("status", f"{FG_MAGENTA}Operation complete! Ready for new requests.{RESET}") | |
| cooldown_until = time.time() + 2.0 # 2 second cooldown | |
| last_operation_complete_time = current_time | |
| global_last_text.clear() # Clear dedup cache | |
| main_job = None | |
| # Clear completion flag | |
| try: | |
| ns_list = list(namespace_for_memory) | |
| await client.store.delete_item(ns_list, "main_operation_complete") | |
| except Exception: | |
| pass | |
| # Don't continue - let the message be processed after cooldown | |
| # The cooldown check above will handle waiting if needed | |
| # Routing logic: Use ONLY server-side status, not client task status | |
| if long_running and not just_completed: | |
| # Secondary thread: handle queries during long operation | |
| progress = long_info.get("progress", "?") | |
| tool_name = long_info.get("tool_name", "operation") | |
| _event("routing", f"Operation in progress ({progress}%), routing to secondary thread") | |
| payload = { | |
| "messages": [{"type": "human", "content": user_text}], | |
| "thread_type": "secondary", | |
| "interim_messages_reset": False, | |
| } | |
| await stream_run( | |
| client, | |
| thread_id_updates, | |
| graph, | |
| payload, | |
| label=f"secondary [{progress}%]", | |
| namespace_for_memory=namespace_for_memory, | |
| global_last_text=global_last_text, | |
| ) | |
| interim_messages_reset = False | |
| else: | |
| # Main thread: start new operation | |
| _event("routing", "Starting new operation on main thread (background)") | |
| interim_messages_reset = True | |
| global_last_text.clear() # Clear for new operation | |
| payload = { | |
| "messages": [{"type": "human", "content": user_text}], | |
| "thread_type": "main", | |
| "interim_messages_reset": interim_messages_reset, | |
| } | |
| async def run_main() -> int: | |
| result = await stream_run( | |
| client, | |
| thread_id_main, | |
| graph, | |
| payload, | |
| label="main", | |
| namespace_for_memory=namespace_for_memory, | |
| global_last_text=global_last_text, | |
| ) | |
| # After completion, signal cooldown | |
| return result | |
| main_job = asyncio.create_task(run_main()) | |
| # Do not await; allow user to type while long task runs | |
| # On exit, best-effort wait for background | |
| if main_job is not None: | |
| print(f"\n{FG_GRAY}Waiting for background task to complete...{RESET}") | |
| with contextlib.suppress(Exception): | |
| await asyncio.wait_for(main_job, timeout=10) | |
| return 0 | |
| else: | |
| # Non-interactive: single message to main thread | |
| msg = initial_message or "Hello, I need help with my mobile account" | |
| print(f"{FG_BLUE}Sending:{RESET} {msg}\n") | |
| payload = { | |
| "messages": [{"type": "human", "content": msg}], | |
| "thread_type": "main", | |
| "interim_messages_reset": True, | |
| } | |
| global_last_text: dict[str, str] = {} | |
| return await stream_run( | |
| client, | |
| thread_id_main, | |
| graph, | |
| payload, | |
| label="single", | |
| namespace_for_memory=namespace_for_memory, | |
| global_last_text=global_last_text, | |
| ) | |
| def main(argv: list[str]) -> int: | |
| parser = argparse.ArgumentParser( | |
| description="Client for multi-threaded telco agent", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| Examples: | |
| # Interactive mode (recommended) | |
| python telco_client.py --interactive | |
| # Single message | |
| python telco_client.py --message "What's my current package?" | |
| # Custom server and user | |
| python telco_client.py --url http://localhost:8000 --user john_doe --interactive | |
| # Use different thread file | |
| python telco_client.py --thread-file .telco_thread --interactive | |
| """ | |
| ) | |
| parser.add_argument( | |
| "--url", | |
| default="http://127.0.0.1:2024", | |
| help="LangGraph server base URL (default: http://127.0.0.1:2024)" | |
| ) | |
| parser.add_argument( | |
| "--graph", | |
| default="telco-agent", | |
| help="Graph name as defined in langgraph.json (default: telco-agent)" | |
| ) | |
| parser.add_argument( | |
| "--user", | |
| default="fciannella", | |
| help="User ID for namespace (default: fciannella)" | |
| ) | |
| parser.add_argument( | |
| "--interactive", | |
| action="store_true", | |
| help="Interactive mode (chat continuously)" | |
| ) | |
| parser.add_argument( | |
| "--thread-file", | |
| default=".telco_thread_id", | |
| help="Path to persist/load main thread ID (default: .telco_thread_id)" | |
| ) | |
| parser.add_argument( | |
| "--message", | |
| "-m", | |
| help="Single message to send (non-interactive mode)" | |
| ) | |
| args = parser.parse_args(argv) | |
| return asyncio.run( | |
| run_client( | |
| base_url=args.url, | |
| graph=args.graph, | |
| user_id=args.user, | |
| interactive=args.interactive, | |
| thread_file=args.thread_file, | |
| initial_message=args.message, | |
| ) | |
| ) | |
| if __name__ == "__main__": | |
| raise SystemExit(main(sys.argv[1:])) | |