Spaces:
Sleeping
Sleeping
import os, os.path | |
# to search web for results | |
from urllib.parse import urlparse, quote | |
import requests | |
from duckduckgo_search import DDGS | |
# to present web search results in a table | |
import pandas as pd | |
# to get get document chunks, embed and build vector database | |
# 2 vector databases are built, one for keyword search (BM25), one for semantic (Chroma) | |
from langchain.document_loaders import WebBaseLoader, PyPDFLoader | |
from langchain.embeddings import HuggingFaceEmbeddings | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain.vectorstores import Chroma | |
from langchain.retrievers import BM25Retriever | |
# for saving bm25 retriever | |
# pickle is not for production, just for prototype | |
import pickle | |
# this is for returning top n search results using DuckDuckGo | |
top_n_results = 10 | |
# chroma vector store will be set up for all the different combinations of chunk sizes and overlaps below | |
# the process of building up the vector store will take very long | |
chunk_sizes = [500, 600, 700, 800, 900, 1000, 1250, 1500, 1750, 2000, 2250, 2500, 2750, 3000] | |
chunk_overlaps = [50, 100, 150, 200] | |
################################ Search the Web ################################ | |
## Use DuckDuckGo to Search for Top N Results for Each Country's ESG Policies | |
# Use DuckDuckGo search to loop through each country and save the top N results by searching for | |
# "{country} sustainability esg newest updated public policy document government" | |
# After some experimentation the search phrase above seems to give the best results | |
# for the most recent ESG policies as it contains all the necessary keywords, but it can be changed | |
# Store the Relevant Links in a Dictionary | |
# Links are Mostly HTML or PDF | |
def duckduckgo_scrape(country, search_term, n_search_results): | |
all_links = [] | |
with DDGS() as ddgs: | |
results = ddgs.text(search_term, max_results=n_search_results) | |
for result in results: | |
result['country'] = country | |
all_links.append(result) | |
# Save scraped links into csv | |
df_links = pd.DataFrame(all_links).rename(columns = { | |
'title': 'Title', | |
'href': 'url', | |
'body': 'Summarized Body', | |
'country': 'Country' | |
}) | |
# save scraped links into csv | |
df_links.to_csv("duck_duck_go_scraped_links.csv") | |
return all_links, df_links | |
################################ Load the Documents ################################ | |
## For every search result returned by DuckDuckGo for each country above, scrape the web using the url and convert to documents using Langchain loaders: | |
# PDF Documents: If link from search result points to PDF document, | |
# save the PDF permanently in local storage in the folder called 'pdfs', then use PyPDFLoader to convert it to raw documents. | |
# HTML Documents: If link is just a HTML page, use Langchain WebBaseLoader to convert it to raw documents. | |
# Metadata: Add country in the metadata, this is an important step as it is needed by RetrieveQA for filtering in the future. | |
# For PDFs, langchain will use its local path as the source, need to change it back to the online path. | |
# Save all the documents into a list called "all_documents". | |
# for adding country metadata | |
def add_country_metadata(docs, country): | |
for doc in docs: | |
doc.metadata['country'] = country | |
return docs | |
# for adding source url metadata | |
def add_url_metadata(docs, url): | |
for doc in docs: | |
doc.metadata['source'] = url | |
return docs | |
# If link from search result points to PDF document, | |
# save the PDF permanently in local storage in the folder called 'pdfs', | |
# then use PyPDFLoader to convert it to raw documents. | |
def pdf_loader(url, country): | |
try: | |
try: | |
response = requests.get(url) | |
except: | |
# sometimes there is ssl error, and the page is actually http:// | |
url = url.replace("https://", "http://") | |
response = requests.get(url) | |
# create pdf directory to save pdfs locally | |
pdf_dir = f"pdfs/{country}" | |
if not os.path.exists(pdf_dir): | |
os.makedirs(pdf_dir) | |
pdf_filename = f"{pdf_dir}/{url.split('/')[-1]}" | |
with open(pdf_filename, 'wb') as f: # save the pdf locally first | |
f.write(response.content) | |
loader = PyPDFLoader(pdf_filename) # then use langchain loader to load it | |
raw_pdf_documents = loader.load() | |
raw_pdf_documents = add_country_metadata(raw_pdf_documents, country) | |
# pdf source data will be populated by Langchain as the local path | |
# we do not want this, we change it back to the original path on the web instead | |
raw_pdf_documents = add_url_metadata(raw_pdf_documents, url) | |
return raw_pdf_documents | |
except Exception as e: | |
print(f"Failed to load for {url}") | |
# Same as above but for pdf in local directory | |
def pdf_loader_local(pdf_filename, country): | |
try: | |
loader = PyPDFLoader(pdf_filename) # then use langchain loader to load it | |
raw_pdf_documents = loader.load() | |
raw_pdf_documents = add_country_metadata(raw_pdf_documents, country) | |
return raw_pdf_documents | |
except Exception as e: | |
print(f"Failed to load for {pdf_filename} {e}") | |
return False | |
# If link is just a HTML page, use Langchain WebBaseLoader to convert it to raw documents. | |
def html_loader(url, country): | |
try: | |
loader = WebBaseLoader(url) | |
raw_html_documents = loader.load() | |
raw_html_documents = add_country_metadata(raw_html_documents, country) | |
return raw_html_documents | |
except: | |
print(f"Failed to load for {url}") | |
def process_links_load_documents(all_links): | |
all_documents = [] # store all the documents | |
for link in all_links: | |
country = link['country'] | |
title = link['title'] | |
url = link['href'] | |
url = url.replace(" ", "%20") # replace spaces to encoded version e.g. %20 | |
# If url points to PDF documents | |
if url.endswith('.pdf') or (('.pdf' in url) & ('blob' in url)): | |
print(f"{country}: Loading PDF from {url}") | |
docs = pdf_loader(url, country) | |
if docs is not None: # if error, docs will be None | |
if isinstance(docs, list): | |
all_documents.extend(docs) | |
else: | |
all_documents.append(docs) | |
#print(docs) | |
# If url is just a HTML page | |
else: | |
print(f"{country}: Loading HTML from {url}") | |
docs = html_loader(url, country) | |
if docs is not None: # if error, docs will be None | |
if isinstance(docs, list): | |
all_documents.extend(docs) | |
else: | |
all_documents.append(docs) | |
#print(docs) | |
# documents return a lot of \n, perform some cleaning | |
for document in all_documents: | |
document.page_content = document.page_content.replace('\n', '') | |
return all_documents | |
################################ Set Up Chroma Vector Store ################################ | |
# This is for semantic search. | |
# In the configuration cell at the top, we define all the chunk sizes and overlaps that we are interested in. | |
# The Chroma vector stores will be set up for each of the configuration, persisted in a different directory. | |
# These vector stores can be accessed in the main app later. | |
# Time taken to get the embeddings for every document chunk can be very long. | |
# Note: If we are using a lot more data than can be stored in the RAM or when in production, | |
# better to initialize a separate vector store in a server (Postgres or online solutions like Pinecone) before pushing the document chunks to it bit by bit. | |
def setup_chromadb_vectorstore(hf_embeddings, all_documents, chunk_size, chunk_overlap, country): | |
chromadb_dir = "chromadb" | |
if not os.path.exists(chromadb_dir): | |
os.makedirs(chromadb_dir) | |
print(f"Processing Chunk Size: {chunk_size}, Chunk Overlap: {chunk_overlap}") | |
text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=chunk_size, chunk_overlap=chunk_overlap | |
) | |
split_documents = text_splitter.split_documents(all_documents) | |
persist_directory = f"{chromadb_dir}/new_{country}_chunk_{chunk_size}_overlap_{chunk_overlap}_" | |
# Build the vector database using Chroma and persist it in a local directory | |
chroma_db = Chroma.from_documents(split_documents, | |
hf_embeddings, | |
persist_directory=persist_directory) | |
chroma_db.persist() | |
return True # to let user know this process is done | |
################################ Set Up BM25 Retriever ################################ | |
# This is for keyword search. | |
# BM25 is a keyword-based algorithm that performs well on queries containing keywords without capturing the semantic meaning of the query terms, | |
# hence there is no need to embed the text with HuggingFaceEmbeddings and it is relatively faster to set up. | |
# We will use it with combination of the chroma_db vector store retriver in our application later, with ensemble retriever to re-rank the results. | |
# The retriever is just a small file so we just store it using pickle, but for production this is still not recommended. | |
def setup_bm25_retriever(all_documents, chunk_size, chunk_overlap, country): | |
bm25_dir = "bm25" | |
if not os.path.exists(bm25_dir): | |
os.makedirs(bm25_dir) | |
print(f"Processing Chunk Size: {chunk_size}, Chunk Overlap: {chunk_overlap}, Country: {country}") | |
text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=chunk_size, chunk_overlap=chunk_overlap | |
) | |
split_documents = text_splitter.split_documents(all_documents) | |
split_documents = [doc for doc in split_documents if doc.metadata['country']==country] | |
bm25_retriever = BM25Retriever.from_documents(split_documents) | |
filename = f"{bm25_dir}/new_{country}_chunk_{chunk_size}_overlap_{chunk_overlap}_.pickle" | |
with open(filename, 'wb') as handle: | |
pickle.dump(bm25_retriever, handle) | |
return True # to let user know this process is done | |