Xudong Xiao commited on
Commit
0a00054
1 Parent(s): 7df592d

Create task from Youtube link & query status of a task using taskId

Browse files
Files changed (4) hide show
  1. entries/run.py +0 -1
  2. src/task.py +34 -16
  3. src/web/api_specs.yaml +79 -0
  4. src/web/web.py +18 -7
entries/run.py CHANGED
@@ -66,7 +66,6 @@ if __name__ == "__main__":
66
  task_list.append({"id": task_id, "status": "created", "resource_status:": "local"})
67
  stream = open(local_dir.joinpath("task_queue.yaml"), "w")
68
  dump(tasks_queue, stream)
69
-
70
  task.run_pipeline()
71
 
72
 
 
66
  task_list.append({"id": task_id, "status": "created", "resource_status:": "local"})
67
  stream = open(local_dir.joinpath("task_queue.yaml"), "w")
68
  dump(tasks_queue, stream)
 
69
  task.run_pipeline()
70
 
71
 
src/task.py CHANGED
@@ -1,12 +1,9 @@
1
- from src.srt_util.srt import SrtScript
2
- from src.srt_util.srt2ass import srt2ass
 
3
  import openai
4
- import stable_whisper
5
- import torch
6
- import whisper
7
  from pytube import YouTube
8
  from os import getenv
9
- from enum import Enum
10
  from pathlib import Path
11
  from enum import Enum, auto
12
  import logging
@@ -44,19 +41,30 @@ SRT_Script : SrtScript
44
  - (Optional) mp4
45
  """
46
 
47
- class TaskStatus(Enum):
48
- CREATED = (auto(), None)
49
- INITIALIZING_ASR = (auto(), None)
50
- PRE_PROCESSING = (auto(), None)
51
- TRANSLATING = (auto(), 0.0)
52
- POST_PROCESSING = (auto(), None)
53
- OUTPUT_MODULE = (auto(), None)
54
 
55
 
56
 
57
  class Task:
 
 
 
 
 
 
 
 
 
 
58
  def __init__(self, task_id, task_local_dir, launch_info):
59
- self.status = TaskStatus.CREATED
 
60
  openai.api_key = getenv("OPENAI_API_KEY")
61
  self.launch_info = launch_info
62
  self.task_local_dir = task_local_dir
@@ -66,6 +74,7 @@ class Task:
66
  self.task_id = task_id
67
  self.progress = NotImplemented
68
  self.SRT_Script = None
 
69
 
70
 
71
  @staticmethod
@@ -88,26 +97,36 @@ class Task:
88
  # Module 1 ASR: audio --> SRT_script
89
  def get_srt_class(self, whisper_model='tiny', method="stable"):
90
  # Instead of using the script_en variable directly, we'll use script_input
 
 
91
  pass
92
 
93
  # Module 2: SRT preprocess: perform preprocess steps
94
  def preprocess(self):
 
 
95
  pass
96
 
97
  def update_translation_progress(self, new_progress):
98
  if self.progress == TaskStatus.TRANSLATING:
99
  self.progress = TaskStatus.TRANSLATING.value[0], new_progress
 
100
 
101
  # Module 3: perform srt translation
102
  def translation(self):
 
103
  pass
104
 
105
  # Module 4: perform srt post process steps
106
  def postprocess(self):
 
 
107
  pass
108
 
109
  # Module 5: output module
110
  def output_render(self):
 
 
111
  pass
112
 
113
  def run_pipeline(self):
@@ -115,8 +134,7 @@ class Task:
115
  self.preprocess()
116
  self.translation()
117
  self.postprocess()
118
- out = self.output_render()
119
- return out
120
 
121
  class YoutubeTask(Task):
122
  def __init__(self, task_id, task_local_dir, launch_info, youtube_url):
 
1
+ import threading
2
+ import time
3
+
4
  import openai
 
 
 
5
  from pytube import YouTube
6
  from os import getenv
 
7
  from pathlib import Path
8
  from enum import Enum, auto
9
  import logging
 
41
  - (Optional) mp4
42
  """
43
 
44
+ class TaskStatus(str, Enum):
45
+ CREATED = 'CREATED'
46
+ INITIALIZING_ASR = 'INITIALIZING_ASR'
47
+ PRE_PROCESSING = 'PRE_PROCESSING'
48
+ TRANSLATING = 'TRANSLATING'
49
+ POST_PROCESSING = 'POST_PROCESSING'
50
+ OUTPUT_MODULE = 'OUTPUT_MODULE'
51
 
52
 
53
 
54
  class Task:
55
+ @property
56
+ def status(self):
57
+ with self.__status_lock:
58
+ return self.__status
59
+
60
+ @status.setter
61
+ def status(self, new_status):
62
+ with self.__status_lock:
63
+ self.__status = new_status
64
+
65
  def __init__(self, task_id, task_local_dir, launch_info):
66
+ self.__status_lock = threading.Lock()
67
+ self.__status = TaskStatus.CREATED
68
  openai.api_key = getenv("OPENAI_API_KEY")
69
  self.launch_info = launch_info
70
  self.task_local_dir = task_local_dir
 
74
  self.task_id = task_id
75
  self.progress = NotImplemented
76
  self.SRT_Script = None
77
+ self.result = None
78
 
79
 
80
  @staticmethod
 
97
  # Module 1 ASR: audio --> SRT_script
98
  def get_srt_class(self, whisper_model='tiny', method="stable"):
99
  # Instead of using the script_en variable directly, we'll use script_input
100
+ self.status = TaskStatus.INITIALIZING_ASR
101
+ time.sleep(5)
102
  pass
103
 
104
  # Module 2: SRT preprocess: perform preprocess steps
105
  def preprocess(self):
106
+ self.status = TaskStatus.PRE_PROCESSING
107
+ time.sleep(5)
108
  pass
109
 
110
  def update_translation_progress(self, new_progress):
111
  if self.progress == TaskStatus.TRANSLATING:
112
  self.progress = TaskStatus.TRANSLATING.value[0], new_progress
113
+ time.sleep(5)
114
 
115
  # Module 3: perform srt translation
116
  def translation(self):
117
+ time.sleep(5)
118
  pass
119
 
120
  # Module 4: perform srt post process steps
121
  def postprocess(self):
122
+ self.status = TaskStatus.POST_PROCESSING
123
+ time.sleep(5)
124
  pass
125
 
126
  # Module 5: output module
127
  def output_render(self):
128
+ self.status = TaskStatus.OUTPUT_MODULE
129
+ return "TODO"
130
  pass
131
 
132
  def run_pipeline(self):
 
134
  self.preprocess()
135
  self.translation()
136
  self.postprocess()
137
+ self.result = self.output_render()
 
138
 
139
  class YoutubeTask(Task):
140
  def __init__(self, task_id, task_local_dir, launch_info, youtube_url):
src/web/api_specs.yaml ADDED
@@ -0,0 +1,79 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ openapi: 3.0.3
2
+ info:
3
+ title: Pigeon AI
4
+ description: Pigeon AI
5
+ version: 1.0.0
6
+ servers:
7
+ - url: 'https'
8
+ paths:
9
+ /api/task:
10
+ post:
11
+ summary: Create a task
12
+ operationId: createTask
13
+ requestBody:
14
+ content:
15
+ application/json:
16
+ schema:
17
+ $ref: '#/components/schemas/youtubeLink'
18
+ responses:
19
+ '200':
20
+ description: OK
21
+ content:
22
+ application/json:
23
+ schema:
24
+ $ref: '#/components/schemas/task'
25
+ /api/task/{taskId}/status:
26
+ get:
27
+ summary: Get task status
28
+ operationId: getTask
29
+ parameters:
30
+ - name: taskId
31
+ in: path
32
+ required: true
33
+ description: task id
34
+ schema:
35
+ type: string
36
+ responses:
37
+ '200':
38
+ description: OK
39
+ content:
40
+ application/json:
41
+ schema:
42
+ $ref: '#/components/schemas/taskStatus'
43
+ '404':
44
+ description: Not Found
45
+ content:
46
+ application/json:
47
+ schema:
48
+ $ref: '#/components/schemas/error'
49
+
50
+ components:
51
+ schemas:
52
+ youtubeLink:
53
+ type: object
54
+ properties:
55
+ youtubeLink:
56
+ type: string
57
+ description: youtube link
58
+ example: https://www.youtube.com/watch?v=5qap5aO4i9A
59
+ task:
60
+ type: object
61
+ properties:
62
+ taskId:
63
+ type: string
64
+ description: task id generated by uuid
65
+ example: 7a765280-1a72-47e4-8747-8a38cdbaca91
66
+ taskStatus:
67
+ type: object
68
+ properties:
69
+ status:
70
+ type: string
71
+ description: task status
72
+ example: PROCESSING
73
+ error:
74
+ type: object
75
+ properties:
76
+ error:
77
+ type: string
78
+ description: error message
79
+ example: 'Invalid youtube link'
src/web/web.py CHANGED
@@ -2,27 +2,38 @@ import yaml
2
  from flask import Flask, request, jsonify
3
  from concurrent.futures import ThreadPoolExecutor
4
  from src.task import Task
 
5
 
6
  app = Flask(__name__)
7
 
8
  # Global thread pool
9
  executor = ThreadPoolExecutor(max_workers=4) # Adjust max_workers as per your requirement
10
 
 
 
 
11
  @app.route('/api/task', methods=['POST'])
12
- def create_task():
 
13
  data = request.get_json()
14
-
15
- if not data or 'youtube_link' not in data:
16
  return jsonify({'error': 'YouTube link not provided'}), 400
17
- youtube_link = data['youtube_link']
18
  launch_config = yaml.load(open("./configs/local_launch.yaml"), Loader=yaml.Loader)
19
- task = Task.fromYoutubeLink(youtube_link, 1, launch_config)
20
-
 
21
  # Submit task to thread pool
22
  executor.submit(task.run)
23
 
24
- return jsonify({'task_id': task.task_id})
25
 
 
 
 
 
 
 
26
 
27
  if __name__ == '__main__':
28
  app.run(debug=True)
 
2
  from flask import Flask, request, jsonify
3
  from concurrent.futures import ThreadPoolExecutor
4
  from src.task import Task
5
+ from uuid import uuid4
6
 
7
  app = Flask(__name__)
8
 
9
  # Global thread pool
10
  executor = ThreadPoolExecutor(max_workers=4) # Adjust max_workers as per your requirement
11
 
12
+ # thread safe task map to store task status
13
+ task_map = {}
14
+
15
  @app.route('/api/task', methods=['POST'])
16
+ def create_task_youtube():
17
+ global task_map
18
  data = request.get_json()
19
+ if not data or 'youtubeLink' not in data:
 
20
  return jsonify({'error': 'YouTube link not provided'}), 400
21
+ youtube_link = data['youtubeLink']
22
  launch_config = yaml.load(open("./configs/local_launch.yaml"), Loader=yaml.Loader)
23
+ task_id = str(uuid4())
24
+ task = Task.fromYoutubeLink(youtube_link, task_id, launch_config)
25
+ task_map[task_id] = task
26
  # Submit task to thread pool
27
  executor.submit(task.run)
28
 
29
+ return jsonify({'taskId': task.task_id})
30
 
31
+ @app.route('/api/task/<taskId>/status', methods=['GET'])
32
+ def get_task_status(taskId):
33
+ global task_map
34
+ if taskId not in task_map:
35
+ return jsonify({'error': 'Task not found'}), 404
36
+ return jsonify({'status': task_map[taskId].status})
37
 
38
  if __name__ == '__main__':
39
  app.run(debug=True)