h-point / api.py
zhou12189108's picture
Upload 2 files
e014353
raw
history blame
2.79 kB
import hashlib
import os
import asyncio
import uuid
import shutil
from flask import Flask, jsonify, request, logging as flog
from flask_limiter.util import get_remote_address
import hcaptcha_solver
app = Flask(__name__)
def get_ipaddr():
if request.access_route:
print(request.access_route[0])
return request.access_route[0]
else:
return request.remote_addr or '127.0.0.1'
handler = flog.default_handler
def generate_uuid():
unique_identifier = str(uuid.uuid4())
hashed_string = hashlib.sha256(unique_identifier.encode()).hexdigest()
return hashed_string
def get_token():
default_token = "init_token"
if os.path.exists("token"):
return open("token", "r").read().strip()
return default_token
def check_request(required_data, data):
token = get_token()
if not data or any(key not in data for key in required_data):
print("Error:Invalid Request Data\n" + str(data))
return False
if data["token"] != token:
print("Error:Invalid Token\n" + str(data))
return False
return True
@app.errorhandler(429)
def rate_limit_exceeded(e):
print(get_remote_address())
return jsonify(msg="Too many request"), 429
@app.errorhandler(405)
def method_not_allowed(e):
print(get_remote_address())
return jsonify(msg="Unauthorized Request"), 405
@app.route("/", methods=["GET"])
def index():
return jsonify(status_code=200, ip=get_ipaddr())
@app.route("/update/token", methods=["POST"])
def update_token():
require_data = ["token", "new_token"]
data = request.get_json(force=True, silent=True)
if not check_request(require_data, data):
return jsonify(msg="Unauthorized Request"), 403
token = open("token", "w+")
token.write(data["new_token"])
token.close()
return jsonify(msg="Token updated successfully", success=True)
@app.route("/api/solve", methods=["POST"])
def solver_captcha():
require_data = ["token", "host", "site_key"]
data = request.get_json(force=True, silent=True)
if not check_request(require_data, data):
return jsonify(msg="Unauthorized Request"), 403
dir_path=generate_uuid()
resp=asyncio.run(hcaptcha_solver.bytedance(data["host"], data["site_key"], dir_path))
if os.path.exists(dir_path):
shutil.rmtree(dir_path)
if os.path.exists("tmp_dir"):
shutil.rmtree("tmp_dir")
return resp
@app.route("/api/update", methods=["POST"])
def update_model():
require_data = ["token"]
data = request.get_json(force=True, silent=True)
if not check_request(require_data, data):
return jsonify(msg="Unauthorized Request"), 403
hcaptcha_solver.solver.install(upgrade=True)
return jsonify(statue="ok"), 200
app.run(host="0.0.0.0", port=8081)