h-point / hcaptcha_solver.py
zhou12189108's picture
Upload hcaptcha_solver.py
8a2a0e9 verified
raw
history blame contribute delete
No virus
11.6 kB
import asyncio
import base64
import random
import re
import nopecha
import requests
from hcaptcha_challenger.agents import Malenia
from playwright.async_api import BrowserContext as ASyncContext, async_playwright
nopecha.api_key = '5nogeisu16i5tr5r'
async def route_continuation(route, request, host, sitekey):
# 检查请求的URL,只拦截特定网站的请求
if request.url == f"https://{host}/":
print("start to solve")
# 修改DNS解析结果
await route.fulfill(status=200,
body="""
<!DOCTYPE html>
<html lang="en">
<head>
<title>hCAPTCHA 演示</title>
<meta charset="UTF-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, user-scalable=yes">
<script src="https://js.hcaptcha.com/1/api.js" type="text/javascript" async defer></script>
</head>
<body>
<br><br>
<div class="sample-form">
<form id="hcaptcha-demo-form" method="POST">
<div id="hcaptcha-demo" class="h-captcha" data-sitekey="%%%%%%%%%%%" data-callback="onSuccess" data-expired-callback="onExpire"></div>
<script>
// success callback
var onSuccess = function(response) {
var errorDivs = document.getElementsByClassName("hcaptcha-error");
if (errorDivs.length) {
errorDivs[0].className = "";
}
var errorMsgs = document.getElementsByClassName("hcaptcha-error-message");
if (errorMsgs.length) {
errorMsgs[0].parentNode.removeChild(errorMsgs[0]);
}
var logEl = document.querySelector(".hcaptcha-success");
logEl.innerHTML = "挑战成功!"
};
var onExpire = function(response) {
var logEl = document.querySelector(".hcaptcha-success");
logEl.innerHTML = "令牌已过期。"
};
</script>
<div class="hcaptcha-success smsg" aria-live="polite"></div>
</body>
<script type="text/javascript">
// beacon example
function addEventHandler(object,szEvent,cbCallback){
if(!!object.addEventListener){ // for modern browsers or IE9+
return object.addEventListener(szEvent,cbCallback);
}
if(!!object.attachEvent){ // for IE <=8
return object.attachEvent(szEvent,cbCallback);
}
};
// Ex: triggers pageview beacon
addEventHandler(window,'load',function(){b();});
// Ex: triggers event beacon without pageview
addEventHandler(window,'load',function(){b({"vt": "e", "ec": "test_cat", "ea": "test_action"});});
</script>
</html>
""".replace("%%%%%%%%%%%", sitekey))
else:
# 对于其他网站的请求,不做修改
await route.continue_()
def url_to_base64(url):
try:
# 获取 URL 的内容
response = requests.get(url)
# 将内容转换为 base64
content_base64 = base64.b64encode(response.content).decode('utf-8')
return content_base64
except requests.exceptions.RequestException as e:
print(f"Error fetching URL: {e}")
return None
async def handle_response(page):
await asyncio.sleep(2)
try:
await page.wait_for_selector("//iframe[contains(@title, 'hCaptcha challenge')]", timeout=15000)
frame_challenge = page.frame_locator("//iframe[contains(@title, 'hCaptcha challenge')]")
question_text = frame_challenge.locator("//div[@class='challenge-prompt']")
element_handle = await question_text.element_handle()
prompt_padding_element = await element_handle.query_selector("div.prompt-padding")
if prompt_padding_element is not None:
span_element = await element_handle.query_selector("h2 > span")
span_text = await span_element.inner_text()
examples0 = frame_challenge.locator('//div[@class="challenge-example"]')
examples = examples0.locator('div.image[aria-hidden]')
aria_hidden_value = await examples.get_attribute("aria-hidden")
if aria_hidden_value == "true":
examples = None
print("no examples")
else:
examples1 = examples.locator('//div[@class="image"]')
await examples1.wait_for()
style_attribute = await examples1.get_attribute("style")
url_match = re.search(r'url\("(.+?)"\)', style_attribute)
if url_match:
url = url_match.group(1)
print(url)
examples = url
task0 = frame_challenge.locator("//div[@class='task-image']")
await frame_challenge.locator("//div[@tabindex='0']").nth(0).wait_for()
count = await task0.count()
print(span_text)
print("done tasks for getting examples")
data = []
for i in range(count):
sample = task0.nth(i)
img = sample.locator('//div[@class="image"]')
style_attribute = await img.get_attribute("style")
url_match = re.search(r'url\("(.+?)"\)', style_attribute)
if url_match:
url = url_match.group(1)
data.append(url)
print("done tasks for getting url")
await classify_click(page, data, 0, examples, span_text)
else:
await asyncio.sleep(random.uniform(0.1, 0.3))
await frame_challenge.locator("//div[@class='refresh button']").click()
print("refresh")
except Exception as e:
print(e)
await page.close()
tasks.cancel()
async def on_response(response, page):
if response.url.startswith("https://api.hcaptcha.com/getcaptcha"):
asyncio.create_task(handle_response(page))
elif response.url.startswith("https://api.hcaptcha.com/checkcaptcha"):
data0 = await response.json()
if data0.get("pass"):
global tasks, token
await page.close()
token = data0.get("generated_pass_UUID")
tasks.cancel()
print(data0)
async def classify_click(page, data, round0, examples, quetsion0):
try:
await page.wait_for_selector("//iframe[contains(@title, 'hCaptcha challenge')]")
frame_challenge = page.frame_locator("//iframe[contains(@title, 'hCaptcha challenge')]")
samples = frame_challenge.locator("//div[@class='task-image']")
await frame_challenge.locator("//div[@tabindex='0']").nth(0).wait_for()
count = await samples.count()
print(count)
if examples:
clicks = nopecha.Recognition.solve(
type='hcaptcha',
task=quetsion0,
image_urls=data,
image_examples=examples
)
else:
clicks = nopecha.Recognition.solve(
type='hcaptcha',
task=quetsion0,
image_urls=data
)
for i in range(count):
sample = samples.nth(i)
await sample.wait_for()
if clicks[i]:
print("try to click")
await sample.click(delay=200)
print(clicks)
await asyncio.sleep(random.uniform(0.1, 0.3))
fl = frame_challenge.locator("//div[@class='button-submit button']")
await fl.click()
if round0 == 0:
await asyncio.sleep(2)
await page.wait_for_selector("//iframe[contains(@title, 'hCaptcha challenge')]")
frame_challenge = page.frame_locator("//iframe[contains(@title, 'hCaptcha challenge')]")
task0 = frame_challenge.locator("//div[@class='task-image']")
await frame_challenge.locator("//div[@tabindex='0']").nth(0).wait_for()
count = await task0.count()
data = []
for i in range(count):
sample = task0.nth(i)
img = sample.locator('//div[@class="image"]')
style_attribute = await img.get_attribute("style")
url_match = re.search(r'url\("(.+?)"\)', style_attribute)
if url_match:
url = url_match.group(1)
data.append(url)
print("done tasks for getting url")
await classify_click(page, data, 1, examples, quetsion0)
except Exception as e:
print(e)
tasks.cancel()
async def area_click(page, data, round0, examples):
try:
await page.wait_for_selector("//iframe[contains(@title, 'hCaptcha challenge')]")
frame_challenge = page.frame_locator("//iframe[contains(@title, 'hCaptcha challenge')]")
locator = frame_challenge.locator("//div[@class='challenge-view']//canvas")
await locator.wait_for(state="visible")
image = await locator.screenshot()
if examples:
clicks = nopecha.Recognition.solve(
type='hcaptcha_area_select',
task=data.get("requester_question").get("en"),
image_data=[url_to_base64(data.get("tasklist")[0]["datapoint_uri"])],
image_examples=examples
)
else:
clicks = nopecha.Recognition.solve(
type='hcaptcha_area_select',
task=data.get("requester_question").get("en"),
image_data=[url_to_base64(data.get("tasklist")[0]["datapoint_uri"])]
)
print(clicks)
print(clicks["x"], clicks["y"])
print("try to click")
bounds = await locator.bounding_box()
print(bounds)
await locator.click(delay=200, position={"x": int(bounds["width"] * clicks["x"] / 100),
"y": int(bounds["height"] * clicks["y"] / 100)})
print("done")
fl = frame_challenge.locator("//div[@class='button-submit button']")
await fl.click()
await asyncio.sleep(random.uniform(0.1, 0.3))
if round0 == 0:
await area_click(page, data, 1, examples)
except Exception as e:
print(e)
await area_click(page, data, round0, examples)
async def hit_challenge(context: ASyncContext, host, sitekey, times: int = 8):
await context.route('**/*', lambda route, request: route_continuation(route, request, host, sitekey))
page = await context.new_page()
page.on('response', lambda response: on_response(response, page))
await page.goto(f"https://{host}")
checkbox = page.frame_locator("//iframe[contains(@title,'checkbox')]")
await checkbox.locator("#checkbox").click()
await asyncio.sleep(3000)
async def bytedance(host, sitekey):
async with async_playwright() as p:
browser = await p.firefox.launch(headless=True)
context = await browser.new_context(
locale="en-US"
)
await Malenia.apply_stealth(context)
await hit_challenge(context, host, sitekey)
question = {}
tasks = None
token = None
async def main(host, key):
global tasks, token
try:
tasks = asyncio.gather(bytedance(host, key),
return_exceptions=True)
await tasks
return token
except asyncio.CancelledError:
print("task done")
return token
except Exception as e:
print(e)
return token
# asyncio.run(main("free.vps.vc", "3bae0a5b-f2b8-43ef-98b7-76865a8a3997"))