MiniCheck-Flan-T5-Large / web_retrieval.py
Liyan06
add web search
1104bf8
raw
history blame
5.3 kB
# Some functions are adapted from https://github.com/yuxiaw/Factcheck-GPT
import concurrent.futures
import requests
import bs4
import re
from typing import List, Tuple
import itertools
import numpy as np
from time import time
def is_tag_visible(element: bs4.element) -> bool:
"""Determines if an HTML element is visible.
Args:
element: A BeautifulSoup element to check the visiblity of.
returns:
Whether the element is visible.
"""
if element.parent.name in [
"style",
"script",
"head",
"title",
"meta",
"[document]",
] or isinstance(element, bs4.element.Comment):
return False
return True
def scrape_url(url: str, timeout: float = 3) -> Tuple[str, str]:
"""Scrapes a URL for all text information.
Args:
url: URL of webpage to scrape.
timeout: Timeout of the requests call.
Returns:
web_text: The visible text of the scraped URL.
url: URL input.
"""
# Scrape the URL
try:
response = requests.get(url, timeout=timeout)
response.raise_for_status()
except requests.exceptions.RequestException as _:
return None, url
# Extract out all text from the tags
try:
soup = bs4.BeautifulSoup(response.text, "html.parser")
texts = soup.findAll(string=True)
# Filter out invisible text from the page.
visible_text = filter(is_tag_visible, texts)
except Exception as _:
return None, url
# Returns all the text concatenated as a string.
web_text = " ".join(t.strip() for t in visible_text).strip()
# Clean up spacing.
web_text = " ".join(web_text.split())
return web_text, url
def search_google(query:str, num_web_pages:int=10, timeout:int=6, save_url:str='') -> List[str]:
"""Searches the query using Google.
Args:
query: Search query.
num_web_pages: the number of web pages to request.
save_url: path to save returned urls, such as 'urls.txt'
Returns:
search_results: A list of the top URLs relevant to the query.
"""
# set headers: Google returns different web-pages according to agent device
# desktop user-agent
USER_AGENT = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.14; rv:65.0) Gecko/20100101 Firefox/65.0"
headers = {'User-Agent': USER_AGENT}
# set language
# set the Google interface language, use &hl=XX
# set the preferred language of the search results, use &lr=lang_XX
# set language as en, otherwise it will return many translation web pages to Arabic that can't be opened correctly.
lang = "en"
# scrape google results
urls = []
for page in range(0, num_web_pages, 10):
# here page is google search's bottom page meaning, click 2 -> start=10
# url = "https://www.google.com/search?q={}&start={}".format(query, page)
url = "https://www.google.com/search?q={}&lr=lang_{}&hl={}&start={}".format(query, lang, lang, page)
r = requests.get(url, headers=headers, timeout=timeout)
# collect all urls by regular expression
# how to do if I just want to have the returned top-k pages?
urls += re.findall('href="(https?://.*?)"', r.text)
# set to remove repeated urls
urls = list(set(urls))
# save all url into a txt file
if not save_url == "":
with open(save_url, 'w') as file:
for url in urls:
file.write(url + '\n')
return urls
def order_doc_score_url(used_chunk, support_prob_per_chunk, urls, allow_duplicated_urls=False):
"""
Orders the documents, scores, and URLs based on the scores in descending order.
allow_duplicated_urls:
- If False, the function will return the highest scored chunk per doc + scores + urls.
- If True, the function will return all the chunks per doc + scores + urls.
"""
# Flatten the used_chunk and support_prob_per_chunk lists
flattened_docs = [doc for chunk in used_chunk for doc in chunk]
flattened_scores = [score for chunk in support_prob_per_chunk for score in chunk]
# Create a list of tuples containing the doc, score, and corresponding URL
doc_score_url = list(zip(flattened_docs, flattened_scores, np.repeat(urls, [len(chunk) for chunk in used_chunk])))
# Sort the list based on the scores in descending order
ranked_doc_score_url = sorted(doc_score_url, key=lambda x: x[1], reverse=True)
# Unzip the sorted list to get the ranked docs, scores, and URLs
ranked_docs, scores, ranked_urls = zip(*ranked_doc_score_url)
if allow_duplicated_urls:
return ranked_docs, scores, ranked_urls
else:
filtered_docs = []
filtered_scores = []
filtered_urls = []
seen_urls = set()
for doc, score, url in zip(ranked_docs, scores, ranked_urls):
if url not in seen_urls:
filtered_docs.append(doc)
filtered_scores.append(score)
filtered_urls.append(url)
seen_urls.add(url)
# Update the variables with the filtered results
ranked_docs = filtered_docs
scores = filtered_scores
ranked_urls = filtered_urls
return ranked_docs, scores, ranked_urls