|
""" |
|
DaaS (Docker as a Service) is a service |
|
that allows users to run docker commands on the server side. |
|
""" |
|
|
|
from fastapi import FastAPI |
|
from fastapi.responses import StreamingResponse |
|
from fastapi import FastAPI, File, UploadFile, HTTPException |
|
from pydantic import BaseModel, Field |
|
from typing import Optional, Dict |
|
import time |
|
import os |
|
import asyncio |
|
import subprocess |
|
import uuid |
|
import glob |
|
import threading |
|
import queue |
|
from shared_utils.docker_as_service_api import DockerServiceApiComModel |
|
|
|
app = FastAPI() |
|
|
|
def python_obj_to_pickle_file_bytes(obj): |
|
import pickle |
|
import io |
|
with io.BytesIO() as f: |
|
pickle.dump(obj, f) |
|
return f.getvalue() |
|
|
|
def yield_message(message): |
|
dsacm = DockerServiceApiComModel(server_message=message) |
|
return python_obj_to_pickle_file_bytes(dsacm) |
|
|
|
def read_output(stream, output_queue): |
|
while True: |
|
line_stdout = stream.readline() |
|
|
|
if line_stdout: |
|
output_queue.put(line_stdout) |
|
else: |
|
break |
|
|
|
|
|
async def stream_generator(request_obj): |
|
import tempfile |
|
|
|
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as temp_dir: |
|
|
|
|
|
download_folder = temp_dir |
|
|
|
|
|
existing_file_before_download = [] |
|
|
|
video_id = request_obj.client_command |
|
cmd = [ |
|
'/root/.dotnet/tools/BBDown', |
|
video_id, |
|
'--use-app-api', |
|
'--work-dir', |
|
f'{os.path.abspath(temp_dir)}' |
|
] |
|
cmd_chmod = [] |
|
|
|
|
|
cmd = [ |
|
'docker', 'run', '--rm', |
|
'-v', |
|
f'{os.path.abspath(temp_dir)}:/downloads', |
|
'bbdown', |
|
video_id, |
|
'--use-app-api', |
|
'--work-dir', |
|
'/downloads' |
|
] |
|
cmd_chmod = [ |
|
'docker', 'run', '--rm', |
|
'-v', |
|
f'{os.path.abspath(temp_dir)}:/downloads', |
|
'--entrypoint=""', |
|
'bbdown', |
|
|
|
'chmod', |
|
'-R', |
|
'777', |
|
'/downloads' |
|
] |
|
|
|
|
|
cmd = ' '.join(cmd) |
|
yield yield_message(cmd) |
|
process = subprocess.Popen(cmd, |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.PIPE, |
|
shell=True, |
|
text=True) |
|
|
|
stdout_queue = queue.Queue() |
|
thread = threading.Thread(target=read_output, args=(process.stdout, stdout_queue)) |
|
thread.daemon = True |
|
thread.start() |
|
stderr_queue = queue.Queue() |
|
thread = threading.Thread(target=read_output, args=(process.stderr, stderr_queue)) |
|
thread.daemon = True |
|
thread.start() |
|
|
|
while True: |
|
print("looping") |
|
|
|
|
|
stdout_this_round = "" |
|
stderr_this_round = "" |
|
while True: |
|
try: |
|
output_stdout = stdout_queue.get_nowait() |
|
if output_stdout: |
|
stdout_this_round += output_stdout |
|
print(output_stdout) |
|
except queue.Empty: |
|
yield yield_message(stdout_this_round) |
|
break |
|
|
|
while True: |
|
try: |
|
output_stderr = stderr_queue.get_nowait() |
|
if output_stderr: |
|
stderr_this_round += output_stderr |
|
print(output_stderr) |
|
except queue.Empty: |
|
yield yield_message(stderr_this_round) |
|
break |
|
|
|
|
|
if process.poll() is not None: |
|
break |
|
|
|
await asyncio.sleep(0.5) |
|
|
|
|
|
return_code = process.returncode |
|
yield yield_message("(daas return code:) " + str(return_code)) |
|
|
|
|
|
if cmd_chmod: |
|
docker_chmod_res = subprocess.call(' '.join(cmd_chmod), shell=True) |
|
|
|
|
|
existing_file_after_download = glob.glob(os.path.join(download_folder, '**', '*')) |
|
|
|
|
|
downloaded_files = [ |
|
f for f in existing_file_after_download if f not in existing_file_before_download |
|
] |
|
downloaded_files_path = [ |
|
os.path.join(download_folder, f) for f in existing_file_after_download if f not in existing_file_before_download |
|
] |
|
|
|
server_file_attach = {} |
|
for fp, fn in zip(downloaded_files_path, downloaded_files): |
|
if os.path.isdir(fp): continue |
|
with open(fp, "rb") as f: |
|
file_bytes = f.read() |
|
server_file_attach[fn] = file_bytes |
|
|
|
dsacm = DockerServiceApiComModel( |
|
server_message="complete", |
|
server_file_attach=server_file_attach, |
|
) |
|
yield python_obj_to_pickle_file_bytes(dsacm) |
|
|
|
|
|
def simple_generator(return_obj): |
|
dsacm = DockerServiceApiComModel( |
|
server_message=return_obj, |
|
) |
|
yield python_obj_to_pickle_file_bytes(dsacm) |
|
|
|
@app.post("/stream") |
|
async def stream_response(file: UploadFile = File(...)): |
|
|
|
import pickle |
|
import io |
|
content = await file.read() |
|
with io.BytesIO(content) as f: |
|
request_obj = pickle.load(f) |
|
|
|
return StreamingResponse(stream_generator(request_obj), media_type="application/octet-stream") |
|
|
|
@app.post("/search") |
|
async def stream_response(file: UploadFile = File(...)): |
|
|
|
import pickle |
|
import io |
|
content = await file.read() |
|
with io.BytesIO(content) as f: |
|
request_obj = pickle.load(f) |
|
|
|
|
|
keyword = request_obj.client_command |
|
|
|
from experimental_mods.get_search_kw_api_stop import search_videos |
|
|
|
csrf_token = '40a227fcf12c380d7d3c81af2cd8c5e8' |
|
search_type = 'default' |
|
max_pages = 1 |
|
output_path = 'search_results' |
|
config_path = 'experimental_mods/config.json' |
|
|
|
|
|
videos = search_videos( |
|
keyword=keyword, |
|
csrf_token=csrf_token, |
|
search_type=search_type, |
|
max_pages=max_pages, |
|
output_path=output_path, |
|
config_path=config_path, |
|
early_stop=True |
|
) |
|
|
|
return StreamingResponse(simple_generator(videos), media_type="application/octet-stream") |
|
|
|
@app.get("/") |
|
async def hi(): |
|
return "Hello, this is Docker as a Service (DaaS)! If you want to use this service, you must duplicate this space. " \ |
|
"您好,这里是Docker作为服务(DaaS)!如果您想使用此服务,您必须复制此空间。复制方法:点击https://huggingface.co/spaces/hamercity/bbdown页面右上角的三个点,然后选择“复制空间”。" \ |
|
"此外,在设置中,你还需要修改URL,例如:DAAS_SERVER_URL = \"https://你的用户名-你的空间名.hf.space/stream\"" |
|
|
|
if __name__ == "__main__": |
|
import uvicorn |
|
uvicorn.run(app, host="0.0.0.0", port=49000) |
|
|