File size: 3,215 Bytes
1c1e321 4e23fb0 08ec6fc 1c1e321 abb1a7f 1c1e321 4b7d88e 1c1e321 2454dee 1c1e321 a913311 1c1e321 08ec6fc 3765e28 1c1e321 a913311 1c1e321 abb1a7f 1c1e321 447977d 4b7d88e 1c1e321 4b7d88e 1c1e321 abb1a7f 1c1e321 ecbe9a9 1c1e321 c0d6ce0 d3069fb abb1a7f c0d6ce0 1c1e321 ef37607 1c1e321 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
from celery import Celery, chain
import os, shutil, subprocess
import uuid
from urllib.parse import urlparse
from App import celery_config, bot
from typing import List
from App.Editor.Schema import EditorRequest, LinkInfo
from celery.signals import worker_process_init
from asgiref.sync import async_to_sync
celery = Celery()
celery.config_from_object(celery_config)
celery.conf.update(
# Other Celery configuration settings
# CELERYD_LOG_LEVEL="DEBUG", # Set log level to DEBUG for the worker
)
@worker_process_init.connect
def worker_process_init_handler(**kwargs):
bot.start()
def create_symlink(source_dir, target_dir, symlink_name):
source_path = os.path.join(source_dir, symlink_name)
target_path = os.path.join(target_dir, symlink_name)
try:
os.symlink(source_path, target_path)
print(f"Symlink '{symlink_name}' created successfully.")
except FileExistsError:
print(f"Symlink '{symlink_name}' already exists.")
def download_with_wget(link, download_dir, filename):
subprocess.run(["aria2c", link, "-d", download_dir, "-o", filename])
@celery.task
def copy_remotion_app(src: str, dest: str):
shutil.copytree(src, dest)
# # create symbolic link to prevent multiple installs
# source_dir = os.path.join(src, "node_module")
# create_symlink(source_dir, target_dir=dest, symlink_name="node_module")
@celery.task
def install_dependencies(directory: str):
os.chdir(directory)
os.system("npm install")
@celery.task
def download_assets(links: List[LinkInfo], temp_dir: str):
public_dir = f"{temp_dir}/public"
for link in links:
file_link = link.link
file_name = link.file_name
download_with_wget(file_link, public_dir, file_name)
@celery.task
def render_video(directory: str, output_directory: str):
os.chdir(directory)
os.system(f"npm run build --output {output_directory}")
@celery.task
def cleanup_temp_directory(
temp_dir: str, output_dir: str, chat_id: int = -1002069945904
):
async_to_sync(SendVideo)(temp_dir, output_dir, chat_id=-1002069945904)
async def SendVideo(temp_dir: str, output_dir: str, chat_id: int = -1002069945904):
try:
await bot.send_video(
-1002069945904, video=output_dir, caption="Your video caption"
)
finally:
# Cleanup: Remove the temporary directory
shutil.rmtree(temp_dir, ignore_errors=True)
@celery.task
def celery_task(video_task: EditorRequest):
remotion_app_dir = os.path.join("/srv", "Remotion-app")
project_id = str(uuid.uuid4())
temp_dir = f"/tmp/{project_id}"
output_dir = f"/tmp/{project_id}/out/video.mp4"
chain(
copy_remotion_app.si(remotion_app_dir, temp_dir),
install_dependencies.si(temp_dir),
download_assets.si(video_task.links, temp_dir) if video_task.links else None,
render_video.si(temp_dir, output_dir),
cleanup_temp_directory.si(temp_dir, output_dir),
).apply_async(
# link_error=handle_error
) # Link the tasks and handle errors
def handle_error(task_id, err, *args, **kwargs):
print(f"Error in task {task_id}: {err}")
# You can add additional error handling logic here
|