import os import re import json import base64 import logging import time from typing import List, Dict, Any from fastapi import FastAPI from pydantic import BaseModel from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.service import Service # ------------------------- # Logging # ------------------------- logging.basicConfig(level=os.getenv("LOG_LEVEL", "INFO")) logger = logging.getLogger("hackrx-round5") # ------------------------- # FastAPI app # ------------------------- app = FastAPI(title="HackRx Round 5 API", version="1.0.0") # ------------------------- # Models # ------------------------- class ChallengeRequest(BaseModel): url: str questions: List[str] class ChallengeResponse(BaseModel): answers: List[str] # ------------------------- # Helpers # ------------------------- def try_decode_jwt(token: str) -> Dict[str, Any]: """Try to decode a JWT without verifying signature.""" try: parts = token.split(".") if len(parts) != 3: return {} payload_b64 = parts[1] + "=" * (-len(parts[1]) % 4) # pad payload_json = base64.urlsafe_b64decode(payload_b64).decode("utf-8") decoded_payload = json.loads(payload_json) logger.info(f"Decoded JWT payload: {decoded_payload}") return decoded_payload except Exception as e: logger.error(f"JWT decode error: {e}") return {} def setup_chrome_driver(): """Setup Chrome driver with appropriate options.""" chrome_options = Options() chrome_options.add_argument("--headless") # Run in background chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") chrome_options.add_argument("--disable-gpu") chrome_options.add_argument("--window-size=1920,1080") chrome_options.add_argument("--user-agent=Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0 Safari/537.36") # Enable logging to capture console messages chrome_options.add_argument("--enable-logging") chrome_options.add_argument("--log-level=0") try: driver = webdriver.Chrome(options=chrome_options) return driver except Exception as e: logger.error(f"Failed to create Chrome driver: {e}") return None # ------------------------- # Interactive Scraper # ------------------------- def scrape_with_selenium(url: str) -> Dict[str, Any]: """Scrape webpage with Selenium, click Start Challenge, and extract data.""" driver = None try: driver = setup_chrome_driver() if not driver: return {} logger.info(f"Loading URL: {url}") driver.get(url) # Wait for page to load WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.TAG_NAME, "body")) ) time.sleep(2) # Look for and click "Start Challenge" button start_button_selectors = [ "button:contains('Start Challenge')", "button[id*='start']", "button[class*='start']", "input[value*='Start']", "a[href*='start']", ".btn:contains('Start')", "[onclick*='start']" ] button_clicked = False for selector in start_button_selectors: try: if "contains" in selector: # Use XPath for text-based selection xpath_selector = f"//button[contains(text(), 'Start Challenge')] | //button[contains(text(), 'Start')] | //input[contains(@value, 'Start')]" elements = driver.find_elements(By.XPATH, xpath_selector) else: elements = driver.find_elements(By.CSS_SELECTOR, selector) if elements: logger.info(f"Found start button with selector: {selector}") elements[0].click() button_clicked = True time.sleep(3) # Wait for challenge to start break except Exception as e: logger.debug(f"Selector {selector} failed: {e}") continue if not button_clicked: logger.warning("Could not find Start Challenge button, proceeding with current page") # Get page source after interaction html = driver.page_source # Get console logs console_logs = [] try: logs = driver.get_log('browser') for log in logs: console_logs.append(log['message']) logger.info(f"Console log: {log['message']}") except Exception as e: logger.warning(f"Could not get console logs: {e}") # Extract data from HTML hidden_values: List[str] = [] jwt_data: Dict[str, Any] = {} # Look for JWT tokens in HTML and console logs all_text = html + " ".join(console_logs) jwt_patterns = [ r"eyJ[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+", r"[A-Za-z0-9_-]{20,}\.[A-Za-z0-9_-]{20,}\.[A-Za-z0-9_-]{20,}" ] for pattern in jwt_patterns: jwt_matches = re.findall(pattern, all_text) for token in jwt_matches: logger.info(f"Found JWT token: {token[:50]}...") data = try_decode_jwt(token) if data: jwt_data.update(data) for k, v in data.items(): hidden_values.append(f"jwt {k}={v}") # Look for completion codes in console logs for log in console_logs: # Look for completion codes completion_matches = re.findall(r"completion[_\s]*code[:\s]*([A-Za-z0-9\-_]{6,})", log, flags=re.I) for code in completion_matches: hidden_values.append(f"completion_code {code}") # Look for challenge completion messages if "challenge" in log.lower() and ("complete" in log.lower() or "finished" in log.lower()): hidden_values.append(f"console_message {log}") # Execute JavaScript to check for global variables or challenge data try: js_result = driver.execute_script(""" var data = {}; if (window.challengeData) data.challengeData = window.challengeData; if (window.challenge) data.challenge = window.challenge; if (window.completionCode) data.completionCode = window.completionCode; return data; """) if js_result: for k, v in js_result.items(): hidden_values.append(f"js_global {k}={v}") logger.info(f"Found JS global: {k} = {v}") except Exception as e: logger.debug(f"JS execution failed: {e}") # Look for data in local storage try: local_storage = driver.execute_script("return window.localStorage;") if local_storage: for k, v in local_storage.items(): if any(keyword in k.lower() for keyword in ['challenge', 'code', 'completion']): hidden_values.append(f"localStorage {k}={v}") except Exception as e: logger.debug(f"LocalStorage check failed: {e}") logger.info(f"Found {len(hidden_values)} hidden values") logger.info(f"JWT data: {jwt_data}") return { "title": driver.title, "visible_text": driver.find_element(By.TAG_NAME, "body").text[:6000], "hidden_values": hidden_values, "jwt_data": jwt_data, "console_logs": console_logs, "button_clicked": button_clicked } except Exception as e: logger.error(f"Selenium scraping failed for {url}: {e}") return {} finally: if driver: driver.quit() # ------------------------- # Answer extractor # ------------------------- def answer_question(question: str, content: Dict[str, Any]) -> str: """Enhanced rule-based extraction for Round 5 questions.""" ql = question.lower() hidden = content.get("hidden_values", []) jwt_data = content.get("jwt_data", {}) console_logs = content.get("console_logs", []) logger.info(f"Answering question: {question}") logger.info(f"Available JWT data: {jwt_data}") logger.info(f"Hidden values count: {len(hidden)}") # Challenge ID extraction if "challenge id" in ql or "challengeid" in ql: # First check JWT data directly if "challengeID" in jwt_data: result = str(jwt_data["challengeID"]) logger.info(f"Found challengeID in JWT: {result}") return result # Check hidden values for h in hidden: if "challengeid" in h.lower(): result = h.split("=", 1)[-1].strip() logger.info(f"Found challengeID in hidden values: {result}") return result # Completion code extraction if "completion" in ql and "code" in ql: # Look for explicit completion codes for h in hidden: if "completion_code" in h.lower(): result = h.split("=", 1)[-1].strip() logger.info(f"Found completion code: {result}") return result # Look in console logs for completion codes for log in console_logs: completion_matches = re.findall(r"completion[_\s]*code[:\s]*([A-Za-z0-9\-_]{6,})", log, flags=re.I) if completion_matches: result = completion_matches[0] logger.info(f"Found completion code in console: {result}") return result # Look for any long tokens that might be completion codes for h in hidden: if "token" in h.lower() or "code" in h.lower(): token = h.split("=", 1)[-1].strip() if len(token) > 15: # Assuming completion codes are reasonably long logger.info(f"Found potential completion code: {token}") return token # Challenge name extraction if "challenge name" in ql: if "coolGuy" in jwt_data: result = str(jwt_data["coolGuy"]) logger.info(f"Found challenge name in JWT: {result}") return result # Fallback: return any relevant data from JWT if jwt_data: for key, value in jwt_data.items(): if key not in ["iat", "exp"] and isinstance(value, str): logger.info(f"Fallback: returning JWT field {key}: {value}") return str(value) logger.warning("No matching data found for question") return "Challenge information not found" # ------------------------- # Routes # ------------------------- @app.get("/") def root(): return { "message": "HackRx Round 5 API - Ready (with Selenium support)", "endpoints": {"challenge": "POST /challenge", "health": "GET /health"}, } @app.get("/health") def health(): return {"status": "healthy"} @app.post("/challenge", response_model=ChallengeResponse) def challenge(req: ChallengeRequest): logger.info(f"Round 5 request: url={req.url}, questions={req.questions}") content = scrape_with_selenium(req.url) if not content: return ChallengeResponse(answers=["Challenge information not found" for _ in req.questions]) answers = [] for q in req.questions: ans = answer_question(q, content) answers.append(ans) logger.info(f"Q: {q} → A: {ans}") logger.info(f"Final answers: {answers}") return ChallengeResponse(answers=answers)