import asyncio import base64 import random import chardet import nopecha import requests from hcaptcha_challenger.agents import Malenia from playwright.async_api import BrowserContext as ASyncContext, async_playwright nopecha.api_key = '5nogeisu16i5tr5r' async def route_continuation(route, request, host, sitekey): # 检查请求的URL,只拦截特定网站的请求 if request.url == f"https://{host}/": print("start to solve") # 修改DNS解析结果 await route.fulfill(status=200, body=""" hCAPTCHA 演示

""".replace("%%%%%%%%%%%", sitekey)) else: # 对于其他网站的请求,不做修改 await route.continue_() def url_to_base64(url): try: # 获取 URL 的内容 response = requests.get(url) # 将内容转换为 base64 content_base64 = base64.b64encode(response.content).decode('utf-8') return content_base64 except requests.exceptions.RequestException as e: print(f"Error fetching URL: {e}") return None async def on_response(response, page): if response.url.startswith("https://api.hcaptcha.com/getcaptcha"): # 获取响应内容 content_type = response.headers.get('Content-Type') if content_type == "application/json": data = await response.json() else: print("content type:", content_type) content = await response.body() detected_encoding = chardet.detect(content)['encoding'] data = content.decode(detected_encoding) # 打印响应内容 print(data) examples_urls = data.get("requester_question_example") examples = [url_to_base64(i) for i in examples_urls] if examples_urls else None if data.get("request_type") == "image_label_binary": await classify_click(page, data, 0, examples) elif data.get("request_type") == "image_label_area_select": # await asyncio.sleep(random.uniform(0.1, 0.3)) # await page.wait_for_selector("//iframe[contains(@title, 'hCaptcha challenge')]") # frame_challenge = page.frame_locator("//iframe[contains(@title, 'hCaptcha challenge')]") # await frame_challenge.locator("//div[@class='refresh button']").click() await area_click(page, data, 0, examples) else: await asyncio.sleep(random.uniform(0.1, 0.3)) await page.wait_for_selector("//iframe[contains(@title, 'hCaptcha challenge')]") frame_challenge = page.frame_locator("//iframe[contains(@title, 'hCaptcha challenge')]") await frame_challenge.locator("//div[@class='refresh button']").click() elif response.url.startswith("https://api.hcaptcha.com/checkcaptcha"): data0 = await response.json() if data0.get("pass"): global tasks, token await page.close() token = data0.get("generated_pass_UUID") tasks.cancel() print(data0) async def classify_click(page, data, round0, examples): try: await page.wait_for_selector("//iframe[contains(@title, 'hCaptcha challenge')]") frame_challenge = page.frame_locator("//iframe[contains(@title, 'hCaptcha challenge')]") samples = frame_challenge.locator("//div[@class='task-image']") await frame_challenge.locator("//div[@tabindex='0']").nth(0).wait_for() count = await samples.count() print(count) clicks = nopecha.Recognition.solve( type='hcaptcha', task=data.get("requester_question").get("en"), image_urls=[data.get("tasklist")[i]["datapoint_uri"] for i in (range(9) if round0 == 0 else range(9, 18))], image_examples=examples ) for i in range(count): sample = samples.nth(i) await sample.wait_for() if clicks[i]: print("try to click") await sample.click(delay=200) print(clicks) await asyncio.sleep(random.uniform(0.1, 0.3)) fl = frame_challenge.locator("//div[@class='button-submit button']") await fl.click() if round0 == 0: await classify_click(page, data, 1, examples) except Exception as e: print(e) await classify_click(page, data, round0, examples) async def area_click(page, data, round0, examples): try: await page.wait_for_selector("//iframe[contains(@title, 'hCaptcha challenge')]") frame_challenge = page.frame_locator("//iframe[contains(@title, 'hCaptcha challenge')]") locator = frame_challenge.locator("//div[@class='challenge-view']//canvas") await locator.wait_for(state="visible") image = await locator.screenshot() if examples: clicks = nopecha.Recognition.solve( type='hcaptcha_area_select', task=data.get("requester_question").get("en"), image_data=[url_to_base64(data.get("tasklist")[0]["datapoint_uri"])], image_examples=examples ) else: clicks = nopecha.Recognition.solve( type='hcaptcha_area_select', task=data.get("requester_question").get("en"), image_data=[url_to_base64(data.get("tasklist")[0]["datapoint_uri"])] ) print(clicks) print(clicks["x"], clicks["y"]) print("try to click") bounds = await locator.bounding_box() print(bounds) await locator.click(delay=200, position={"x": int(bounds["width"] * clicks["x"] / 100), "y": int(bounds["height"] * clicks["y"] / 100)}) print("done") fl = frame_challenge.locator("//div[@class='button-submit button']") await fl.click() await asyncio.sleep(random.uniform(0.1, 0.3)) if round0 == 0: await area_click(page, data, 1, examples) except Exception as e: print(e) await area_click(page, data, round0, examples) async def hit_challenge(context: ASyncContext, host, sitekey, times: int = 8): await context.route('**/*', lambda route, request: route_continuation(route, request, host, sitekey)) page = await context.new_page() page.on('response', lambda response: on_response(response, page)) await page.goto(f"https://{host}") checkbox = page.frame_locator("//iframe[contains(@title,'checkbox')]") await checkbox.locator("#checkbox").click() await asyncio.sleep(3000) async def bytedance(host, sitekey): async with async_playwright() as p: browser = await p.firefox.launch(headless=True) context = await browser.new_context( locale="en-US" ) await Malenia.apply_stealth(context) await hit_challenge(context, host, sitekey) question = {} tasks = None token = None async def main(host, key): global tasks, token try: tasks = asyncio.gather(bytedance(host, key), return_exceptions=True) await tasks return token except asyncio.CancelledError: print("task done") return token except Exception as e: print(e) return token