| | import os |
| | import base64 |
| | import hashlib |
| | import mimetypes |
| | from datetime import datetime |
| | from io import BytesIO |
| |
|
| | from flask import Flask, request, jsonify, redirect, render_template |
| | from huggingface_hub import HfApi, HfFileSystem |
| | from PIL import Image |
| | import uuid |
| |
|
| | app = Flask(__name__) |
| |
|
| | |
| | HF_TOKEN = os.environ.get("HF_TOKEN") |
| | DATASET_ID = os.environ.get("DATASET_ID") |
| | MAX_SIZE = int(os.environ.get("MAX_SIZE", 10)) * 1024 * 1024 |
| | API_KEY = os.environ.get("API_KEY") |
| |
|
| | fs = HfFileSystem(token=HF_TOKEN) |
| | hf_api = HfApi(token=HF_TOKEN) |
| |
|
| | def validate_auth(): |
| | if not API_KEY: |
| | return True |
| | header_key = request.headers.get("X-API-Key") |
| | param_key = request.args.get("key") |
| | return header_key == API_KEY or param_key == API_KEY |
| |
|
| | def generate_filename(data, mime_type=None): |
| | ext = mimetypes.guess_extension(mime_type or "application/octet-stream") or ".bin" |
| | unique_id = uuid.uuid4().hex[:8] |
| | timestamp = datetime.now().strftime("%Y%m%d%H%M%S") |
| | return f"{timestamp}-{unique_id}{ext}" |
| |
|
| | def save_to_dataset(content, filename): |
| | path = f"datasets/{DATASET_ID}/resolve/main/images/{filename}" |
| | fs.mkdir(f"datasets/{DATASET_ID}/images", exist_ok=True) |
| | fs.write_bytes(f"datasets/{DATASET_ID}/images/{filename}", content) |
| | return f"https://hf-mirror.com/datasets/{DATASET_ID}/resolve/main/images/{filename}" |
| |
|
| | @app.route("/", methods=["GET"]) |
| | def index(): |
| | return render_template("index.html") |
| |
|
| | @app.route("/api/1/upload", methods=["GET", "POST"]) |
| | def upload(): |
| | |
| | if not validate_auth(): |
| | return jsonify({ |
| | "status_code": 401, |
| | "error": {"message": "Invalid API key"}, |
| | "status_txt": "Unauthorized" |
| | }), 401 |
| |
|
| | |
| | source = None |
| | if request.method == "POST": |
| | if "source" in request.files: |
| | file = request.files["source"] |
| | source = file.read() |
| | else: |
| | source = request.form.get("source") |
| | else: |
| | source = request.args.get("source") |
| |
|
| | |
| | if isinstance(source, bytes): |
| | content = source |
| | elif source.startswith(("http://", "https://")): |
| | |
| | return jsonify({"error": "URL download not implemented"}), 501 |
| | elif source.startswith("data:"): |
| | header, data = source.split(",", 1) |
| | mime_type = header.split(":")[1].split(";")[0] |
| | content = base64.b64decode(data) |
| | else: |
| | content = base64.b64decode(source) |
| |
|
| | |
| | if len(content) > MAX_SIZE: |
| | return jsonify({ |
| | "status_code": 413, |
| | "error": {"message": f"File exceeds {MAX_SIZE//1024//1024}MB limit"}, |
| | "status_txt": "Payload Too Large" |
| | }), 413 |
| |
|
| | |
| | try: |
| | image = Image.open(BytesIO(content)) |
| | width, height = image.size |
| | mime_type = image.get_format_mimetype() |
| | except: |
| | width = height = 0 |
| | mime_type = None |
| |
|
| | |
| | filename = generate_filename(content, mime_type) |
| | file_url = save_to_dataset(content, filename) |
| |
|
| | |
| | response_format = request.args.get("format", "json") |
| | if response_format == "txt": |
| | return file_url |
| | elif response_format == "redirect": |
| | return redirect(file_url, code=302) |
| | else: |
| | return jsonify({ |
| | "status_code": 200, |
| | "success": {"message": "file uploaded", "code": 200}, |
| | "image": { |
| | "filename": filename, |
| | "url": file_url, |
| | "size": len(content), |
| | "width": width, |
| | "height": height, |
| | "mime": mime_type, |
| | "size_formatted": f"{len(content)/1024/1024:.1f} MB" |
| | }, |
| | "status_txt": "OK" |
| | }) |
| |
|
| | if __name__ == "__main__": |
| | app.run(host="0.0.0.0", port=7860, debug=True) |
| |
|
| |
|