Spaces:
DIVY118
/
Build error

API2 / RT.py
DIVY118's picture
Update RT.py
248bd24 verified
#VAR
MAX_RESULTS = 10
import asyncio
import logging
import warnings
from typing import Dict, Generator, Optional
import nest_asyncio
import json
import logging
import sys
from collections import deque
from datetime import datetime, timezone
from decimal import Decimal
from itertools import cycle
from typing import AsyncGenerator, Deque, Dict, Optional, Set, Tuple
from curl_cffi import requests
from docstring_inheritance import GoogleDocstringInheritanceMeta
from lxml import html
import json
import re
from html import unescape
from typing import Optional
from urllib.parse import unquote
from dataclasses import dataclass
from typing import Dict, Optional
from random import randint
class DuckDuckGoSearchException(Exception):
""""""
@dataclass
class MapsResult:
"""Represents a result from the maps search."""
title: Optional[str] = None
address: Optional[str] = None
country_code: Optional[str] = None
latitude: Optional[str] = None
longitude: Optional[str] = None
url: Optional[str] = None
desc: Optional[str] = None
phone: Optional[str] = None
image: Optional[str] = None
source: Optional[str] = None
hours: Optional[Dict[str, str]] = None
category: Optional[str] = None
facebook: Optional[str] = None
instagram: Optional[str] = None
twitter: Optional[str] = None
REGEX_500_IN_URL = re.compile(r"(?:\d{3}-\d{2}\.js)")
REGEX_STRIP_TAGS = re.compile("<.*?>")
REGEX_VQD = re.compile(rb"""vqd=['"]?([^&"']+)""")
def _extract_vqd(html_bytes: bytes, keywords: str) -> Optional[str]:
"""Extract vqd from html using a regular expression."""
try:
match = REGEX_VQD.search(html_bytes)
if match:
return match.group(1).decode()
except Exception:
pass
raise DuckDuckGoSearchException(
f"_extract_vqd() {keywords=} Could not extract vqd.")
def _text_extract_json(html_bytes: bytes, keywords: str) -> Optional[str]:
"""text(backend="api") -> extract json from html."""
try:
start = html_bytes.index(b"DDG.pageLayout.load('d',") + 24
end = html_bytes.index(b");DDG.duckbar.load(", start)
data = html_bytes[start:end]
return json.loads(data)
except Exception as ex:
raise DuckDuckGoSearchException(
f"_text_extract_json() {keywords=} {type(ex).__name__}: {ex}") from ex
def _is_500_in_url(url: str) -> bool:
"""Something like '506-00.js' inside the url."""
return bool(REGEX_500_IN_URL.search(url))
def _normalize(raw_html: str) -> str:
"""Strip HTML tags from the raw_html string."""
return unescape(REGEX_STRIP_TAGS.sub("", raw_html)) if raw_html else ""
def _normalize_url(url: str) -> str:
"""Unquote URL and replace spaces with '+'."""
return unquote(url.replace(" ", "+")) if url else ""
logger = logging.getLogger("duckduckgo_search.AsyncDDGS")
# Not working on Windows, NotImplementedError (https://curl-cffi.readthedocs.io/en/latest/faq/)
if sys.platform.lower().startswith("win"):
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
class AsyncDDGS(metaclass=GoogleDocstringInheritanceMeta):
"""webscout_search async class to get search results from duckduckgo.com."""
def __init__(self, headers=None, proxies=None, timeout=10) -> None:
"""Initialize the AsyncDDGS object.
Args:
headers (dict, optional): Dictionary of headers for the HTTP client. Defaults to None.
proxies (Union[dict, str], optional): Proxies for the HTTP client (can be dict or str). Defaults to None.
timeout (int, optional): Timeout value for the HTTP client. Defaults to 10.
"""
useragent = f'{randint(0, 1000000)}'
headers = {'User-Agent': useragent}
self.proxies = proxies if proxies and isinstance(proxies, dict) else {
"http": proxies,
"https": proxies
}
self._asession = requests.AsyncSession(headers=headers,
proxies=self.proxies,
timeout=timeout,
impersonate="chrome")
self._asession.headers["Referer"] = "https://duckduckgo.com/"
async def __aenter__(self) -> "AsyncDDGS":
"""A context manager method that is called when entering the 'with' statement."""
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
"""Closes the session."""
return self._asession.close()
async def _aget_url(self, method: str, url: str,
**kwargs) -> Optional[requests.Response]:
try:
useragent = f'{randint(0, 1000000)}'
headers = {'User-Agent': useragent}
resp = await self._asession.request(method,
url,
stream=True,
**kwargs,
headers=headers)
resp.raise_for_status()
resp_content = await resp.acontent()
logger.debug(
f"_aget_url() {url} {resp.status_code} {resp.http_version} {resp.elapsed} {len(resp_content)}"
)
if _is_500_in_url(str(resp.url)) or resp.status_code == 202:
raise DuckDuckGoSearchException("Ratelimit")
if resp.status_code == 200:
return resp_content
except Exception as ex:
raise DuckDuckGoSearchException(
f"_aget_url() {url} {type(ex).__name__}: {ex}") from ex
async def _aget_vqd(self, keywords: str) -> Optional[str]:
"""Get vqd value for a search query."""
resp_content = await self._aget_url("POST",
"https://duckduckgo.com",
data={"q": keywords})
if resp_content:
return _extract_vqd(resp_content, keywords)
async def text(
self,
keywords: str,
region: str = "wt-wt",
safesearch: str = "moderate",
timelimit: Optional[str] = None,
backend: str = "api",
max_results: Optional[int] = None,
) -> AsyncGenerator[Dict[str, Optional[str]], None]:
"""DuckDuckGo text search generator. Query params: https://duckduckgo.com/params.
Args:
keywords: keywords for query.
region: wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt".
safesearch: on, moderate, off. Defaults to "moderate".
timelimit: d, w, m, y. Defaults to None.
backend: api, html, lite. Defaults to api.
api - collect data from https://duckduckgo.com,
html - collect data from https://html.duckduckgo.com,
lite - collect data from https://lite.duckduckgo.com.
max_results: max number of results. If None, returns results only from the first response. Defaults to None.
Yields:
dict with search results.
"""
if backend == "api":
results = self._text_api(keywords, region, safesearch, timelimit,
max_results)
elif backend == "html":
results = self._text_html(keywords, region, safesearch, timelimit,
max_results)
elif backend == "lite":
results = self._text_lite(keywords, region, timelimit, max_results)
async for result in results:
yield result
async def _text_api(
self,
keywords: str,
region: str = "wt-wt",
safesearch: str = "moderate",
timelimit: Optional[str] = None,
max_results: Optional[int] = None,
) -> AsyncGenerator[Dict[str, Optional[str]], None]:
"""webscout text search generator. Query params: https://duckduckgo.com/params.
Args:
keywords: keywords for query.
region: wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt".
safesearch: on, moderate, off. Defaults to "moderate".
timelimit: d, w, m, y. Defaults to None.
max_results: max number of results. If None, returns results only from the first response. Defaults to None.
Yields:
dict with search results.
"""
assert keywords, "keywords is mandatory"
vqd = await self._aget_vqd(keywords)
payload = {
"q": keywords,
"kl": region,
"l": region,
"bing_market": region,
"s": "0",
"df": timelimit,
"vqd": vqd,
# "o": "json",
"sp": "0",
}
safesearch = safesearch.lower()
if safesearch == "moderate":
payload["ex"] = "-1"
elif safesearch == "off":
payload["ex"] = "-2"
elif safesearch == "on": # strict
payload["p"] = "1"
cache = set()
for _ in range(11):
resp_content = await self._aget_url("GET",
"https://links.duckduckgo.com/d.js",
params=payload)
if resp_content is None:
return
page_data = _text_extract_json(resp_content, keywords)
if page_data is None:
return
result_exists, next_page_url = False, None
for row in page_data:
href = row.get("u", None)
if href and href not in cache and href != f"http://www.google.com/search?q={keywords}":
cache.add(href)
body = _normalize(row["a"])
if body:
result_exists = True
yield {
"title": _normalize(row["t"]),
"href": _normalize_url(href),
"body": body,
}
if max_results and len(cache) >= max_results:
return
else:
next_page_url = row.get("n", None)
if max_results is None or result_exists is False or next_page_url is None:
return
payload["s"] = next_page_url.split("s=")[1].split("&")[0]
async def _text_html(
self,
keywords: str,
region: str = "wt-wt",
safesearch: str = "moderate",
timelimit: Optional[str] = None,
max_results: Optional[int] = None,
) -> AsyncGenerator[Dict[str, Optional[str]], None]:
"""webscout text search generator. Query params: https://duckduckgo.com/params.
Args:
keywords: keywords for query.
region: wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt".
safesearch: on, moderate, off. Defaults to "moderate".
timelimit: d, w, m, y. Defaults to None.
max_results: max number of results. If None, returns results only from the first response. Defaults to None.
Yields:
dict with search results.
"""
assert keywords, "keywords is mandatory"
self._asession.headers["Referer"] = "https://html.duckduckgo.com/"
safesearch_base = {"on": 1, "moderate": -1, "off": -2}
payload = {
"q": keywords,
"s": "0",
"kl": region,
"p": safesearch_base[safesearch.lower()],
"df": timelimit,
}
cache: Set[str] = set()
for _ in range(11):
resp_content = await self._aget_url("POST",
"https://html.duckduckgo.com/html",
data=payload)
if resp_content is None:
return
tree = html.fromstring(resp_content)
if tree.xpath('//div[@class="no-results"]/text()'):
return
result_exists = False
for e in tree.xpath('//div[contains(@class, "results_links")]'):
href = e.xpath('.//a[contains(@class, "result__a")]/@href')
href = href[0] if href else None
if (href and href not in cache
and href != f"http://www.google.com/search?q={keywords}"
and not href.startswith("https://duckduckgo.com/y.js?ad_domain")):
cache.add(href)
title = e.xpath('.//a[contains(@class, "result__a")]/text()')
body = e.xpath('.//a[contains(@class, "result__snippet")]//text()')
result_exists = True
yield {
"title": _normalize(title[0]) if title else None,
"href": _normalize_url(href),
"body": _normalize("".join(body)) if body else None,
}
if max_results and len(cache) >= max_results:
return
if max_results is None or result_exists is False:
return
next_page = tree.xpath('.//div[@class="nav-link"]')
next_page = next_page[-1] if next_page else None
if next_page is None:
return
names = next_page.xpath('.//input[@type="hidden"]/@name')
values = next_page.xpath('.//input[@type="hidden"]/@value')
payload = {n: v for n, v in zip(names, values)}
async def _text_lite(
self,
keywords: str,
region: str = "wt-wt",
timelimit: Optional[str] = None,
max_results: Optional[int] = None,
) -> AsyncGenerator[Dict[str, Optional[str]], None]:
"""webscout text search generator. Query params: https://duckduckgo.com/params.
Args:
keywords: keywords for query.
region: wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt".
timelimit: d, w, m, y. Defaults to None.
max_results: max number of results. If None, returns results only from the first response. Defaults to None.
Yields:
dict with search results.
"""
assert keywords, "keywords is mandatory"
self._asession.headers["Referer"] = "https://lite.duckduckgo.com/"
payload = {
"q": keywords,
"s": "0",
"o": "json",
"api": "d.js",
"kl": region,
"df": timelimit,
}
cache: Set[str] = set()
for _ in range(11):
resp_content = await self._aget_url("POST",
"https://lite.duckduckgo.com/lite/",
data=payload)
if resp_content is None:
return
if b"No more results." in resp_content:
return
tree = html.fromstring(resp_content)
result_exists = False
data = zip(cycle(range(1, 5)), tree.xpath("//table[last()]//tr"))
for i, e in data:
if i == 1:
href = e.xpath(".//a//@href")
href = href[0] if href else None
if (href is None or href in cache
or href == f"http://www.google.com/search?q={keywords}"
or href.startswith("https://duckduckgo.com/y.js?ad_domain")):
[next(data, None) for _ in range(3)] # skip block(i=1,2,3,4)
else:
cache.add(href)
title = e.xpath(".//a//text()")[0]
elif i == 2:
body = e.xpath(".//td[@class='result-snippet']//text()")
body = "".join(body).strip()
elif i == 3:
result_exists = True
yield {
"title": _normalize(title),
"href": _normalize_url(href),
"body": _normalize(body),
}
if max_results and len(cache) >= max_results:
return
if max_results is None or result_exists is False:
return
next_page_s = tree.xpath(
"//form[./input[contains(@value, 'ext')]]/input[@name='s']/@value")
if not next_page_s:
return
payload["s"] = next_page_s[0]
payload["vqd"] = _extract_vqd(resp_content, keywords)
async def images(
self,
keywords: str,
region: str = "wt-wt",
safesearch: str = "moderate",
timelimit: Optional[str] = None,
size: Optional[str] = None,
color: Optional[str] = None,
type_image: Optional[str] = None,
layout: Optional[str] = None,
license_image: Optional[str] = None,
max_results: Optional[int] = None,
) -> AsyncGenerator[Dict[str, Optional[str]], None]:
"""webscout images search. Query params: https://duckduckgo.com/params.
Args:
keywords: keywords for query.
region: wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt".
safesearch: on, moderate, off. Defaults to "moderate".
timelimit: Day, Week, Month, Year. Defaults to None.
size: Small, Medium, Large, Wallpaper. Defaults to None.
color: color, Monochrome, Red, Orange, Yellow, Green, Blue,
Purple, Pink, Brown, Black, Gray, Teal, White. Defaults to None.
type_image: photo, clipart, gif, transparent, line.
Defaults to None.
layout: Square, Tall, Wide. Defaults to None.
license_image: any (All Creative Commons), Public (PublicDomain),
Share (Free to Share and Use), ShareCommercially (Free to Share and Use Commercially),
Modify (Free to Modify, Share, and Use), ModifyCommercially (Free to Modify, Share, and
Use Commercially). Defaults to None.
max_results: max number of results. If None, returns results only from the first response. Defaults to None.
Yields:
dict with image search results.
"""
assert keywords, "keywords is mandatory"
vqd = await self._aget_vqd(keywords)
safesearch_base = {"on": 1, "moderate": 1, "off": -1}
timelimit = f"time:{timelimit}" if timelimit else ""
size = f"size:{size}" if size else ""
color = f"color:{color}" if color else ""
type_image = f"type:{type_image}" if type_image else ""
layout = f"layout:{layout}" if layout else ""
license_image = f"license:{license_image}" if license_image else ""
payload = {
"l": region,
"o": "json",
"q": keywords,
"vqd": vqd,
"f":
f"{timelimit},{size},{color},{type_image},{layout},{license_image}",
"p": safesearch_base[safesearch.lower()],
}
cache = set()
for _ in range(10):
resp_content = await self._aget_url("GET",
"https://duckduckgo.com/i.js",
params=payload)
if resp_content is None:
return
try:
resp_json = json.loads(resp_content)
except Exception:
return
page_data = resp_json.get("results", None)
if page_data is None:
return
result_exists = False
for row in page_data:
image_url = row.get("image", None)
if image_url and image_url not in cache:
cache.add(image_url)
result_exists = True
yield {
"title": row["title"],
"image": _normalize_url(image_url),
"thumbnail": _normalize_url(row["thumbnail"]),
"url": _normalize_url(row["url"]),
"height": row["height"],
"width": row["width"],
"source": row["source"],
}
if max_results and len(cache) >= max_results:
return
if max_results is None or result_exists is False:
return
next = resp_json.get("next", None)
if next is None:
return
payload["s"] = next.split("s=")[-1].split("&")[0]
async def videos(
self,
keywords: str,
region: str = "wt-wt",
safesearch: str = "moderate",
timelimit: Optional[str] = None,
resolution: Optional[str] = None,
duration: Optional[str] = None,
license_videos: Optional[str] = None,
max_results: Optional[int] = None,
) -> AsyncGenerator[Dict[str, Optional[str]], None]:
"""webscout videos search. Query params: https://duckduckgo.com/params.
Args:
keywords: keywords for query.
region: wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt".
safesearch: on, moderate, off. Defaults to "moderate".
timelimit: d, w, m. Defaults to None.
resolution: high, standart. Defaults to None.
duration: short, medium, long. Defaults to None.
license_videos: creativeCommon, youtube. Defaults to None.
max_results: max number of results. If None, returns results only from the first response. Defaults to None.
Yields:
dict with videos search results
"""
assert keywords, "keywords is mandatory"
vqd = await self._aget_vqd(keywords)
safesearch_base = {"on": 1, "moderate": -1, "off": -2}
timelimit = f"publishedAfter:{timelimit}" if timelimit else ""
resolution = f"videoDefinition:{resolution}" if resolution else ""
duration = f"videoDuration:{duration}" if duration else ""
license_videos = f"videoLicense:{license_videos}" if license_videos else ""
payload = {
"l": region,
"o": "json",
"s": 0,
"q": keywords,
"vqd": vqd,
"f": f"{timelimit},{resolution},{duration},{license_videos}",
"p": safesearch_base[safesearch.lower()],
}
cache = set()
for _ in range(10):
resp_content = await self._aget_url("GET",
"https://duckduckgo.com/v.js",
params=payload)
if resp_content is None:
return
try:
resp_json = json.loads(resp_content)
except Exception:
return
page_data = resp_json.get("results", None)
if page_data is None:
return
result_exists = False
for row in page_data:
if row["content"] not in cache:
cache.add(row["content"])
result_exists = True
yield row
if max_results and len(cache) >= max_results:
return
if max_results is None or result_exists is False:
return
next = resp_json.get("next", None)
if next is None:
return
payload["s"] = next.split("s=")[-1].split("&")[0]
async def news(
self,
keywords: str,
region: str = "wt-wt",
safesearch: str = "moderate",
timelimit: Optional[str] = None,
max_results: Optional[int] = None,
) -> AsyncGenerator[Dict[str, Optional[str]], None]:
"""webscout news search. Query params: https://duckduckgo.com/params.
Args:
keywords: keywords for query.
region: wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt".
safesearch: on, moderate, off. Defaults to "moderate".
timelimit: d, w, m. Defaults to None.
max_results: max number of results. If None, returns results only from the first response. Defaults to None.
Yields:
dict with news search results.
"""
assert keywords, "keywords is mandatory"
vqd = await self._aget_vqd(keywords)
safesearch_base = {"on": 1, "moderate": -1, "off": -2}
payload = {
"l": region,
"o": "json",
"noamp": "1",
"q": keywords,
"vqd": vqd,
"p": safesearch_base[safesearch.lower()],
"df": timelimit,
"s": 0,
}
cache = set()
for _ in range(10):
resp_content = await self._aget_url("GET",
"https://duckduckgo.com/news.js",
params=payload)
if resp_content is None:
return
try:
resp_json = json.loads(resp_content)
except Exception:
return
page_data = resp_json.get("results", None)
if page_data is None:
return
result_exists = False
for row in page_data:
if row["url"] not in cache:
cache.add(row["url"])
image_url = row.get("image", None)
result_exists = True
yield {
"date": datetime.fromtimestamp(row["date"],
timezone.utc).isoformat(),
"title": row["title"],
"body": _normalize(row["excerpt"]),
"url": _normalize_url(row["url"]),
"image": _normalize_url(image_url) if image_url else None,
"source": row["source"],
}
if max_results and len(cache) >= max_results:
return
if max_results is None or result_exists is False:
return
next = resp_json.get("next", None)
if next is None:
return
payload["s"] = next.split("s=")[-1].split("&")[0]
async def answers(
self, keywords: str) -> AsyncGenerator[Dict[str, Optional[str]], None]:
"""webscout instant answers. Query params: https://duckduckgo.com/params.
Args:
keywords: keywords for query.
Yields:
dict with instant answers results.
"""
assert keywords, "keywords is mandatory"
payload = {
"q": f"what is {keywords}",
"format": "json",
}
resp_content = await self._aget_url("GET",
"https://api.duckduckgo.com/",
params=payload)
if resp_content is None:
yield None
try:
page_data = json.loads(resp_content)
except Exception:
page_data = None
if page_data:
answer = page_data.get("AbstractText", None)
url = page_data.get("AbstractURL", None)
if answer:
yield {
"icon": None,
"text": answer,
"topic": None,
"url": url,
}
# related:
payload = {
"q": f"{keywords}",
"format": "json",
}
resp_content = await self._aget_url("GET",
"https://api.duckduckgo.com/",
params=payload)
if resp_content is None:
yield None
try:
page_data = json.loads(resp_content).get("RelatedTopics", None)
except Exception:
page_data = None
if page_data:
for row in page_data:
topic = row.get("Name", None)
if not topic:
icon = row["Icon"].get("URL", None)
yield {
"icon": f"https://duckduckgo.com{icon}" if icon else None,
"text": row["Text"],
"topic": None,
"url": row["FirstURL"],
}
else:
for subrow in row["Topics"]:
icon = subrow["Icon"].get("URL", None)
yield {
"icon": f"https://duckduckgo.com{icon}" if icon else None,
"text": subrow["Text"],
"topic": topic,
"url": subrow["FirstURL"],
}
async def suggestions(
self,
keywords: str,
region: str = "wt-wt") -> AsyncGenerator[Dict[str, Optional[str]], None]:
"""webscout suggestions. Query params: https://duckduckgo.com/params.
Args:
keywords: keywords for query.
region: wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt".
Yields:
dict with suggestions results.
"""
assert keywords, "keywords is mandatory"
payload = {
"q": keywords,
"kl": region,
}
resp_content = await self._aget_url("GET",
"https://duckduckgo.com/ac",
params=payload)
if resp_content is None:
yield None
try:
page_data = json.loads(resp_content)
for r in page_data:
yield r
except Exception:
pass
async def maps(
self,
keywords: str,
place: Optional[str] = None,
street: Optional[str] = None,
city: Optional[str] = None,
county: Optional[str] = None,
state: Optional[str] = None,
country: Optional[str] = None,
postalcode: Optional[str] = None,
latitude: Optional[str] = None,
longitude: Optional[str] = None,
radius: int = 0,
max_results: Optional[int] = None,
) -> AsyncGenerator[Dict[str, Optional[str]], None]:
"""webscout maps search. Query params: https://duckduckgo.com/params.
Args:
keywords: keywords for query
place: if set, the other parameters are not used. Defaults to None.
street: house number/street. Defaults to None.
city: city of search. Defaults to None.
county: county of search. Defaults to None.
state: state of search. Defaults to None.
country: country of search. Defaults to None.
postalcode: postalcode of search. Defaults to None.
latitude: geographic coordinate (north-south position). Defaults to None.
longitude: geographic coordinate (east-west position); if latitude and
longitude are set, the other parameters are not used. Defaults to None.
radius: expand the search square by the distance in kilometers. Defaults to 0.
max_results: max number of results. If None, returns results only from the first response. Defaults to None.
Yields:
dict with maps search results
"""
assert keywords, "keywords is mandatory"
vqd = await self._aget_vqd(keywords)
# if longitude and latitude are specified, skip the request about bbox to the nominatim api
if latitude and longitude:
lat_t = Decimal(latitude.replace(",", "."))
lat_b = Decimal(latitude.replace(",", "."))
lon_l = Decimal(longitude.replace(",", "."))
lon_r = Decimal(longitude.replace(",", "."))
if radius == 0:
radius = 1
# otherwise request about bbox to nominatim api
else:
if place:
params: Dict[str, Optional[str]] = {
"q": place,
"polygon_geojson": "0",
"format": "jsonv2",
}
else:
params = {
"street": street,
"city": city,
"county": county,
"state": state,
"country": country,
"postalcode": postalcode,
"polygon_geojson": "0",
"format": "jsonv2",
}
try:
resp_content = await self._aget_url(
"GET",
"https://nominatim.openstreetmap.org/search.php",
params=params,
)
if resp_content is None:
yield None
coordinates = json.loads(resp_content)[0]["boundingbox"]
lat_t, lon_l = Decimal(coordinates[1]), Decimal(coordinates[2])
lat_b, lon_r = Decimal(coordinates[0]), Decimal(coordinates[3])
except Exception as ex:
logger.debug(
f"ddg_maps() keywords={keywords} {type(ex).__name__} {ex}")
return
# if a radius is specified, expand the search square
lat_t += Decimal(radius) * Decimal(0.008983)
lat_b -= Decimal(radius) * Decimal(0.008983)
lon_l -= Decimal(radius) * Decimal(0.008983)
lon_r += Decimal(radius) * Decimal(0.008983)
logger.debug(f"bbox coordinates\n{lat_t} {lon_l}\n{lat_b} {lon_r}")
# сreate a queue of search squares (bboxes)
work_bboxes: Deque[Tuple[Decimal, Decimal, Decimal, Decimal]] = deque()
work_bboxes.append((lat_t, lon_l, lat_b, lon_r))
# bbox iterate
cache = set()
while work_bboxes:
lat_t, lon_l, lat_b, lon_r = work_bboxes.pop()
params = {
"q": keywords,
"vqd": vqd,
"tg": "maps_places",
"rt": "D",
"mkexp": "b",
"wiki_info": "1",
"is_requery": "1",
"bbox_tl": f"{lat_t},{lon_l}",
"bbox_br": f"{lat_b},{lon_r}",
"strict_bbox": "1",
}
resp_content = await self._aget_url("GET",
"https://duckduckgo.com/local.js",
params=params)
if resp_content is None:
return
try:
page_data = json.loads(resp_content).get("results", [])
except Exception:
return
if page_data is None:
return
for res in page_data:
result = MapsResult()
result.title = res["name"]
result.address = res["address"]
if f"{result.title} {result.address}" in cache:
continue
else:
cache.add(f"{result.title} {result.address}")
result.country_code = res["country_code"]
result.url = _normalize_url(res["website"])
result.phone = res["phone"]
result.latitude = res["coordinates"]["latitude"]
result.longitude = res["coordinates"]["longitude"]
result.source = _normalize_url(res["url"])
if res["embed"]:
result.image = res["embed"].get("image", "")
result.desc = res["embed"].get("description", "")
result.hours = res["hours"]
result.category = res["ddg_category"]
result.facebook = f"www.facebook.com/profile.php?id={x}" if (
x := res["facebook_id"]) else None
result.instagram = f"https://www.instagram.com/{x}" if (
x := res["instagram_id"]) else None
result.twitter = f"https://twitter.com/{x}" if (
x := res["twitter_id"]) else None
yield result.__dict__
if max_results and len(cache) >= max_results:
return
if max_results is None:
return
# divide the square into 4 parts and add to the queue
if len(page_data) >= 15:
lat_middle = (lat_t + lat_b) / 2
lon_middle = (lon_l + lon_r) / 2
bbox1 = (lat_t, lon_l, lat_middle, lon_middle)
bbox2 = (lat_t, lon_middle, lat_middle, lon_r)
bbox3 = (lat_middle, lon_l, lat_b, lon_middle)
bbox4 = (lat_middle, lon_middle, lat_b, lon_r)
work_bboxes.extendleft([bbox1, bbox2, bbox3, bbox4])
async def translate(self,
keywords: str,
from_: Optional[str] = None,
to: str = "en") -> Optional[Dict[str, Optional[str]]]:
"""webscout translate.
Args:
keywords: string or a list of strings to translate
from_: translate from (defaults automatically). Defaults to None.
to: what language to translate. Defaults to "en".
Returns:
dict with translated keywords.
"""
assert keywords, "keywords is mandatory"
vqd = await self._aget_vqd("translate")
payload = {
"vqd": vqd,
"query": "translate",
"to": to,
}
if from_:
payload["from"] = from_
resp_content = await self._aget_url(
"POST",
"https://duckduckgo.com/translation.js",
params=payload,
data=keywords.encode(),
)
if resp_content is None:
return None
try:
page_data = json.loads(resp_content)
page_data["original"] = keywords
except Exception:
page_data = None
return page_data
logger = logging.getLogger("duckduckgo_search.DDGS")
nest_asyncio.apply()
class DDGS(AsyncDDGS):
def __init__(self, headers=None, proxies=None, timeout=10):
if asyncio.get_event_loop().is_running():
warnings.warn(
"DDGS running in an async loop. This may cause errors. Use AsyncDDGS instead.",
stacklevel=2)
super().__init__(headers, proxies, timeout)
self._loop = asyncio.get_event_loop()
def __enter__(self) -> "DDGS":
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
self._loop.create_task(self.__aexit__(exc_type, exc_val, exc_tb))
def _iter_over_async(self, async_gen):
"""Iterate over an async generator."""
while True:
try:
yield self._loop.run_until_complete(async_gen.__anext__())
except StopAsyncIteration:
break
def text(self, *args,
**kwargs) -> Generator[Dict[str, Optional[str]], None, None]:
async_gen = super().text(*args, **kwargs)
return self._iter_over_async(async_gen)
def images(self, *args,
**kwargs) -> Generator[Dict[str, Optional[str]], None, None]:
async_gen = super().images(*args, **kwargs)
return self._iter_over_async(async_gen)
def videos(self, *args,
**kwargs) -> Generator[Dict[str, Optional[str]], None, None]:
async_gen = super().videos(*args, **kwargs)
return self._iter_over_async(async_gen)
def news(self, *args,
**kwargs) -> Generator[Dict[str, Optional[str]], None, None]:
async_gen = super().news(*args, **kwargs)
return self._iter_over_async(async_gen)
def answers(self, *args,
**kwargs) -> Generator[Dict[str, Optional[str]], None, None]:
async_gen = super().answers(*args, **kwargs)
return self._iter_over_async(async_gen)
def suggestions(self, *args,
**kwargs) -> Generator[Dict[str, Optional[str]], None, None]:
async_gen = super().suggestions(*args, **kwargs)
return self._iter_over_async(async_gen)
def maps(self, *args,
**kwargs) -> Generator[Dict[str, Optional[str]], None, None]:
async_gen = super().maps(*args, **kwargs)
return self._iter_over_async(async_gen)
def translate(self, *args, **kwargs) -> Optional[Dict[str, Optional[str]]]:
async_coro = super().translate(*args, **kwargs)
return self._loop.run_until_complete(async_coro)
# Function to generate response based on user input
def Gemini(messages, model):
response = model.generate_content(messages)
messages.append({
"parts": [
{
"text": response.text
}
],
"role": "model"})
messages
return response.text
from rich import print
from time import time as t
#pip install requests
#pip install bs4
import requests as rq
from bs4 import BeautifulSoup
classes=["Ab33Nc","zCubwf","hgKElc","LTKOO sY7ric","Z0LcW","vk_bk","gsrt vk_bk FzvWSb YwPhnf","pclqee","tw-Data-text tw-text-small tw-ta",
"IZ6rdc","O5uR6d LTKOO","vlzY6d","webanswers-webanswers_table__webanswers-table",
"dDoNo ikb4Bb gsrt","sXLaOe","LWkfKe","VQF4g","qv3Wpe","kno-rdesc"]
useragent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.75 Safari/537.36'
sess = rq.session()
#scrape data from google search results
def Online_Scraper(query,PRINT=True):
query=query.replace(" + "," plus ")
query=query.replace(" - "," minus ")
URL = "https://www.google.co.in/search?q=" + query
headers = {'User-Agent': useragent}
page = sess.get(URL, headers=headers)
soup = BeautifulSoup(page.content, 'html.parser')
for i in classes:
try:
result=soup.find(class_=i).get_text()
if PRINT:
print(f"by class {i}")
return result
except Exception:
pass
return None
def DDG(query):
with DDGS() as ddgs:
results = ddgs.text(query, max_results=MAX_RESULTS)
results=[i for i in results if i["body"] != None]
return results
def RealTimeGemini(query:str,messages:list=[],model=None):
assert query, "Query is required"
assert isinstance(query, str), "Query must be a string"
print(messages)
realquery = query
ReturnObj = {}
C=t()
results = Online_Scraper(realquery)
if results == None:
try:
results = DDG(realquery)
except:
results = "No results found"
#ADD TO RETURN OBJECT
ReturnObj["DDGSResults"] = results
ReturnObj["DDGSExecutionTime"] = t() - C
ReturnObj["Query"] = realquery
ReturnObj["SearchQuery"] = query
C = t()
messages=[{
"parts": [
{
"text": f"```{str(results)}```\n *real time information you can use to reply"
}
],
"role": "user"
}
],
"role": "user"
},
{
"parts": [
{
"text": "ok i know its websearch results i will tell you whenevery you ask me about it"
}
],
"role": "model"
}] + messages
messages.append({
"parts": [
{
"text": query
}
],
"role": "user"
})
responce = Gemini(messages,model)
#ADD TO RETURN OBJECT
ReturnObj["GeminiResponce"] = responce
ReturnObj["GeminiExecutionTime"] = t() - C
return ReturnObj
# if __name__ == "__main__":
# while 1:
# a = input("Enter your query: ")
# print(RealTimeGemini(a))
# while 1:
# X=input("Enter your query: ")
# C=t()
# print(Online_Scraper(X))
# print(C-t())