|
|
|
|
|
import concurrent.futures |
|
import requests |
|
import bs4 |
|
import re |
|
from typing import List, Tuple |
|
import itertools |
|
import numpy as np |
|
from time import time |
|
|
|
from requests.adapters import HTTPAdapter |
|
from urllib3.util.retry import Retry |
|
|
|
import spacy |
|
from spacy.cli import download |
|
|
|
try: |
|
nlp = spacy.load("en_core_web_lg") |
|
except: |
|
|
|
download("en_core_web_lg") |
|
nlp = spacy.load("en_core_web_lg") |
|
|
|
|
|
def extract_entities(text): |
|
text = nlp(text) |
|
ents = list({ent.text for ent in text.ents}) |
|
return ents |
|
|
|
|
|
def is_tag_visible(element: bs4.element) -> bool: |
|
"""Determines if an HTML element is visible. |
|
|
|
Args: |
|
element: A BeautifulSoup element to check the visiblity of. |
|
returns: |
|
Whether the element is visible. |
|
""" |
|
if element.parent.name in [ |
|
"style", |
|
"script", |
|
"head", |
|
"title", |
|
"meta", |
|
"[document]", |
|
] or isinstance(element, bs4.element.Comment): |
|
return False |
|
return True |
|
|
|
|
|
def scrape_url(url: str, timeout=10) -> Tuple[str, str]: |
|
"""Scrapes a URL for all text information. |
|
|
|
Args: |
|
url: URL of webpage to scrape. |
|
timeout: Timeout of the requests call. |
|
Returns: |
|
web_text: The visible text of the scraped URL. |
|
url: URL input. |
|
""" |
|
|
|
try: |
|
response = requests.get(url, timeout=timeout) |
|
response.raise_for_status() |
|
except requests.exceptions.RequestException as _: |
|
return None, url |
|
|
|
|
|
try: |
|
soup = bs4.BeautifulSoup(response.text, "html.parser") |
|
texts = soup.findAll(string=True) |
|
|
|
visible_text = filter(is_tag_visible, texts) |
|
except Exception as _: |
|
return None, url |
|
|
|
|
|
web_text = " ".join(t.strip() for t in visible_text).strip() |
|
|
|
web_text = " ".join(web_text.split()) |
|
return web_text, url |
|
|
|
|
|
def search_google(query:str, num_web_pages:int=10, timeout:int=6, save_url:str='') -> List[str]: |
|
"""Searches the query using Google. |
|
Args: |
|
query: Search query. |
|
num_web_pages: the number of web pages to request. |
|
save_url: path to save returned urls, such as 'urls.txt' |
|
Returns: |
|
search_results: A list of the top URLs relevant to the query. |
|
""" |
|
|
|
|
|
USER_AGENT = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.14; rv:65.0) Gecko/20100101 Firefox/65.0" |
|
headers = {'User-Agent': USER_AGENT} |
|
|
|
|
|
|
|
|
|
|
|
lang = "en" |
|
|
|
|
|
all_urls = [] |
|
for search_query in [query]: |
|
for page in range(0, num_web_pages, 10): |
|
|
|
|
|
|
|
url = f"https://www.google.com/search?q={search_query}&start={page}" |
|
r = requests.get(url, headers=headers, timeout=timeout) |
|
|
|
|
|
urls = re.findall('href="(https?://.*?)"', r.text) |
|
urls = [url for url in urls if 'google.com' not in url and '.pdf' not in url] |
|
|
|
all_urls.extend(urls) |
|
|
|
all_urls_final = [] |
|
for url in all_urls: |
|
if url not in all_urls_final: |
|
all_urls_final.append(url) |
|
|
|
|
|
if not save_url == "": |
|
with open(save_url, 'w') as file: |
|
for url in all_urls_final: |
|
file.write(url + '\n') |
|
return all_urls_final |
|
|
|
|
|
def order_doc_score_url(used_chunk, support_prob_per_chunk, urls, allow_duplicated_urls=False): |
|
|
|
""" |
|
Orders the documents, scores, and URLs based on the scores in descending order. |
|
|
|
allow_duplicated_urls: |
|
- If False, the function will return the highest scored chunk per doc + scores + urls. |
|
- If True, the function will return all the chunks per doc + scores + urls. |
|
""" |
|
|
|
|
|
flattened_docs = [doc for chunk in used_chunk for doc in chunk] |
|
flattened_scores = [score for chunk in support_prob_per_chunk for score in chunk] |
|
|
|
|
|
doc_score_url = list(zip(flattened_docs, flattened_scores, np.repeat(urls, [len(chunk) for chunk in used_chunk]))) |
|
|
|
|
|
ranked_doc_score_url = sorted(doc_score_url, key=lambda x: x[1], reverse=True) |
|
|
|
|
|
ranked_docs, scores, ranked_urls = zip(*ranked_doc_score_url) |
|
|
|
if allow_duplicated_urls: |
|
return ranked_docs, scores, ranked_urls |
|
|
|
else: |
|
|
|
filtered_docs = [] |
|
filtered_scores = [] |
|
filtered_urls = [] |
|
seen_urls = set() |
|
|
|
for doc, score, url in zip(ranked_docs, scores, ranked_urls): |
|
if url not in seen_urls: |
|
filtered_docs.append(doc) |
|
filtered_scores.append(score) |
|
filtered_urls.append(url) |
|
seen_urls.add(url) |
|
|
|
|
|
ranked_docs = filtered_docs |
|
scores = filtered_scores |
|
ranked_urls = filtered_urls |
|
|
|
return ranked_docs, scores, ranked_urls |