| """ |
| 浏览器自动化获取 reCAPTCHA token |
| 使用 Playwright 访问页面并执行 reCAPTCHA 验证 |
| """ |
| import asyncio |
| import time |
| import re |
| from typing import Optional, Dict |
| from playwright.async_api import async_playwright, Browser, BrowserContext |
|
|
| from ..core.logger import debug_logger |
|
|
|
|
| def parse_proxy_url(proxy_url: str) -> Optional[Dict[str, str]]: |
| """解析代理URL,分离协议、主机、端口、认证信息 |
| |
| Args: |
| proxy_url: 代理URL,格式:protocol://[username:password@]host:port |
| |
| Returns: |
| 代理配置字典,包含server、username、password(如果有认证) |
| """ |
| proxy_pattern = r'^(socks5|http|https)://(?:([^:]+):([^@]+)@)?([^:]+):(\d+)$' |
| match = re.match(proxy_pattern, proxy_url) |
|
|
| if match: |
| protocol, username, password, host, port = match.groups() |
| proxy_config = {'server': f'{protocol}://{host}:{port}'} |
|
|
| if username and password: |
| proxy_config['username'] = username |
| proxy_config['password'] = password |
|
|
| return proxy_config |
| return None |
|
|
|
|
| def validate_browser_proxy_url(proxy_url: str) -> tuple[bool, str]: |
| """验证浏览器代理URL格式(仅支持HTTP和无认证SOCKS5) |
| |
| Args: |
| proxy_url: 代理URL |
| |
| Returns: |
| (是否有效, 错误信息) |
| """ |
| if not proxy_url or not proxy_url.strip(): |
| return True, "" |
|
|
| proxy_url = proxy_url.strip() |
| parsed = parse_proxy_url(proxy_url) |
|
|
| if not parsed: |
| return False, "代理URL格式错误,正确格式:http://host:port 或 socks5://host:port" |
|
|
| |
| has_auth = 'username' in parsed |
|
|
| |
| protocol = parsed['server'].split('://')[0] |
|
|
| |
| if protocol == 'socks5' and has_auth: |
| return False, "浏览器不支持带认证的SOCKS5代理,请使用HTTP代理或移除SOCKS5认证" |
|
|
| |
| if protocol in ['http', 'https']: |
| return True, "" |
|
|
| |
| if protocol == 'socks5' and not has_auth: |
| return True, "" |
|
|
| return False, f"不支持的代理协议:{protocol}" |
|
|
|
|
| class BrowserCaptchaService: |
| """浏览器自动化获取 reCAPTCHA token(单例模式)""" |
|
|
| _instance: Optional['BrowserCaptchaService'] = None |
| _lock = asyncio.Lock() |
|
|
| def __init__(self, db=None): |
| """初始化服务(始终使用无头模式)""" |
| self.headless = True |
| self.playwright = None |
| self.browser: Optional[Browser] = None |
| self._initialized = False |
| self.website_key = "6LdsFiUsAAAAAIjVDZcuLhaHiDn5nnHVXVRQGeMV" |
| self.db = db |
|
|
| @classmethod |
| async def get_instance(cls, db=None) -> 'BrowserCaptchaService': |
| """获取单例实例""" |
| if cls._instance is None: |
| async with cls._lock: |
| if cls._instance is None: |
| cls._instance = cls(db) |
| await cls._instance.initialize() |
| return cls._instance |
|
|
| async def initialize(self): |
| """初始化浏览器(启动一次)""" |
| if self._initialized: |
| return |
|
|
| try: |
| |
| proxy_url = None |
| if self.db: |
| captcha_config = await self.db.get_captcha_config() |
| if captcha_config.browser_proxy_enabled and captcha_config.browser_proxy_url: |
| proxy_url = captcha_config.browser_proxy_url |
|
|
| debug_logger.log_info(f"[BrowserCaptcha] 正在启动浏览器... (proxy={proxy_url or 'None'})") |
| self.playwright = await async_playwright().start() |
|
|
| |
| launch_options = { |
| 'headless': self.headless, |
| 'args': [ |
| '--disable-blink-features=AutomationControlled', |
| '--disable-dev-shm-usage', |
| '--no-sandbox', |
| '--disable-setuid-sandbox' |
| ] |
| } |
|
|
| |
| if proxy_url: |
| proxy_config = parse_proxy_url(proxy_url) |
| if proxy_config: |
| launch_options['proxy'] = proxy_config |
| auth_info = "auth=yes" if 'username' in proxy_config else "auth=no" |
| debug_logger.log_info(f"[BrowserCaptcha] 代理配置: {proxy_config['server']} ({auth_info})") |
| else: |
| debug_logger.log_warning(f"[BrowserCaptcha] 代理URL格式错误: {proxy_url}") |
|
|
| self.browser = await self.playwright.chromium.launch(**launch_options) |
| self._initialized = True |
| debug_logger.log_info(f"[BrowserCaptcha] ✅ 浏览器已启动 (headless={self.headless}, proxy={proxy_url or 'None'})") |
| except Exception as e: |
| debug_logger.log_error(f"[BrowserCaptcha] ❌ 浏览器启动失败: {str(e)}") |
| raise |
|
|
| async def get_token(self, project_id: str) -> Optional[str]: |
| """获取 reCAPTCHA token |
| |
| Args: |
| project_id: Flow项目ID |
| |
| Returns: |
| reCAPTCHA token字符串,如果获取失败返回None |
| """ |
| if not self._initialized: |
| await self.initialize() |
|
|
| start_time = time.time() |
| context = None |
|
|
| try: |
| |
| context = await self.browser.new_context( |
| viewport={'width': 1920, 'height': 1080}, |
| user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', |
| locale='en-US', |
| timezone_id='America/New_York' |
| ) |
| page = await context.new_page() |
|
|
| website_url = f"https://labs.google/fx/tools/flow/project/{project_id}" |
|
|
| debug_logger.log_info(f"[BrowserCaptcha] 访问页面: {website_url}") |
|
|
| |
| try: |
| await page.goto(website_url, wait_until="domcontentloaded", timeout=30000) |
| except Exception as e: |
| debug_logger.log_warning(f"[BrowserCaptcha] 页面加载超时或失败: {str(e)}") |
|
|
| |
| debug_logger.log_info("[BrowserCaptcha] 检查并加载 reCAPTCHA v3 脚本...") |
| script_loaded = await page.evaluate(""" |
| () => { |
| if (window.grecaptcha && typeof window.grecaptcha.execute === 'function') { |
| return true; |
| } |
| return false; |
| } |
| """) |
|
|
| if not script_loaded: |
| |
| debug_logger.log_info("[BrowserCaptcha] 注入 reCAPTCHA v3 脚本...") |
| await page.evaluate(f""" |
| () => {{ |
| return new Promise((resolve) => {{ |
| const script = document.createElement('script'); |
| script.src = 'https://www.google.com/recaptcha/api.js?render={self.website_key}'; |
| script.async = true; |
| script.defer = true; |
| script.onload = () => resolve(true); |
| script.onerror = () => resolve(false); |
| document.head.appendChild(script); |
| }}); |
| }} |
| """) |
|
|
| |
| debug_logger.log_info("[BrowserCaptcha] 等待reCAPTCHA初始化...") |
| for i in range(20): |
| grecaptcha_ready = await page.evaluate(""" |
| () => { |
| return window.grecaptcha && |
| typeof window.grecaptcha.execute === 'function'; |
| } |
| """) |
| if grecaptcha_ready: |
| debug_logger.log_info(f"[BrowserCaptcha] reCAPTCHA 已准备好(等待了 {i*0.5} 秒)") |
| break |
| await asyncio.sleep(0.5) |
| else: |
| debug_logger.log_warning("[BrowserCaptcha] reCAPTCHA 初始化超时,继续尝试执行...") |
|
|
| |
| await page.wait_for_timeout(1000) |
|
|
| |
| debug_logger.log_info("[BrowserCaptcha] 执行reCAPTCHA验证...") |
| token = await page.evaluate(""" |
| async (websiteKey) => { |
| try { |
| if (!window.grecaptcha) { |
| console.error('[BrowserCaptcha] window.grecaptcha 不存在'); |
| return null; |
| } |
| |
| if (typeof window.grecaptcha.execute !== 'function') { |
| console.error('[BrowserCaptcha] window.grecaptcha.execute 不是函数'); |
| return null; |
| } |
| |
| // 确保grecaptcha已准备好 |
| await new Promise((resolve, reject) => { |
| const timeout = setTimeout(() => { |
| reject(new Error('reCAPTCHA加载超时')); |
| }, 15000); |
| |
| if (window.grecaptcha && window.grecaptcha.ready) { |
| window.grecaptcha.ready(() => { |
| clearTimeout(timeout); |
| resolve(); |
| }); |
| } else { |
| clearTimeout(timeout); |
| resolve(); |
| } |
| }); |
| |
| // 执行reCAPTCHA v3 |
| const token = await window.grecaptcha.execute(websiteKey, { |
| action: 'FLOW_GENERATION' |
| }); |
| |
| return token; |
| } catch (error) { |
| console.error('[BrowserCaptcha] reCAPTCHA执行错误:', error); |
| return null; |
| } |
| } |
| """, self.website_key) |
|
|
| duration_ms = (time.time() - start_time) * 1000 |
|
|
| if token: |
| debug_logger.log_info(f"[BrowserCaptcha] ✅ Token获取成功(耗时 {duration_ms:.0f}ms)") |
| return token |
| else: |
| debug_logger.log_error("[BrowserCaptcha] Token获取失败(返回null)") |
| return None |
|
|
| except Exception as e: |
| debug_logger.log_error(f"[BrowserCaptcha] 获取token异常: {str(e)}") |
| return None |
| finally: |
| |
| if context: |
| try: |
| await context.close() |
| except: |
| pass |
|
|
| async def close(self): |
| """关闭浏览器""" |
| try: |
| if self.browser: |
| try: |
| await self.browser.close() |
| except Exception as e: |
| |
| if "Connection closed" not in str(e): |
| debug_logger.log_warning(f"[BrowserCaptcha] 关闭浏览器时出现异常: {str(e)}") |
| finally: |
| self.browser = None |
|
|
| if self.playwright: |
| try: |
| await self.playwright.stop() |
| except Exception: |
| pass |
| finally: |
| self.playwright = None |
|
|
| self._initialized = False |
| debug_logger.log_info("[BrowserCaptcha] 浏览器已关闭") |
| except Exception as e: |
| debug_logger.log_error(f"[BrowserCaptcha] 关闭浏览器异常: {str(e)}") |
|
|