Filtir / fetch_evidence.py
vladbogo's picture
Upload folder using huggingface_hub
7a8b33f verified
import requests
from bs4 import BeautifulSoup
import json
import json5
import argparse
from pathlib import Path
import multiprocessing as mp
from zsvision.zs_multiproc import starmap_with_kwargs
from pipeline_paths import PIPELINE_PATHS
from datetime import datetime
import urllib.robotparser
import urllib.parse
from utils import get_google_search_results
import time
from random import randint
from fake_useragent import UserAgent
from newspaper import Article, Config
def can_scrape(url, user_agent="*"):
rp = urllib.robotparser.RobotFileParser()
rp.set_url(f"{url.scheme}://{url.netloc}/robots.txt")
# be conservative - if we can't find robots.txt, don't scrapes
try:
rp.read()
ok_to_scrape = rp.can_fetch(user_agent, url.geturl())
except urllib.error.URLError:
ok_to_scrape = False
return ok_to_scrape
def fetch_search_results_to_gather_evidence(
args,
idx: int,
total: int,
search_results_dest_path: Path,
queryset: dict,
):
user_agent = UserAgent()
config = Config()
config.fetch_images = False
print(f"Query {idx}/{total}")
search_results_dest_path.parent.mkdir(exist_ok=True, parents=True)
# check if we already have search_results for this title
if search_results_dest_path.exists() and not args.refresh:
print(f"Found existing search results at {search_results_dest_path}, skipping")
return 0
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.63 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"DNT": "1",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
}
# we assume some sites won't permit scraping, so we'll skip these
num_results = args.num_search_results_to_keep + 5
results = {}
for item in queryset:
if item["search_query"] == "no suitable query":
item["search_results"] = []
continue
search_results = get_google_search_results(
query_str=item["search_query"], num_results=num_results
)
if search_results == [{"Result": "No good Google Search Result was found"}]:
item["search_results"] = []
continue
parsed_results = []
for search_result in search_results:
if not can_scrape(
urllib.parse.urlparse(search_result["link"]), user_agent="MyScraper"
):
print(
f"Skipping {search_result['link']} because it doesn't permit scraping"
)
continue
try:
config.browser_user_agent = user_agent.random
article = Article(search_result["link"], language="en", config=config)
article.download()
article.parse()
text = article.text
except Exception as e:
print(f"Error parsing article: {e}, trying with requests.get...")
try:
response = requests.get(
search_result["link"], timeout=15, headers=headers
)
html = response.text
soup = BeautifulSoup(html, features="html.parser")
text = soup.get_text()
except Exception as exception:
print(f"Error parsing article: {exception}")
raise exception
search_result["text"] = text
parsed_results.append(search_result)
if len(parsed_results) == args.num_search_results_to_keep:
break
item["search_results"] = parsed_results
# update the queryset with new information
date_str = datetime.now().strftime("%Y-%m-%d")
results = {"documents": queryset, "dates": {"search_results_fetched": date_str}}
print(
f"Writing web pages for search results for {len(queryset)} queries to {search_results_dest_path}"
)
with open(search_results_dest_path, "w") as f:
f.write(json.dumps(results, indent=4, sort_keys=True))
def main():
args = parse_args()
search_query_paths = list(
PIPELINE_PATHS["search_queries_for_evidence"].glob("**/*.json")
)
if args.limit:
print(f"Limited to {args.limit} search querysets")
search_query_paths = search_query_paths[: args.limit]
kwarg_list = []
for idx, search_query_path in enumerate(search_query_paths):
rel_path = search_query_path.relative_to(
PIPELINE_PATHS["search_queries_for_evidence"]
)
dest_path = PIPELINE_PATHS["google_search_results_evidence"] / rel_path
if dest_path.exists() and not args.refresh:
print(f"For {search_query_path}, found results at {dest_path}, skipping")
continue
with open(search_query_path, "r") as f:
queryset = json.load(f)
kwarg_list.append(
{
"idx": idx,
"total": len(search_query_paths),
"search_results_dest_path": dest_path,
"args": args,
"queryset": queryset,
}
)
# provide the total number of queries to each process
for kwargs in kwarg_list:
kwargs["total"] = len(kwarg_list)
# single process
if args.processes == 1:
cost = 0
for kwargs in kwarg_list:
fetch_search_results_to_gather_evidence(**kwargs)
else: # multiprocess
func = fetch_search_results_to_gather_evidence
with mp.Pool(processes=args.processes) as pool:
starmap_with_kwargs(pool=pool, func=func, kwargs_iter=kwarg_list)
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument(
"--model", default="gpt-3.5-turbo", choices=["gpt-4", "gpt-3.5-turbo"]
)
parser.add_argument("--limit", default=0, type=int)
parser.add_argument("--refresh", action="store_true")
parser.add_argument("--num_search_results_to_keep", type=int, default=3)
parser.add_argument("--processes", type=int, default=1)
return parser.parse_args()
if __name__ == "__main__":
main()