import asyncio import base64 import random import re import nopecha import requests from hcaptcha_challenger.agents import Malenia from playwright.async_api import BrowserContext as ASyncContext, async_playwright nopecha.api_key = '5nogeisu16i5tr5r' async def route_continuation(route, request, host, sitekey): # 检查请求的URL,只拦截特定网站的请求 if request.url == f"https://{host}/": print("start to solve") # 修改DNS解析结果 await route.fulfill(status=200, body=""" hCAPTCHA 演示

""".replace("%%%%%%%%%%%", sitekey)) else: # 对于其他网站的请求,不做修改 await route.continue_() def url_to_base64(url): try: # 获取 URL 的内容 response = requests.get(url) # 将内容转换为 base64 content_base64 = base64.b64encode(response.content).decode('utf-8') return content_base64 except requests.exceptions.RequestException as e: print(f"Error fetching URL: {e}") return None async def handle_response(page): await asyncio.sleep(2) try: await page.wait_for_selector("//iframe[contains(@title, 'hCaptcha challenge')]", timeout=15000) frame_challenge = page.frame_locator("//iframe[contains(@title, 'hCaptcha challenge')]") question_text = frame_challenge.locator("//div[@class='challenge-prompt']") element_handle = await question_text.element_handle() prompt_padding_element = await element_handle.query_selector("div.prompt-padding") if prompt_padding_element is not None: span_element = await element_handle.query_selector("h2 > span") span_text = await span_element.inner_text() examples0 = frame_challenge.locator('//div[@class="challenge-example"]') examples = examples0.locator('div.image[aria-hidden]') aria_hidden_value = await examples.get_attribute("aria-hidden") if aria_hidden_value == "true": examples = None print("no examples") else: examples1 = examples.locator('//div[@class="image"]') await examples1.wait_for() style_attribute = await examples1.get_attribute("style") url_match = re.search(r'url\("(.+?)"\)', style_attribute) if url_match: url = url_match.group(1) print(url) examples = url task0 = frame_challenge.locator("//div[@class='task-image']") await frame_challenge.locator("//div[@tabindex='0']").nth(0).wait_for() count = await task0.count() print(span_text) print("done tasks for getting examples") data = [] for i in range(count): sample = task0.nth(i) img = sample.locator('//div[@class="image"]') style_attribute = await img.get_attribute("style") url_match = re.search(r'url\("(.+?)"\)', style_attribute) if url_match: url = url_match.group(1) data.append(url) print("done tasks for getting url") await classify_click(page, data, 0, examples, span_text) else: await asyncio.sleep(random.uniform(0.1, 0.3)) await frame_challenge.locator("//div[@class='refresh button']").click() print("refresh") except Exception as e: print(e) await page.close() tasks.cancel() async def on_response(response, page): if response.url.startswith("https://api.hcaptcha.com/getcaptcha"): asyncio.create_task(handle_response(page)) elif response.url.startswith("https://api.hcaptcha.com/checkcaptcha"): data0 = await response.json() if data0.get("pass"): global tasks, token await page.close() token = data0.get("generated_pass_UUID") tasks.cancel() print(data0) async def classify_click(page, data, round0, examples, quetsion0): try: await page.wait_for_selector("//iframe[contains(@title, 'hCaptcha challenge')]") frame_challenge = page.frame_locator("//iframe[contains(@title, 'hCaptcha challenge')]") samples = frame_challenge.locator("//div[@class='task-image']") await frame_challenge.locator("//div[@tabindex='0']").nth(0).wait_for() count = await samples.count() print(count) if examples: clicks = nopecha.Recognition.solve( type='hcaptcha', task=quetsion0, image_urls=data, image_examples=examples ) else: clicks = nopecha.Recognition.solve( type='hcaptcha', task=quetsion0, image_urls=data ) for i in range(count): sample = samples.nth(i) await sample.wait_for() if clicks[i]: print("try to click") await sample.click(delay=200) print(clicks) await asyncio.sleep(random.uniform(0.1, 0.3)) fl = frame_challenge.locator("//div[@class='button-submit button']") await fl.click() if round0 == 0: await asyncio.sleep(2) await page.wait_for_selector("//iframe[contains(@title, 'hCaptcha challenge')]") frame_challenge = page.frame_locator("//iframe[contains(@title, 'hCaptcha challenge')]") task0 = frame_challenge.locator("//div[@class='task-image']") await frame_challenge.locator("//div[@tabindex='0']").nth(0).wait_for() count = await task0.count() data = [] for i in range(count): sample = task0.nth(i) img = sample.locator('//div[@class="image"]') style_attribute = await img.get_attribute("style") url_match = re.search(r'url\("(.+?)"\)', style_attribute) if url_match: url = url_match.group(1) data.append(url) print("done tasks for getting url") await classify_click(page, data, 1, examples, quetsion0) except Exception as e: print(e) tasks.cancel() async def area_click(page, data, round0, examples): try: await page.wait_for_selector("//iframe[contains(@title, 'hCaptcha challenge')]") frame_challenge = page.frame_locator("//iframe[contains(@title, 'hCaptcha challenge')]") locator = frame_challenge.locator("//div[@class='challenge-view']//canvas") await locator.wait_for(state="visible") image = await locator.screenshot() if examples: clicks = nopecha.Recognition.solve( type='hcaptcha_area_select', task=data.get("requester_question").get("en"), image_data=[url_to_base64(data.get("tasklist")[0]["datapoint_uri"])], image_examples=examples ) else: clicks = nopecha.Recognition.solve( type='hcaptcha_area_select', task=data.get("requester_question").get("en"), image_data=[url_to_base64(data.get("tasklist")[0]["datapoint_uri"])] ) print(clicks) print(clicks["x"], clicks["y"]) print("try to click") bounds = await locator.bounding_box() print(bounds) await locator.click(delay=200, position={"x": int(bounds["width"] * clicks["x"] / 100), "y": int(bounds["height"] * clicks["y"] / 100)}) print("done") fl = frame_challenge.locator("//div[@class='button-submit button']") await fl.click() await asyncio.sleep(random.uniform(0.1, 0.3)) if round0 == 0: await area_click(page, data, 1, examples) except Exception as e: print(e) await area_click(page, data, round0, examples) async def hit_challenge(context: ASyncContext, host, sitekey, times: int = 8): await context.route('**/*', lambda route, request: route_continuation(route, request, host, sitekey)) page = await context.new_page() page.on('response', lambda response: on_response(response, page)) await page.goto(f"https://{host}") checkbox = page.frame_locator("//iframe[contains(@title,'checkbox')]") await checkbox.locator("#checkbox").click() await asyncio.sleep(3000) async def bytedance(host, sitekey): async with async_playwright() as p: browser = await p.firefox.launch(headless=True) context = await browser.new_context( locale="en-US" ) await Malenia.apply_stealth(context) await hit_challenge(context, host, sitekey) question = {} tasks = None token = None async def main(host, key): global tasks, token try: tasks = asyncio.gather(bytedance(host, key), return_exceptions=True) await tasks return token except asyncio.CancelledError: print("task done") return token except Exception as e: print(e) return token # asyncio.run(main("free.vps.vc", "3bae0a5b-f2b8-43ef-98b7-76865a8a3997"))