Spanicin commited on
Commit
6e03c09
·
verified ·
1 Parent(s): 548190a

Update app_celery.py

Browse files
Files changed (1) hide show
  1. app_celery.py +26 -46
app_celery.py CHANGED
@@ -77,6 +77,7 @@ celery.conf.update(app.config)
77
 
78
  TEMP_DIR = None
79
  start_time = None
 
80
 
81
  app.config['temp_response'] = None
82
  app.config['generation_thread'] = None
@@ -419,6 +420,7 @@ def process_video_for_chunk(audio_chunk_path, args_dict, chunk_index):
419
  @app.route("/run", methods=['POST'])
420
  def generate_video():
421
  global start_time
 
422
  start_time = time.time()
423
  global TEMP_DIR
424
  TEMP_DIR = create_temp_dir()
@@ -520,64 +522,42 @@ def generate_video():
520
  # args.device = "cpu"
521
 
522
  try:
523
- # base64_video, temp_file_path, duration = main(args)
524
- # final_video_path = app.config['final_video_path']
525
- # print('final_video_path',final_video_path)
526
- chunk_tasks = []
527
  for index, audio_chunk in enumerate(audio_chunks):
528
  print(f"Submitting chunk {index} with audio_chunk: {audio_chunk}")
529
  task = process_video_for_chunk.apply_async(args=[audio_chunk, args_dict, index])
530
  print(f"Task {task.id} submitted for chunk {index}")
531
  chunk_tasks.append(task)
532
  print("chunk_tasks",chunk_tasks)
 
533
 
534
- # # video_chunk_paths = [task.get() for task in chunk_tasks]
535
- # # print(f"Video chunks generated: {video_chunk_paths}")
536
- # video_chunk_paths = []
537
- # for task in chunk_tasks:
538
- # try:
539
- # video_chunk_path = task.get() # Wait for the task to complete
540
- # video_chunk_paths.append(video_chunk_path)
541
- # except Exception as e:
542
- # print(f"Error while fetching task result: {str(e)}")
543
- # return jsonify({'status': 'error', 'message': str(e)}), 500
544
-
545
- # print(f"Video chunks generated: {video_chunk_paths}")
546
- # preprocess_dir = os.path.join("/tmp", "preprocess_data")
547
- # custom_cleanup(TEMP_DIR.name, preprocess_dir)
548
-
549
- # print("Temporary files cleaned up, but preprocess_data is retained.")
550
- # return jsonify({
551
- # 'status': 'completed',
552
- # 'video_chunk_paths': video_chunk_paths
553
- # })
554
-
555
-
556
- @stream_with_context
557
- def generate_chunks():
558
- video_chunk_paths = []
559
- for index, task in enumerate(chunk_tasks):
560
- try:
561
- video_chunk_path = task.get() # Wait for each task to complete
562
- video_chunk_paths.append(video_chunk_path)
563
- print("video_chunk_paths",video_chunk_paths)
564
- yield f'data: {video_chunk_path}\n\n' # Send the chunk path to the frontend
565
- print(f"Chunk {index} generated and sent: {video_chunk_path}")
566
- except Exception as e:
567
- print(f"Error while fetching task result: {str(e)}")
568
- yield f'data: error\n\n'
569
-
570
- preprocess_dir = os.path.join("/tmp", "preprocess_data")
571
- custom_cleanup(TEMP_DIR.name, preprocess_dir)
572
- print("Temporary files cleaned up, but preprocess_data is retained.")
573
-
574
- # Return the generator that streams the data as it becomes available
575
- return Response(generate_chunks(), content_type='text/event-stream')
576
 
577
  except Exception as e:
578
  return jsonify({'status': 'error', 'message': str(e)}), 500
579
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
580
 
 
 
581
 
582
  @app.route("/health", methods=["GET"])
583
  def health_status():
 
77
 
78
  TEMP_DIR = None
79
  start_time = None
80
+ chunk_tasks = []
81
 
82
  app.config['temp_response'] = None
83
  app.config['generation_thread'] = None
 
420
  @app.route("/run", methods=['POST'])
421
  def generate_video():
422
  global start_time
423
+ global chunk_tasks
424
  start_time = time.time()
425
  global TEMP_DIR
426
  TEMP_DIR = create_temp_dir()
 
522
  # args.device = "cpu"
523
 
524
  try:
 
 
 
 
525
  for index, audio_chunk in enumerate(audio_chunks):
526
  print(f"Submitting chunk {index} with audio_chunk: {audio_chunk}")
527
  task = process_video_for_chunk.apply_async(args=[audio_chunk, args_dict, index])
528
  print(f"Task {task.id} submitted for chunk {index}")
529
  chunk_tasks.append(task)
530
  print("chunk_tasks",chunk_tasks)
531
+ return jsonify({'status': 'Video generation started'}), 200
532
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
533
 
534
  except Exception as e:
535
  return jsonify({'status': 'error', 'message': str(e)}), 500
536
 
537
+ @app.route("/stream", methods=['GET'])
538
+ def stream_video_chunks():
539
+ global chunk_tasks
540
+
541
+ @stream_with_context
542
+ def generate_chunks():
543
+ video_chunk_paths = []
544
+ for index, task in enumerate(chunk_tasks):
545
+ try:
546
+ video_chunk_path = task.get() # Wait for each task to complete
547
+ video_chunk_paths.append(video_chunk_path)
548
+ print("video_chunk_paths",video_chunk_paths)
549
+ yield f'data: {video_chunk_path}\n\n' # Send the chunk path to the frontend
550
+ print(f"Chunk {index} generated and sent: {video_chunk_path}")
551
+ except Exception as e:
552
+ print(f"Error while fetching task result: {str(e)}")
553
+ yield f'data: error\n\n'
554
+
555
+ preprocess_dir = os.path.join("/tmp", "preprocess_data")
556
+ custom_cleanup(TEMP_DIR.name, preprocess_dir)
557
+ print("Temporary files cleaned up, but preprocess_data is retained.")
558
 
559
+ # Return the generator that streams the data as it becomes available
560
+ return Response(generate_chunks(), content_type='text/event-stream')
561
 
562
  @app.route("/health", methods=["GET"])
563
  def health_status():