Spaces:
Runtime error
Runtime error
import base64 | |
from typing import Dict, Optional | |
from urllib.parse import quote | |
import aiohttp | |
import requests | |
from langchain_core.pydantic_v1 import BaseModel, Extra, Field, root_validator | |
from langchain.utils import get_from_dict_or_env | |
class DataForSeoAPIWrapper(BaseModel): | |
"""Wrapper around the DataForSeo API.""" | |
class Config: | |
"""Configuration for this pydantic object.""" | |
extra = Extra.forbid | |
arbitrary_types_allowed = True | |
default_params: dict = Field( | |
default={ | |
"location_name": "United States", | |
"language_code": "en", | |
"depth": 10, | |
"se_name": "google", | |
"se_type": "organic", | |
} | |
) | |
"""Default parameters to use for the DataForSEO SERP API.""" | |
params: dict = Field(default={}) | |
"""Additional parameters to pass to the DataForSEO SERP API.""" | |
api_login: Optional[str] = None | |
"""The API login to use for the DataForSEO SERP API.""" | |
api_password: Optional[str] = None | |
"""The API password to use for the DataForSEO SERP API.""" | |
json_result_types: Optional[list] = None | |
"""The JSON result types.""" | |
json_result_fields: Optional[list] = None | |
"""The JSON result fields.""" | |
top_count: Optional[int] = None | |
"""The number of top results to return.""" | |
aiosession: Optional[aiohttp.ClientSession] = None | |
"""The aiohttp session to use for the DataForSEO SERP API.""" | |
def validate_environment(cls, values: Dict) -> Dict: | |
"""Validate that login and password exists in environment.""" | |
login = get_from_dict_or_env(values, "api_login", "DATAFORSEO_LOGIN") | |
password = get_from_dict_or_env(values, "api_password", "DATAFORSEO_PASSWORD") | |
values["api_login"] = login | |
values["api_password"] = password | |
return values | |
async def arun(self, url: str) -> str: | |
"""Run request to DataForSEO SERP API and parse result async.""" | |
return self._process_response(await self._aresponse_json(url)) | |
def run(self, url: str) -> str: | |
"""Run request to DataForSEO SERP API and parse result async.""" | |
return self._process_response(self._response_json(url)) | |
def results(self, url: str) -> list: | |
res = self._response_json(url) | |
return self._filter_results(res) | |
async def aresults(self, url: str) -> list: | |
res = await self._aresponse_json(url) | |
return self._filter_results(res) | |
def _prepare_request(self, keyword: str) -> dict: | |
"""Prepare the request details for the DataForSEO SERP API.""" | |
if self.api_login is None or self.api_password is None: | |
raise ValueError("api_login or api_password is not provided") | |
cred = base64.b64encode( | |
f"{self.api_login}:{self.api_password}".encode("utf-8") | |
).decode("utf-8") | |
headers = {"Authorization": f"Basic {cred}", "Content-Type": "application/json"} | |
obj = {"keyword": quote(keyword)} | |
obj = {**obj, **self.default_params, **self.params} | |
data = [obj] | |
_url = ( | |
f"https://api.dataforseo.com/v3/serp/{obj['se_name']}" | |
f"/{obj['se_type']}/live/advanced" | |
) | |
return { | |
"url": _url, | |
"headers": headers, | |
"data": data, | |
} | |
def _check_response(self, response: dict) -> dict: | |
"""Check the response from the DataForSEO SERP API for errors.""" | |
if response.get("status_code") != 20000: | |
raise ValueError( | |
f"Got error from DataForSEO SERP API: {response.get('status_message')}" | |
) | |
return response | |
def _response_json(self, url: str) -> dict: | |
"""Use requests to run request to DataForSEO SERP API and return results.""" | |
request_details = self._prepare_request(url) | |
response = requests.post( | |
request_details["url"], | |
headers=request_details["headers"], | |
json=request_details["data"], | |
) | |
response.raise_for_status() | |
return self._check_response(response.json()) | |
async def _aresponse_json(self, url: str) -> dict: | |
"""Use aiohttp to request DataForSEO SERP API and return results async.""" | |
request_details = self._prepare_request(url) | |
if not self.aiosession: | |
async with aiohttp.ClientSession() as session: | |
async with session.post( | |
request_details["url"], | |
headers=request_details["headers"], | |
json=request_details["data"], | |
) as response: | |
res = await response.json() | |
else: | |
async with self.aiosession.post( | |
request_details["url"], | |
headers=request_details["headers"], | |
json=request_details["data"], | |
) as response: | |
res = await response.json() | |
return self._check_response(res) | |
def _filter_results(self, res: dict) -> list: | |
output = [] | |
types = self.json_result_types if self.json_result_types is not None else [] | |
for task in res.get("tasks", []): | |
for result in task.get("result", []): | |
for item in result.get("items", []): | |
if len(types) == 0 or item.get("type", "") in types: | |
self._cleanup_unnecessary_items(item) | |
if len(item) != 0: | |
output.append(item) | |
if self.top_count is not None and len(output) >= self.top_count: | |
break | |
return output | |
def _cleanup_unnecessary_items(self, d: dict) -> dict: | |
fields = self.json_result_fields if self.json_result_fields is not None else [] | |
if len(fields) > 0: | |
for k, v in list(d.items()): | |
if isinstance(v, dict): | |
self._cleanup_unnecessary_items(v) | |
if len(v) == 0: | |
del d[k] | |
elif k not in fields: | |
del d[k] | |
if "xpath" in d: | |
del d["xpath"] | |
if "position" in d: | |
del d["position"] | |
if "rectangle" in d: | |
del d["rectangle"] | |
for k, v in list(d.items()): | |
if isinstance(v, dict): | |
self._cleanup_unnecessary_items(v) | |
return d | |
def _process_response(self, res: dict) -> str: | |
"""Process response from DataForSEO SERP API.""" | |
toret = "No good search result found" | |
for task in res.get("tasks", []): | |
for result in task.get("result", []): | |
item_types = result.get("item_types") | |
items = result.get("items", []) | |
if "answer_box" in item_types: | |
toret = next( | |
item for item in items if item.get("type") == "answer_box" | |
).get("text") | |
elif "knowledge_graph" in item_types: | |
toret = next( | |
item for item in items if item.get("type") == "knowledge_graph" | |
).get("description") | |
elif "featured_snippet" in item_types: | |
toret = next( | |
item for item in items if item.get("type") == "featured_snippet" | |
).get("description") | |
elif "shopping" in item_types: | |
toret = next( | |
item for item in items if item.get("type") == "shopping" | |
).get("price") | |
elif "organic" in item_types: | |
toret = next( | |
item for item in items if item.get("type") == "organic" | |
).get("description") | |
if toret: | |
break | |
return toret | |