| | |
| | """ |
| | END-TO-END TEST - Harmonic Stack |
| | Ghost in the Machine Labs |
| | |
| | Full flow: User → Operator → Director → Coordinator → Result |
| | """ |
| | import asyncio |
| | import json |
| | from datetime import datetime |
| | import httpx |
| | import redis.asyncio as redis |
| |
|
| | OLLAMA_URL = "http://localhost:11434" |
| | REDIS_URL = "redis://localhost:6379" |
| |
|
| | async def e2e_test(): |
| | """Run full end-to-end test""" |
| | print("=" * 60) |
| | print("HARMONIC STACK END-TO-END TEST") |
| | print("Ghost in the Machine Labs") |
| | print("=" * 60) |
| | print(f"Time: {datetime.now().isoformat()}\n") |
| | |
| | r = redis.from_url(REDIS_URL) |
| | http = httpx.AsyncClient(timeout=120.0) |
| | |
| | |
| | user_request = "Write a Python function to calculate factorial" |
| | print(f"USER REQUEST: {user_request}\n") |
| | |
| | |
| | print("-" * 60) |
| | print("STEP 1: OPERATOR ROUTING") |
| | print("-" * 60) |
| | |
| | route_prompt = f"""Route this message to the appropriate node. |
| | |
| | MESSAGE: {user_request} |
| | |
| | AVAILABLE NODES: |
| | - executive: High-level planning |
| | - technical_director: Code, architecture |
| | - creative_director: Writing, design |
| | - research_director: Investigation, analysis |
| | - operations_director: Process, scheduling |
| | - council: Ethical deliberation |
| | |
| | Respond with JSON: {{"route_to": "node", "priority": 1-5, "reason": "why"}}""" |
| |
|
| | resp = await http.post( |
| | f"{OLLAMA_URL}/api/generate", |
| | json={"model": "operator", "prompt": route_prompt, "stream": False} |
| | ) |
| | |
| | route_result = resp.json().get("response", "") |
| | print(f"Operator: {route_result[:200]}") |
| | |
| | |
| | try: |
| | start = route_result.find("{") |
| | end = route_result.rfind("}") + 1 |
| | decision = json.loads(route_result[start:end]) |
| | target = decision.get("route_to", "technical_director") |
| | print(f"\n→ Routed to: {target}") |
| | except: |
| | target = "technical_director" |
| | print(f"\n→ Default route: {target}") |
| | |
| | |
| | print("\n" + "-" * 60) |
| | print("STEP 2: DIRECTOR → COORDINATOR") |
| | print("-" * 60) |
| | |
| | |
| | pubsub = r.pubsub() |
| | await pubsub.subscribe("msgbus:director:technical_director") |
| | |
| | |
| | domain_map = { |
| | "technical_director": "code", |
| | "creative_director": "writing", |
| | "research_director": "research", |
| | "operations_director": "system" |
| | } |
| | coordinator_domain = domain_map.get(target, "code") |
| | |
| | |
| | task = { |
| | "type": "task", |
| | "task_id": f"e2e-{datetime.now().strftime('%H%M%S')}", |
| | "from": "technical_director", |
| | "specification": user_request, |
| | "constraints": ["Clean code", "Include docstring"], |
| | "quality_criteria": ["Working code", "Proper error handling"], |
| | "context": {"user": "joe", "priority": "normal"}, |
| | "priority": 2 |
| | } |
| | |
| | await r.publish(f"msgbus:coordinator:{coordinator_domain}", json.dumps(task)) |
| | print(f"Task {task['task_id']} sent to coordinator_{coordinator_domain}") |
| | |
| | |
| | print("\n" + "-" * 60) |
| | print("STEP 3: COORDINATOR EXECUTION") |
| | print("-" * 60) |
| | print("Waiting for result...") |
| | |
| | result = None |
| | timeout = 60 |
| | start_time = datetime.now() |
| | |
| | while (datetime.now() - start_time).total_seconds() < timeout: |
| | msg = await pubsub.get_message(timeout=1.0) |
| | if msg and msg["type"] == "message": |
| | data = msg["data"] |
| | if isinstance(data, bytes): |
| | data = data.decode() |
| | try: |
| | result = json.loads(data) |
| | if result.get("type") == "task_result": |
| | break |
| | except: |
| | pass |
| | |
| | if result: |
| | print(f"\n✓ Result received!") |
| | print(f" Task ID: {result.get('task_id')}") |
| | print(f" From: {result.get('from')}") |
| | print(f" Status: {result.get('status')}") |
| | print(f" Quality: {result.get('quality')}") |
| | print(f" Time: {result.get('ms')}ms") |
| | print(f"\n Output preview:") |
| | output = result.get("output", "") |
| | print(f" {output[:500]}..." if len(output) > 500 else f" {output}") |
| | else: |
| | print("\n✗ Timeout waiting for result") |
| | print(" (Is coordinators.py running?)") |
| | |
| | |
| | await pubsub.close() |
| | await r.aclose() |
| | await http.aclose() |
| | |
| | |
| | print("\n" + "=" * 60) |
| | print("SUMMARY") |
| | print("=" * 60) |
| | print(f" User Request: {user_request}") |
| | print(f" Routed To: {target}") |
| | print(f" Coordinator: {coordinator_domain}") |
| | print(f" Result: {'SUCCESS' if result else 'TIMEOUT'}") |
| | |
| | if result: |
| | print("\n🎉 END-TO-END TEST PASSED!") |
| | else: |
| | print("\n⚠️ Test incomplete - start coordinators.py first") |
| | |
| | return result is not None |
| |
|
| |
|
| | if __name__ == "__main__": |
| | asyncio.run(e2e_test()) |
| |
|