File size: 7,647 Bytes
c7d9c76 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 |
"""
DaaS (Docker as a Service) is a service
that allows users to run docker commands on the server side.
"""
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi import FastAPI, File, UploadFile, HTTPException
from pydantic import BaseModel, Field
from typing import Optional, Dict
import time
import os
import asyncio
import subprocess
import uuid
import glob
import threading
import queue
from shared_utils.docker_as_service_api import DockerServiceApiComModel
app = FastAPI()
def python_obj_to_pickle_file_bytes(obj):
import pickle
import io
with io.BytesIO() as f:
pickle.dump(obj, f)
return f.getvalue()
def yield_message(message):
dsacm = DockerServiceApiComModel(server_message=message)
return python_obj_to_pickle_file_bytes(dsacm)
def read_output(stream, output_queue):
while True:
line_stdout = stream.readline()
# print('recv')
if line_stdout:
output_queue.put(line_stdout)
else:
break
async def stream_generator(request_obj):
import tempfile
# Create a temporary directory
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as temp_dir:
# Construct the docker command
download_folder = temp_dir
# Get list of existing files before download
existing_file_before_download = []
video_id = request_obj.client_command
cmd = [
'/root/.dotnet/tools/BBDown',
video_id,
'--use-app-api',
'--work-dir',
f'{os.path.abspath(temp_dir)}'
]
cmd_chmod = []
cmd = [
'docker', 'run', '--rm',
'-v',
f'{os.path.abspath(temp_dir)}:/downloads',
'bbdown',
video_id,
'--use-app-api',
'--work-dir',
'/downloads'
]
cmd_chmod = [
'docker', 'run', '--rm',
'-v',
f'{os.path.abspath(temp_dir)}:/downloads',
'--entrypoint=""', # override the entrypoint
'bbdown', # image name
# chmod -R 777 /downloads
'chmod',
'-R',
'777',
'/downloads'
]
cmd = ' '.join(cmd)
yield yield_message(cmd)
process = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True,
text=True)
stdout_queue = queue.Queue()
thread = threading.Thread(target=read_output, args=(process.stdout, stdout_queue))
thread.daemon = True
thread.start()
stderr_queue = queue.Queue()
thread = threading.Thread(target=read_output, args=(process.stderr, stderr_queue))
thread.daemon = True
thread.start()
while True:
print("looping")
# Check if there is any output in the queue
stdout_this_round = ""
stderr_this_round = ""
while True:
try:
output_stdout = stdout_queue.get_nowait() # Non-blocking get
if output_stdout:
stdout_this_round += output_stdout
print(output_stdout)
except queue.Empty:
yield yield_message(stdout_this_round)
break
while True:
try:
output_stderr = stderr_queue.get_nowait() # Non-blocking get
if output_stderr:
stderr_this_round += output_stderr
print(output_stderr)
except queue.Empty:
yield yield_message(stderr_this_round)
break
# Break the loop if the process has finished
if process.poll() is not None:
break
await asyncio.sleep(0.5)
# Get the return code
return_code = process.returncode
yield yield_message("(daas return code:) " + str(return_code))
# change files mod to 777
if cmd_chmod:
docker_chmod_res = subprocess.call(' '.join(cmd_chmod), shell=True)
# print(f"Successfully downloaded video {video_id}")
existing_file_after_download = glob.glob(os.path.join(download_folder, '**', '*'))
# existing_file_after_download = list(os.listdir(download_folder))
# get the difference
downloaded_files = [
f for f in existing_file_after_download if f not in existing_file_before_download
]
downloaded_files_path = [
os.path.join(download_folder, f) for f in existing_file_after_download if f not in existing_file_before_download
]
# read file
server_file_attach = {}
for fp, fn in zip(downloaded_files_path, downloaded_files):
if os.path.isdir(fp): continue
with open(fp, "rb") as f:
file_bytes = f.read()
server_file_attach[fn] = file_bytes
dsacm = DockerServiceApiComModel(
server_message="complete",
server_file_attach=server_file_attach,
)
yield python_obj_to_pickle_file_bytes(dsacm)
def simple_generator(return_obj):
dsacm = DockerServiceApiComModel(
server_message=return_obj,
)
yield python_obj_to_pickle_file_bytes(dsacm)
@app.post("/stream")
async def stream_response(file: UploadFile = File(...)):
# read the file in memory, treat it as pickle file, and unpickle it
import pickle
import io
content = await file.read()
with io.BytesIO(content) as f:
request_obj = pickle.load(f)
# process the request_obj
return StreamingResponse(stream_generator(request_obj), media_type="application/octet-stream")
@app.post("/search")
async def stream_response(file: UploadFile = File(...)):
# read the file in memory, treat it as pickle file, and unpickle it
import pickle
import io
content = await file.read()
with io.BytesIO(content) as f:
request_obj = pickle.load(f)
# process the request_obj
keyword = request_obj.client_command
from experimental_mods.get_search_kw_api_stop import search_videos
# Default parameters for video search
csrf_token = '40a227fcf12c380d7d3c81af2cd8c5e8' # Using default from main()
search_type = 'default'
max_pages = 1
output_path = 'search_results'
config_path = 'experimental_mods/config.json'
# Search for videos and return the first result
videos = search_videos(
keyword=keyword,
csrf_token=csrf_token,
search_type=search_type,
max_pages=max_pages,
output_path=output_path,
config_path=config_path,
early_stop=True
)
return StreamingResponse(simple_generator(videos), media_type="application/octet-stream")
@app.get("/")
async def hi():
return "Hello, this is Docker as a Service (DaaS)! If you want to use this service, you must duplicate this space. " \
"您好,这里是Docker作为服务(DaaS)!如果您想使用此服务,您必须复制此空间。复制方法:点击https://huggingface.co/spaces/hamercity/bbdown页面右上角的三个点,然后选择“复制空间”。" \
"此外,在设置中,你还需要修改URL,例如:DAAS_SERVER_URL = \"https://你的用户名-你的空间名.hf.space/stream\""
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=49000)
|