File size: 6,108 Bytes
1104bf8
 
 
 
 
 
 
 
 
 
 
4ec6f2d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1104bf8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c191acc
1104bf8
 
 
 
 
 
 
 
 
 
 
c191acc
 
 
1104bf8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3fbb656
1104bf8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4ec6f2d
8aee497
4ec6f2d
 
 
3fbb656
 
4ec6f2d
 
 
 
 
 
 
 
 
 
 
 
1104bf8
 
 
 
4ec6f2d
1104bf8
4ec6f2d
1104bf8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# Some functions are adapted from https://github.com/yuxiaw/Factcheck-GPT

import concurrent.futures
import requests
import bs4
import re
from typing import List, Tuple
import itertools
import numpy as np
from time import time

from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

import spacy
from spacy.cli import download

try:
    nlp = spacy.load("en_core_web_lg")
except:
    # If loading fails, download the model
    download("en_core_web_lg")
    nlp = spacy.load("en_core_web_lg")


def extract_entities(text):
    text = nlp(text)
    ents = list({ent.text for ent in text.ents})
    return ents


def is_tag_visible(element: bs4.element) -> bool:
    """Determines if an HTML element is visible.

    Args:
        element: A BeautifulSoup element to check the visiblity of.
    returns:
        Whether the element is visible.
    """
    if element.parent.name in [
        "style",
        "script",
        "head",
        "title",
        "meta",
        "[document]",
    ] or isinstance(element, bs4.element.Comment):
        return False
    return True


def scrape_url(url: str, timeout=10) -> Tuple[str, str]:
    """Scrapes a URL for all text information.

    Args:
        url: URL of webpage to scrape.
        timeout: Timeout of the requests call.
    Returns:
        web_text: The visible text of the scraped URL.
        url: URL input.
    """
    # Scrape the URL
    try:
        response = requests.get(url, timeout=timeout)
        response.raise_for_status()
    except requests.exceptions.RequestException as _:
        return None, url

    # Extract out all text from the tags
    try:
        soup = bs4.BeautifulSoup(response.text, "html.parser")
        texts = soup.findAll(string=True)
        # Filter out invisible text from the page.
        visible_text = filter(is_tag_visible, texts)
    except Exception as _:
        return None, url

    # Returns all the text concatenated as a string.
    web_text = " ".join(t.strip() for t in visible_text).strip()
    # Clean up spacing.
    web_text = " ".join(web_text.split())
    return web_text, url


def search_google(query:str, num_web_pages:int=10, timeout:int=6, save_url:str='') -> List[str]:
    """Searches the query using Google. 
    Args:
        query: Search query.
        num_web_pages: the number of web pages to request.
        save_url: path to save returned urls, such as 'urls.txt'
    Returns:
        search_results: A list of the top URLs relevant to the query.
    """
    # set headers: Google returns different web-pages according to agent device
    # desktop user-agent
    USER_AGENT = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.14; rv:65.0) Gecko/20100101 Firefox/65.0"
    headers = {'User-Agent': USER_AGENT}
    
    # set language
    # set the Google interface language, use &hl=XX
    # set the preferred language of the search results, use &lr=lang_XX
    # set language as en, otherwise it will return many translation web pages to Arabic that can't be opened correctly.
    lang = "en" 

    # scrape google results
    all_urls = []
    for search_query in [query]:
        for page in range(0, num_web_pages, 10):
            # here page is google search's bottom page meaning, click 2 -> start=10
            # url = "https://www.google.com/search?q={}&start={}".format(query, page)
            # url = "https://www.google.com/search?q={}&lr=lang_{}&hl={}&start={}".format(search_query, lang, lang, page)
            url = f"https://www.google.com/search?q={search_query}&start={page}"
            r = requests.get(url, headers=headers, timeout=timeout)
            # collect all urls by regular expression
            # how to do if I just want to have the returned top-k pages?
            urls = re.findall('href="(https?://.*?)"', r.text)
            urls = [url for url in urls if 'google.com' not in url and '.pdf' not in url]  # can be inproved based on TF-IDF later

            all_urls.extend(urls)
    
    all_urls_final = []
    for url in all_urls:
        if url not in all_urls_final:
            all_urls_final.append(url)

    # save all url into a txt file
    if not save_url == "":
        with open(save_url, 'w') as file:
            for url in all_urls_final:
                file.write(url + '\n')
    return all_urls_final


def order_doc_score_url(used_chunk, support_prob_per_chunk, urls, allow_duplicated_urls=False):

    """
    Orders the documents, scores, and URLs based on the scores in descending order.

    allow_duplicated_urls: 
        - If False, the function will return the highest scored chunk per doc + scores + urls.
        - If True, the function will return all the chunks per doc + scores + urls.
    """

    # Flatten the used_chunk and support_prob_per_chunk lists
    flattened_docs = [doc for chunk in used_chunk for doc in chunk]
    flattened_scores = [score for chunk in support_prob_per_chunk for score in chunk]

    # Create a list of tuples containing the doc, score, and corresponding URL
    doc_score_url = list(zip(flattened_docs, flattened_scores, np.repeat(urls, [len(chunk) for chunk in used_chunk])))

    # Sort the list based on the scores in descending order
    ranked_doc_score_url = sorted(doc_score_url, key=lambda x: x[1], reverse=True)

    # Unzip the sorted list to get the ranked docs, scores, and URLs
    ranked_docs, scores, ranked_urls = zip(*ranked_doc_score_url)

    if allow_duplicated_urls:
        return ranked_docs, scores, ranked_urls
    
    else:

        filtered_docs = []
        filtered_scores = []
        filtered_urls = []
        seen_urls = set()

        for doc, score, url in zip(ranked_docs, scores, ranked_urls):
            if url not in seen_urls:
                filtered_docs.append(doc)
                filtered_scores.append(score)
                filtered_urls.append(url)
                seen_urls.add(url)

        # Update the variables with the filtered results
        ranked_docs = filtered_docs
        scores = filtered_scores
        ranked_urls = filtered_urls

        return ranked_docs, scores, ranked_urls