File size: 3,857 Bytes
d907837 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
from abc import ABC, ABCMeta, abstractmethod
from contextlib import asynccontextmanager
import logging
from typing import Literal, Optional
from httpx import AsyncClient
from pydantic import BaseModel, Field
from playwright.async_api import Browser, BrowserContext, Page
from asyncio import Semaphore
# ========================== Schemas ==========================
class SerpQuery(BaseModel):
"""Model for SERP query"""
query: str = Field(
..., description="The query to search for")
n_results: int = Field(
10, description="Number of results to return for each query. Valid values are 10, 25, 50 and 100")
sort_by: Literal["relevance",
"date"] = Field(default="relevance", description="How to sort search results.")
class SerpResultItem(BaseModel):
"""Model for a single SERP result item"""
title: str = Field(..., description="Title of the search result")
href: str = Field(..., description="URL of the search result")
body: Optional[str] = Field(
None, description="Snippet of the search result")
content_slug: Optional[str] = Field(
None, description="Content slug of the search result. A slug that encodes the content type and URL that can be used to fetch the full content later")
class Config:
extra = "allow" # Allow additional fields in the result item
# =============================== Base classes ===============================
class SERPBackendBase(ABC):
"""Base class for SERP scrapping backends"""
def __init__(self):
pass
@property
@abstractmethod
def name(self) -> str:
"""Name of the backend. Used for identification in slugs"""
pass
@property
@abstractmethod
def category(self) -> Literal["general", "patent", "scholar"]:
"""Content category that the backend provides. Used for search_auto """
pass
@abstractmethod
async def query(self, query: SerpQuery, client: AsyncClient) -> list[SerpResultItem]:
"""Perform a SERP query and return results"""
pass
class PlaywrightSerpBackendBase(SERPBackendBase):
"""Base class for SERP scrapping backends using Playwright"""
def __init__(self):
pass
async def query(self, query: SerpQuery, client: AsyncClient) -> list[SerpResultItem]:
"""Perform a SERP query and return results using Playwright"""
raise NotImplementedError("query_page method must be used instead")
@abstractmethod
async def query_serp_page(self, browser: Browser, query: SerpQuery) -> list[SerpResultItem]:
"""Perform a SERP query using Playwright and return results"""
pass
async def query_serp_backend(backend: SERPBackendBase, query: SerpQuery, client: AsyncClient, browser: Browser) -> list[SerpResultItem]:
"""Queries the given backend with the given SERP query."""
logging.info(f"Querying {backend.name} with {query}")
if isinstance(backend, PlaywrightSerpBackendBase):
return await backend.query_serp_page(browser, query)
else:
return await backend.query(query, client)
def get_backends_doc(backends: list[SERPBackendBase]) -> str:
"""Retrieves all the available backends and builds a list for doc"""
doc_str = "### Available SERP Backends \n\n\n "
for backend in backends:
doc_str += f" \n\n `{backend.name}` - category: `{backend.category}`"
return doc_str
@asynccontextmanager
async def playwright_open_page(browser: Browser, sema: Semaphore):
"""Context manager for playwright pages"""
# Acquire the concurrency semaphore
await sema.acquire()
context: BrowserContext = await browser.new_context()
page: Page = await context.new_page()
try:
yield page
finally:
await page.close()
await context.close()
sema.release()
|