Spaces:
Sleeping
Sleeping
File size: 4,965 Bytes
ccbdf6f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
import requests
from bs4 import BeautifulSoup
import os
import asyncio
import aiohttp
from dotenv import load_dotenv
load_dotenv()
brave_key = os.getenv("BRAVE_KEY")
# print(f"Brave Key: {brave_key}")
import time
import json
MAX_SCRAPED_LEN = 1024
def fetch_urls(response):
urls = []
results_dict = response.json()
# print(results_dict)
# Parse the HTML content of the search results page
soup = BeautifulSoup(response.text, 'html.parser')
attrs = [f"{val} \n\n" for val in soup.contents]
for res in results_dict['web']['results']:
urls.append(res['url'])
return urls
async def fetch_content(session, url):
try:
async with session.get(url) as response:
if response.status == 200:
content = await async_remove_tags(await response.read())
return content
except Exception as e:
print(f"Error fetching content from {url}: {e}")
return None
async def fetch_all(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch_content(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
def fetch_context(query):
url = "https://api.search.brave.com/res/v1/web/search"
api_key = brave_key
headers = {
"Accept": "application/json",
"Accept-Encoding": "gzip",
"X-Subscription-Token": api_key
}
total_content = []
params = {
"q": query,
"count": 4
}
response = requests.get(url, headers=headers, params=params)
# # Send an HTTP GET request to the search engine
if response.status_code == 200:
urls = fetch_urls(response)
try:
loop = asyncio.get_event_loop()
except:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
results = loop.run_until_complete(fetch_all(urls))
# Process fetched content and summarize
for content in results:
if content:
total_content.append(content[:min(len(content), MAX_SCRAPED_LEN)])
else:
print("Failed to fetch real-time data. Status code:", response.status_code)
return total_content
# Function to remove tags
async def async_remove_tags(html):
# parse html content
soup = BeautifulSoup(html, "html.parser")
for data in soup(['style', 'script']):
# Remove tags
data.decompose()
# return data by retrieving the tag content
return ' '.join(soup.stripped_strings)
def remove_tags(html):
# parse html content
soup = BeautifulSoup(html, "html.parser")
for data in soup(['style', 'script']):
# Remove tags
data.decompose()
# return data by retrieving the tag content
return ' '.join(soup.stripped_strings)
def fetch_images(query):
url = "https://api.search.brave.com/res/v1/images/search"
api_key = brave_key
headers = {
"Accept": "application/json",
"Accept-Encoding": "gzip",
"X-Subscription-Token": api_key
}
titles = [" + ".join(query.split(','))]
url_list = []
for q in titles:
params = {
"q": q,
"count": 10
}
print(f"Image Query: {q}")
tries = 3
for _ in range(tries):
response = requests.get(url, headers=headers, params=params)
try:
# # Send an HTTP GET request to the search engine
if response.status_code == 200:
results_dict = response.json()
# Parse the HTML content of the search results page
soup = BeautifulSoup(response.text, 'html.parser')
attrs = [f"{val} \n\n" for val in soup.contents]
urls = []
# print(soup.get_text())
for res in results_dict['results']:
urls.append(res['thumbnail']['src'])
for url in urls:
try:
response = requests.get(url)
if response.status_code == 200:
url_list.append(url)
except:
print(f"Invalid url : {url}")
break # Got a result, exit
else:
print("Failed to fetch real-time data. Status code:", response.status_code)
except Exception as e:
print(f"Cant retrieve: {e}")
return url_list
if __name__ == "__main__":
import time
query = "Suggest 3 books by Enid Blyton"
start_ts = time.time()
total_content = fetch_context(query)
for c in total_content:
print("="*100)
print(c)
print("="*100)
end_ts = time.time()
print(f"Time taken {end_ts - start_ts} seconds") |