Spaces:
Running
Running
import hashlib | |
import os | |
import asyncio | |
import uuid | |
import shutil | |
from flask import Flask, jsonify, request, logging as flog | |
from flask_limiter.util import get_remote_address | |
import hcaptcha_solver | |
app = Flask(__name__) | |
def get_ipaddr(): | |
if request.access_route: | |
print(request.access_route[0]) | |
return request.access_route[0] | |
else: | |
return request.remote_addr or '127.0.0.1' | |
handler = flog.default_handler | |
def generate_uuid(): | |
unique_identifier = str(uuid.uuid4()) | |
hashed_string = hashlib.sha256(unique_identifier.encode()).hexdigest() | |
return hashed_string | |
def get_token(): | |
default_token = "init_token" | |
if os.path.exists("token"): | |
return open("token", "r").read().strip() | |
return default_token | |
def check_request(required_data, data): | |
token = get_token() | |
if not data or any(key not in data for key in required_data): | |
print("Error:Invalid Request Data\n" + str(data)) | |
return False | |
if data["token"] != token: | |
print("Error:Invalid Token\n" + str(data)) | |
return False | |
return True | |
def rate_limit_exceeded(e): | |
print(get_remote_address()) | |
return jsonify(msg="Too many request"), 429 | |
def method_not_allowed(e): | |
print(get_remote_address()) | |
return jsonify(msg="Unauthorized Request"), 405 | |
def index(): | |
return jsonify(status_code=200, ip=get_ipaddr()) | |
def update_token(): | |
require_data = ["token", "new_token"] | |
data = request.get_json(force=True, silent=True) | |
if not check_request(require_data, data): | |
return jsonify(msg="Unauthorized Request"), 403 | |
token = open("token", "w+") | |
token.write(data["new_token"]) | |
token.close() | |
return jsonify(msg="Token updated successfully", success=True) | |
def solver_captcha(): | |
require_data = ["token", "host", "site_key"] | |
data = request.get_json(force=True, silent=True) | |
if not check_request(require_data, data): | |
return jsonify(msg="Unauthorized Request"), 403 | |
dir_path=generate_uuid() | |
resp=asyncio.run(hcaptcha_solver.bytedance(data["host"], data["site_key"], dir_path)) | |
if os.path.exists(dir_path): | |
shutil.rmtree(dir_path) | |
if os.path.exists("tmp_dir"): | |
shutil.rmtree("tmp_dir") | |
return resp | |
def update_model(): | |
require_data = ["token"] | |
data = request.get_json(force=True, silent=True) | |
if not check_request(require_data, data): | |
return jsonify(msg="Unauthorized Request"), 403 | |
hcaptcha_solver.solver.install(upgrade=True) | |
return jsonify(statue="ok"), 200 | |
app.run(host="0.0.0.0", port=8081) | |