import asyncio from contextlib import asynccontextmanager from typing import Optional from fastapi import APIRouter, FastAPI from fastapi.routing import APIRouter import httpx from pydantic import BaseModel, Field from playwright.async_api import async_playwright, Browser, BrowserContext, Page import logging import uvicorn from scrap import PatentScrapBulkResponse, scrap_patent_async, scrap_patent_bulk_async from serp import SerpQuery, SerpResults, query_arxiv, query_bing_search, query_brave_search, query_ddg_search, query_google_patents, query_google_scholar from utils import log_gathered_exceptions logging.basicConfig( level=logging.INFO, format='[%(asctime)s][%(levelname)s][%(filename)s:%(lineno)d]: %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) # playwright global context playwright = None pw_browser: Optional[Browser] = None # httpx client httpx_client = httpx.AsyncClient(timeout=30, limits=httpx.Limits( max_connections=30, max_keepalive_connections=20)) @asynccontextmanager async def api_lifespan(app: FastAPI): global playwright, pw_browser playwright = await async_playwright().start() pw_browser = await playwright.chromium.launch(headless=True) yield await pw_browser.close() await playwright.stop() app = FastAPI(lifespan=api_lifespan, docs_url="/", title="SERPent", description=open("docs/docs.md").read()) # Router for scrapping related endpoints scrap_router = APIRouter(prefix="/scrap", tags=["scrapping"]) # Router for SERP-scrapping related endpoints serp_router = APIRouter(prefix="/serp", tags=["serp scrapping"]) # ===================== Search endpoints ===================== @serp_router.post("/search_scholar") async def search_google_scholar(params: SerpQuery): """Queries google scholar for the specified query""" logging.info(f"Searching Google Scholar for queries: {params.queries}") results = await asyncio.gather(*[query_google_scholar(pw_browser, q, params.n_results) for q in params.queries], return_exceptions=True) log_gathered_exceptions(results, "google scholar search", params) # Filter out exceptions and flatten the results filtered_results = [r for r in results if not isinstance(r, Exception)] flattened_results = [ item for sublist in filtered_results for item in sublist] # all queries failed, return the last exception if len(filtered_results) == 0: return SerpResults(results=[], error=str(results[-1])) return SerpResults(results=flattened_results, error=None) @serp_router.post("/search_arxiv") async def search_arxiv(params: SerpQuery): """Searches arxiv for the specified queries and returns the found documents.""" logging.info(f"Searching Arxiv for queries: {params.queries}") results = await asyncio.gather(*[query_arxiv(httpx_client, q, params.n_results) for q in params.queries], return_exceptions=True) log_gathered_exceptions(results, "arxiv search", params) filtered_results = [r for r in results if not isinstance(r, Exception)] flattened_results = [ item for sublist in filtered_results for item in sublist] if len(filtered_results) == 0: return SerpResults(results=[], error=str(results[-1])) return SerpResults(results=flattened_results, error=None) @serp_router.post("/search_patents") async def search_patents(params: SerpQuery) -> SerpResults: """Searches google patents for the specified queries and returns the found documents.""" logging.info(f"Searching Google Patents for queries: {params.queries}") results = await asyncio.gather(*[query_google_patents(pw_browser, q, params.n_results) for q in params.queries], return_exceptions=True) log_gathered_exceptions(results, "google patent search", params) # Filter out exceptions and flatten the results filtered_results = [r for r in results if not isinstance(r, Exception)] flattened_results = [ item for sublist in filtered_results for item in sublist] # all queries failed, return the last exception if len(filtered_results) == 0: return SerpResults(results=[], error=str(results[-1])) return SerpResults(results=flattened_results, error=None) @serp_router.post("/search_brave") async def search_brave(params: SerpQuery) -> SerpResults: """Searches brave search for the specified queries and returns the found documents.""" logging.info(f"Searching Brave Search for queries: {params.queries}") results = await asyncio.gather(*[query_brave_search(pw_browser, q, params.n_results) for q in params.queries], return_exceptions=True) log_gathered_exceptions(results, "brave search", params) # Filter out exceptions and flatten the results filtered_results = [r for r in results if not isinstance(r, Exception)] flattened_results = [ item for sublist in filtered_results for item in sublist] # all queries failed, return the last exception if len(filtered_results) == 0: return SerpResults(results=[], error=str(results[-1])) return SerpResults(results=flattened_results, error=None) @serp_router.post("/search_bing") async def search_bing(params: SerpQuery) -> SerpResults: """Searches Bing search for the specified queries and returns the found documents.""" logging.info(f"Searching Bing Search for queries: {params.queries}") results = await asyncio.gather(*[query_bing_search(pw_browser, q, params.n_results) for q in params.queries], return_exceptions=True) log_gathered_exceptions(results, "bing search", params) # Filter out exceptions and flatten the results filtered_results = [r for r in results if not isinstance(r, Exception)] flattened_results = [ item for sublist in filtered_results for item in sublist] # all queries failed, return the last exception if len(filtered_results) == 0: return SerpResults(results=[], error=str(results[-1])) return SerpResults(results=flattened_results, error=None) @serp_router.post("/search_duck") async def search_duck(params: SerpQuery) -> SerpResults: """Searches duckduckgo for the specified queries and returns the found documents""" logging.info(f"Searching DuckDuckGo for queries: {params.queries}") results = await asyncio.gather(*[query_ddg_search(q, params.n_results) for q in params.queries], return_exceptions=True) log_gathered_exceptions(results, "duckduckgo search", params) # Filter out exceptions and flatten the results filtered_results = [r for r in results if not isinstance(r, Exception)] flattened_results = [ item for sublist in filtered_results for item in sublist] # all queries failed, return the last exception if len(filtered_results) == 0: return SerpResults(results=[], error=str(results[-1])) return SerpResults(results=flattened_results, error=None) @serp_router.post("/search") async def search(params: SerpQuery): """Attempts to search the specified queries using ALL backends""" results = [] for q in params.queries: try: logging.info(f"Querying DDG with query: `{q}`") res = await query_ddg_search(q, params.n_results) results.extend(res) continue except Exception as e: logging.error(f"Failed to query DDG with query `{q}`: {e}") logging.info("Trying with next browser backend.") try: logging.info(f"Querying Brave Search with query: `{q}`") res = await query_brave_search(pw_browser, q, params.n_results) results.extend(res) continue except Exception as e: logging.error( f"Failed to query Brave Search with query `{q}`: {e}") logging.info("Trying with next browser backend.") try: logging.info(f"Querying Bing with query: `{q}`") res = await query_bing_search(pw_browser, q, params.n_results) results.extend(res) continue except Exception as e: logging.error(f"Failed to query Bing search with query `{q}`: {e}") logging.info("Trying with next browser backend.") if len(results) == 0: return SerpResults(results=[], error="All backends are rate-limited.") return SerpResults(results=results, error=None) # =========================== Scrapping endpoints =========================== # TODO: return a proper error response if the patent is not found or scrapping fails @scrap_router.get("/scrap_patent/{patent_id}") async def scrap_patent(patent_id: str): """Scraps the specified patent from Google Patents.""" try: patent = await scrap_patent_async(httpx_client, f"https://patents.google.com/patent/{patent_id}/en") return patent except Exception as e: logging.warning(f"Failed to scrap patent {patent_id}: {e}") return None class ScrapPatentsRequest(BaseModel): """Request model for scrapping multiple patents.""" patent_ids: list[str] = Field(..., description="List of patent IDs to scrap") @scrap_router.post("/scrap_patents_bulk", response_model=PatentScrapBulkResponse) async def scrap_patents(params: ScrapPatentsRequest) -> PatentScrapBulkResponse: """Scraps multiple patents from Google Patents.""" patents = await scrap_patent_bulk_async(httpx_client, params.patent_ids) return patents # =============================================================================== app.include_router(serp_router) app.include_router(scrap_router) uvicorn.run(app, host="0.0.0.0", port=7860)