|
|
"""
|
|
|
Durable Objects integration for OpenManus
|
|
|
Provides interface to Cloudflare Durable Objects operations
|
|
|
"""
|
|
|
|
|
|
import json
|
|
|
import time
|
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
|
|
from app.logger import logger
|
|
|
|
|
|
from .client import CloudflareClient, CloudflareError
|
|
|
|
|
|
|
|
|
class DurableObjects:
|
|
|
"""Cloudflare Durable Objects client"""
|
|
|
|
|
|
def __init__(self, client: CloudflareClient):
|
|
|
self.client = client
|
|
|
|
|
|
async def create_agent_session(
|
|
|
self, session_id: str, user_id: str, metadata: Optional[Dict[str, Any]] = None
|
|
|
) -> Dict[str, Any]:
|
|
|
"""Create a new agent session"""
|
|
|
|
|
|
session_data = {
|
|
|
"sessionId": session_id,
|
|
|
"userId": user_id,
|
|
|
"metadata": metadata or {},
|
|
|
}
|
|
|
|
|
|
try:
|
|
|
response = await self.client.post(
|
|
|
f"do/agent/{session_id}/start", data=session_data, use_worker=True
|
|
|
)
|
|
|
|
|
|
return {
|
|
|
"success": True,
|
|
|
"session_id": session_id,
|
|
|
"user_id": user_id,
|
|
|
**response,
|
|
|
}
|
|
|
|
|
|
except CloudflareError as e:
|
|
|
logger.error(f"Failed to create agent session: {e}")
|
|
|
raise
|
|
|
|
|
|
async def get_agent_session_status(self, session_id: str) -> Dict[str, Any]:
|
|
|
"""Get agent session status"""
|
|
|
|
|
|
try:
|
|
|
response = await self.client.get(
|
|
|
f"do/agent/{session_id}/status?sessionId={session_id}", use_worker=True
|
|
|
)
|
|
|
|
|
|
return response
|
|
|
|
|
|
except CloudflareError as e:
|
|
|
logger.error(f"Failed to get agent session status: {e}")
|
|
|
raise
|
|
|
|
|
|
async def update_agent_session(
|
|
|
self, session_id: str, updates: Dict[str, Any]
|
|
|
) -> Dict[str, Any]:
|
|
|
"""Update agent session"""
|
|
|
|
|
|
update_data = {"sessionId": session_id, "updates": updates}
|
|
|
|
|
|
try:
|
|
|
response = await self.client.post(
|
|
|
f"do/agent/{session_id}/update", data=update_data, use_worker=True
|
|
|
)
|
|
|
|
|
|
return {"success": True, "session_id": session_id, **response}
|
|
|
|
|
|
except CloudflareError as e:
|
|
|
logger.error(f"Failed to update agent session: {e}")
|
|
|
raise
|
|
|
|
|
|
async def stop_agent_session(self, session_id: str) -> Dict[str, Any]:
|
|
|
"""Stop agent session"""
|
|
|
|
|
|
try:
|
|
|
response = await self.client.post(
|
|
|
f"do/agent/{session_id}/stop",
|
|
|
data={"sessionId": session_id},
|
|
|
use_worker=True,
|
|
|
)
|
|
|
|
|
|
return {"success": True, "session_id": session_id, **response}
|
|
|
|
|
|
except CloudflareError as e:
|
|
|
logger.error(f"Failed to stop agent session: {e}")
|
|
|
raise
|
|
|
|
|
|
async def add_agent_message(
|
|
|
self, session_id: str, message: Dict[str, Any]
|
|
|
) -> Dict[str, Any]:
|
|
|
"""Add a message to agent session"""
|
|
|
|
|
|
message_data = {
|
|
|
"sessionId": session_id,
|
|
|
"message": {"timestamp": int(time.time()), **message},
|
|
|
}
|
|
|
|
|
|
try:
|
|
|
response = await self.client.post(
|
|
|
f"do/agent/{session_id}/messages", data=message_data, use_worker=True
|
|
|
)
|
|
|
|
|
|
return {"success": True, "session_id": session_id, **response}
|
|
|
|
|
|
except CloudflareError as e:
|
|
|
logger.error(f"Failed to add agent message: {e}")
|
|
|
raise
|
|
|
|
|
|
async def get_agent_messages(
|
|
|
self, session_id: str, limit: int = 50, offset: int = 0
|
|
|
) -> Dict[str, Any]:
|
|
|
"""Get agent session messages"""
|
|
|
|
|
|
try:
|
|
|
response = await self.client.get(
|
|
|
f"do/agent/{session_id}/messages?sessionId={session_id}&limit={limit}&offset={offset}",
|
|
|
use_worker=True,
|
|
|
)
|
|
|
|
|
|
return response
|
|
|
|
|
|
except CloudflareError as e:
|
|
|
logger.error(f"Failed to get agent messages: {e}")
|
|
|
raise
|
|
|
|
|
|
|
|
|
async def join_chat_room(
|
|
|
self,
|
|
|
room_id: str,
|
|
|
user_id: str,
|
|
|
username: str,
|
|
|
room_config: Optional[Dict[str, Any]] = None,
|
|
|
) -> Dict[str, Any]:
|
|
|
"""Join a chat room"""
|
|
|
|
|
|
join_data = {
|
|
|
"userId": user_id,
|
|
|
"username": username,
|
|
|
"roomConfig": room_config or {},
|
|
|
}
|
|
|
|
|
|
try:
|
|
|
response = await self.client.post(
|
|
|
f"do/chat/{room_id}/join", data=join_data, use_worker=True
|
|
|
)
|
|
|
|
|
|
return {"success": True, "room_id": room_id, "user_id": user_id, **response}
|
|
|
|
|
|
except CloudflareError as e:
|
|
|
logger.error(f"Failed to join chat room: {e}")
|
|
|
raise
|
|
|
|
|
|
async def leave_chat_room(self, room_id: str, user_id: str) -> Dict[str, Any]:
|
|
|
"""Leave a chat room"""
|
|
|
|
|
|
leave_data = {"userId": user_id}
|
|
|
|
|
|
try:
|
|
|
response = await self.client.post(
|
|
|
f"do/chat/{room_id}/leave", data=leave_data, use_worker=True
|
|
|
)
|
|
|
|
|
|
return {"success": True, "room_id": room_id, "user_id": user_id, **response}
|
|
|
|
|
|
except CloudflareError as e:
|
|
|
logger.error(f"Failed to leave chat room: {e}")
|
|
|
raise
|
|
|
|
|
|
async def get_chat_room_info(self, room_id: str) -> Dict[str, Any]:
|
|
|
"""Get chat room information"""
|
|
|
|
|
|
try:
|
|
|
response = await self.client.get(f"do/chat/{room_id}/info", use_worker=True)
|
|
|
|
|
|
return response
|
|
|
|
|
|
except CloudflareError as e:
|
|
|
logger.error(f"Failed to get chat room info: {e}")
|
|
|
raise
|
|
|
|
|
|
async def send_chat_message(
|
|
|
self,
|
|
|
room_id: str,
|
|
|
user_id: str,
|
|
|
username: str,
|
|
|
content: str,
|
|
|
message_type: str = "text",
|
|
|
) -> Dict[str, Any]:
|
|
|
"""Send a message to chat room"""
|
|
|
|
|
|
message_data = {
|
|
|
"userId": user_id,
|
|
|
"username": username,
|
|
|
"content": content,
|
|
|
"messageType": message_type,
|
|
|
}
|
|
|
|
|
|
try:
|
|
|
response = await self.client.post(
|
|
|
f"do/chat/{room_id}/messages", data=message_data, use_worker=True
|
|
|
)
|
|
|
|
|
|
return {"success": True, "room_id": room_id, **response}
|
|
|
|
|
|
except CloudflareError as e:
|
|
|
logger.error(f"Failed to send chat message: {e}")
|
|
|
raise
|
|
|
|
|
|
async def get_chat_messages(
|
|
|
self, room_id: str, limit: int = 50, offset: int = 0
|
|
|
) -> Dict[str, Any]:
|
|
|
"""Get chat room messages"""
|
|
|
|
|
|
try:
|
|
|
response = await self.client.get(
|
|
|
f"do/chat/{room_id}/messages?limit={limit}&offset={offset}",
|
|
|
use_worker=True,
|
|
|
)
|
|
|
|
|
|
return response
|
|
|
|
|
|
except CloudflareError as e:
|
|
|
logger.error(f"Failed to get chat messages: {e}")
|
|
|
raise
|
|
|
|
|
|
async def get_chat_participants(self, room_id: str) -> Dict[str, Any]:
|
|
|
"""Get chat room participants"""
|
|
|
|
|
|
try:
|
|
|
response = await self.client.get(
|
|
|
f"do/chat/{room_id}/participants", use_worker=True
|
|
|
)
|
|
|
|
|
|
return response
|
|
|
|
|
|
except CloudflareError as e:
|
|
|
logger.error(f"Failed to get chat participants: {e}")
|
|
|
raise
|
|
|
|
|
|
|
|
|
def get_agent_websocket_url(self, session_id: str, user_id: str) -> str:
|
|
|
"""Get WebSocket URL for agent session"""
|
|
|
|
|
|
if not self.client.worker_url:
|
|
|
raise CloudflareError("Worker URL not configured")
|
|
|
|
|
|
base_url = self.client.worker_url.replace("https://", "wss://").replace(
|
|
|
"http://", "ws://"
|
|
|
)
|
|
|
return (
|
|
|
f"{base_url}/do/agent/{session_id}?sessionId={session_id}&userId={user_id}"
|
|
|
)
|
|
|
|
|
|
def get_chat_websocket_url(self, room_id: str, user_id: str, username: str) -> str:
|
|
|
"""Get WebSocket URL for chat room"""
|
|
|
|
|
|
if not self.client.worker_url:
|
|
|
raise CloudflareError("Worker URL not configured")
|
|
|
|
|
|
base_url = self.client.worker_url.replace("https://", "wss://").replace(
|
|
|
"http://", "ws://"
|
|
|
)
|
|
|
return f"{base_url}/do/chat/{room_id}?userId={user_id}&username={username}"
|
|
|
|
|
|
|
|
|
class DurableObjectsWebSocket:
|
|
|
"""Helper class for WebSocket connections to Durable Objects"""
|
|
|
|
|
|
def __init__(self, url: str):
|
|
|
self.url = url
|
|
|
self.websocket = None
|
|
|
self.connected = False
|
|
|
self.message_handlers = {}
|
|
|
|
|
|
async def connect(self):
|
|
|
"""Connect to WebSocket"""
|
|
|
try:
|
|
|
import websockets
|
|
|
|
|
|
self.websocket = await websockets.connect(self.url)
|
|
|
self.connected = True
|
|
|
logger.info(f"Connected to Durable Object WebSocket: {self.url}")
|
|
|
|
|
|
|
|
|
import asyncio
|
|
|
|
|
|
asyncio.create_task(self._message_loop())
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Failed to connect to WebSocket: {e}")
|
|
|
raise CloudflareError(f"WebSocket connection failed: {e}")
|
|
|
|
|
|
async def disconnect(self):
|
|
|
"""Disconnect from WebSocket"""
|
|
|
if self.websocket and self.connected:
|
|
|
await self.websocket.close()
|
|
|
self.connected = False
|
|
|
logger.info("Disconnected from Durable Object WebSocket")
|
|
|
|
|
|
async def send_message(self, message_type: str, payload: Dict[str, Any]):
|
|
|
"""Send message via WebSocket"""
|
|
|
if not self.connected or not self.websocket:
|
|
|
raise CloudflareError("WebSocket not connected")
|
|
|
|
|
|
message = {
|
|
|
"type": message_type,
|
|
|
"payload": payload,
|
|
|
"timestamp": int(time.time()),
|
|
|
}
|
|
|
|
|
|
try:
|
|
|
await self.websocket.send(json.dumps(message))
|
|
|
except Exception as e:
|
|
|
logger.error(f"Failed to send WebSocket message: {e}")
|
|
|
raise CloudflareError(f"Failed to send message: {e}")
|
|
|
|
|
|
def add_message_handler(self, message_type: str, handler):
|
|
|
"""Add a message handler for specific message types"""
|
|
|
if message_type not in self.message_handlers:
|
|
|
self.message_handlers[message_type] = []
|
|
|
self.message_handlers[message_type].append(handler)
|
|
|
|
|
|
async def _message_loop(self):
|
|
|
"""Handle incoming WebSocket messages"""
|
|
|
try:
|
|
|
async for message in self.websocket:
|
|
|
try:
|
|
|
data = json.loads(message)
|
|
|
message_type = data.get("type")
|
|
|
|
|
|
if message_type in self.message_handlers:
|
|
|
for handler in self.message_handlers[message_type]:
|
|
|
try:
|
|
|
if callable(handler):
|
|
|
if asyncio.iscoroutinefunction(handler):
|
|
|
await handler(data)
|
|
|
else:
|
|
|
handler(data)
|
|
|
except Exception as e:
|
|
|
logger.error(f"Message handler error: {e}")
|
|
|
|
|
|
except json.JSONDecodeError as e:
|
|
|
logger.error(f"Failed to parse WebSocket message: {e}")
|
|
|
except Exception as e:
|
|
|
logger.error(f"WebSocket message processing error: {e}")
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"WebSocket message loop error: {e}")
|
|
|
self.connected = False
|
|
|
|
|
|
|
|
|
async def __aenter__(self):
|
|
|
await self.connect()
|
|
|
return self
|
|
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
|
await self.disconnect()
|
|
|
|