Rectifier / App /Worker.py
Mbonea's picture
minor fix
75f05a6
from celery import Celery, chain
import os, shutil, subprocess
import uuid
from urllib.parse import urlparse
from subprocess import run
from App import celery_config, bot, SERVER_STATE
from typing import List
from App.Editor.Schema import EditorRequest, LinkInfo, Assets, Constants
from celery.signals import worker_process_init
from asgiref.sync import async_to_sync
import json
import os
from pydantic import BaseModel, Field
from App.Generate.utils.GroqInstruct import tagger
from App.utilis import upload_file
import subprocess
def concatenate_videos(input_dir):
# Get a list of all the mp4 files in the input directory
files = sorted([f for f in os.listdir(input_dir) if f.endswith(".mp4")])
# Generate the input file list for ffmpeg
input_files = "|".join([f"file '{os.path.join(input_dir, f)}'" for f in files])
output_file = f"{input_dir}/final.mp4"
# Run the ffmpeg command to concatenate the videos
subprocess.run(
[
"ffmpeg",
"-f",
"concat",
"-safe",
"0",
"-i",
f"concat:{input_files}",
"-c",
"copy",
output_file,
]
)
bot.start()
bot.send_file(-1002069945904, file=output_file, caption="finally done!")
return output_file
class YouTubeUploadTask(BaseModel):
filename: str
title: str = Field(
...,
min_length=100,
max_length=500,
description="A good title for the video",
)
description: str = Field(
...,
min_length=100,
max_length=500,
description="A brief summary of the video's content",
)
category_id: str = "22" # Default to a generic category, update as needed
privacy: str = "private"
tags: str = Field(
...,
min_length=100,
max_length=500,
description="Best seo tags for youtube based on the story",
)
thumbnail: str = Field(
...,
min_length=100,
max_length=500,
description="""Best image prompt based on the image description: here is an """,
)
# celery = Celery()
# celery.config_from_object(celery_config)
# celery.conf.update(
# # Other Celery configuration settings
# CELERYD_LOG_LEVEL="DEBUG", # Set log level to DEBUG for the worker
# )
# @celery.task(name="CreateFile")
def create_json_file(assets: List[Assets], asset_dir: str):
for asset in assets:
filename = f"{asset.type.capitalize()}Sequences.json"
# Convert dictionary to JSON string
json_string = json.dumps(asset.sequence)
# Create directory if it doesn't exist
os.makedirs(asset_dir, exist_ok=True)
print(os.path.join(asset_dir, filename))
# Write JSON string to file
with open(os.path.join(asset_dir, filename), "w") as f:
f.write(json_string)
# @celery.task(name="Constants")
def create_constants_json_file(constants: Constants, asset_dir: str):
temp_dir = asset_dir.replace("src/HelloWorld/Assets", "")
instrunction_file = os.path.join(temp_dir, "ServerInstructions.json")
filename = "Constants.json"
if constants:
json_string = json.dumps(constants.dict())
else:
json_string = json.dumps({})
os.makedirs(asset_dir, exist_ok=True)
with open(instrunction_file, "w") as f:
if constants.instructions:
f.write(json.dumps({"frames": constants.frames}))
else:
f.write(json.dumps({"frames": [0, constants.duration]}))
with open(os.path.join(asset_dir, filename), "w") as f:
f.write(json_string)
def create_symlink(source_dir, target_dir, symlink_name):
source_path = os.path.join(source_dir, symlink_name)
target_path = os.path.join(target_dir, symlink_name)
try:
os.symlink(source_path, target_path)
print(f"Symlink '{symlink_name}' created successfully.")
except FileExistsError:
print(f"Symlink '{symlink_name}' already exists.")
def change_playback_speed(input_path, speed_factor):
"""
Change the playback speed of a video and overwrite the original file.
:param input_path: Path to the input video file.
:param speed_factor: Factor by which to increase the speed. (e.g., 2.0 for double speed)
"""
# Create a temporary output file
temp_output_path = input_path + ".temp.mp4"
# Construct the ffmpeg command
command = [
"ffmpeg",
"-i",
input_path,
"-filter_complex",
f"[0:v]setpts={1/speed_factor}*PTS[v];[0:a]atempo={speed_factor}[a]",
"-map",
"[v]",
"-map",
"[a]",
"-y", # Overwrite the output file if it exists
temp_output_path,
]
# Run the command
subprocess.run(command, check=True)
# Replace the original file with the new one
os.replace(temp_output_path, input_path)
def download_with_wget(
link=None,
download_dir=None,
filename=None,
links_file_path=None,
):
headers = [
"--header",
"Cookie: __Host-session=63EQahvTpHuoFSkEW75hC",
"--header",
"Cookie: __cf_bm=CDGicP5OErYjDI85UmQSRKlppJLlbcgCXlWcODoIQAI-1716296320-1.0.1.1-4Rm5_wdxupmrDWgddOQjEV01TMFC4UJ479GRIAKKGHNgXu3N8ZkASEZXGwCWaRyUYazsUaLMALk.4frWWJzHQ",
]
# Construct the full command
if links_file_path:
command = ["aria2c", "-i", links_file_path, "--dir", download_dir]
else:
command = ["aria2c"] + headers + [link, "-d", download_dir, "-o", filename]
# Run the command
subprocess.run(command)
# @celery.task(name="CopyRemotion")
def copy_remotion_app(src: str, dest: str):
shutil.copytree(src, dest)
# @celery.task(name="Unsilence")
def unsilence(directory: str):
output_dir = os.path.join(directory, "out/video.mp4")
shortered_dir = os.path.join(directory, "out/temp.mp4")
os.system(f"pipx run unsilence {output_dir} {shortered_dir} -y")
os.remove(output_dir)
os.rename(shortered_dir, output_dir)
# @celery.task(name="InstallDependency")
def install_dependencies(directory: str):
os.chdir(directory)
os.system("npm install")
# @celery.task(name="uploadTime")
def upload_video_to_youtube(task: YouTubeUploadTask):
# Convert dict to Pydantic model
# Build the command
command = [
"/srv/youtube/youtubeuploader", # Adjust the path as needed
"-filename",
task.filename,
"-title",
task.title,
"-description",
task.description,
"-categoryId",
task.category_id,
"-privacy",
task.privacy,
"-tags",
task.tags,
]
if task.thumbnail:
command.extend(["-thumbnail", task.thumbnail])
# Execute the command
result = run(command, capture_output=True, text=True)
return result.stdout
# @celery.task(name="DownloadAssets")
def download_assets(links: List[LinkInfo], temp_dir: str):
public_dir = f"{temp_dir}/public"
os.makedirs(public_dir, exist_ok=True)
links_file_path = os.path.join(temp_dir, "download_links.txt")
with open(links_file_path, "w") as links_file:
for link in links:
file_link = link.link
file_name = link.file_name
# Write each link to the file in the format required by aria2c
links_file.write(f"{file_link}\n out={file_name}\n")
download_with_wget(links_file_path=links_file_path, download_dir=public_dir)
# @celery.task(name="RenderFile")
def render_video(directory: str, output_directory: str):
os.chdir(directory)
os.system(
f"npm run build --enable-multiprocess-on-linux --output {output_directory}"
)
print("complete")
# @celery.task(name="send")
async def cleanup_temp_directory(
temp_dir: str,
output_dir: str,
video_task: EditorRequest,
chat_id: int = -1002069945904,
):
video_folder_dir = f"/tmp/Video/{video_task.constants.task}"
try:
print("sending...")
# bot.send_video(chat_id=chat_id,caption="Your Video Caption",file_name=output_dir)
await bot.send_file(chat_id, file=output_dir, caption="Your video caption")
finally:
# remotion_app_dir = os.path.join("/srv", "Remotion-app")
# shutil.rmtree(remotion_app_dir)
# use the cache
# shutil.copytree(temp_dir, remotion_app_dir)
# if not SERVER_STATE.CACHED:
# SERVER_STATE.CACHED = True
# Cleanup: Remove the temporary directory
shutil.rmtree(temp_dir, ignore_errors=True)
# @celery.task(name="All")
async def celery_task(video_task: EditorRequest):
remotion_app_dir = os.path.join("/srv", "Remotion-app")
project_id = str(uuid.uuid4())
temp_dir = f"/tmp/{project_id}"
output_dir = f"/tmp/{project_id}/out/video.mp4"
assets_dir = os.path.join(temp_dir, "src/HelloWorld/Assets")
copy_remotion_app(remotion_app_dir, temp_dir)
# # use the cached stuff
# if not SERVER_STATE.CACHED:
#
install_dependencies(temp_dir)
create_constants_json_file(video_task.constants, assets_dir)
create_json_file(video_task.assets, assets_dir)
download_assets(video_task.links, temp_dir)
render_video(temp_dir, output_dir)
change_playback_speed(output_dir, 1.2)
# unsilence(temp_dir)
response: YouTubeUploadTask = tagger(narration="", response_model=YouTubeUploadTask)
response.filename = output_dir
upload_video_to_youtube(task=response)
await cleanup_temp_directory(temp_dir, output_dir, video_task)
# chain(
# copy_remotion_app.si(remotion_app_dir, temp_dir),
# install_dependencies.si(temp_dir),
# create_constants_json_file.si(video_task.constants, assets_dir),
# create_json_file.si(video_task.assets, assets_dir),
# download_assets.si(video_task.links, temp_dir) if video_task.links else None,
# render_video.si(temp_dir, output_dir),
# # unsilence.si(temp_dir),
# cleanup_temp_directory.si(temp_dir, output_dir),
# ).apply_async(
# # link_error=handle_error
# ) # Link the tasks and handle errors
def handle_error(task_id, err, *args, **kwargs):
print(f"Error in task {task_id}: {err}")
# You can add additional error handling logic here