Spaces:
Sleeping
Sleeping
import requests | |
from bs4 import BeautifulSoup | |
import os | |
import asyncio | |
import aiohttp | |
from dotenv import load_dotenv | |
load_dotenv() | |
brave_key = os.getenv("BRAVE_KEY") | |
# print(f"Brave Key: {brave_key}") | |
import time | |
import json | |
MAX_SCRAPED_LEN = 1024 | |
def fetch_urls(response): | |
urls = [] | |
results_dict = response.json() | |
# print(results_dict) | |
# Parse the HTML content of the search results page | |
soup = BeautifulSoup(response.text, 'html.parser') | |
attrs = [f"{val} \n\n" for val in soup.contents] | |
for res in results_dict['web']['results']: | |
urls.append(res['url']) | |
return urls | |
async def fetch_content(session, url): | |
try: | |
async with session.get(url) as response: | |
if response.status == 200: | |
content = await async_remove_tags(await response.read()) | |
return content | |
except Exception as e: | |
print(f"Error fetching content from {url}: {e}") | |
return None | |
async def fetch_all(urls): | |
async with aiohttp.ClientSession() as session: | |
tasks = [fetch_content(session, url) for url in urls] | |
results = await asyncio.gather(*tasks, return_exceptions=True) | |
return results | |
def fetch_context(query): | |
url = "https://api.search.brave.com/res/v1/web/search" | |
api_key = brave_key | |
headers = { | |
"Accept": "application/json", | |
"Accept-Encoding": "gzip", | |
"X-Subscription-Token": api_key | |
} | |
total_content = [] | |
params = { | |
"q": query, | |
"count": 4 | |
} | |
response = requests.get(url, headers=headers, params=params) | |
# # Send an HTTP GET request to the search engine | |
if response.status_code == 200: | |
urls = fetch_urls(response) | |
try: | |
loop = asyncio.get_event_loop() | |
except: | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
results = loop.run_until_complete(fetch_all(urls)) | |
# Process fetched content and summarize | |
for content in results: | |
if content: | |
total_content.append(content[:min(len(content), MAX_SCRAPED_LEN)]) | |
else: | |
print("Failed to fetch real-time data. Status code:", response.status_code) | |
return total_content | |
# Function to remove tags | |
async def async_remove_tags(html): | |
# parse html content | |
soup = BeautifulSoup(html, "html.parser") | |
for data in soup(['style', 'script']): | |
# Remove tags | |
data.decompose() | |
# return data by retrieving the tag content | |
return ' '.join(soup.stripped_strings) | |
def remove_tags(html): | |
# parse html content | |
soup = BeautifulSoup(html, "html.parser") | |
for data in soup(['style', 'script']): | |
# Remove tags | |
data.decompose() | |
# return data by retrieving the tag content | |
return ' '.join(soup.stripped_strings) | |
def fetch_images(query): | |
url = "https://api.search.brave.com/res/v1/images/search" | |
api_key = brave_key | |
headers = { | |
"Accept": "application/json", | |
"Accept-Encoding": "gzip", | |
"X-Subscription-Token": api_key | |
} | |
titles = [" + ".join(query.split(','))] | |
url_list = [] | |
for q in titles: | |
params = { | |
"q": q, | |
"count": 10 | |
} | |
print(f"Image Query: {q}") | |
tries = 3 | |
for _ in range(tries): | |
response = requests.get(url, headers=headers, params=params) | |
try: | |
# # Send an HTTP GET request to the search engine | |
if response.status_code == 200: | |
results_dict = response.json() | |
# Parse the HTML content of the search results page | |
soup = BeautifulSoup(response.text, 'html.parser') | |
attrs = [f"{val} \n\n" for val in soup.contents] | |
urls = [] | |
# print(soup.get_text()) | |
for res in results_dict['results']: | |
urls.append(res['thumbnail']['src']) | |
for url in urls: | |
try: | |
response = requests.get(url) | |
if response.status_code == 200: | |
url_list.append(url) | |
except: | |
print(f"Invalid url : {url}") | |
break # Got a result, exit | |
else: | |
print("Failed to fetch real-time data. Status code:", response.status_code) | |
except Exception as e: | |
print(f"Cant retrieve: {e}") | |
return url_list | |
if __name__ == "__main__": | |
import time | |
query = "Suggest 3 books by Enid Blyton" | |
start_ts = time.time() | |
total_content = fetch_context(query) | |
for c in total_content: | |
print("="*100) | |
print(c) | |
print("="*100) | |
end_ts = time.time() | |
print(f"Time taken {end_ts - start_ts} seconds") |