File size: 7,647 Bytes
c7d9c76
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
"""
DaaS (Docker as a Service) is a service 
that allows users to run docker commands on the server side.
"""

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi import FastAPI, File, UploadFile, HTTPException
from pydantic import BaseModel, Field
from typing import Optional, Dict
import time
import os
import asyncio
import subprocess
import uuid
import glob
import threading
import queue
from shared_utils.docker_as_service_api import DockerServiceApiComModel

app = FastAPI()

def python_obj_to_pickle_file_bytes(obj):
    import pickle
    import io
    with io.BytesIO() as f:
        pickle.dump(obj, f)
        return f.getvalue()

def yield_message(message):
    dsacm = DockerServiceApiComModel(server_message=message)
    return python_obj_to_pickle_file_bytes(dsacm)

def read_output(stream, output_queue):
    while True:
        line_stdout = stream.readline()
        # print('recv')
        if line_stdout:
            output_queue.put(line_stdout)
        else:
            break


async def stream_generator(request_obj):
    import tempfile
    # Create a temporary directory
    with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as temp_dir:

        # Construct the docker command
        download_folder = temp_dir

        # Get list of existing files before download
        existing_file_before_download = []

        video_id = request_obj.client_command
        cmd = [
            '/root/.dotnet/tools/BBDown',
            video_id,
            '--use-app-api',
            '--work-dir', 
            f'{os.path.abspath(temp_dir)}'
        ]
        cmd_chmod = []
        

        cmd = [
            'docker', 'run', '--rm',
            '-v',
            f'{os.path.abspath(temp_dir)}:/downloads',
            'bbdown',
            video_id,
            '--use-app-api',
            '--work-dir',
            '/downloads'
        ]
        cmd_chmod = [
            'docker', 'run', '--rm',
            '-v',
            f'{os.path.abspath(temp_dir)}:/downloads',
            '--entrypoint=""', # override the entrypoint
            'bbdown',   # image name
            # chmod -R 777 /downloads
            'chmod',
            '-R',
            '777',
            '/downloads'
        ]


        cmd = ' '.join(cmd)
        yield yield_message(cmd)
        process = subprocess.Popen(cmd,
                    stdout=subprocess.PIPE,
                    stderr=subprocess.PIPE,
                    shell=True,
                    text=True)

        stdout_queue = queue.Queue()
        thread = threading.Thread(target=read_output, args=(process.stdout, stdout_queue))
        thread.daemon = True
        thread.start()
        stderr_queue = queue.Queue()
        thread = threading.Thread(target=read_output, args=(process.stderr, stderr_queue))
        thread.daemon = True
        thread.start()

        while True:
            print("looping")
            # Check if there is any output in the queue

            stdout_this_round = ""
            stderr_this_round = ""
            while True:
                try:
                    output_stdout = stdout_queue.get_nowait()  # Non-blocking get
                    if output_stdout:
                        stdout_this_round += output_stdout
                        print(output_stdout)
                except queue.Empty:
                    yield yield_message(stdout_this_round)
                    break

            while True:
                try:
                    output_stderr = stderr_queue.get_nowait()  # Non-blocking get
                    if output_stderr:
                        stderr_this_round += output_stderr
                        print(output_stderr)
                except queue.Empty:
                    yield yield_message(stderr_this_round)
                    break

            # Break the loop if the process has finished
            if process.poll() is not None:
                break
            
            await asyncio.sleep(0.5)

        # Get the return code
        return_code = process.returncode
        yield yield_message("(daas return code:) " + str(return_code))

        # change files mod to 777
        if cmd_chmod:
            docker_chmod_res = subprocess.call(' '.join(cmd_chmod), shell=True)

        # print(f"Successfully downloaded video {video_id}")
        existing_file_after_download = glob.glob(os.path.join(download_folder, '**', '*'))
        # existing_file_after_download = list(os.listdir(download_folder))
        # get the difference
        downloaded_files = [
            f for f in existing_file_after_download if f not in existing_file_before_download
        ]
        downloaded_files_path = [
            os.path.join(download_folder, f) for f in existing_file_after_download if f not in existing_file_before_download
        ]
        # read file 
        server_file_attach = {}
        for fp, fn in zip(downloaded_files_path, downloaded_files):
            if os.path.isdir(fp): continue
            with open(fp, "rb") as f:
                file_bytes = f.read()
                server_file_attach[fn] = file_bytes

        dsacm = DockerServiceApiComModel(
            server_message="complete",
            server_file_attach=server_file_attach,
        )
        yield python_obj_to_pickle_file_bytes(dsacm)


def simple_generator(return_obj):
    dsacm = DockerServiceApiComModel(
        server_message=return_obj,
    )
    yield python_obj_to_pickle_file_bytes(dsacm)

@app.post("/stream")
async def stream_response(file: UploadFile = File(...)):
    # read the file in memory, treat it as pickle file, and unpickle it
    import pickle
    import io
    content = await file.read()
    with io.BytesIO(content) as f:
        request_obj = pickle.load(f)
    # process the request_obj
    return StreamingResponse(stream_generator(request_obj), media_type="application/octet-stream")

@app.post("/search")
async def stream_response(file: UploadFile = File(...)):
    # read the file in memory, treat it as pickle file, and unpickle it
    import pickle
    import io
    content = await file.read()
    with io.BytesIO(content) as f:
        request_obj = pickle.load(f)

    # process the request_obj
    keyword = request_obj.client_command

    from experimental_mods.get_search_kw_api_stop import search_videos
    # Default parameters for video search
    csrf_token = '40a227fcf12c380d7d3c81af2cd8c5e8'  # Using default from main()
    search_type = 'default'
    max_pages = 1
    output_path = 'search_results'
    config_path = 'experimental_mods/config.json'

    # Search for videos and return the first result
    videos = search_videos(
        keyword=keyword,
        csrf_token=csrf_token,
        search_type=search_type,
        max_pages=max_pages,
        output_path=output_path,
        config_path=config_path,
        early_stop=True
    )

    return StreamingResponse(simple_generator(videos), media_type="application/octet-stream")

@app.get("/")
async def hi():
    return "Hello, this is Docker as a Service (DaaS)! If you want to use this service, you must duplicate this space. " \
           "您好,这里是Docker作为服务(DaaS)!如果您想使用此服务,您必须复制此空间。复制方法:点击https://huggingface.co/spaces/hamercity/bbdown页面右上角的三个点,然后选择“复制空间”。" \
           "此外,在设置中,你还需要修改URL,例如:DAAS_SERVER_URL = \"https://你的用户名-你的空间名.hf.space/stream\""

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=49000)