Ken Sang Tang commited on
Commit
c61d271
·
verified ·
1 Parent(s): 2dfebf6

Create app.py

Browse files
Files changed (1) hide show
  1. app.py +220 -0
app.py ADDED
@@ -0,0 +1,220 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # app.py
2
+
3
+ import asyncio
4
+ import json
5
+ import os
6
+ import uuid
7
+ from datetime import datetime
8
+ from json import dumps
9
+
10
+ from fastapi import Body, FastAPI, HTTPException, Request
11
+ from fastapi.middleware.cors import CORSMiddleware
12
+ from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse, FileResponse
13
+ from fastapi.staticfiles import StaticFiles
14
+ from fastapi.templating import Jinja2Templates
15
+ from pydantic import BaseModel
16
+ from loguru import logger
17
+ import uvicorn
18
+ import aiohttp
19
+
20
+ app = FastAPI()
21
+
22
+ OPENMANUS_ENDPOINT_URL = os.getenv("OPENMANUS_ENDPOINT_URL")
23
+ if not OPENMANUS_ENDPOINT_URL:
24
+ raise EnvironmentError("OPENMANUS_ENDPOINT_URL environment variable must be set")
25
+
26
+ app.mount("/static", StaticFiles(directory="static"), name="static")
27
+ templates = Jinja2Templates(directory="templates")
28
+
29
+ app.add_middleware(
30
+ CORSMiddleware,
31
+ allow_origins=["*"],
32
+ allow_credentials=True,
33
+ allow_methods=["*"],
34
+ allow_headers=["*"],
35
+ )
36
+
37
+ class Task(BaseModel):
38
+ id: str
39
+ prompt: str
40
+ created_at: datetime
41
+ status: str
42
+ steps: list = []
43
+
44
+ def model_dump(self, *args, **kwargs):
45
+ data = super().model_dump(*args, **kwargs)
46
+ data["created_at"] = self.created_at.isoformat()
47
+ return data
48
+
49
+ class TaskManager:
50
+ def __init__(self):
51
+ self.tasks = {}
52
+ self.queues = {}
53
+
54
+ def create_task(self, prompt: str) -> Task:
55
+ task_id = str(uuid.uuid4())
56
+ task = Task(
57
+ id=task_id, prompt=prompt, created_at=datetime.now(), status="pending"
58
+ )
59
+ self.tasks[task_id] = task
60
+ self.queues[task_id] = asyncio.Queue()
61
+ return task
62
+
63
+ async def update_task_step(
64
+ self, task_id: str, step: int, result: str, step_type: str = "step"
65
+ ):
66
+ if task_id in self.tasks:
67
+ task = self.tasks[task_id]
68
+ task.steps.append({"step": step, "result": result, "type": step_type})
69
+ await self.queues[task_id].put(
70
+ {"type": step_type, "step": step, "result": result}
71
+ )
72
+ await self.queues[task_id].put(
73
+ {"type": "status", "status": task.status, "steps": task.steps}
74
+ )
75
+
76
+ async def complete_task(self, task_id: str):
77
+ if task_id in self.tasks:
78
+ task = self.tasks[task_id]
79
+ task.status = "completed"
80
+ await self.queues[task_id].put(
81
+ {"type": "status", "status": task.status, "steps": task.steps}
82
+ )
83
+ await self.queues[task_id].put({"type": "complete"})
84
+
85
+ async def fail_task(self, task_id: str, error: str):
86
+ if task_id in self.tasks:
87
+ self.tasks[task_id].status = f"failed: {error}"
88
+ await self.queues[task_id].put({"type": "error", "message": error})
89
+
90
+ task_manager = TaskManager()
91
+
92
+ @app.get("/", response_class=HTMLResponse)
93
+ async def index(request: Request):
94
+ return templates.TemplateResponse("index.html", {"request": request})
95
+
96
+ @app.get("/chat", response_class=HTMLResponse)
97
+ async def chat(request: Request):
98
+ return templates.TemplateResponse("chat.html", {"request": request})
99
+
100
+ @app.post("/tasks")
101
+ async def create_task(prompt: str = Body(..., embed=True)):
102
+ task = task_manager.create_task(prompt)
103
+ asyncio.create_task(run_task(task.id, prompt))
104
+ return {"task_id": task.id}
105
+
106
+ @app.get("/tasks")
107
+ async def get_tasks():
108
+ sorted_tasks = sorted(
109
+ task_manager.tasks.values(), key=lambda task: task.created_at, reverse=True
110
+ )
111
+ return JSONResponse(
112
+ content=[task.model_dump() for task in sorted_tasks],
113
+ headers={"Content-Type": "application/json"},
114
+ )
115
+
116
+ @app.get("/tasks/{task_id}")
117
+ async def get_task(task_id: str):
118
+ if task_id not in task_manager.tasks:
119
+ raise HTTPException(status_code=404, detail="Task not found")
120
+ return task_manager.tasks[task_id]
121
+
122
+ @app.get("/tasks/{task_id}/events")
123
+ async def task_events(task_id: str):
124
+ async def event_generator():
125
+ if task_id not in task_manager.queues:
126
+ yield f"event: error\ndata: {dumps({'message': 'Task not found'})}\n\n"
127
+ return
128
+
129
+ queue = task_manager.queues[task_id]
130
+ task = task_manager.tasks.get(task_id)
131
+ if task:
132
+ yield f"event: status\ndata: {dumps({'type': 'status', 'status': task.status, 'steps': task.steps})}\n\n"
133
+
134
+ while True:
135
+ try:
136
+ event = await queue.get()
137
+ formatted_event = dumps(event)
138
+ yield ": heartbeat\n\n"
139
+
140
+ if event["type"] == "complete":
141
+ yield f"event: complete\ndata: {formatted_event}\n\n"
142
+ break
143
+ elif event["type"] == "error":
144
+ yield f"event: error\ndata: {formatted_event}\n\n"
145
+ break
146
+ elif event["type"] in ["step", "think", "tool", "act", "run"]:
147
+ yield f"event: {event['type']}\ndata: {formatted_event}\n\n"
148
+ else:
149
+ yield f"event: {event['type']}\ndata: {formatted_event}\n\n"
150
+ except asyncio.CancelledError:
151
+ break
152
+ except Exception as e:
153
+ yield f"event: error\ndata: {dumps({'message': str(e)})}\n\n"
154
+ break
155
+
156
+ return StreamingResponse(
157
+ event_generator(),
158
+ media_type="text/event-stream",
159
+ headers={
160
+ "Cache-Control": "no-cache",
161
+ "Connection": "keep-alive",
162
+ "X-Accel-Buffering": "no",
163
+ },
164
+ )
165
+
166
+ @app.get("/download")
167
+ async def download_file(file_path: str):
168
+ if not os.path.exists(file_path):
169
+ raise HTTPException(status_code=404, detail="File not found")
170
+ return FileResponse(file_path, filename=os.path.basename(file_path))
171
+
172
+ async def run_task(task_id: str, prompt: str):
173
+ try:
174
+ task_manager.tasks[task_id].status = "running"
175
+
176
+ class SSELogHandler:
177
+ def __init__(self, task_id):
178
+ self.task_id = task_id
179
+
180
+ async def __call__(self, message):
181
+ import re
182
+ cleaned_message = re.sub(r"^.*? - ", "", message)
183
+ event_type = "log"
184
+ if "Manus result:" in cleaned_message:
185
+ event_type = "result"
186
+ cleaned_message = cleaned_message.replace("Manus result:", "")
187
+ await task_manager.update_task_step(
188
+ self.task_id, 1, cleaned_message, event_type
189
+ )
190
+ return
191
+ await task_manager.update_task_step(
192
+ self.task_id, 0, cleaned_message, event_type
193
+ )
194
+
195
+ sse_handler = SSELogHandler(task_id)
196
+ logger.add(sse_handler)
197
+
198
+ async def call_manus(url: str, prompt: str):
199
+ generate_kwargs = {"prompt": prompt}
200
+ async with aiohttp.ClientSession() as session:
201
+ async with session.post(
202
+ url=url,
203
+ json=generate_kwargs,
204
+ timeout=aiohttp.ClientTimeout(total=3600)
205
+ ) as response:
206
+ buffer = ""
207
+ async for line in response.content:
208
+ decode_line = line.decode('utf-8')
209
+ buffer += decode_line
210
+ if buffer:
211
+ logger.info(buffer)
212
+
213
+ await call_manus(OPENMANUS_ENDPOINT_URL, prompt)
214
+ await task_manager.update_task_step(task_id, 1, "", "result")
215
+ await task_manager.complete_task(task_id)
216
+ except Exception as e:
217
+ await task_manager.fail_task(task_id, str(e))
218
+
219
+ if __name__ == "__main__":
220
+ uvicorn.run(app, host="0.0.0.0", port=7860)