| |
| from __future__ import annotations |
| import os |
| import time |
| import logging |
| import threading |
| import asyncio |
| from typing import Optional, Dict, Any, Tuple |
| from concurrent.futures import ThreadPoolExecutor |
| from urllib.parse import quote_plus, urljoin |
|
|
| from fastapi import FastAPI, HTTPException, Query, Body |
| from pydantic import BaseModel |
| from starlette.responses import JSONResponse |
|
|
| from selenium import webdriver |
| from selenium.webdriver.chrome.options import Options |
| from selenium.webdriver.chrome.service import Service |
| from selenium.common.exceptions import WebDriverException, SessionNotCreatedException |
| from webdriver_manager.chrome import ChromeDriverManager |
| from bs4 import BeautifulSoup |
| from selenium.webdriver.common.by import By |
| from selenium.webdriver.support.ui import WebDriverWait |
| from selenium.webdriver.support import expected_conditions as EC |
| from selenium.common.exceptions import TimeoutException |
|
|
| |
| from pyvirtualdisplay import Display |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger("fast_fetcher") |
|
|
| |
| class BrowserManager: |
| def __init__( |
| self, |
| headless: bool = True, |
| user_agent: Optional[str] = None, |
| window_size: str = "1366,768", |
| disable_images: bool = True, |
| block_resource_urls: Optional[list[str]] = None, |
| ): |
| self.headless = headless |
| self.user_agent = user_agent |
| self.window_size = window_size |
| self.disable_images = disable_images |
| self.block_resource_urls = block_resource_urls or [ |
| "*.doubleclick.net/*", |
| "*.google-analytics.com/*", |
| "*.googlesyndication.com/*", |
| "*.adservice.google.com/*", |
| ] |
| self._driver_lock = threading.Lock() |
| self._driver: Optional[webdriver.Chrome] = None |
| self._display: Optional[Display] = None |
| self._start_driver_with_retries() |
|
|
| def _build_options(self) -> Options: |
| opts = Options() |
| |
| chrome_bin = os.environ.get("CHROME_BIN", "/usr/bin/google-chrome-stable") |
| if os.path.exists(chrome_bin): |
| opts.binary_location = chrome_bin |
| logger.debug("Using chrome binary: %s", chrome_bin) |
| else: |
| logger.warning("Chrome binary not found at %s (will rely on system/browser manager).", chrome_bin) |
|
|
| if self.headless: |
| opts.add_argument("--headless=new") |
| opts.add_argument("--headless") |
|
|
| |
| opts.add_argument("--no-sandbox") |
| opts.add_argument("--disable-setuid-sandbox") |
| opts.add_argument("--disable-dev-shm-usage") |
| opts.add_argument("--disable-gpu") |
| opts.add_argument("--disable-extensions") |
| opts.add_argument("--disable-blink-features=AutomationControlled") |
| opts.add_argument("--disable-software-rasterizer") |
| opts.add_argument(f"--window-size={self.window_size}") |
| opts.add_argument("--remote-debugging-port=0") |
|
|
| if self.user_agent: |
| opts.add_argument(f"--user-agent={self.user_agent}") |
|
|
| if self.disable_images: |
| prefs = { |
| "profile.managed_default_content_settings.images": 2, |
| "profile.managed_default_content_settings.stylesheets": 2, |
| "profile.managed_default_content_settings.fonts": 2, |
| } |
| opts.add_experimental_option("prefs", prefs) |
|
|
| opts.add_experimental_option("excludeSwitches", ["enable-logging"]) |
| opts.add_experimental_option("useAutomationExtension", False) |
| return opts |
|
|
| def _start_driver_with_retries(self, attempts: int = 3, delay_seconds: float = 1.0): |
| last_exc = None |
| for attempt in range(1, attempts + 1): |
| try: |
| logger.info("Starting Chrome driver (attempt %d/%d)...", attempt, attempts) |
| self._start_driver() |
| logger.info("Chrome driver started successfully.") |
| return |
| except Exception as exc: |
| logger.exception("Failed to start driver on attempt %d: %s", attempt, exc) |
| last_exc = exc |
| time.sleep(delay_seconds) |
| raise RuntimeError(f"Unable to start Chrome driver after {attempts} attempts: {last_exc}") from last_exc |
|
|
| def _start_xvfb_if_needed(self): |
| |
| if not self.headless and os.environ.get("DISPLAY", "") == "": |
| try: |
| logger.info("No DISPLAY found and headless=False — starting virtual X display (Xvfb).") |
| self._display = Display(visible=0, size=(int(self.window_size.split(",")[0]), int(self.window_size.split(",")[1]))) |
| self._display.start() |
| logger.info("Virtual X display started (DISPLAY=%s).", os.environ.get("DISPLAY")) |
| except Exception as e: |
| logger.exception("Failed to start virtual display: %s", e) |
| raise |
|
|
| def _stop_xvfb_if_started(self): |
| if self._display: |
| try: |
| self._display.stop() |
| logger.info("Virtual X display stopped.") |
| except Exception: |
| pass |
| self._display = None |
|
|
| def _start_driver(self): |
| |
| self._start_xvfb_if_needed() |
|
|
| opts = self._build_options() |
|
|
| |
| primary_exc = None |
| fallback_exc = None |
| try: |
| logger.debug("Attempting to start Chrome via Selenium Manager (webdriver.Chrome(options=opts))") |
| self._driver = webdriver.Chrome(options=opts) |
| |
| try: |
| self._driver.execute_script("return navigator.userAgent") |
| except Exception as e: |
| |
| raise RuntimeError("Browser started by Selenium Manager but crashed immediately.") from e |
|
|
| self._post_start_setup() |
| return |
| except Exception as e_primary: |
| primary_exc = e_primary |
| logger.warning("Selenium Manager attempt failed: %s", e_primary) |
|
|
| |
| try: |
| driver_path = ChromeDriverManager().install() |
| logger.info("webdriver-manager installed chromedriver: %s", driver_path) |
| try: |
| os.chmod(driver_path, 0o755) |
| except Exception: |
| logger.debug("chmod on chromedriver failed or unnecessary.") |
|
|
| service = Service(driver_path) |
| self._driver = webdriver.Chrome(service=service, options=opts) |
| self._post_start_setup() |
| return |
| except Exception as e_fallback: |
| fallback_exc = e_fallback |
| logger.exception("webdriver-manager fallback failed: %s", e_fallback) |
|
|
| |
| try: |
| sys_path = "/usr/bin/chromedriver" |
| if os.path.exists(sys_path): |
| logger.info("Trying system chromedriver at %s", sys_path) |
| try: |
| os.chmod(sys_path, 0o755) |
| except Exception: |
| pass |
| service = Service(sys_path) |
| self._driver = webdriver.Chrome(service=service, options=opts) |
| self._post_start_setup() |
| return |
| except Exception as e_sys: |
| logger.exception("System chromedriver attempt failed: %s", e_sys) |
|
|
| |
| self._stop_xvfb_if_started() |
| |
| raise RuntimeError(f"Failed to start Chrome driver. primary_error={primary_exc}, fallback_error={fallback_exc}") |
|
|
| def _post_start_setup(self): |
| try: |
| self._driver.set_page_load_timeout(60) |
| |
| try: |
| self._driver.execute_cdp_cmd("Network.enable", {}) |
| if self.block_resource_urls: |
| self._driver.execute_cdp_cmd("Network.setBlockedURLs", {"urls": self.block_resource_urls}) |
| except Exception: |
| pass |
| except Exception: |
| pass |
|
|
| def fetch_html( |
| self, |
| url: str, |
| wait_seconds: Optional[float] = 10.0, |
| wait_for_selector: Optional[str] = None, |
| ) -> str: |
| if self._driver is None: |
| self._start_driver_with_retries() |
|
|
| with self._driver_lock: |
| driver = self._driver |
| try: |
| driver.get(url) |
|
|
| if wait_for_selector and wait_seconds: |
| try: |
| WebDriverWait(driver, wait_seconds).until( |
| EC.presence_of_element_located((By.CSS_SELECTOR, wait_for_selector)) |
| ) |
| except TimeoutException: |
| pass |
| else: |
| if wait_seconds: |
| try: |
| WebDriverWait(driver, min(wait_seconds, 3)).until( |
| lambda d: d.execute_script("return document.readyState") == "complete" |
| ) |
| except Exception: |
| time.sleep(0.5) |
|
|
| return driver.page_source |
| except WebDriverException as e: |
| logger.exception("WebDriver exception during fetch: %s", e) |
| |
| try: |
| self._safe_quit_driver() |
| except Exception: |
| pass |
| self._start_driver_with_retries() |
| raise RuntimeError(f"WebDriver error during fetch: {e}") |
|
|
| def _safe_quit_driver(self): |
| if self._driver: |
| try: |
| self._driver.quit() |
| except Exception: |
| pass |
| self._driver = None |
| |
| self._stop_xvfb_if_started() |
|
|
| def close(self): |
| self._safe_quit_driver() |
|
|
|
|
| |
| def EXTRACT_DATA(html: str) -> Dict[str, Any]: |
| soup = BeautifulSoup(html, "html.parser") |
| BASE_URL = "https://www.google.com" |
|
|
| def safe_text(el): |
| return el.get_text(strip=True) if el else "" |
|
|
| def safe_attr(el, attr): |
| return el.get(attr) if el and el.has_attr(attr) else "" |
|
|
| def abs_url(url): |
| return urljoin(BASE_URL, url) if url else "" |
|
|
| def clean_thumb(src): |
| if src and not src.startswith("data:"): |
| return abs_url(src) |
| return None |
|
|
| def is_ad_element(element): |
| for parent in element.parents: |
| if parent.get("id") in ["tads", "tadsb"] or "ads-ad" in parent.get("class", []): |
| return True |
| return False |
|
|
| web_results = [] |
| for result in soup.select(".tF2Cxc"): |
| if is_ad_element(result): |
| continue |
| title_tag = result.select_one("h3") |
| link_tag = result.select_one("a") |
| cite_tag = result.select_one("cite") |
| snippet_tag = result.select_one(".VwiC3b") |
| read_more_tag = result.select_one(".vzmbzf") |
|
|
| if title_tag and link_tag: |
| entry = { |
| "no": len(web_results) + 1, |
| "title": safe_text(title_tag), |
| "link": abs_url(safe_attr(link_tag, "href")), |
| "displayed_url": safe_text(cite_tag), |
| "snippet": safe_text(snippet_tag) |
| } |
| extra = [] |
| if read_more_tag: |
| read_more_url = abs_url(safe_attr(read_more_tag, "href")) |
| if read_more_url: |
| extra.append({"read_more": read_more_url}) |
| if extra: |
| entry["extra"] = extra |
| web_results.append(entry) |
|
|
| image_results = [] |
| for img_item in soup.select(".eA0Zlc"): |
| img_tag = img_item.select_one("img") |
| link_tag = img_item.select_one("a") |
| source_tag = img_item.select_one(".s0fJje span") |
| src = safe_attr(img_tag, "data-src") or safe_attr(img_tag, "src") |
| thumb = clean_thumb(src) |
| if thumb: |
| image_results.append({ |
| "thumbnail": thumb, |
| "alt": safe_attr(img_tag, "alt"), |
| "source": safe_text(source_tag), |
| "link": abs_url(safe_attr(link_tag, "href")) |
| }) |
|
|
| video_results = [] |
| for video in soup.select(".KYaZsb"): |
| title_tag = video.select_one(".tNxQIb.ynAwRc") |
| link_tag = video.select_one("a.rIRoqf") |
| thumb_img = video.select_one(".AZJdrc img") |
| duration_tag = video.select_one(".c8rnLc") |
| channel_tag = video.select_one(".Sg4azc span:first-child") |
| date_tag = video.select_one(".rbYSKb span") |
| desc_tag = video.select_one(".wNifxf .p4wth") |
| thumb_src = safe_attr(thumb_img, "data-src") or safe_attr(thumb_img, "src") |
| thumb = clean_thumb(thumb_src) |
| if title_tag and link_tag: |
| video_results.append({ |
| "title": safe_text(title_tag), |
| "link": abs_url(safe_attr(link_tag, "href")), |
| "thumbnail": thumb, |
| "duration": safe_text(duration_tag), |
| "channel": safe_text(channel_tag), |
| "date": safe_text(date_tag), |
| "description_snippet": safe_text(desc_tag) |
| }) |
|
|
| news_results = [] |
| for news in soup.select(".m7jPZ"): |
| title_tag = news.select_one(".n0jPhd") |
| link_tag = news.select_one("a") |
| source_tag = news.select_one(".MgUUmf span") |
| time_tag = news.select_one(".rbYSKb span") |
| thumb_img = news.select_one(".uhHOwf img") |
| thumb_src = safe_attr(thumb_img, "data-src") or safe_attr(thumb_img, "src") |
| thumb = clean_thumb(thumb_src) |
| if title_tag and link_tag: |
| news_results.append({ |
| "title": safe_text(title_tag), |
| "link": abs_url(safe_attr(link_tag, "href")), |
| "source": safe_text(source_tag), |
| "time": safe_text(time_tag), |
| "thumbnail": thumb |
| }) |
|
|
| knowledge_panel = {} |
| rhs = soup.find(id="rhs") |
| if rhs: |
| title_tag = rhs.select_one(".PZPZlf.ssJ7i") |
| subtitle_tag = rhs.select_one(".iAIpCb span") |
| if title_tag: |
| knowledge_panel["title"] = safe_text(title_tag) |
| if subtitle_tag: |
| knowledge_panel["subtitle"] = safe_text(subtitle_tag) |
|
|
| desc_tag = rhs.select_one(".kno-rdesc span") |
| if desc_tag: |
| knowledge_panel["description"] = safe_text(desc_tag) |
|
|
| facts = {} |
| for fact in rhs.select(".zloOqf"): |
| label_tag = fact.select_one(".w8qArf") |
| value_tag = fact.select_one(".LrzXr") |
| if label_tag and value_tag: |
| label = safe_text(label_tag).replace(":", "").strip() |
| links = value_tag.find_all("a") |
| if links and len(links) > 1: |
| names = [safe_text(a) for a in links if safe_text(a)] |
| if names: |
| facts[label] = names |
| else: |
| text = safe_text(value_tag) |
| if text: |
| facts[label] = text |
| if facts: |
| knowledge_panel["facts"] = facts |
|
|
| profiles = [] |
| for profile in rhs.select(".dRrfkf a"): |
| name_tag = profile.select_one(".CtCigf") |
| link = safe_attr(profile, "href") |
| if name_tag and link: |
| profiles.append({ |
| "platform": safe_text(name_tag), |
| "link": abs_url(link) |
| }) |
| if profiles: |
| knowledge_panel["profiles"] = profiles |
|
|
| if not knowledge_panel: |
| knowledge_panel = None |
|
|
| ai_overview = None |
| ai_container = soup.select_one(".p2M1Qe .f5cPye") |
| if ai_container: |
| text = safe_text(ai_container) |
| if text: |
| ai_overview = text |
|
|
| thumbnails = set() |
| for img in soup.select("img[data-src], img[src]"): |
| src = safe_attr(img, "data-src") or safe_attr(img, "src") |
| clean = clean_thumb(src) |
| if clean: |
| thumbnails.add(clean) |
|
|
| all_thumbnails = sorted(thumbnails) if thumbnails else None |
|
|
| data = {} |
| if web_results: |
| data["web_results"] = web_results |
| if image_results: |
| data["image_results"] = image_results |
| if video_results: |
| data["video_results"] = video_results |
| if news_results: |
| data["news_results"] = news_results |
| if knowledge_panel: |
| data["knowledge_panel"] = knowledge_panel |
| if ai_overview: |
| data["ai_overview"] = ai_overview |
| if all_thumbnails: |
| data["all_thumbnail_urls"] = all_thumbnails |
|
|
| return data |
|
|
|
|
| |
| class BrowserPool: |
| def __init__(self, pool_size: int = 1, headless: bool = True): |
| self.pool_size = max(1, pool_size) |
| self.managers = [BrowserManager(headless=headless) for _ in range(self.pool_size)] |
| self._rr_index = 0 |
| self._rr_lock = threading.Lock() |
|
|
| def pick_manager(self) -> BrowserManager: |
| with self._rr_lock: |
| idx = self._rr_index |
| self._rr_index = (self._rr_index + 1) % self.pool_size |
| return self.managers[idx] |
|
|
| def close_all(self): |
| for m in self.managers: |
| try: |
| m.close() |
| except Exception: |
| pass |
|
|
| class SimpleTTLCache: |
| def __init__(self, ttl_seconds: int = 20): |
| self.ttl = ttl_seconds |
| self._cache: Dict[str, Tuple[float, Any]] = {} |
| self._lock = threading.Lock() |
|
|
| def get(self, key: str): |
| with self._lock: |
| item = self._cache.get(key) |
| if not item: |
| return None |
| ts, value = item |
| if time.time() - ts > self.ttl: |
| del self._cache[key] |
| return None |
| return value |
|
|
| def set(self, key: str, value: Any): |
| with self._lock: |
| self._cache[key] = (time.time(), value) |
|
|
| class SearchRequest(BaseModel): |
| query: Optional[str] = None |
| url: Optional[str] = None |
| wait_for_selector: Optional[str] = None |
| headless: Optional[bool] = True |
|
|
| app = FastAPI(title="fast_fetcher_api", version="0.1") |
| POOL: Optional[BrowserPool] = None |
| EXECUTOR: Optional[ThreadPoolExecutor] = None |
| CACHE = SimpleTTLCache(ttl_seconds=25) |
|
|
| @app.on_event("startup") |
| async def startup_event(): |
| global POOL, EXECUTOR |
| |
| POOL = BrowserPool(pool_size=1, headless=False) |
| EXECUTOR = ThreadPoolExecutor(max_workers=2) |
| app.state.executor = EXECUTOR |
| app.state.pool = POOL |
| logger.info("Startup: browser pool created (size=%d).", 1) |
|
|
| @app.on_event("shutdown") |
| async def shutdown_event(): |
| global POOL, EXECUTOR |
| if POOL: |
| POOL.close_all() |
| if EXECUTOR: |
| EXECUTOR.shutdown(wait=True) |
| logger.info("Shutdown: browsers closed and executor stopped.") |
|
|
| def _blocking_fetch_and_extract(manager: BrowserManager, url: str, wait_for_selector: Optional[str], wait_seconds: Optional[float]): |
| start = time.time() |
| html = manager.fetch_html(url, wait_seconds=wait_seconds, wait_for_selector=wait_for_selector) |
| extracted = EXTRACT_DATA(html) |
| duration = time.time() - start |
| return {"url": url, "duration": duration, "data": extracted} |
|
|
| @app.get("/health") |
| async def health(): |
| return {"status": "ok"} |
|
|
| @app.get("/search") |
| async def search(query: str = Query(..., min_length=1), wait_for_selector: Optional[str] = None): |
| q = query.strip() |
| if not q: |
| raise HTTPException(status_code=400, detail="query parameter required") |
|
|
| url = f"https://www.google.com/search?q={quote_plus(q)}" |
| cache_key = f"search:{q}:{wait_for_selector}" |
|
|
| cached = CACHE.get(cache_key) |
| if cached: |
| return JSONResponse(content={"cached": True, **cached}) |
|
|
| manager = app.state.pool.pick_manager() |
| loop = asyncio.get_event_loop() |
| fut = loop.run_in_executor(app.state.executor, _blocking_fetch_and_extract, manager, url, wait_for_selector, 5.0) |
| result = await fut |
| CACHE.set(cache_key, result) |
| return JSONResponse(content={"cached": False, **result}) |
|
|
| @app.get("/fetch") |
| async def fetch(url: str = Query(..., min_length=5), wait_for_selector: Optional[str] = None): |
| manager = app.state.pool.pick_manager() |
| loop = asyncio.get_event_loop() |
| fut = loop.run_in_executor(app.state.executor, _blocking_fetch_and_extract, manager, url, wait_for_selector, 6.0) |
| result = await fut |
| return JSONResponse(content=result) |
|
|
| @app.post("/search") |
| async def post_search(body: SearchRequest = Body(...)): |
| if not (body.query or body.url): |
| raise HTTPException(status_code=400, detail="Either query or url must be provided") |
| if body.url: |
| target = body.url |
| else: |
| target = f"https://www.google.com/search?q={quote_plus(body.query)}" |
|
|
| cache_key = f"search_post:{target}:{body.wait_for_selector}" |
| cached = CACHE.get(cache_key) |
| if cached: |
| return JSONResponse(content={"cached": True, **cached}) |
|
|
| manager = app.state.pool.pick_manager() |
| loop = asyncio.get_event_loop() |
| fut = loop.run_in_executor(app.state.executor, _blocking_fetch_and_extract, manager, target, body.wait_for_selector, 6.0) |
| result = await fut |
| CACHE.set(cache_key, result) |
| return JSONResponse(content={"cached": False, **result}) |
|
|