rag-mcp-server / web_search.py
frascuchon's picture
frascuchon HF Staff
add source code
a854edf
import aiohttp
import certifi
import os
import ssl
from typing import Dict, Any
SERPER_API_KEY = str.strip(os.getenv("SERPER_API_KEY", ""))
AIOHTTP_TIMEOUT = int(os.getenv("AIOHTTP_TIMEOUT", "15"))
if not SERPER_API_KEY:
raise ValueError("SERPER_API_KEY environment variable is not set.")
async def google(q: str, results: int = 5) -> Dict[str, Any]:
url = "https://google.serper.dev/search"
return await fetch_json(url, {
"q": q,
"num": results,
"page": 1,
})
async def fetch_json(url: str, payload: dict) -> Dict[str, Any]:
headers = {
'X-API-KEY': SERPER_API_KEY,
'Content-Type': 'application/json'
}
ssl_context = ssl.create_default_context(cafile=certifi.where())
connector = aiohttp.TCPConnector(ssl=ssl_context)
timeout = aiohttp.ClientTimeout(total=AIOHTTP_TIMEOUT)
async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
async with session.post(url, headers=headers, json=payload) as response:
response.raise_for_status()
return await response.json()