h-point / hcaptcha_solver.py
zhou12189108's picture
Upload hcaptcha_solver.py
a52bef8
raw
history blame
6.42 kB
from __future__ import annotations
from loguru import logger
from playwright.async_api import BrowserContext as ASyncContext, async_playwright, Page
from hcaptcha_challenger import ModelHub, install
from hcaptcha_challenger.agents import AgentT, Malenia
async def route_continuation(route, request, host, sitekey):
# 检查请求的URL,只拦截特定网站的请求
if request.url == f"https://{host}/":
print("start to solve")
# 修改DNS解析结果
await route.fulfill(status=200,
body="""
<!DOCTYPE html>
<html lang="en">
<head>
<title>hCAPTCHA 演示</title>
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, user-scalable=yes">
<script src="https://js.hcaptcha.com/1/api.js" type="text/javascript" async defer></script>
</head>
<body>
<br><br>
<div class="sample-form">
<form id="hcaptcha-demo-form" method="POST">
<div id="hcaptcha-demo" class="h-captcha" data-sitekey="%%%%%%%%%%%" data-callback="onSuccess" data-expired-callback="onExpire"></div>
<script>
// success callback
var onSuccess = function(response) {
var errorDivs = document.getElementsByClassName("hcaptcha-error");
if (errorDivs.length) {
errorDivs[0].className = "";
}
var errorMsgs = document.getElementsByClassName("hcaptcha-error-message");
if (errorMsgs.length) {
errorMsgs[0].parentNode.removeChild(errorMsgs[0]);
}
var logEl = document.querySelector(".hcaptcha-success");
logEl.innerHTML = "挑战成功!"
};
var onExpire = function(response) {
var logEl = document.querySelector(".hcaptcha-success");
logEl.innerHTML = "令牌已过期。"
};
</script>
<div class="hcaptcha-success smsg" aria-live="polite"></div>
</body>
<script type="text/javascript">
// beacon example
function addEventHandler(object,szEvent,cbCallback){
if(!!object.addEventListener){ // for modern browsers or IE9+
return object.addEventListener(szEvent,cbCallback);
}
if(!!object.attachEvent){ // for IE <=8
return object.attachEvent(szEvent,cbCallback);
}
};
// Ex: triggers pageview beacon
addEventHandler(window,'load',function(){b();});
// Ex: triggers event beacon without pageview
addEventHandler(window,'load',function(){b({"vt": "e", "ec": "test_cat", "ea": "test_action"});});
</script>
</html>
""".replace("%%%%%%%%%%%", sitekey))
else:
# 对于其他网站的请求,不做修改
await route.continue_()
def patch_modelhub(modelhub: ModelHub):
"""
1. Patching clip_candidates allows you to handle all image classification tasks in self-supervised mode.
2. You need to inject hints for all categories that appear in a batch of images
3. The ObjectsYaml in the GitHub repository are updated regularly,
but if you find something new, you can imitate the following and patch some hints.
4. Note that this should be a regularly changing table.
If after a while certain labels no longer appear, you should not fill them in clip_candidates
5. Please note that you only need a moderate number of candidates prompts,
too many prompts will increase the computational complexity
:param modelhub:
:return:
"""
modelhub.clip_candidates.update(
{
"the largest animal in real life": [
"parrot",
"bee",
"ladybug",
"frog",
"crab",
"bat",
"butterfly",
"dragonfly",
"giraffe",
"tiger",
"owl",
"duck"
]
}
)
def prelude(page: Page) -> AgentT:
# 1. You need to deploy sub-thread tasks and actively run `install(upgrade=True)` every 20 minutes
# 2. You need to make sure to run `install(upgrade=True, clip=True)` before each instantiation
install(upgrade=True, clip=True)
modelhub = ModelHub.from_github_repo()
modelhub.parse_objects()
# Make arbitrary pre-modifications to modelhub, which is very useful for CLIP models
patch_modelhub(modelhub)
agent = AgentT.from_page(
# page, the control handle of the Playwright Page
page=page,
# modelhub, Register modelhub externally, and the agent can patch custom configurations
modelhub=modelhub,
# clip, Enable CLIP zero-shot image classification method
clip=True,
)
return agent
async def hit_challenge(context: ASyncContext, host, sitekey, times: int = 8):
await context.route('**/*', lambda route, request: route_continuation(route, request, host, sitekey))
page = await context.new_page()
agent = prelude(page)
await page.goto(f"https://{host}")
logger.info("startup sitelink", url=f"https://{host}")
await agent.handle_checkbox()
for pth in range(1, times):
# Handle challenge
result = await agent.execute()
if not agent.qr:
return
# Post-processing
match result:
case agent.status.CHALLENGE_BACKCALL | agent.status.CHALLENGE_RETRY:
logger.warning(f"retry", pth=pth, ash=agent.ash)
await page.wait_for_timeout(500)
fl = page.frame_locator(agent.HOOK_CHALLENGE)
await fl.locator("//div[@class='refresh button']").click()
case agent.status.CHALLENGE_SUCCESS:
logger.success(f"task done", pth=pth, ash=agent.ash)
rqdata = agent.cr.__dict__
await context.close()
return rqdata["generated_pass_UUID"]
async def bytedance(host, sitekey):
async with async_playwright() as p:
browser = await p.firefox.launch(headless=True)
context = await browser.new_context(
locale="en-US"
)
await Malenia.apply_stealth(context)
token = await hit_challenge(context, host, sitekey)
return token