Spaces:
Sleeping
Sleeping
import os | |
import re | |
import json | |
import base64 | |
import logging | |
import time | |
from typing import List, Dict, Any | |
from fastapi import FastAPI | |
from pydantic import BaseModel | |
from selenium import webdriver | |
from selenium.webdriver.common.by import By | |
from selenium.webdriver.support.ui import WebDriverWait | |
from selenium.webdriver.support import expected_conditions as EC | |
from selenium.webdriver.chrome.options import Options | |
from selenium.webdriver.chrome.service import Service | |
# ------------------------- | |
# Logging | |
# ------------------------- | |
logging.basicConfig(level=os.getenv("LOG_LEVEL", "INFO")) | |
logger = logging.getLogger("hackrx-round5") | |
# ------------------------- | |
# FastAPI app | |
# ------------------------- | |
app = FastAPI(title="HackRx Round 5 API", version="1.0.0") | |
# ------------------------- | |
# Models | |
# ------------------------- | |
class ChallengeRequest(BaseModel): | |
url: str | |
questions: List[str] | |
class ChallengeResponse(BaseModel): | |
answers: List[str] | |
# ------------------------- | |
# Helpers | |
# ------------------------- | |
def try_decode_jwt(token: str) -> Dict[str, Any]: | |
"""Try to decode a JWT without verifying signature.""" | |
try: | |
parts = token.split(".") | |
if len(parts) != 3: | |
return {} | |
payload_b64 = parts[1] + "=" * (-len(parts[1]) % 4) # pad | |
payload_json = base64.urlsafe_b64decode(payload_b64).decode("utf-8") | |
decoded_payload = json.loads(payload_json) | |
logger.info(f"Decoded JWT payload: {decoded_payload}") | |
return decoded_payload | |
except Exception as e: | |
logger.error(f"JWT decode error: {e}") | |
return {} | |
def setup_chrome_driver(): | |
"""Setup Chrome driver with appropriate options.""" | |
chrome_options = Options() | |
chrome_options.add_argument("--headless") # Run in background | |
chrome_options.add_argument("--no-sandbox") | |
chrome_options.add_argument("--disable-dev-shm-usage") | |
chrome_options.add_argument("--disable-gpu") | |
chrome_options.add_argument("--window-size=1920,1080") | |
chrome_options.add_argument("--user-agent=Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0 Safari/537.36") | |
# Enable logging to capture console messages | |
chrome_options.add_argument("--enable-logging") | |
chrome_options.add_argument("--log-level=0") | |
try: | |
driver = webdriver.Chrome(options=chrome_options) | |
return driver | |
except Exception as e: | |
logger.error(f"Failed to create Chrome driver: {e}") | |
return None | |
# ------------------------- | |
# Interactive Scraper | |
# ------------------------- | |
def scrape_with_selenium(url: str) -> Dict[str, Any]: | |
"""Scrape webpage with Selenium, click Start Challenge, and extract data.""" | |
driver = None | |
try: | |
driver = setup_chrome_driver() | |
if not driver: | |
return {} | |
logger.info(f"Loading URL: {url}") | |
driver.get(url) | |
# Wait for page to load | |
WebDriverWait(driver, 10).until( | |
EC.presence_of_element_located((By.TAG_NAME, "body")) | |
) | |
time.sleep(2) | |
# Look for and click "Start Challenge" button | |
start_button_selectors = [ | |
"button:contains('Start Challenge')", | |
"button[id*='start']", | |
"button[class*='start']", | |
"input[value*='Start']", | |
"a[href*='start']", | |
".btn:contains('Start')", | |
"[onclick*='start']" | |
] | |
button_clicked = False | |
for selector in start_button_selectors: | |
try: | |
if "contains" in selector: | |
# Use XPath for text-based selection | |
xpath_selector = f"//button[contains(text(), 'Start Challenge')] | //button[contains(text(), 'Start')] | //input[contains(@value, 'Start')]" | |
elements = driver.find_elements(By.XPATH, xpath_selector) | |
else: | |
elements = driver.find_elements(By.CSS_SELECTOR, selector) | |
if elements: | |
logger.info(f"Found start button with selector: {selector}") | |
elements[0].click() | |
button_clicked = True | |
time.sleep(3) # Wait for challenge to start | |
break | |
except Exception as e: | |
logger.debug(f"Selector {selector} failed: {e}") | |
continue | |
if not button_clicked: | |
logger.warning("Could not find Start Challenge button, proceeding with current page") | |
# Get page source after interaction | |
html = driver.page_source | |
# Get console logs | |
console_logs = [] | |
try: | |
logs = driver.get_log('browser') | |
for log in logs: | |
console_logs.append(log['message']) | |
logger.info(f"Console log: {log['message']}") | |
except Exception as e: | |
logger.warning(f"Could not get console logs: {e}") | |
# Extract data from HTML | |
hidden_values: List[str] = [] | |
jwt_data: Dict[str, Any] = {} | |
# Look for JWT tokens in HTML and console logs | |
all_text = html + " ".join(console_logs) | |
jwt_patterns = [ | |
r"eyJ[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+", | |
r"[A-Za-z0-9_-]{20,}\.[A-Za-z0-9_-]{20,}\.[A-Za-z0-9_-]{20,}" | |
] | |
for pattern in jwt_patterns: | |
jwt_matches = re.findall(pattern, all_text) | |
for token in jwt_matches: | |
logger.info(f"Found JWT token: {token[:50]}...") | |
data = try_decode_jwt(token) | |
if data: | |
jwt_data.update(data) | |
for k, v in data.items(): | |
hidden_values.append(f"jwt {k}={v}") | |
# Look for completion codes in console logs | |
for log in console_logs: | |
# Look for completion codes | |
completion_matches = re.findall(r"completion[_\s]*code[:\s]*([A-Za-z0-9\-_]{6,})", log, flags=re.I) | |
for code in completion_matches: | |
hidden_values.append(f"completion_code {code}") | |
# Look for challenge completion messages | |
if "challenge" in log.lower() and ("complete" in log.lower() or "finished" in log.lower()): | |
hidden_values.append(f"console_message {log}") | |
# Execute JavaScript to check for global variables or challenge data | |
try: | |
js_result = driver.execute_script(""" | |
var data = {}; | |
if (window.challengeData) data.challengeData = window.challengeData; | |
if (window.challenge) data.challenge = window.challenge; | |
if (window.completionCode) data.completionCode = window.completionCode; | |
return data; | |
""") | |
if js_result: | |
for k, v in js_result.items(): | |
hidden_values.append(f"js_global {k}={v}") | |
logger.info(f"Found JS global: {k} = {v}") | |
except Exception as e: | |
logger.debug(f"JS execution failed: {e}") | |
# Look for data in local storage | |
try: | |
local_storage = driver.execute_script("return window.localStorage;") | |
if local_storage: | |
for k, v in local_storage.items(): | |
if any(keyword in k.lower() for keyword in ['challenge', 'code', 'completion']): | |
hidden_values.append(f"localStorage {k}={v}") | |
except Exception as e: | |
logger.debug(f"LocalStorage check failed: {e}") | |
logger.info(f"Found {len(hidden_values)} hidden values") | |
logger.info(f"JWT data: {jwt_data}") | |
return { | |
"title": driver.title, | |
"visible_text": driver.find_element(By.TAG_NAME, "body").text[:6000], | |
"hidden_values": hidden_values, | |
"jwt_data": jwt_data, | |
"console_logs": console_logs, | |
"button_clicked": button_clicked | |
} | |
except Exception as e: | |
logger.error(f"Selenium scraping failed for {url}: {e}") | |
return {} | |
finally: | |
if driver: | |
driver.quit() | |
# ------------------------- | |
# Answer extractor | |
# ------------------------- | |
def answer_question(question: str, content: Dict[str, Any]) -> str: | |
"""Enhanced rule-based extraction for Round 5 questions.""" | |
ql = question.lower() | |
hidden = content.get("hidden_values", []) | |
jwt_data = content.get("jwt_data", {}) | |
console_logs = content.get("console_logs", []) | |
logger.info(f"Answering question: {question}") | |
logger.info(f"Available JWT data: {jwt_data}") | |
logger.info(f"Hidden values count: {len(hidden)}") | |
# Challenge ID extraction | |
if "challenge id" in ql or "challengeid" in ql: | |
# First check JWT data directly | |
if "challengeID" in jwt_data: | |
result = str(jwt_data["challengeID"]) | |
logger.info(f"Found challengeID in JWT: {result}") | |
return result | |
# Check hidden values | |
for h in hidden: | |
if "challengeid" in h.lower(): | |
result = h.split("=", 1)[-1].strip() | |
logger.info(f"Found challengeID in hidden values: {result}") | |
return result | |
# Completion code extraction | |
if "completion" in ql and "code" in ql: | |
# Look for explicit completion codes | |
for h in hidden: | |
if "completion_code" in h.lower(): | |
result = h.split("=", 1)[-1].strip() | |
logger.info(f"Found completion code: {result}") | |
return result | |
# Look in console logs for completion codes | |
for log in console_logs: | |
completion_matches = re.findall(r"completion[_\s]*code[:\s]*([A-Za-z0-9\-_]{6,})", log, flags=re.I) | |
if completion_matches: | |
result = completion_matches[0] | |
logger.info(f"Found completion code in console: {result}") | |
return result | |
# Look for any long tokens that might be completion codes | |
for h in hidden: | |
if "token" in h.lower() or "code" in h.lower(): | |
token = h.split("=", 1)[-1].strip() | |
if len(token) > 15: # Assuming completion codes are reasonably long | |
logger.info(f"Found potential completion code: {token}") | |
return token | |
# Challenge name extraction | |
if "challenge name" in ql: | |
if "coolGuy" in jwt_data: | |
result = str(jwt_data["coolGuy"]) | |
logger.info(f"Found challenge name in JWT: {result}") | |
return result | |
# Fallback: return any relevant data from JWT | |
if jwt_data: | |
for key, value in jwt_data.items(): | |
if key not in ["iat", "exp"] and isinstance(value, str): | |
logger.info(f"Fallback: returning JWT field {key}: {value}") | |
return str(value) | |
logger.warning("No matching data found for question") | |
return "Challenge information not found" | |
# ------------------------- | |
# Routes | |
# ------------------------- | |
def root(): | |
return { | |
"message": "HackRx Round 5 API - Ready (with Selenium support)", | |
"endpoints": {"challenge": "POST /challenge", "health": "GET /health"}, | |
} | |
def health(): | |
return {"status": "healthy"} | |
def challenge(req: ChallengeRequest): | |
logger.info(f"Round 5 request: url={req.url}, questions={req.questions}") | |
content = scrape_with_selenium(req.url) | |
if not content: | |
return ChallengeResponse(answers=["Challenge information not found" for _ in req.questions]) | |
answers = [] | |
for q in req.questions: | |
ans = answer_question(q, content) | |
answers.append(ans) | |
logger.info(f"Q: {q} → A: {ans}") | |
logger.info(f"Final answers: {answers}") | |
return ChallengeResponse(answers=answers) |