|
|
|
MAX_RESULTS = 10 |
|
|
|
|
|
import asyncio |
|
import logging |
|
import warnings |
|
from typing import Dict, Generator, Optional |
|
import nest_asyncio |
|
import json |
|
import logging |
|
import sys |
|
from collections import deque |
|
from datetime import datetime, timezone |
|
from decimal import Decimal |
|
from itertools import cycle |
|
from typing import AsyncGenerator, Deque, Dict, Optional, Set, Tuple |
|
from curl_cffi import requests |
|
from docstring_inheritance import GoogleDocstringInheritanceMeta |
|
from lxml import html |
|
import json |
|
import re |
|
from html import unescape |
|
from typing import Optional |
|
from urllib.parse import unquote |
|
from dataclasses import dataclass |
|
from typing import Dict, Optional |
|
from random import randint |
|
|
|
|
|
class DuckDuckGoSearchException(Exception): |
|
"""""" |
|
|
|
|
|
@dataclass |
|
class MapsResult: |
|
"""Represents a result from the maps search.""" |
|
|
|
title: Optional[str] = None |
|
address: Optional[str] = None |
|
country_code: Optional[str] = None |
|
latitude: Optional[str] = None |
|
longitude: Optional[str] = None |
|
url: Optional[str] = None |
|
desc: Optional[str] = None |
|
phone: Optional[str] = None |
|
image: Optional[str] = None |
|
source: Optional[str] = None |
|
hours: Optional[Dict[str, str]] = None |
|
category: Optional[str] = None |
|
facebook: Optional[str] = None |
|
instagram: Optional[str] = None |
|
twitter: Optional[str] = None |
|
|
|
|
|
REGEX_500_IN_URL = re.compile(r"(?:\d{3}-\d{2}\.js)") |
|
REGEX_STRIP_TAGS = re.compile("<.*?>") |
|
REGEX_VQD = re.compile(rb"""vqd=['"]?([^&"']+)""") |
|
|
|
|
|
def _extract_vqd(html_bytes: bytes, keywords: str) -> Optional[str]: |
|
"""Extract vqd from html using a regular expression.""" |
|
try: |
|
match = REGEX_VQD.search(html_bytes) |
|
if match: |
|
return match.group(1).decode() |
|
except Exception: |
|
pass |
|
raise DuckDuckGoSearchException( |
|
f"_extract_vqd() {keywords=} Could not extract vqd.") |
|
|
|
|
|
def _text_extract_json(html_bytes: bytes, keywords: str) -> Optional[str]: |
|
"""text(backend="api") -> extract json from html.""" |
|
try: |
|
start = html_bytes.index(b"DDG.pageLayout.load('d',") + 24 |
|
end = html_bytes.index(b");DDG.duckbar.load(", start) |
|
data = html_bytes[start:end] |
|
return json.loads(data) |
|
except Exception as ex: |
|
raise DuckDuckGoSearchException( |
|
f"_text_extract_json() {keywords=} {type(ex).__name__}: {ex}") from ex |
|
|
|
|
|
def _is_500_in_url(url: str) -> bool: |
|
"""Something like '506-00.js' inside the url.""" |
|
return bool(REGEX_500_IN_URL.search(url)) |
|
|
|
|
|
def _normalize(raw_html: str) -> str: |
|
"""Strip HTML tags from the raw_html string.""" |
|
return unescape(REGEX_STRIP_TAGS.sub("", raw_html)) if raw_html else "" |
|
|
|
|
|
def _normalize_url(url: str) -> str: |
|
"""Unquote URL and replace spaces with '+'.""" |
|
return unquote(url.replace(" ", "+")) if url else "" |
|
|
|
|
|
logger = logging.getLogger("duckduckgo_search.AsyncDDGS") |
|
|
|
if sys.platform.lower().startswith("win"): |
|
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) |
|
|
|
|
|
class AsyncDDGS(metaclass=GoogleDocstringInheritanceMeta): |
|
"""webscout_search async class to get search results from duckduckgo.com.""" |
|
|
|
def __init__(self, headers=None, proxies=None, timeout=10) -> None: |
|
"""Initialize the AsyncDDGS object. |
|
|
|
Args: |
|
headers (dict, optional): Dictionary of headers for the HTTP client. Defaults to None. |
|
proxies (Union[dict, str], optional): Proxies for the HTTP client (can be dict or str). Defaults to None. |
|
timeout (int, optional): Timeout value for the HTTP client. Defaults to 10. |
|
""" |
|
useragent = f'{randint(0, 1000000)}' |
|
headers = {'User-Agent': useragent} |
|
self.proxies = proxies if proxies and isinstance(proxies, dict) else { |
|
"http": proxies, |
|
"https": proxies |
|
} |
|
self._asession = requests.AsyncSession(headers=headers, |
|
proxies=self.proxies, |
|
timeout=timeout, |
|
impersonate="chrome") |
|
self._asession.headers["Referer"] = "https://duckduckgo.com/" |
|
|
|
async def __aenter__(self) -> "AsyncDDGS": |
|
"""A context manager method that is called when entering the 'with' statement.""" |
|
return self |
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: |
|
"""Closes the session.""" |
|
return self._asession.close() |
|
|
|
async def _aget_url(self, method: str, url: str, |
|
**kwargs) -> Optional[requests.Response]: |
|
try: |
|
useragent = f'{randint(0, 1000000)}' |
|
headers = {'User-Agent': useragent} |
|
resp = await self._asession.request(method, |
|
url, |
|
stream=True, |
|
**kwargs, |
|
headers=headers) |
|
resp.raise_for_status() |
|
resp_content = await resp.acontent() |
|
logger.debug( |
|
f"_aget_url() {url} {resp.status_code} {resp.http_version} {resp.elapsed} {len(resp_content)}" |
|
) |
|
if _is_500_in_url(str(resp.url)) or resp.status_code == 202: |
|
raise DuckDuckGoSearchException("Ratelimit") |
|
if resp.status_code == 200: |
|
return resp_content |
|
except Exception as ex: |
|
raise DuckDuckGoSearchException( |
|
f"_aget_url() {url} {type(ex).__name__}: {ex}") from ex |
|
|
|
async def _aget_vqd(self, keywords: str) -> Optional[str]: |
|
"""Get vqd value for a search query.""" |
|
resp_content = await self._aget_url("POST", |
|
"https://duckduckgo.com", |
|
data={"q": keywords}) |
|
if resp_content: |
|
return _extract_vqd(resp_content, keywords) |
|
|
|
async def text( |
|
self, |
|
keywords: str, |
|
region: str = "wt-wt", |
|
safesearch: str = "moderate", |
|
timelimit: Optional[str] = None, |
|
backend: str = "api", |
|
max_results: Optional[int] = None, |
|
) -> AsyncGenerator[Dict[str, Optional[str]], None]: |
|
"""DuckDuckGo text search generator. Query params: https://duckduckgo.com/params. |
|
|
|
Args: |
|
keywords: keywords for query. |
|
region: wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt". |
|
safesearch: on, moderate, off. Defaults to "moderate". |
|
timelimit: d, w, m, y. Defaults to None. |
|
backend: api, html, lite. Defaults to api. |
|
api - collect data from https://duckduckgo.com, |
|
html - collect data from https://html.duckduckgo.com, |
|
lite - collect data from https://lite.duckduckgo.com. |
|
max_results: max number of results. If None, returns results only from the first response. Defaults to None. |
|
|
|
Yields: |
|
dict with search results. |
|
|
|
""" |
|
if backend == "api": |
|
results = self._text_api(keywords, region, safesearch, timelimit, |
|
max_results) |
|
elif backend == "html": |
|
results = self._text_html(keywords, region, safesearch, timelimit, |
|
max_results) |
|
elif backend == "lite": |
|
results = self._text_lite(keywords, region, timelimit, max_results) |
|
|
|
async for result in results: |
|
yield result |
|
|
|
async def _text_api( |
|
self, |
|
keywords: str, |
|
region: str = "wt-wt", |
|
safesearch: str = "moderate", |
|
timelimit: Optional[str] = None, |
|
max_results: Optional[int] = None, |
|
) -> AsyncGenerator[Dict[str, Optional[str]], None]: |
|
"""webscout text search generator. Query params: https://duckduckgo.com/params. |
|
|
|
Args: |
|
keywords: keywords for query. |
|
region: wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt". |
|
safesearch: on, moderate, off. Defaults to "moderate". |
|
timelimit: d, w, m, y. Defaults to None. |
|
max_results: max number of results. If None, returns results only from the first response. Defaults to None. |
|
|
|
Yields: |
|
dict with search results. |
|
|
|
""" |
|
assert keywords, "keywords is mandatory" |
|
|
|
vqd = await self._aget_vqd(keywords) |
|
|
|
payload = { |
|
"q": keywords, |
|
"kl": region, |
|
"l": region, |
|
"bing_market": region, |
|
"s": "0", |
|
"df": timelimit, |
|
"vqd": vqd, |
|
|
|
"sp": "0", |
|
} |
|
safesearch = safesearch.lower() |
|
if safesearch == "moderate": |
|
payload["ex"] = "-1" |
|
elif safesearch == "off": |
|
payload["ex"] = "-2" |
|
elif safesearch == "on": |
|
payload["p"] = "1" |
|
|
|
cache = set() |
|
for _ in range(11): |
|
resp_content = await self._aget_url("GET", |
|
"https://links.duckduckgo.com/d.js", |
|
params=payload) |
|
if resp_content is None: |
|
return |
|
|
|
page_data = _text_extract_json(resp_content, keywords) |
|
if page_data is None: |
|
return |
|
|
|
result_exists, next_page_url = False, None |
|
for row in page_data: |
|
href = row.get("u", None) |
|
if href and href not in cache and href != f"http://www.google.com/search?q={keywords}": |
|
cache.add(href) |
|
body = _normalize(row["a"]) |
|
if body: |
|
result_exists = True |
|
yield { |
|
"title": _normalize(row["t"]), |
|
"href": _normalize_url(href), |
|
"body": body, |
|
} |
|
if max_results and len(cache) >= max_results: |
|
return |
|
else: |
|
next_page_url = row.get("n", None) |
|
if max_results is None or result_exists is False or next_page_url is None: |
|
return |
|
payload["s"] = next_page_url.split("s=")[1].split("&")[0] |
|
|
|
async def _text_html( |
|
self, |
|
keywords: str, |
|
region: str = "wt-wt", |
|
safesearch: str = "moderate", |
|
timelimit: Optional[str] = None, |
|
max_results: Optional[int] = None, |
|
) -> AsyncGenerator[Dict[str, Optional[str]], None]: |
|
"""webscout text search generator. Query params: https://duckduckgo.com/params. |
|
|
|
Args: |
|
keywords: keywords for query. |
|
region: wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt". |
|
safesearch: on, moderate, off. Defaults to "moderate". |
|
timelimit: d, w, m, y. Defaults to None. |
|
max_results: max number of results. If None, returns results only from the first response. Defaults to None. |
|
|
|
Yields: |
|
dict with search results. |
|
|
|
""" |
|
assert keywords, "keywords is mandatory" |
|
|
|
self._asession.headers["Referer"] = "https://html.duckduckgo.com/" |
|
safesearch_base = {"on": 1, "moderate": -1, "off": -2} |
|
payload = { |
|
"q": keywords, |
|
"s": "0", |
|
"kl": region, |
|
"p": safesearch_base[safesearch.lower()], |
|
"df": timelimit, |
|
} |
|
cache: Set[str] = set() |
|
for _ in range(11): |
|
resp_content = await self._aget_url("POST", |
|
"https://html.duckduckgo.com/html", |
|
data=payload) |
|
if resp_content is None: |
|
return |
|
|
|
tree = html.fromstring(resp_content) |
|
if tree.xpath('//div[@class="no-results"]/text()'): |
|
return |
|
|
|
result_exists = False |
|
for e in tree.xpath('//div[contains(@class, "results_links")]'): |
|
href = e.xpath('.//a[contains(@class, "result__a")]/@href') |
|
href = href[0] if href else None |
|
if (href and href not in cache |
|
and href != f"http://www.google.com/search?q={keywords}" |
|
and not href.startswith("https://duckduckgo.com/y.js?ad_domain")): |
|
cache.add(href) |
|
title = e.xpath('.//a[contains(@class, "result__a")]/text()') |
|
body = e.xpath('.//a[contains(@class, "result__snippet")]//text()') |
|
result_exists = True |
|
yield { |
|
"title": _normalize(title[0]) if title else None, |
|
"href": _normalize_url(href), |
|
"body": _normalize("".join(body)) if body else None, |
|
} |
|
if max_results and len(cache) >= max_results: |
|
return |
|
if max_results is None or result_exists is False: |
|
return |
|
next_page = tree.xpath('.//div[@class="nav-link"]') |
|
next_page = next_page[-1] if next_page else None |
|
if next_page is None: |
|
return |
|
|
|
names = next_page.xpath('.//input[@type="hidden"]/@name') |
|
values = next_page.xpath('.//input[@type="hidden"]/@value') |
|
payload = {n: v for n, v in zip(names, values)} |
|
|
|
async def _text_lite( |
|
self, |
|
keywords: str, |
|
region: str = "wt-wt", |
|
timelimit: Optional[str] = None, |
|
max_results: Optional[int] = None, |
|
) -> AsyncGenerator[Dict[str, Optional[str]], None]: |
|
"""webscout text search generator. Query params: https://duckduckgo.com/params. |
|
|
|
Args: |
|
keywords: keywords for query. |
|
region: wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt". |
|
timelimit: d, w, m, y. Defaults to None. |
|
max_results: max number of results. If None, returns results only from the first response. Defaults to None. |
|
|
|
Yields: |
|
dict with search results. |
|
|
|
""" |
|
assert keywords, "keywords is mandatory" |
|
|
|
self._asession.headers["Referer"] = "https://lite.duckduckgo.com/" |
|
payload = { |
|
"q": keywords, |
|
"s": "0", |
|
"o": "json", |
|
"api": "d.js", |
|
"kl": region, |
|
"df": timelimit, |
|
} |
|
cache: Set[str] = set() |
|
for _ in range(11): |
|
resp_content = await self._aget_url("POST", |
|
"https://lite.duckduckgo.com/lite/", |
|
data=payload) |
|
if resp_content is None: |
|
return |
|
|
|
if b"No more results." in resp_content: |
|
return |
|
|
|
tree = html.fromstring(resp_content) |
|
|
|
result_exists = False |
|
data = zip(cycle(range(1, 5)), tree.xpath("//table[last()]//tr")) |
|
for i, e in data: |
|
if i == 1: |
|
href = e.xpath(".//a//@href") |
|
href = href[0] if href else None |
|
if (href is None or href in cache |
|
or href == f"http://www.google.com/search?q={keywords}" |
|
or href.startswith("https://duckduckgo.com/y.js?ad_domain")): |
|
[next(data, None) for _ in range(3)] |
|
else: |
|
cache.add(href) |
|
title = e.xpath(".//a//text()")[0] |
|
elif i == 2: |
|
body = e.xpath(".//td[@class='result-snippet']//text()") |
|
body = "".join(body).strip() |
|
elif i == 3: |
|
result_exists = True |
|
yield { |
|
"title": _normalize(title), |
|
"href": _normalize_url(href), |
|
"body": _normalize(body), |
|
} |
|
if max_results and len(cache) >= max_results: |
|
return |
|
if max_results is None or result_exists is False: |
|
return |
|
next_page_s = tree.xpath( |
|
"//form[./input[contains(@value, 'ext')]]/input[@name='s']/@value") |
|
if not next_page_s: |
|
return |
|
payload["s"] = next_page_s[0] |
|
payload["vqd"] = _extract_vqd(resp_content, keywords) |
|
|
|
async def images( |
|
self, |
|
keywords: str, |
|
region: str = "wt-wt", |
|
safesearch: str = "moderate", |
|
timelimit: Optional[str] = None, |
|
size: Optional[str] = None, |
|
color: Optional[str] = None, |
|
type_image: Optional[str] = None, |
|
layout: Optional[str] = None, |
|
license_image: Optional[str] = None, |
|
max_results: Optional[int] = None, |
|
) -> AsyncGenerator[Dict[str, Optional[str]], None]: |
|
"""webscout images search. Query params: https://duckduckgo.com/params. |
|
|
|
Args: |
|
keywords: keywords for query. |
|
region: wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt". |
|
safesearch: on, moderate, off. Defaults to "moderate". |
|
timelimit: Day, Week, Month, Year. Defaults to None. |
|
size: Small, Medium, Large, Wallpaper. Defaults to None. |
|
color: color, Monochrome, Red, Orange, Yellow, Green, Blue, |
|
Purple, Pink, Brown, Black, Gray, Teal, White. Defaults to None. |
|
type_image: photo, clipart, gif, transparent, line. |
|
Defaults to None. |
|
layout: Square, Tall, Wide. Defaults to None. |
|
license_image: any (All Creative Commons), Public (PublicDomain), |
|
Share (Free to Share and Use), ShareCommercially (Free to Share and Use Commercially), |
|
Modify (Free to Modify, Share, and Use), ModifyCommercially (Free to Modify, Share, and |
|
Use Commercially). Defaults to None. |
|
max_results: max number of results. If None, returns results only from the first response. Defaults to None. |
|
|
|
Yields: |
|
dict with image search results. |
|
|
|
""" |
|
assert keywords, "keywords is mandatory" |
|
|
|
vqd = await self._aget_vqd(keywords) |
|
|
|
safesearch_base = {"on": 1, "moderate": 1, "off": -1} |
|
timelimit = f"time:{timelimit}" if timelimit else "" |
|
size = f"size:{size}" if size else "" |
|
color = f"color:{color}" if color else "" |
|
type_image = f"type:{type_image}" if type_image else "" |
|
layout = f"layout:{layout}" if layout else "" |
|
license_image = f"license:{license_image}" if license_image else "" |
|
payload = { |
|
"l": region, |
|
"o": "json", |
|
"q": keywords, |
|
"vqd": vqd, |
|
"f": |
|
f"{timelimit},{size},{color},{type_image},{layout},{license_image}", |
|
"p": safesearch_base[safesearch.lower()], |
|
} |
|
|
|
cache = set() |
|
for _ in range(10): |
|
resp_content = await self._aget_url("GET", |
|
"https://duckduckgo.com/i.js", |
|
params=payload) |
|
if resp_content is None: |
|
return |
|
try: |
|
resp_json = json.loads(resp_content) |
|
except Exception: |
|
return |
|
page_data = resp_json.get("results", None) |
|
if page_data is None: |
|
return |
|
|
|
result_exists = False |
|
for row in page_data: |
|
image_url = row.get("image", None) |
|
if image_url and image_url not in cache: |
|
cache.add(image_url) |
|
result_exists = True |
|
yield { |
|
"title": row["title"], |
|
"image": _normalize_url(image_url), |
|
"thumbnail": _normalize_url(row["thumbnail"]), |
|
"url": _normalize_url(row["url"]), |
|
"height": row["height"], |
|
"width": row["width"], |
|
"source": row["source"], |
|
} |
|
if max_results and len(cache) >= max_results: |
|
return |
|
if max_results is None or result_exists is False: |
|
return |
|
next = resp_json.get("next", None) |
|
if next is None: |
|
return |
|
payload["s"] = next.split("s=")[-1].split("&")[0] |
|
|
|
async def videos( |
|
self, |
|
keywords: str, |
|
region: str = "wt-wt", |
|
safesearch: str = "moderate", |
|
timelimit: Optional[str] = None, |
|
resolution: Optional[str] = None, |
|
duration: Optional[str] = None, |
|
license_videos: Optional[str] = None, |
|
max_results: Optional[int] = None, |
|
) -> AsyncGenerator[Dict[str, Optional[str]], None]: |
|
"""webscout videos search. Query params: https://duckduckgo.com/params. |
|
|
|
Args: |
|
keywords: keywords for query. |
|
region: wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt". |
|
safesearch: on, moderate, off. Defaults to "moderate". |
|
timelimit: d, w, m. Defaults to None. |
|
resolution: high, standart. Defaults to None. |
|
duration: short, medium, long. Defaults to None. |
|
license_videos: creativeCommon, youtube. Defaults to None. |
|
max_results: max number of results. If None, returns results only from the first response. Defaults to None. |
|
|
|
Yields: |
|
dict with videos search results |
|
|
|
""" |
|
assert keywords, "keywords is mandatory" |
|
|
|
vqd = await self._aget_vqd(keywords) |
|
|
|
safesearch_base = {"on": 1, "moderate": -1, "off": -2} |
|
timelimit = f"publishedAfter:{timelimit}" if timelimit else "" |
|
resolution = f"videoDefinition:{resolution}" if resolution else "" |
|
duration = f"videoDuration:{duration}" if duration else "" |
|
license_videos = f"videoLicense:{license_videos}" if license_videos else "" |
|
payload = { |
|
"l": region, |
|
"o": "json", |
|
"s": 0, |
|
"q": keywords, |
|
"vqd": vqd, |
|
"f": f"{timelimit},{resolution},{duration},{license_videos}", |
|
"p": safesearch_base[safesearch.lower()], |
|
} |
|
|
|
cache = set() |
|
for _ in range(10): |
|
resp_content = await self._aget_url("GET", |
|
"https://duckduckgo.com/v.js", |
|
params=payload) |
|
if resp_content is None: |
|
return |
|
try: |
|
resp_json = json.loads(resp_content) |
|
except Exception: |
|
return |
|
page_data = resp_json.get("results", None) |
|
if page_data is None: |
|
return |
|
|
|
result_exists = False |
|
for row in page_data: |
|
if row["content"] not in cache: |
|
cache.add(row["content"]) |
|
result_exists = True |
|
yield row |
|
if max_results and len(cache) >= max_results: |
|
return |
|
if max_results is None or result_exists is False: |
|
return |
|
next = resp_json.get("next", None) |
|
if next is None: |
|
return |
|
payload["s"] = next.split("s=")[-1].split("&")[0] |
|
|
|
async def news( |
|
self, |
|
keywords: str, |
|
region: str = "wt-wt", |
|
safesearch: str = "moderate", |
|
timelimit: Optional[str] = None, |
|
max_results: Optional[int] = None, |
|
) -> AsyncGenerator[Dict[str, Optional[str]], None]: |
|
"""webscout news search. Query params: https://duckduckgo.com/params. |
|
|
|
Args: |
|
keywords: keywords for query. |
|
region: wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt". |
|
safesearch: on, moderate, off. Defaults to "moderate". |
|
timelimit: d, w, m. Defaults to None. |
|
max_results: max number of results. If None, returns results only from the first response. Defaults to None. |
|
|
|
Yields: |
|
dict with news search results. |
|
|
|
""" |
|
assert keywords, "keywords is mandatory" |
|
|
|
vqd = await self._aget_vqd(keywords) |
|
|
|
safesearch_base = {"on": 1, "moderate": -1, "off": -2} |
|
payload = { |
|
"l": region, |
|
"o": "json", |
|
"noamp": "1", |
|
"q": keywords, |
|
"vqd": vqd, |
|
"p": safesearch_base[safesearch.lower()], |
|
"df": timelimit, |
|
"s": 0, |
|
} |
|
|
|
cache = set() |
|
for _ in range(10): |
|
resp_content = await self._aget_url("GET", |
|
"https://duckduckgo.com/news.js", |
|
params=payload) |
|
if resp_content is None: |
|
return |
|
try: |
|
resp_json = json.loads(resp_content) |
|
except Exception: |
|
return |
|
page_data = resp_json.get("results", None) |
|
if page_data is None: |
|
return |
|
|
|
result_exists = False |
|
for row in page_data: |
|
if row["url"] not in cache: |
|
cache.add(row["url"]) |
|
image_url = row.get("image", None) |
|
result_exists = True |
|
yield { |
|
"date": datetime.fromtimestamp(row["date"], |
|
timezone.utc).isoformat(), |
|
"title": row["title"], |
|
"body": _normalize(row["excerpt"]), |
|
"url": _normalize_url(row["url"]), |
|
"image": _normalize_url(image_url) if image_url else None, |
|
"source": row["source"], |
|
} |
|
if max_results and len(cache) >= max_results: |
|
return |
|
if max_results is None or result_exists is False: |
|
return |
|
next = resp_json.get("next", None) |
|
if next is None: |
|
return |
|
payload["s"] = next.split("s=")[-1].split("&")[0] |
|
|
|
async def answers( |
|
self, keywords: str) -> AsyncGenerator[Dict[str, Optional[str]], None]: |
|
"""webscout instant answers. Query params: https://duckduckgo.com/params. |
|
|
|
Args: |
|
keywords: keywords for query. |
|
|
|
Yields: |
|
dict with instant answers results. |
|
|
|
""" |
|
assert keywords, "keywords is mandatory" |
|
|
|
payload = { |
|
"q": f"what is {keywords}", |
|
"format": "json", |
|
} |
|
|
|
resp_content = await self._aget_url("GET", |
|
"https://api.duckduckgo.com/", |
|
params=payload) |
|
if resp_content is None: |
|
yield None |
|
try: |
|
page_data = json.loads(resp_content) |
|
except Exception: |
|
page_data = None |
|
|
|
if page_data: |
|
answer = page_data.get("AbstractText", None) |
|
url = page_data.get("AbstractURL", None) |
|
if answer: |
|
yield { |
|
"icon": None, |
|
"text": answer, |
|
"topic": None, |
|
"url": url, |
|
} |
|
|
|
|
|
payload = { |
|
"q": f"{keywords}", |
|
"format": "json", |
|
} |
|
resp_content = await self._aget_url("GET", |
|
"https://api.duckduckgo.com/", |
|
params=payload) |
|
if resp_content is None: |
|
yield None |
|
try: |
|
page_data = json.loads(resp_content).get("RelatedTopics", None) |
|
except Exception: |
|
page_data = None |
|
|
|
if page_data: |
|
for row in page_data: |
|
topic = row.get("Name", None) |
|
if not topic: |
|
icon = row["Icon"].get("URL", None) |
|
yield { |
|
"icon": f"https://duckduckgo.com{icon}" if icon else None, |
|
"text": row["Text"], |
|
"topic": None, |
|
"url": row["FirstURL"], |
|
} |
|
else: |
|
for subrow in row["Topics"]: |
|
icon = subrow["Icon"].get("URL", None) |
|
yield { |
|
"icon": f"https://duckduckgo.com{icon}" if icon else None, |
|
"text": subrow["Text"], |
|
"topic": topic, |
|
"url": subrow["FirstURL"], |
|
} |
|
|
|
async def suggestions( |
|
self, |
|
keywords: str, |
|
region: str = "wt-wt") -> AsyncGenerator[Dict[str, Optional[str]], None]: |
|
"""webscout suggestions. Query params: https://duckduckgo.com/params. |
|
|
|
Args: |
|
keywords: keywords for query. |
|
region: wt-wt, us-en, uk-en, ru-ru, etc. Defaults to "wt-wt". |
|
|
|
Yields: |
|
dict with suggestions results. |
|
""" |
|
assert keywords, "keywords is mandatory" |
|
|
|
payload = { |
|
"q": keywords, |
|
"kl": region, |
|
} |
|
resp_content = await self._aget_url("GET", |
|
"https://duckduckgo.com/ac", |
|
params=payload) |
|
if resp_content is None: |
|
yield None |
|
try: |
|
page_data = json.loads(resp_content) |
|
for r in page_data: |
|
yield r |
|
except Exception: |
|
pass |
|
|
|
async def maps( |
|
self, |
|
keywords: str, |
|
place: Optional[str] = None, |
|
street: Optional[str] = None, |
|
city: Optional[str] = None, |
|
county: Optional[str] = None, |
|
state: Optional[str] = None, |
|
country: Optional[str] = None, |
|
postalcode: Optional[str] = None, |
|
latitude: Optional[str] = None, |
|
longitude: Optional[str] = None, |
|
radius: int = 0, |
|
max_results: Optional[int] = None, |
|
) -> AsyncGenerator[Dict[str, Optional[str]], None]: |
|
"""webscout maps search. Query params: https://duckduckgo.com/params. |
|
|
|
Args: |
|
keywords: keywords for query |
|
place: if set, the other parameters are not used. Defaults to None. |
|
street: house number/street. Defaults to None. |
|
city: city of search. Defaults to None. |
|
county: county of search. Defaults to None. |
|
state: state of search. Defaults to None. |
|
country: country of search. Defaults to None. |
|
postalcode: postalcode of search. Defaults to None. |
|
latitude: geographic coordinate (north-south position). Defaults to None. |
|
longitude: geographic coordinate (east-west position); if latitude and |
|
longitude are set, the other parameters are not used. Defaults to None. |
|
radius: expand the search square by the distance in kilometers. Defaults to 0. |
|
max_results: max number of results. If None, returns results only from the first response. Defaults to None. |
|
|
|
Yields: |
|
dict with maps search results |
|
""" |
|
assert keywords, "keywords is mandatory" |
|
|
|
vqd = await self._aget_vqd(keywords) |
|
|
|
|
|
if latitude and longitude: |
|
lat_t = Decimal(latitude.replace(",", ".")) |
|
lat_b = Decimal(latitude.replace(",", ".")) |
|
lon_l = Decimal(longitude.replace(",", ".")) |
|
lon_r = Decimal(longitude.replace(",", ".")) |
|
if radius == 0: |
|
radius = 1 |
|
|
|
else: |
|
if place: |
|
params: Dict[str, Optional[str]] = { |
|
"q": place, |
|
"polygon_geojson": "0", |
|
"format": "jsonv2", |
|
} |
|
else: |
|
params = { |
|
"street": street, |
|
"city": city, |
|
"county": county, |
|
"state": state, |
|
"country": country, |
|
"postalcode": postalcode, |
|
"polygon_geojson": "0", |
|
"format": "jsonv2", |
|
} |
|
try: |
|
resp_content = await self._aget_url( |
|
"GET", |
|
"https://nominatim.openstreetmap.org/search.php", |
|
params=params, |
|
) |
|
if resp_content is None: |
|
yield None |
|
|
|
coordinates = json.loads(resp_content)[0]["boundingbox"] |
|
lat_t, lon_l = Decimal(coordinates[1]), Decimal(coordinates[2]) |
|
lat_b, lon_r = Decimal(coordinates[0]), Decimal(coordinates[3]) |
|
except Exception as ex: |
|
logger.debug( |
|
f"ddg_maps() keywords={keywords} {type(ex).__name__} {ex}") |
|
return |
|
|
|
|
|
lat_t += Decimal(radius) * Decimal(0.008983) |
|
lat_b -= Decimal(radius) * Decimal(0.008983) |
|
lon_l -= Decimal(radius) * Decimal(0.008983) |
|
lon_r += Decimal(radius) * Decimal(0.008983) |
|
logger.debug(f"bbox coordinates\n{lat_t} {lon_l}\n{lat_b} {lon_r}") |
|
|
|
|
|
work_bboxes: Deque[Tuple[Decimal, Decimal, Decimal, Decimal]] = deque() |
|
work_bboxes.append((lat_t, lon_l, lat_b, lon_r)) |
|
|
|
|
|
cache = set() |
|
while work_bboxes: |
|
lat_t, lon_l, lat_b, lon_r = work_bboxes.pop() |
|
params = { |
|
"q": keywords, |
|
"vqd": vqd, |
|
"tg": "maps_places", |
|
"rt": "D", |
|
"mkexp": "b", |
|
"wiki_info": "1", |
|
"is_requery": "1", |
|
"bbox_tl": f"{lat_t},{lon_l}", |
|
"bbox_br": f"{lat_b},{lon_r}", |
|
"strict_bbox": "1", |
|
} |
|
resp_content = await self._aget_url("GET", |
|
"https://duckduckgo.com/local.js", |
|
params=params) |
|
if resp_content is None: |
|
return |
|
try: |
|
page_data = json.loads(resp_content).get("results", []) |
|
except Exception: |
|
return |
|
if page_data is None: |
|
return |
|
|
|
for res in page_data: |
|
result = MapsResult() |
|
result.title = res["name"] |
|
result.address = res["address"] |
|
if f"{result.title} {result.address}" in cache: |
|
continue |
|
else: |
|
cache.add(f"{result.title} {result.address}") |
|
result.country_code = res["country_code"] |
|
result.url = _normalize_url(res["website"]) |
|
result.phone = res["phone"] |
|
result.latitude = res["coordinates"]["latitude"] |
|
result.longitude = res["coordinates"]["longitude"] |
|
result.source = _normalize_url(res["url"]) |
|
if res["embed"]: |
|
result.image = res["embed"].get("image", "") |
|
result.desc = res["embed"].get("description", "") |
|
result.hours = res["hours"] |
|
result.category = res["ddg_category"] |
|
result.facebook = f"www.facebook.com/profile.php?id={x}" if ( |
|
x := res["facebook_id"]) else None |
|
result.instagram = f"https://www.instagram.com/{x}" if ( |
|
x := res["instagram_id"]) else None |
|
result.twitter = f"https://twitter.com/{x}" if ( |
|
x := res["twitter_id"]) else None |
|
yield result.__dict__ |
|
if max_results and len(cache) >= max_results: |
|
return |
|
if max_results is None: |
|
return |
|
|
|
if len(page_data) >= 15: |
|
lat_middle = (lat_t + lat_b) / 2 |
|
lon_middle = (lon_l + lon_r) / 2 |
|
bbox1 = (lat_t, lon_l, lat_middle, lon_middle) |
|
bbox2 = (lat_t, lon_middle, lat_middle, lon_r) |
|
bbox3 = (lat_middle, lon_l, lat_b, lon_middle) |
|
bbox4 = (lat_middle, lon_middle, lat_b, lon_r) |
|
work_bboxes.extendleft([bbox1, bbox2, bbox3, bbox4]) |
|
|
|
async def translate(self, |
|
keywords: str, |
|
from_: Optional[str] = None, |
|
to: str = "en") -> Optional[Dict[str, Optional[str]]]: |
|
"""webscout translate. |
|
|
|
Args: |
|
keywords: string or a list of strings to translate |
|
from_: translate from (defaults automatically). Defaults to None. |
|
to: what language to translate. Defaults to "en". |
|
|
|
Returns: |
|
dict with translated keywords. |
|
""" |
|
assert keywords, "keywords is mandatory" |
|
|
|
vqd = await self._aget_vqd("translate") |
|
|
|
payload = { |
|
"vqd": vqd, |
|
"query": "translate", |
|
"to": to, |
|
} |
|
if from_: |
|
payload["from"] = from_ |
|
|
|
resp_content = await self._aget_url( |
|
"POST", |
|
"https://duckduckgo.com/translation.js", |
|
params=payload, |
|
data=keywords.encode(), |
|
) |
|
if resp_content is None: |
|
return None |
|
try: |
|
page_data = json.loads(resp_content) |
|
page_data["original"] = keywords |
|
except Exception: |
|
page_data = None |
|
return page_data |
|
|
|
|
|
logger = logging.getLogger("duckduckgo_search.DDGS") |
|
nest_asyncio.apply() |
|
|
|
|
|
class DDGS(AsyncDDGS): |
|
|
|
def __init__(self, headers=None, proxies=None, timeout=10): |
|
if asyncio.get_event_loop().is_running(): |
|
warnings.warn( |
|
"DDGS running in an async loop. This may cause errors. Use AsyncDDGS instead.", |
|
stacklevel=2) |
|
super().__init__(headers, proxies, timeout) |
|
self._loop = asyncio.get_event_loop() |
|
|
|
def __enter__(self) -> "DDGS": |
|
return self |
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb) -> None: |
|
self._loop.create_task(self.__aexit__(exc_type, exc_val, exc_tb)) |
|
|
|
def _iter_over_async(self, async_gen): |
|
"""Iterate over an async generator.""" |
|
while True: |
|
try: |
|
yield self._loop.run_until_complete(async_gen.__anext__()) |
|
except StopAsyncIteration: |
|
break |
|
|
|
def text(self, *args, |
|
**kwargs) -> Generator[Dict[str, Optional[str]], None, None]: |
|
async_gen = super().text(*args, **kwargs) |
|
return self._iter_over_async(async_gen) |
|
|
|
def images(self, *args, |
|
**kwargs) -> Generator[Dict[str, Optional[str]], None, None]: |
|
async_gen = super().images(*args, **kwargs) |
|
return self._iter_over_async(async_gen) |
|
|
|
def videos(self, *args, |
|
**kwargs) -> Generator[Dict[str, Optional[str]], None, None]: |
|
async_gen = super().videos(*args, **kwargs) |
|
return self._iter_over_async(async_gen) |
|
|
|
def news(self, *args, |
|
**kwargs) -> Generator[Dict[str, Optional[str]], None, None]: |
|
async_gen = super().news(*args, **kwargs) |
|
return self._iter_over_async(async_gen) |
|
|
|
def answers(self, *args, |
|
**kwargs) -> Generator[Dict[str, Optional[str]], None, None]: |
|
async_gen = super().answers(*args, **kwargs) |
|
return self._iter_over_async(async_gen) |
|
|
|
def suggestions(self, *args, |
|
**kwargs) -> Generator[Dict[str, Optional[str]], None, None]: |
|
async_gen = super().suggestions(*args, **kwargs) |
|
return self._iter_over_async(async_gen) |
|
|
|
def maps(self, *args, |
|
**kwargs) -> Generator[Dict[str, Optional[str]], None, None]: |
|
async_gen = super().maps(*args, **kwargs) |
|
return self._iter_over_async(async_gen) |
|
|
|
def translate(self, *args, **kwargs) -> Optional[Dict[str, Optional[str]]]: |
|
async_coro = super().translate(*args, **kwargs) |
|
return self._loop.run_until_complete(async_coro) |
|
|
|
|
|
|
|
|
|
|
|
|
|
def Gemini(messages, model): |
|
response = model.generate_content(messages) |
|
|
|
messages.append({ |
|
"parts": [ |
|
{ |
|
"text": response.text |
|
} |
|
], |
|
"role": "model"}) |
|
messages |
|
return response.text |
|
|
|
|
|
from rich import print |
|
from time import time as t |
|
|
|
|
|
|
|
import requests as rq |
|
from bs4 import BeautifulSoup |
|
|
|
|
|
classes=["Ab33Nc","zCubwf","hgKElc","LTKOO sY7ric","Z0LcW","vk_bk","gsrt vk_bk FzvWSb YwPhnf","pclqee","tw-Data-text tw-text-small tw-ta", |
|
"IZ6rdc","O5uR6d LTKOO","vlzY6d","webanswers-webanswers_table__webanswers-table", |
|
"dDoNo ikb4Bb gsrt","sXLaOe","LWkfKe","VQF4g","qv3Wpe","kno-rdesc"] |
|
|
|
useragent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.75 Safari/537.36' |
|
|
|
|
|
sess = rq.session() |
|
|
|
|
|
def Online_Scraper(query,PRINT=True): |
|
query=query.replace(" + "," plus ") |
|
query=query.replace(" - "," minus ") |
|
URL = "https://www.google.co.in/search?q=" + query |
|
headers = {'User-Agent': useragent} |
|
|
|
page = sess.get(URL, headers=headers) |
|
soup = BeautifulSoup(page.content, 'html.parser') |
|
|
|
for i in classes: |
|
try: |
|
result=soup.find(class_=i).get_text() |
|
if PRINT: |
|
print(f"by class {i}") |
|
return result |
|
except Exception: |
|
pass |
|
return None |
|
|
|
def DDG(query): |
|
with DDGS() as ddgs: |
|
results = ddgs.text(query, max_results=MAX_RESULTS) |
|
results=[i for i in results if i["body"] != None] |
|
return results |
|
|
|
|
|
def RealTimeGemini(query:str,messages:list=[],model=None): |
|
assert query, "Query is required" |
|
assert isinstance(query, str), "Query must be a string" |
|
print(messages) |
|
realquery = query |
|
ReturnObj = {} |
|
|
|
C=t() |
|
results = Online_Scraper(realquery) |
|
if results == None: |
|
try: |
|
results = DDG(realquery) |
|
except: |
|
results = "No results found" |
|
|
|
|
|
|
|
ReturnObj["DDGSResults"] = results |
|
ReturnObj["DDGSExecutionTime"] = t() - C |
|
ReturnObj["Query"] = realquery |
|
ReturnObj["SearchQuery"] = query |
|
|
|
|
|
C = t() |
|
messages=[{ |
|
"parts": [ |
|
{ |
|
"text": f"```{str(results)}```\n *real time information you can use to reply" |
|
} |
|
], |
|
"role": "user" |
|
} |
|
], |
|
"role": "user" |
|
}, |
|
{ |
|
"parts": [ |
|
{ |
|
"text": "ok i know its websearch results i will tell you whenevery you ask me about it" |
|
} |
|
], |
|
"role": "model" |
|
}] + messages |
|
|
|
messages.append({ |
|
"parts": [ |
|
{ |
|
"text": query |
|
} |
|
], |
|
"role": "user" |
|
}) |
|
responce = Gemini(messages,model) |
|
|
|
|
|
|
|
ReturnObj["GeminiResponce"] = responce |
|
ReturnObj["GeminiExecutionTime"] = t() - C |
|
|
|
return ReturnObj |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|