Spaces:
Running
Running
import contextlib | |
import gzip | |
import json | |
import os | |
import threading | |
from collections import ChainMap | |
from http.server import BaseHTTPRequestHandler, HTTPServer | |
import pytest | |
requests = pytest.importorskip("requests") | |
port = 9898 | |
data = b"\n".join([b"some test data"] * 1000) | |
realfile = f"http://127.0.0.1:{port}/index/realfile" | |
index = b'<a href="%s">Link</a>' % realfile.encode() | |
listing = open( | |
os.path.join(os.path.dirname(__file__), "data", "listing.html"), "rb" | |
).read() | |
win = os.name == "nt" | |
def _make_listing(*paths): | |
return "\n".join( | |
f'<a href="http://127.0.0.1:{port}{f}">Link_{i}</a>' | |
for i, f in enumerate(paths) | |
).encode() | |
def reset_files(): | |
yield | |
# Reset the newly added files after the | |
# test is completed. | |
HTTPTestHandler.dynamic_files.clear() | |
class HTTPTestHandler(BaseHTTPRequestHandler): | |
static_files = { | |
"/index/realfile": data, | |
"/index/otherfile": data, | |
"/index": index, | |
"/data/20020401": listing, | |
"/simple/": _make_listing("/simple/file", "/simple/dir/"), | |
"/simple/file": data, | |
"/simple/dir/": _make_listing("/simple/dir/file"), | |
"/simple/dir/file": data, | |
} | |
dynamic_files = {} | |
files = ChainMap(dynamic_files, static_files) | |
def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
def _respond(self, code=200, headers=None, data=b""): | |
headers = headers or {} | |
headers.update({"User-Agent": "test"}) | |
self.send_response(code) | |
for k, v in headers.items(): | |
self.send_header(k, str(v)) | |
self.end_headers() | |
if data: | |
self.wfile.write(data) | |
def do_GET(self): | |
file_path = self.path | |
if file_path.endswith("/") and file_path.rstrip("/") in self.files: | |
file_path = file_path.rstrip("/") | |
file_data = self.files.get(file_path) | |
if "give_path" in self.headers: | |
return self._respond(200, data=json.dumps({"path": self.path}).encode()) | |
if "redirect" in self.headers and file_path != "/index/realfile": | |
new_url = f"http://127.0.0.1:{port}/index/realfile" | |
return self._respond(301, {"Location": new_url}) | |
if file_data is None: | |
return self._respond(404) | |
status = 200 | |
content_range = f"bytes 0-{len(file_data) - 1}/{len(file_data)}" | |
if ("Range" in self.headers) and ("ignore_range" not in self.headers): | |
ran = self.headers["Range"] | |
b, ran = ran.split("=") | |
start, end = ran.split("-") | |
if start: | |
content_range = f"bytes {start}-{end}/{len(file_data)}" | |
file_data = file_data[int(start) : (int(end) + 1) if end else None] | |
else: | |
# suffix only | |
l = len(file_data) | |
content_range = f"bytes {l - int(end)}-{l - 1}/{l}" | |
file_data = file_data[-int(end) :] | |
if "use_206" in self.headers: | |
status = 206 | |
if "give_length" in self.headers: | |
if "gzip_encoding" in self.headers: | |
file_data = gzip.compress(file_data) | |
response_headers = { | |
"Content-Length": len(file_data), | |
"Content-Encoding": "gzip", | |
} | |
else: | |
response_headers = {"Content-Length": len(file_data)} | |
self._respond(status, response_headers, file_data) | |
elif "give_range" in self.headers: | |
self._respond(status, {"Content-Range": content_range}, file_data) | |
elif "give_mimetype" in self.headers: | |
self._respond( | |
status, {"Content-Type": "text/html; charset=utf-8"}, file_data | |
) | |
else: | |
self._respond(status, data=file_data) | |
def do_POST(self): | |
length = self.headers.get("Content-Length") | |
file_path = self.path.rstrip("/") | |
if length is None: | |
assert self.headers.get("Transfer-Encoding") == "chunked" | |
self.files[file_path] = b"".join(self.read_chunks()) | |
else: | |
self.files[file_path] = self.rfile.read(length) | |
self._respond(200) | |
do_PUT = do_POST | |
def read_chunks(self): | |
length = -1 | |
while length != 0: | |
line = self.rfile.readline().strip() | |
if len(line) == 0: | |
length = 0 | |
else: | |
length = int(line, 16) | |
yield self.rfile.read(length) | |
self.rfile.readline() | |
def do_HEAD(self): | |
if "head_not_auth" in self.headers: | |
return self._respond( | |
403, {"Content-Length": 123}, b"not authorized for HEAD request" | |
) | |
elif "head_ok" not in self.headers: | |
return self._respond(405) | |
file_path = self.path.rstrip("/") | |
file_data = self.files.get(file_path) | |
if file_data is None: | |
return self._respond(404) | |
if ("give_length" in self.headers) or ("head_give_length" in self.headers): | |
response_headers = {"Content-Length": len(file_data)} | |
if "zero_length" in self.headers: | |
response_headers["Content-Length"] = 0 | |
elif "gzip_encoding" in self.headers: | |
file_data = gzip.compress(file_data) | |
response_headers["Content-Encoding"] = "gzip" | |
response_headers["Content-Length"] = len(file_data) | |
self._respond(200, response_headers) | |
elif "give_range" in self.headers: | |
self._respond( | |
200, {"Content-Range": f"0-{len(file_data) - 1}/{len(file_data)}"} | |
) | |
elif "give_etag" in self.headers: | |
self._respond(200, {"ETag": "xxx"}) | |
else: | |
self._respond(200) # OK response, but no useful info | |
def serve(): | |
server_address = ("", port) | |
httpd = HTTPServer(server_address, HTTPTestHandler) | |
th = threading.Thread(target=httpd.serve_forever) | |
th.daemon = True | |
th.start() | |
try: | |
yield f"http://127.0.0.1:{port}" | |
finally: | |
httpd.socket.close() | |
httpd.shutdown() | |
th.join() | |
def server(): | |
with serve() as s: | |
yield s | |