Spaces:
Running
Running
import aiohttp | |
import certifi | |
import os | |
import ssl | |
from typing import Dict, Any | |
SERPER_API_KEY = str.strip(os.getenv("SERPER_API_KEY", "")) | |
AIOHTTP_TIMEOUT = int(os.getenv("AIOHTTP_TIMEOUT", "15")) | |
if not SERPER_API_KEY: | |
raise ValueError("SERPER_API_KEY environment variable is not set.") | |
async def google(q: str, results: int = 5) -> Dict[str, Any]: | |
url = "https://google.serper.dev/search" | |
return await fetch_json(url, { | |
"q": q, | |
"num": results, | |
"page": 1, | |
}) | |
async def fetch_json(url: str, payload: dict) -> Dict[str, Any]: | |
headers = { | |
'X-API-KEY': SERPER_API_KEY, | |
'Content-Type': 'application/json' | |
} | |
ssl_context = ssl.create_default_context(cafile=certifi.where()) | |
connector = aiohttp.TCPConnector(ssl=ssl_context) | |
timeout = aiohttp.ClientTimeout(total=AIOHTTP_TIMEOUT) | |
async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session: | |
async with session.post(url, headers=headers, json=payload) as response: | |
response.raise_for_status() | |
return await response.json() | |