Spaces:
Running
Running
import asyncio | |
from contextlib import asynccontextmanager | |
from typing import Optional | |
from fastapi import APIRouter, FastAPI | |
from fastapi.routing import APIRouter | |
import httpx | |
from pydantic import BaseModel, Field | |
from playwright.async_api import async_playwright, Browser, BrowserContext, Page | |
import logging | |
import uvicorn | |
from scrap import PatentScrapBulkResponse, scrap_patent_async, scrap_patent_bulk_async | |
from serp import SerpQuery, SerpResults, query_arxiv, query_bing_search, query_brave_search, query_ddg_search, query_google_patents, query_google_scholar | |
from utils import log_gathered_exceptions | |
logging.basicConfig( | |
level=logging.INFO, | |
format='[%(asctime)s][%(levelname)s][%(filename)s:%(lineno)d]: %(message)s', | |
datefmt='%Y-%m-%d %H:%M:%S' | |
) | |
# playwright global context | |
playwright = None | |
pw_browser: Optional[Browser] = None | |
# httpx client | |
httpx_client = httpx.AsyncClient(timeout=30, limits=httpx.Limits( | |
max_connections=30, max_keepalive_connections=20)) | |
async def api_lifespan(app: FastAPI): | |
global playwright, pw_browser | |
playwright = await async_playwright().start() | |
pw_browser = await playwright.chromium.launch(headless=True) | |
yield | |
await pw_browser.close() | |
await playwright.stop() | |
app = FastAPI(lifespan=api_lifespan, docs_url="/", | |
title="SERPent", description=open("docs/docs.md").read()) | |
# Router for scrapping related endpoints | |
scrap_router = APIRouter(prefix="/scrap", tags=["scrapping"]) | |
# Router for SERP-scrapping related endpoints | |
serp_router = APIRouter(prefix="/serp", tags=["serp scrapping"]) | |
# ===================== Search endpoints ===================== | |
async def search_google_scholar(params: SerpQuery): | |
"""Queries google scholar for the specified query""" | |
logging.info(f"Searching Google Scholar for queries: {params.queries}") | |
results = await asyncio.gather(*[query_google_scholar(pw_browser, q, params.n_results) for q in params.queries], return_exceptions=True) | |
log_gathered_exceptions(results, "google scholar search", params) | |
# Filter out exceptions and flatten the results | |
filtered_results = [r for r in results if not isinstance(r, Exception)] | |
flattened_results = [ | |
item for sublist in filtered_results for item in sublist] | |
# all queries failed, return the last exception | |
if len(filtered_results) == 0: | |
return SerpResults(results=[], error=str(results[-1])) | |
return SerpResults(results=flattened_results, error=None) | |
async def search_arxiv(params: SerpQuery): | |
"""Searches arxiv for the specified queries and returns the found documents.""" | |
logging.info(f"Searching Arxiv for queries: {params.queries}") | |
results = await asyncio.gather(*[query_arxiv(httpx_client, q, params.n_results) for q in params.queries], return_exceptions=True) | |
log_gathered_exceptions(results, "arxiv search", params) | |
filtered_results = [r for r in results if not isinstance(r, Exception)] | |
flattened_results = [ | |
item for sublist in filtered_results for item in sublist] | |
if len(filtered_results) == 0: | |
return SerpResults(results=[], error=str(results[-1])) | |
return SerpResults(results=flattened_results, error=None) | |
async def search_patents(params: SerpQuery) -> SerpResults: | |
"""Searches google patents for the specified queries and returns the found documents.""" | |
logging.info(f"Searching Google Patents for queries: {params.queries}") | |
results = await asyncio.gather(*[query_google_patents(pw_browser, q, params.n_results) for q in params.queries], return_exceptions=True) | |
log_gathered_exceptions(results, "google patent search", params) | |
# Filter out exceptions and flatten the results | |
filtered_results = [r for r in results if not isinstance(r, Exception)] | |
flattened_results = [ | |
item for sublist in filtered_results for item in sublist] | |
# all queries failed, return the last exception | |
if len(filtered_results) == 0: | |
return SerpResults(results=[], error=str(results[-1])) | |
return SerpResults(results=flattened_results, error=None) | |
async def search_brave(params: SerpQuery) -> SerpResults: | |
"""Searches brave search for the specified queries and returns the found documents.""" | |
logging.info(f"Searching Brave Search for queries: {params.queries}") | |
results = await asyncio.gather(*[query_brave_search(pw_browser, q, params.n_results) for q in params.queries], return_exceptions=True) | |
log_gathered_exceptions(results, "brave search", params) | |
# Filter out exceptions and flatten the results | |
filtered_results = [r for r in results if not isinstance(r, Exception)] | |
flattened_results = [ | |
item for sublist in filtered_results for item in sublist] | |
# all queries failed, return the last exception | |
if len(filtered_results) == 0: | |
return SerpResults(results=[], error=str(results[-1])) | |
return SerpResults(results=flattened_results, error=None) | |
async def search_bing(params: SerpQuery) -> SerpResults: | |
"""Searches Bing search for the specified queries and returns the found documents.""" | |
logging.info(f"Searching Bing Search for queries: {params.queries}") | |
results = await asyncio.gather(*[query_bing_search(pw_browser, q, params.n_results) for q in params.queries], return_exceptions=True) | |
log_gathered_exceptions(results, "bing search", params) | |
# Filter out exceptions and flatten the results | |
filtered_results = [r for r in results if not isinstance(r, Exception)] | |
flattened_results = [ | |
item for sublist in filtered_results for item in sublist] | |
# all queries failed, return the last exception | |
if len(filtered_results) == 0: | |
return SerpResults(results=[], error=str(results[-1])) | |
return SerpResults(results=flattened_results, error=None) | |
async def search_duck(params: SerpQuery) -> SerpResults: | |
"""Searches duckduckgo for the specified queries and returns the found documents""" | |
logging.info(f"Searching DuckDuckGo for queries: {params.queries}") | |
results = await asyncio.gather(*[query_ddg_search(q, params.n_results) for q in params.queries], return_exceptions=True) | |
log_gathered_exceptions(results, "duckduckgo search", params) | |
# Filter out exceptions and flatten the results | |
filtered_results = [r for r in results if not isinstance(r, Exception)] | |
flattened_results = [ | |
item for sublist in filtered_results for item in sublist] | |
# all queries failed, return the last exception | |
if len(filtered_results) == 0: | |
return SerpResults(results=[], error=str(results[-1])) | |
return SerpResults(results=flattened_results, error=None) | |
async def search(params: SerpQuery): | |
"""Attempts to search the specified queries using ALL backends""" | |
results = [] | |
for q in params.queries: | |
try: | |
logging.info(f"Querying DDG with query: `{q}`") | |
res = await query_ddg_search(q, params.n_results) | |
results.extend(res) | |
continue | |
except Exception as e: | |
logging.error(f"Failed to query DDG with query `{q}`: {e}") | |
logging.info("Trying with next browser backend.") | |
try: | |
logging.info(f"Querying Brave Search with query: `{q}`") | |
res = await query_brave_search(pw_browser, q, params.n_results) | |
results.extend(res) | |
continue | |
except Exception as e: | |
logging.error( | |
f"Failed to query Brave Search with query `{q}`: {e}") | |
logging.info("Trying with next browser backend.") | |
try: | |
logging.info(f"Querying Bing with query: `{q}`") | |
res = await query_bing_search(pw_browser, q, params.n_results) | |
results.extend(res) | |
continue | |
except Exception as e: | |
logging.error(f"Failed to query Bing search with query `{q}`: {e}") | |
logging.info("Trying with next browser backend.") | |
if len(results) == 0: | |
return SerpResults(results=[], error="All backends are rate-limited.") | |
return SerpResults(results=results, error=None) | |
# =========================== Scrapping endpoints =========================== | |
# TODO: return a proper error response if the patent is not found or scrapping fails | |
async def scrap_patent(patent_id: str): | |
"""Scraps the specified patent from Google Patents.""" | |
try: | |
patent = await scrap_patent_async(httpx_client, f"https://patents.google.com/patent/{patent_id}/en") | |
return patent | |
except Exception as e: | |
logging.warning(f"Failed to scrap patent {patent_id}: {e}") | |
return None | |
class ScrapPatentsRequest(BaseModel): | |
"""Request model for scrapping multiple patents.""" | |
patent_ids: list[str] = Field(..., | |
description="List of patent IDs to scrap") | |
async def scrap_patents(params: ScrapPatentsRequest) -> PatentScrapBulkResponse: | |
"""Scraps multiple patents from Google Patents.""" | |
patents = await scrap_patent_bulk_async(httpx_client, params.patent_ids) | |
return patents | |
# =============================================================================== | |
app.include_router(serp_router) | |
app.include_router(scrap_router) | |
uvicorn.run(app, host="0.0.0.0", port=7860) | |