Yakova commited on
Commit
ba1ffc4
1 Parent(s): 8a40a50

Update App/Worker.py

Browse files
Files changed (1) hide show
  1. App/Worker.py +20 -139
App/Worker.py CHANGED
@@ -2,57 +2,13 @@ from celery import Celery, chain
2
  import os, shutil, subprocess
3
  import uuid
4
  from urllib.parse import urlparse
5
- from subprocess import run
6
- from App import celery_config, bot, SERVER_STATE
7
  from typing import List
8
- from App.Editor.Schema import EditorRequest, LinkInfo, Assets, Constants
9
  from celery.signals import worker_process_init
10
  from asgiref.sync import async_to_sync
11
  import json
12
  import os
13
- from pydantic import BaseModel
14
- from App.utilis import upload_file
15
-
16
- import subprocess
17
-
18
-
19
- def concatenate_videos(input_dir):
20
- # Get a list of all the mp4 files in the input directory
21
- files = sorted([f for f in os.listdir(input_dir) if f.endswith(".mp4")])
22
-
23
- # Generate the input file list for ffmpeg
24
- input_files = "|".join([f"file '{os.path.join(input_dir, f)}'" for f in files])
25
-
26
- output_file = f"{input_dir}/final.mp4"
27
- # Run the ffmpeg command to concatenate the videos
28
- subprocess.run(
29
- [
30
- "ffmpeg",
31
- "-f",
32
- "concat",
33
- "-safe",
34
- "0",
35
- "-i",
36
- f"concat:{input_files}",
37
- "-c",
38
- "copy",
39
- output_file,
40
- ]
41
- )
42
- bot.start()
43
- bot.send_file(-1002069945904, file=output_file, caption="finally done!")
44
- return output_file
45
-
46
-
47
- class YouTubeUploadTask(BaseModel):
48
- filename: str
49
- title: str = "Default Title"
50
- description: str = "Default Description"
51
- category_id: str = "22" # Default to a generic category, update as needed
52
- privacy: str = "private"
53
- tags: str = ""
54
- thumbnail: str = None
55
-
56
 
57
  # celery = Celery()
58
  # celery.config_from_object(celery_config)
@@ -77,26 +33,6 @@ def create_json_file(assets: List[Assets], asset_dir: str):
77
  f.write(json_string)
78
 
79
 
80
- # @celery.task(name="Constants")
81
- def create_constants_json_file(constants: Constants, asset_dir: str):
82
- temp_dir = asset_dir.replace("src/HelloWorld/Assets", "")
83
- instrunction_file = os.path.join(temp_dir, "ServerInstructions.json")
84
- filename = "Constants.json"
85
- if constants:
86
- json_string = json.dumps(constants.dict())
87
- else:
88
- json_string = json.dumps({})
89
- os.makedirs(asset_dir, exist_ok=True)
90
- with open(instrunction_file, "w") as f:
91
- if constants.instructions:
92
- f.write(json.dumps({"frames": constants.frames}))
93
- else:
94
- f.write(json.dumps({"frames": [0, constants.duration]}))
95
-
96
- with open(os.path.join(asset_dir, filename), "w") as f:
97
- f.write(json_string)
98
-
99
-
100
  def create_symlink(source_dir, target_dir, symlink_name):
101
  source_path = os.path.join(source_dir, symlink_name)
102
  target_path = os.path.join(target_dir, symlink_name)
@@ -116,6 +52,10 @@ def download_with_wget(link, download_dir, filename):
116
  def copy_remotion_app(src: str, dest: str):
117
  shutil.copytree(src, dest)
118
 
 
 
 
 
119
 
120
  # @celery.task(name="Unsilence")
121
  def unsilence(directory: str):
@@ -132,37 +72,6 @@ def install_dependencies(directory: str):
132
  os.system("npm install")
133
 
134
 
135
- # @celery.task(name="uploadTime")
136
- def upload_video_to_youtube(task_data: dict):
137
- # Convert dict to Pydantic model
138
- task = YouTubeUploadTask(**task_data)
139
-
140
- # Build the command
141
- command = [
142
- "/srv/youtube/youtubeuploader", # Adjust the path as needed
143
- "-filename",
144
- task.filename,
145
- "-title",
146
- task.title,
147
- "-description",
148
- task.description,
149
- "-categoryId",
150
- task.category_id,
151
- "-privacy",
152
- task.privacy,
153
- "-tags",
154
- task.tags,
155
- ]
156
-
157
- if task.thumbnail:
158
- command.extend(["-thumbnail", task.thumbnail])
159
-
160
- # Execute the command
161
- result = run(command, capture_output=True, text=True)
162
-
163
- return result.stdout
164
-
165
-
166
  # @celery.task(name="DownloadAssets")
167
  def download_assets(links: List[LinkInfo], temp_dir: str):
168
  public_dir = f"{temp_dir}/public"
@@ -175,44 +84,21 @@ def download_assets(links: List[LinkInfo], temp_dir: str):
175
  # @celery.task(name="RenderFile")
176
  def render_video(directory: str, output_directory: str):
177
  os.chdir(directory)
178
- os.system(
179
- f"npm run build --enable-multiprocess-on-linux --output {output_directory}"
180
- )
181
  print("complete")
182
 
183
 
184
  # @celery.task(name="send")
185
  async def cleanup_temp_directory(
186
- temp_dir: str,
187
- output_dir: str,
188
- video_task: EditorRequest,
189
- chat_id: int = -1002069945904,
190
  ):
191
- video_folder_dir = f"/tmp/Video/{video_task.constants.task}"
192
  try:
193
- if not SERVER_STATE.MASTER:
194
- await upload_file(
195
- output_dir,
196
- SERVER_STATE.SPACE_HOST,
197
- video_task.constants.chunk,
198
- video_task.constants.task,
199
- )
200
- else:
201
-
202
- os.makedirs(video_folder_dir, exist_ok=True)
203
- shutil.move(
204
- output_dir, f"{video_folder_dir}/{video_task.constants.chunk}.mp4"
205
- )
206
-
207
  except Exception as e:
208
  print(e)
209
  finally:
210
- remotion_app_dir = os.path.join("/srv", "Remotion-app")
211
- shutil.rmtree(remotion_app_dir)
212
- # use the cache
213
- shutil.copytree(temp_dir, remotion_app_dir)
214
- if not SERVER_STATE.CACHED:
215
- SERVER_STATE.CACHED = True
216
  # Cleanup: Remove the temporary directory
217
  shutil.rmtree(temp_dir, ignore_errors=True)
218
 
@@ -223,25 +109,20 @@ async def celery_task(video_task: EditorRequest):
223
  project_id = str(uuid.uuid4())
224
  temp_dir = f"/tmp/{project_id}"
225
  output_dir = f"/tmp/{project_id}/out/video.mp4"
226
- assets_dir = os.path.join(temp_dir, "src/HelloWorld/Assets")
227
 
228
- copy_remotion_app(remotion_app_dir, temp_dir)
229
-
230
- # use the cached stuff
231
- if not SERVER_STATE.CACHED:
232
- install_dependencies(temp_dir)
233
 
234
- create_constants_json_file(video_task.constants, assets_dir)
235
- create_json_file(video_task.assets, assets_dir)
236
- download_assets(video_task.links, temp_dir)
237
- render_video(temp_dir, output_dir)
238
- unsilence(temp_dir)
239
- await cleanup_temp_directory(temp_dir, output_dir, video_task)
 
240
 
241
  # chain(
242
  # copy_remotion_app.si(remotion_app_dir, temp_dir),
243
  # install_dependencies.si(temp_dir),
244
- # create_constants_json_file.si(video_task.constants, assets_dir),
245
  # create_json_file.si(video_task.assets, assets_dir),
246
  # download_assets.si(video_task.links, temp_dir) if video_task.links else None,
247
  # render_video.si(temp_dir, output_dir),
@@ -254,4 +135,4 @@ async def celery_task(video_task: EditorRequest):
254
 
255
  def handle_error(task_id, err, *args, **kwargs):
256
  print(f"Error in task {task_id}: {err}")
257
- # You can add additional error handling logic here
 
2
  import os, shutil, subprocess
3
  import uuid
4
  from urllib.parse import urlparse
5
+ from App import celery_config, bot
 
6
  from typing import List
7
+ from App.Editor.Schema import EditorRequest, LinkInfo, Assets
8
  from celery.signals import worker_process_init
9
  from asgiref.sync import async_to_sync
10
  import json
11
  import os
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
12
 
13
  # celery = Celery()
14
  # celery.config_from_object(celery_config)
 
33
  f.write(json_string)
34
 
35
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
36
  def create_symlink(source_dir, target_dir, symlink_name):
37
  source_path = os.path.join(source_dir, symlink_name)
38
  target_path = os.path.join(target_dir, symlink_name)
 
52
  def copy_remotion_app(src: str, dest: str):
53
  shutil.copytree(src, dest)
54
 
55
+ # # create symbolic link to prevent multiple installs
56
+ # source_dir = os.path.join(src, "node_module")
57
+ # create_symlink(source_dir, target_dir=dest, symlink_name="node_module")
58
+
59
 
60
  # @celery.task(name="Unsilence")
61
  def unsilence(directory: str):
 
72
  os.system("npm install")
73
 
74
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
75
  # @celery.task(name="DownloadAssets")
76
  def download_assets(links: List[LinkInfo], temp_dir: str):
77
  public_dir = f"{temp_dir}/public"
 
84
  # @celery.task(name="RenderFile")
85
  def render_video(directory: str, output_directory: str):
86
  os.chdir(directory)
87
+ os.system(f"npm run build --output {output_directory}")
 
 
88
  print("complete")
89
 
90
 
91
  # @celery.task(name="send")
92
  async def cleanup_temp_directory(
93
+ temp_dir: str, output_dir: str, chat_id: int = -1002069945904
 
 
 
94
  ):
 
95
  try:
96
+ print("sending...")
97
+ # bot.send_video(chat_id=chat_id,caption="Your Video Caption",file_name=output_dir)
98
+ await bot.send_file(chat_id, file=output_dir, caption="Your video caption")
 
 
 
 
 
 
 
 
 
 
 
99
  except Exception as e:
100
  print(e)
101
  finally:
 
 
 
 
 
 
102
  # Cleanup: Remove the temporary directory
103
  shutil.rmtree(temp_dir, ignore_errors=True)
104
 
 
109
  project_id = str(uuid.uuid4())
110
  temp_dir = f"/tmp/{project_id}"
111
  output_dir = f"/tmp/{project_id}/out/video.mp4"
 
112
 
113
+ assets_dir = os.path.join(temp_dir, "src/HelloWorld/Assets")
 
 
 
 
114
 
115
+ copy_remotion_app(remotion_app_dir, temp_dir),
116
+ install_dependencies(temp_dir),
117
+ create_json_file(video_task.assets, assets_dir),
118
+ download_assets(video_task.links, temp_dir) if video_task.links else None,
119
+ render_video(temp_dir, output_dir),
120
+ unsilence(temp_dir),
121
+ await cleanup_temp_directory(temp_dir, output_dir),
122
 
123
  # chain(
124
  # copy_remotion_app.si(remotion_app_dir, temp_dir),
125
  # install_dependencies.si(temp_dir),
 
126
  # create_json_file.si(video_task.assets, assets_dir),
127
  # download_assets.si(video_task.links, temp_dir) if video_task.links else None,
128
  # render_video.si(temp_dir, output_dir),
 
135
 
136
  def handle_error(task_id, err, *args, **kwargs):
137
  print(f"Error in task {task_id}: {err}")
138
+ # You can add additional error handling logic here