Rectifier / App /Editor /editorRoutes.py
Mbonea's picture
partially tested code for the swarm
a00f760
raw
history blame
No virus
2.86 kB
from fastapi import APIRouter, HTTPException, status, BackgroundTasks, UploadFile, Query
from .Schema import EditorRequest, TaskInfo
from App.Worker import celery_task, concatenate_videos
from celery.result import AsyncResult
import aiofiles, os, uuid, aiohttp
from App import SERVER_STATE, Task
videditor_router = APIRouter(tags=["vidEditor"])
@videditor_router.post("/create-video")
async def create_video(videoRequest: EditorRequest, background_task: BackgroundTasks):
background_task.add_task(celery_task, videoRequest)
return {"task_id": "started"}
@videditor_router.post("/create-chunks")
async def create_chunks(videoRequest: EditorRequest, background_task: BackgroundTasks):
video_duration = videoRequest.constants.duration
task_id = uuid.uuid4()
new_task = Task(TASK_ID=task_id)
active_nodes = [
node
for node in SERVER_STATE.NODES
if await new_task._check_node_online(node.SPACE_HOST)
]
number_of_nodes = len(active_nodes)
ranges = [
[i, i + number_of_nodes] for i in range(0, video_duration, number_of_nodes)
]
for i, node in enumerate(active_nodes):
await new_task.add_node(node, i)
SERVER_STATE.TASKS[task_id] = new_task
async with aiohttp.ClientSession() as session:
for i, node in enumerate(active_nodes):
videoRequest.constants.frames = ranges[i]
if node.SPACE_HOST == SERVER_STATE.SPACE_HOST:
background_task.add_task(celery_task, videoRequest)
async with session.post(
"node.SPACE_HOST/create-video", json=videoRequest
) as response:
if response.status != 200:
raise HTTPException(
status_code=response.status,
detail="Failed to post request to node",
)
return {"task_id": "started"}
@videditor_router.post("/uploadfile/")
async def create_file(
background_tasks: BackgroundTasks,
file: UploadFile,
node: str,
chunk: int,
task: str,
):
chunk_directory = f"/tmp/Video/{task}"
file_name = f"{chunk_directory}/{chunk}.mp4"
# Create the directory if it does not exist
os.makedirs(chunk_directory, exist_ok=True)
try:
async with aiofiles.open(file_name, "wb") as f:
while contents := await file.read(1024 * 1):
await f.write(contents)
except Exception as e:
return {
"message": f"There was an error uploading the file, error message {str(e)} "
}
finally:
await file.close()
running_task = SERVER_STATE.TASKS[task]
running_task.mark_node_completed(node)
if running_task.is_completed():
background_tasks.add_task(concatenate_videos, chunk_directory)
return {"message": "File uploaded successfully"}