File size: 3,702 Bytes
1c1e321 4e23fb0 08ec6fc 1c1e321 8a4825d 1c1e321 4b7d88e 8a4825d 1c1e321 6343730 1c1e321 8a4825d 6d15d9b 8a4825d aeb8823 8a4825d a913311 1c1e321 08ec6fc 3765e28 1c1e321 a913311 1c1e321 abb1a7f 1c1e321 cee6d60 1c1e321 abb1a7f 1c1e321 ecbe9a9 aeb8823 d5d0324 1c1e321 c0d6ce0 d3069fb 8a4825d abb1a7f c0d6ce0 1c1e321 6343730 1c1e321 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
from celery import Celery, chain
import os, shutil, subprocess
import uuid
from urllib.parse import urlparse
from App import celery_config, bot
from typing import List
from App.Editor.Schema import EditorRequest, LinkInfo, Assets
from celery.signals import worker_process_init
from asgiref.sync import async_to_sync
import json
import os
celery = Celery()
celery.config_from_object(celery_config)
celery.conf.update(
# Other Celery configuration settings
CELERYD_LOG_LEVEL="DEBUG", # Set log level to DEBUG for the worker
)
@worker_process_init.connect
def worker_process_init_handler(**kwargs):
bot.start()
@celery.task
def create_json_file(assets: List[Assets], asset_dir: str):
for asset in assets:
filename = f"{asset.type.capitalize()}Sequences.json"
# Convert dictionary to JSON string
json_string = json.dumps(asset.sequence)
# Create directory if it doesn't exist
os.makedirs(asset_dir, exist_ok=True)
print(os.path.join(asset_dir, filename))
# Write JSON string to file
with open(os.path.join(asset_dir, filename), "w") as f:
f.write(json_string)
def create_symlink(source_dir, target_dir, symlink_name):
source_path = os.path.join(source_dir, symlink_name)
target_path = os.path.join(target_dir, symlink_name)
try:
os.symlink(source_path, target_path)
print(f"Symlink '{symlink_name}' created successfully.")
except FileExistsError:
print(f"Symlink '{symlink_name}' already exists.")
def download_with_wget(link, download_dir, filename):
subprocess.run(["aria2c", link, "-d", download_dir, "-o", filename])
@celery.task
def copy_remotion_app(src: str, dest: str):
shutil.copytree(src, dest)
# # create symbolic link to prevent multiple installs
# source_dir = os.path.join(src, "node_module")
# create_symlink(source_dir, target_dir=dest, symlink_name="node_module")
@celery.task
def install_dependencies(directory: str):
os.chdir(directory)
os.system("npm install")
@celery.task
def download_assets(links: List[LinkInfo], temp_dir: str):
public_dir = f"{temp_dir}/public"
for link in links:
file_link = link.link
file_name = link.file_name
download_with_wget(file_link, public_dir, file_name)
@celery.task
def render_video(directory: str, output_directory: str):
os.chdir(directory)
os.system(f"npm run build --output {output_directory}")
@celery.task
def cleanup_temp_directory(
temp_dir: str, output_dir: str, chat_id: int = -1002069945904
):
try:
bot.send_file(chat_id, file=output_dir, caption="Your video caption")
finally:
# Cleanup: Remove the temporary directory
shutil.rmtree(temp_dir, ignore_errors=True)
@celery.task
def celery_task(video_task: EditorRequest):
remotion_app_dir = os.path.join("/srv", "Remotion-app")
project_id = str(uuid.uuid4())
temp_dir = f"/tmp/{project_id}"
output_dir = f"/tmp/{project_id}/out/video.mp4"
assets_dir = os.path.join(temp_dir, "src/HelloWorld/Assets")
chain(
copy_remotion_app.si(remotion_app_dir, temp_dir),
install_dependencies.si(temp_dir),
create_json_file.si(video_task.assets, assets_dir),
download_assets.si(video_task.links, temp_dir) if video_task.links else None,
render_video.si(temp_dir, output_dir),
cleanup_temp_directory.si(temp_dir, output_dir),
).apply_async(
# link_error=handle_error
) # Link the tasks and handle errors
def handle_error(task_id, err, *args, **kwargs):
print(f"Error in task {task_id}: {err}")
# You can add additional error handling logic here
|