SERPent / app.py
Game4all's picture
Implement arXiv backend
21275ec
import asyncio
from contextlib import asynccontextmanager
from typing import Optional
from fastapi import APIRouter, FastAPI
from fastapi.routing import APIRouter
import httpx
from pydantic import BaseModel, Field
from playwright.async_api import async_playwright, Browser, BrowserContext, Page
import logging
import uvicorn
from scrap import PatentScrapBulkResponse, scrap_patent_async, scrap_patent_bulk_async
from serp import SerpQuery, SerpResults, query_arxiv, query_bing_search, query_brave_search, query_ddg_search, query_google_patents, query_google_scholar
from utils import log_gathered_exceptions
logging.basicConfig(
level=logging.INFO,
format='[%(asctime)s][%(levelname)s][%(filename)s:%(lineno)d]: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# playwright global context
playwright = None
pw_browser: Optional[Browser] = None
# httpx client
httpx_client = httpx.AsyncClient(timeout=30, limits=httpx.Limits(
max_connections=30, max_keepalive_connections=20))
@asynccontextmanager
async def api_lifespan(app: FastAPI):
global playwright, pw_browser
playwright = await async_playwright().start()
pw_browser = await playwright.chromium.launch(headless=True)
yield
await pw_browser.close()
await playwright.stop()
app = FastAPI(lifespan=api_lifespan, docs_url="/",
title="SERPent", description=open("docs/docs.md").read())
# Router for scrapping related endpoints
scrap_router = APIRouter(prefix="/scrap", tags=["scrapping"])
# Router for SERP-scrapping related endpoints
serp_router = APIRouter(prefix="/serp", tags=["serp scrapping"])
# ===================== Search endpoints =====================
@serp_router.post("/search_scholar")
async def search_google_scholar(params: SerpQuery):
"""Queries google scholar for the specified query"""
logging.info(f"Searching Google Scholar for queries: {params.queries}")
results = await asyncio.gather(*[query_google_scholar(pw_browser, q, params.n_results) for q in params.queries], return_exceptions=True)
log_gathered_exceptions(results, "google scholar search", params)
# Filter out exceptions and flatten the results
filtered_results = [r for r in results if not isinstance(r, Exception)]
flattened_results = [
item for sublist in filtered_results for item in sublist]
# all queries failed, return the last exception
if len(filtered_results) == 0:
return SerpResults(results=[], error=str(results[-1]))
return SerpResults(results=flattened_results, error=None)
@serp_router.post("/search_arxiv")
async def search_arxiv(params: SerpQuery):
"""Searches arxiv for the specified queries and returns the found documents."""
logging.info(f"Searching Arxiv for queries: {params.queries}")
results = await asyncio.gather(*[query_arxiv(httpx_client, q, params.n_results) for q in params.queries], return_exceptions=True)
log_gathered_exceptions(results, "arxiv search", params)
filtered_results = [r for r in results if not isinstance(r, Exception)]
flattened_results = [
item for sublist in filtered_results for item in sublist]
if len(filtered_results) == 0:
return SerpResults(results=[], error=str(results[-1]))
return SerpResults(results=flattened_results, error=None)
@serp_router.post("/search_patents")
async def search_patents(params: SerpQuery) -> SerpResults:
"""Searches google patents for the specified queries and returns the found documents."""
logging.info(f"Searching Google Patents for queries: {params.queries}")
results = await asyncio.gather(*[query_google_patents(pw_browser, q, params.n_results) for q in params.queries], return_exceptions=True)
log_gathered_exceptions(results, "google patent search", params)
# Filter out exceptions and flatten the results
filtered_results = [r for r in results if not isinstance(r, Exception)]
flattened_results = [
item for sublist in filtered_results for item in sublist]
# all queries failed, return the last exception
if len(filtered_results) == 0:
return SerpResults(results=[], error=str(results[-1]))
return SerpResults(results=flattened_results, error=None)
@serp_router.post("/search_brave")
async def search_brave(params: SerpQuery) -> SerpResults:
"""Searches brave search for the specified queries and returns the found documents."""
logging.info(f"Searching Brave Search for queries: {params.queries}")
results = await asyncio.gather(*[query_brave_search(pw_browser, q, params.n_results) for q in params.queries], return_exceptions=True)
log_gathered_exceptions(results, "brave search", params)
# Filter out exceptions and flatten the results
filtered_results = [r for r in results if not isinstance(r, Exception)]
flattened_results = [
item for sublist in filtered_results for item in sublist]
# all queries failed, return the last exception
if len(filtered_results) == 0:
return SerpResults(results=[], error=str(results[-1]))
return SerpResults(results=flattened_results, error=None)
@serp_router.post("/search_bing")
async def search_bing(params: SerpQuery) -> SerpResults:
"""Searches Bing search for the specified queries and returns the found documents."""
logging.info(f"Searching Bing Search for queries: {params.queries}")
results = await asyncio.gather(*[query_bing_search(pw_browser, q, params.n_results) for q in params.queries], return_exceptions=True)
log_gathered_exceptions(results, "bing search", params)
# Filter out exceptions and flatten the results
filtered_results = [r for r in results if not isinstance(r, Exception)]
flattened_results = [
item for sublist in filtered_results for item in sublist]
# all queries failed, return the last exception
if len(filtered_results) == 0:
return SerpResults(results=[], error=str(results[-1]))
return SerpResults(results=flattened_results, error=None)
@serp_router.post("/search_duck")
async def search_duck(params: SerpQuery) -> SerpResults:
"""Searches duckduckgo for the specified queries and returns the found documents"""
logging.info(f"Searching DuckDuckGo for queries: {params.queries}")
results = await asyncio.gather(*[query_ddg_search(q, params.n_results) for q in params.queries], return_exceptions=True)
log_gathered_exceptions(results, "duckduckgo search", params)
# Filter out exceptions and flatten the results
filtered_results = [r for r in results if not isinstance(r, Exception)]
flattened_results = [
item for sublist in filtered_results for item in sublist]
# all queries failed, return the last exception
if len(filtered_results) == 0:
return SerpResults(results=[], error=str(results[-1]))
return SerpResults(results=flattened_results, error=None)
@serp_router.post("/search")
async def search(params: SerpQuery):
"""Attempts to search the specified queries using ALL backends"""
results = []
for q in params.queries:
try:
logging.info(f"Querying DDG with query: `{q}`")
res = await query_ddg_search(q, params.n_results)
results.extend(res)
continue
except Exception as e:
logging.error(f"Failed to query DDG with query `{q}`: {e}")
logging.info("Trying with next browser backend.")
try:
logging.info(f"Querying Brave Search with query: `{q}`")
res = await query_brave_search(pw_browser, q, params.n_results)
results.extend(res)
continue
except Exception as e:
logging.error(
f"Failed to query Brave Search with query `{q}`: {e}")
logging.info("Trying with next browser backend.")
try:
logging.info(f"Querying Bing with query: `{q}`")
res = await query_bing_search(pw_browser, q, params.n_results)
results.extend(res)
continue
except Exception as e:
logging.error(f"Failed to query Bing search with query `{q}`: {e}")
logging.info("Trying with next browser backend.")
if len(results) == 0:
return SerpResults(results=[], error="All backends are rate-limited.")
return SerpResults(results=results, error=None)
# =========================== Scrapping endpoints ===========================
# TODO: return a proper error response if the patent is not found or scrapping fails
@scrap_router.get("/scrap_patent/{patent_id}")
async def scrap_patent(patent_id: str):
"""Scraps the specified patent from Google Patents."""
try:
patent = await scrap_patent_async(httpx_client, f"https://patents.google.com/patent/{patent_id}/en")
return patent
except Exception as e:
logging.warning(f"Failed to scrap patent {patent_id}: {e}")
return None
class ScrapPatentsRequest(BaseModel):
"""Request model for scrapping multiple patents."""
patent_ids: list[str] = Field(...,
description="List of patent IDs to scrap")
@scrap_router.post("/scrap_patents_bulk", response_model=PatentScrapBulkResponse)
async def scrap_patents(params: ScrapPatentsRequest) -> PatentScrapBulkResponse:
"""Scraps multiple patents from Google Patents."""
patents = await scrap_patent_bulk_async(httpx_client, params.patent_ids)
return patents
# ===============================================================================
app.include_router(serp_router)
app.include_router(scrap_router)
uvicorn.run(app, host="0.0.0.0", port=7860)