qingxu98 commited on
Commit
c7d9c76
1 Parent(s): 19aa87f

improve multi p

Browse files
docker_as_a_service/docker_as_a_service.py CHANGED
@@ -13,6 +13,7 @@ import os
13
  import asyncio
14
  import subprocess
15
  import uuid
 
16
  import threading
17
  import queue
18
  from shared_utils.docker_as_service_api import DockerServiceApiComModel
@@ -43,7 +44,7 @@ def read_output(stream, output_queue):
43
  async def stream_generator(request_obj):
44
  import tempfile
45
  # Create a temporary directory
46
- with tempfile.TemporaryDirectory() as temp_dir:
47
 
48
  # Construct the docker command
49
  download_folder = temp_dir
@@ -81,31 +82,42 @@ async def stream_generator(request_obj):
81
  while True:
82
  print("looping")
83
  # Check if there is any output in the queue
84
- try:
85
- output_stdout = stdout_queue.get_nowait() # Non-blocking get
86
- if output_stdout:
87
- print(output_stdout)
88
- yield yield_message(output_stdout)
89
-
90
- output_stderr = stderr_queue.get_nowait() # Non-blocking get
91
- if output_stderr:
92
- print(output_stdout)
93
- yield yield_message(output_stderr)
94
- except queue.Empty:
95
- pass # No output available
 
 
 
 
 
 
 
 
 
 
96
 
97
  # Break the loop if the process has finished
98
  if process.poll() is not None:
99
  break
100
 
101
- await asyncio.sleep(0.25)
102
 
103
  # Get the return code
104
  return_code = process.returncode
105
- yield yield_message("(return code:) " + str(return_code))
106
 
107
  # print(f"Successfully downloaded video {video_id}")
108
- existing_file_after_download = list(os.listdir(download_folder))
 
109
  # get the difference
110
  downloaded_files = [
111
  f for f in existing_file_after_download if f not in existing_file_before_download
 
13
  import asyncio
14
  import subprocess
15
  import uuid
16
+ import glob
17
  import threading
18
  import queue
19
  from shared_utils.docker_as_service_api import DockerServiceApiComModel
 
44
  async def stream_generator(request_obj):
45
  import tempfile
46
  # Create a temporary directory
47
+ with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as temp_dir:
48
 
49
  # Construct the docker command
50
  download_folder = temp_dir
 
82
  while True:
83
  print("looping")
84
  # Check if there is any output in the queue
85
+
86
+ stdout_this_round = ""
87
+ stderr_this_round = ""
88
+ while True:
89
+ try:
90
+ output_stdout = stdout_queue.get_nowait() # Non-blocking get
91
+ if output_stdout:
92
+ stdout_this_round += output_stdout
93
+ print(output_stdout)
94
+ except queue.Empty:
95
+ yield yield_message(stdout_this_round)
96
+ break
97
+
98
+ while True:
99
+ try:
100
+ output_stderr = stderr_queue.get_nowait() # Non-blocking get
101
+ if output_stderr:
102
+ stderr_this_round += output_stderr
103
+ print(output_stderr)
104
+ except queue.Empty:
105
+ yield yield_message(stderr_this_round)
106
+ break
107
 
108
  # Break the loop if the process has finished
109
  if process.poll() is not None:
110
  break
111
 
112
+ await asyncio.sleep(0.5)
113
 
114
  # Get the return code
115
  return_code = process.returncode
116
+ yield yield_message("(daas return code:) " + str(return_code))
117
 
118
  # print(f"Successfully downloaded video {video_id}")
119
+ existing_file_after_download = glob.glob(os.path.join(download_folder, '**', '*'))
120
+ # existing_file_after_download = list(os.listdir(download_folder))
121
  # get the difference
122
  downloaded_files = [
123
  f for f in existing_file_after_download if f not in existing_file_before_download
docker_as_a_service/docker_as_a_service_host.py ADDED
@@ -0,0 +1,230 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ DaaS (Docker as a Service) is a service
3
+ that allows users to run docker commands on the server side.
4
+ """
5
+
6
+ from fastapi import FastAPI
7
+ from fastapi.responses import StreamingResponse
8
+ from fastapi import FastAPI, File, UploadFile, HTTPException
9
+ from pydantic import BaseModel, Field
10
+ from typing import Optional, Dict
11
+ import time
12
+ import os
13
+ import asyncio
14
+ import subprocess
15
+ import uuid
16
+ import glob
17
+ import threading
18
+ import queue
19
+ from shared_utils.docker_as_service_api import DockerServiceApiComModel
20
+
21
+ app = FastAPI()
22
+
23
+ def python_obj_to_pickle_file_bytes(obj):
24
+ import pickle
25
+ import io
26
+ with io.BytesIO() as f:
27
+ pickle.dump(obj, f)
28
+ return f.getvalue()
29
+
30
+ def yield_message(message):
31
+ dsacm = DockerServiceApiComModel(server_message=message)
32
+ return python_obj_to_pickle_file_bytes(dsacm)
33
+
34
+ def read_output(stream, output_queue):
35
+ while True:
36
+ line_stdout = stream.readline()
37
+ # print('recv')
38
+ if line_stdout:
39
+ output_queue.put(line_stdout)
40
+ else:
41
+ break
42
+
43
+
44
+ async def stream_generator(request_obj):
45
+ import tempfile
46
+ # Create a temporary directory
47
+ with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as temp_dir:
48
+
49
+ # Construct the docker command
50
+ download_folder = temp_dir
51
+
52
+ # Get list of existing files before download
53
+ existing_file_before_download = []
54
+
55
+ video_id = request_obj.client_command
56
+ cmd = [
57
+ '/root/.dotnet/tools/BBDown',
58
+ video_id,
59
+ '--use-app-api',
60
+ '--work-dir',
61
+ f'{os.path.abspath(temp_dir)}'
62
+ ]
63
+ cmd_chmod = []
64
+
65
+
66
+ cmd = [
67
+ 'docker', 'run', '--rm',
68
+ '-v',
69
+ f'{os.path.abspath(temp_dir)}:/downloads',
70
+ 'bbdown',
71
+ video_id,
72
+ '--use-app-api',
73
+ '--work-dir',
74
+ '/downloads'
75
+ ]
76
+ cmd_chmod = [
77
+ 'docker', 'run', '--rm',
78
+ '-v',
79
+ f'{os.path.abspath(temp_dir)}:/downloads',
80
+ '--entrypoint=""', # override the entrypoint
81
+ 'bbdown', # image name
82
+ # chmod -R 777 /downloads
83
+ 'chmod',
84
+ '-R',
85
+ '777',
86
+ '/downloads'
87
+ ]
88
+
89
+
90
+ cmd = ' '.join(cmd)
91
+ yield yield_message(cmd)
92
+ process = subprocess.Popen(cmd,
93
+ stdout=subprocess.PIPE,
94
+ stderr=subprocess.PIPE,
95
+ shell=True,
96
+ text=True)
97
+
98
+ stdout_queue = queue.Queue()
99
+ thread = threading.Thread(target=read_output, args=(process.stdout, stdout_queue))
100
+ thread.daemon = True
101
+ thread.start()
102
+ stderr_queue = queue.Queue()
103
+ thread = threading.Thread(target=read_output, args=(process.stderr, stderr_queue))
104
+ thread.daemon = True
105
+ thread.start()
106
+
107
+ while True:
108
+ print("looping")
109
+ # Check if there is any output in the queue
110
+
111
+ stdout_this_round = ""
112
+ stderr_this_round = ""
113
+ while True:
114
+ try:
115
+ output_stdout = stdout_queue.get_nowait() # Non-blocking get
116
+ if output_stdout:
117
+ stdout_this_round += output_stdout
118
+ print(output_stdout)
119
+ except queue.Empty:
120
+ yield yield_message(stdout_this_round)
121
+ break
122
+
123
+ while True:
124
+ try:
125
+ output_stderr = stderr_queue.get_nowait() # Non-blocking get
126
+ if output_stderr:
127
+ stderr_this_round += output_stderr
128
+ print(output_stderr)
129
+ except queue.Empty:
130
+ yield yield_message(stderr_this_round)
131
+ break
132
+
133
+ # Break the loop if the process has finished
134
+ if process.poll() is not None:
135
+ break
136
+
137
+ await asyncio.sleep(0.5)
138
+
139
+ # Get the return code
140
+ return_code = process.returncode
141
+ yield yield_message("(daas return code:) " + str(return_code))
142
+
143
+ # change files mod to 777
144
+ if cmd_chmod:
145
+ docker_chmod_res = subprocess.call(' '.join(cmd_chmod), shell=True)
146
+
147
+ # print(f"Successfully downloaded video {video_id}")
148
+ existing_file_after_download = glob.glob(os.path.join(download_folder, '**', '*'))
149
+ # existing_file_after_download = list(os.listdir(download_folder))
150
+ # get the difference
151
+ downloaded_files = [
152
+ f for f in existing_file_after_download if f not in existing_file_before_download
153
+ ]
154
+ downloaded_files_path = [
155
+ os.path.join(download_folder, f) for f in existing_file_after_download if f not in existing_file_before_download
156
+ ]
157
+ # read file
158
+ server_file_attach = {}
159
+ for fp, fn in zip(downloaded_files_path, downloaded_files):
160
+ if os.path.isdir(fp): continue
161
+ with open(fp, "rb") as f:
162
+ file_bytes = f.read()
163
+ server_file_attach[fn] = file_bytes
164
+
165
+ dsacm = DockerServiceApiComModel(
166
+ server_message="complete",
167
+ server_file_attach=server_file_attach,
168
+ )
169
+ yield python_obj_to_pickle_file_bytes(dsacm)
170
+
171
+
172
+ def simple_generator(return_obj):
173
+ dsacm = DockerServiceApiComModel(
174
+ server_message=return_obj,
175
+ )
176
+ yield python_obj_to_pickle_file_bytes(dsacm)
177
+
178
+ @app.post("/stream")
179
+ async def stream_response(file: UploadFile = File(...)):
180
+ # read the file in memory, treat it as pickle file, and unpickle it
181
+ import pickle
182
+ import io
183
+ content = await file.read()
184
+ with io.BytesIO(content) as f:
185
+ request_obj = pickle.load(f)
186
+ # process the request_obj
187
+ return StreamingResponse(stream_generator(request_obj), media_type="application/octet-stream")
188
+
189
+ @app.post("/search")
190
+ async def stream_response(file: UploadFile = File(...)):
191
+ # read the file in memory, treat it as pickle file, and unpickle it
192
+ import pickle
193
+ import io
194
+ content = await file.read()
195
+ with io.BytesIO(content) as f:
196
+ request_obj = pickle.load(f)
197
+
198
+ # process the request_obj
199
+ keyword = request_obj.client_command
200
+
201
+ from experimental_mods.get_search_kw_api_stop import search_videos
202
+ # Default parameters for video search
203
+ csrf_token = '40a227fcf12c380d7d3c81af2cd8c5e8' # Using default from main()
204
+ search_type = 'default'
205
+ max_pages = 1
206
+ output_path = 'search_results'
207
+ config_path = 'experimental_mods/config.json'
208
+
209
+ # Search for videos and return the first result
210
+ videos = search_videos(
211
+ keyword=keyword,
212
+ csrf_token=csrf_token,
213
+ search_type=search_type,
214
+ max_pages=max_pages,
215
+ output_path=output_path,
216
+ config_path=config_path,
217
+ early_stop=True
218
+ )
219
+
220
+ return StreamingResponse(simple_generator(videos), media_type="application/octet-stream")
221
+
222
+ @app.get("/")
223
+ async def hi():
224
+ return "Hello, this is Docker as a Service (DaaS)! If you want to use this service, you must duplicate this space. " \
225
+ "您好,这里是Docker作为服务(DaaS)!如果您想使用此服务,您必须复制此空间。复制方法:点击https://huggingface.co/spaces/hamercity/bbdown页面右上角的三个点,然后选择“复制空间”。" \
226
+ "此外,在设置中,你还需要修改URL,例如:DAAS_SERVER_URL = \"https://你的用户名-你的空间名.hf.space/stream\""
227
+
228
+ if __name__ == "__main__":
229
+ import uvicorn
230
+ uvicorn.run(app, host="0.0.0.0", port=49000)