import os, os.path # to search web for results from urllib.parse import urlparse, quote import requests from duckduckgo_search import DDGS # to present web search results in a table import pandas as pd # to get get document chunks, embed and build vector database # 2 vector databases are built, one for keyword search (BM25), one for semantic (Chroma) from langchain.document_loaders import WebBaseLoader, PyPDFLoader from langchain.embeddings import HuggingFaceEmbeddings from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.vectorstores import Chroma from langchain.retrievers import BM25Retriever # for saving bm25 retriever # pickle is not for production, just for prototype import pickle # this is for returning top n search results using DuckDuckGo top_n_results = 10 # chroma vector store will be set up for all the different combinations of chunk sizes and overlaps below # the process of building up the vector store will take very long chunk_sizes = [500, 600, 700, 800, 900, 1000, 1250, 1500, 1750, 2000, 2250, 2500, 2750, 3000] chunk_overlaps = [50, 100, 150, 200] ################################ Search the Web ################################ ## Use DuckDuckGo to Search for Top N Results for Each Country's ESG Policies # Use DuckDuckGo search to loop through each country and save the top N results by searching for # "{country} sustainability esg newest updated public policy document government" # After some experimentation the search phrase above seems to give the best results # for the most recent ESG policies as it contains all the necessary keywords, but it can be changed # Store the Relevant Links in a Dictionary # Links are Mostly HTML or PDF def duckduckgo_scrape(country, search_term, n_search_results): all_links = [] with DDGS() as ddgs: results = ddgs.text(search_term, max_results=n_search_results) for result in results: result['country'] = country all_links.append(result) # Save scraped links into csv df_links = pd.DataFrame(all_links).rename(columns = { 'title': 'Title', 'href': 'url', 'body': 'Summarized Body', 'country': 'Country' }) # save scraped links into csv df_links.to_csv("duck_duck_go_scraped_links.csv") return all_links, df_links ################################ Load the Documents ################################ ## For every search result returned by DuckDuckGo for each country above, scrape the web using the url and convert to documents using Langchain loaders: # PDF Documents: If link from search result points to PDF document, # save the PDF permanently in local storage in the folder called 'pdfs', then use PyPDFLoader to convert it to raw documents. # HTML Documents: If link is just a HTML page, use Langchain WebBaseLoader to convert it to raw documents. # Metadata: Add country in the metadata, this is an important step as it is needed by RetrieveQA for filtering in the future. # For PDFs, langchain will use its local path as the source, need to change it back to the online path. # Save all the documents into a list called "all_documents". # for adding country metadata def add_country_metadata(docs, country): for doc in docs: doc.metadata['country'] = country return docs # for adding source url metadata def add_url_metadata(docs, url): for doc in docs: doc.metadata['source'] = url return docs # If link from search result points to PDF document, # save the PDF permanently in local storage in the folder called 'pdfs', # then use PyPDFLoader to convert it to raw documents. def pdf_loader(url, country): try: try: response = requests.get(url) except: # sometimes there is ssl error, and the page is actually http:// url = url.replace("https://", "http://") response = requests.get(url) # create pdf directory to save pdfs locally pdf_dir = f"pdfs/{country}" if not os.path.exists(pdf_dir): os.makedirs(pdf_dir) pdf_filename = f"{pdf_dir}/{url.split('/')[-1]}" with open(pdf_filename, 'wb') as f: # save the pdf locally first f.write(response.content) loader = PyPDFLoader(pdf_filename) # then use langchain loader to load it raw_pdf_documents = loader.load() raw_pdf_documents = add_country_metadata(raw_pdf_documents, country) # pdf source data will be populated by Langchain as the local path # we do not want this, we change it back to the original path on the web instead raw_pdf_documents = add_url_metadata(raw_pdf_documents, url) return raw_pdf_documents except Exception as e: print(f"Failed to load for {url}") # Same as above but for pdf in local directory def pdf_loader_local(pdf_filename, country): try: loader = PyPDFLoader(pdf_filename) # then use langchain loader to load it raw_pdf_documents = loader.load() raw_pdf_documents = add_country_metadata(raw_pdf_documents, country) return raw_pdf_documents except Exception as e: print(f"Failed to load for {pdf_filename} {e}") return False # If link is just a HTML page, use Langchain WebBaseLoader to convert it to raw documents. def html_loader(url, country): try: loader = WebBaseLoader(url) raw_html_documents = loader.load() raw_html_documents = add_country_metadata(raw_html_documents, country) return raw_html_documents except: print(f"Failed to load for {url}") def process_links_load_documents(all_links): all_documents = [] # store all the documents for link in all_links: country = link['country'] title = link['title'] url = link['href'] url = url.replace(" ", "%20") # replace spaces to encoded version e.g. %20 # If url points to PDF documents if url.endswith('.pdf') or (('.pdf' in url) & ('blob' in url)): print(f"{country}: Loading PDF from {url}") docs = pdf_loader(url, country) if docs is not None: # if error, docs will be None if isinstance(docs, list): all_documents.extend(docs) else: all_documents.append(docs) #print(docs) # If url is just a HTML page else: print(f"{country}: Loading HTML from {url}") docs = html_loader(url, country) if docs is not None: # if error, docs will be None if isinstance(docs, list): all_documents.extend(docs) else: all_documents.append(docs) #print(docs) # documents return a lot of \n, perform some cleaning for document in all_documents: document.page_content = document.page_content.replace('\n', '') return all_documents ################################ Set Up Chroma Vector Store ################################ # This is for semantic search. # In the configuration cell at the top, we define all the chunk sizes and overlaps that we are interested in. # The Chroma vector stores will be set up for each of the configuration, persisted in a different directory. # These vector stores can be accessed in the main app later. # Time taken to get the embeddings for every document chunk can be very long. # Note: If we are using a lot more data than can be stored in the RAM or when in production, # better to initialize a separate vector store in a server (Postgres or online solutions like Pinecone) before pushing the document chunks to it bit by bit. def setup_chromadb_vectorstore(hf_embeddings, all_documents, chunk_size, chunk_overlap, country): chromadb_dir = "chromadb" if not os.path.exists(chromadb_dir): os.makedirs(chromadb_dir) print(f"Processing Chunk Size: {chunk_size}, Chunk Overlap: {chunk_overlap}") text_splitter = RecursiveCharacterTextSplitter( chunk_size=chunk_size, chunk_overlap=chunk_overlap ) split_documents = text_splitter.split_documents(all_documents) persist_directory = f"{chromadb_dir}/new_{country}_chunk_{chunk_size}_overlap_{chunk_overlap}_" # Build the vector database using Chroma and persist it in a local directory chroma_db = Chroma.from_documents(split_documents, hf_embeddings, persist_directory=persist_directory) chroma_db.persist() return True # to let user know this process is done ################################ Set Up BM25 Retriever ################################ # This is for keyword search. # BM25 is a keyword-based algorithm that performs well on queries containing keywords without capturing the semantic meaning of the query terms, # hence there is no need to embed the text with HuggingFaceEmbeddings and it is relatively faster to set up. # We will use it with combination of the chroma_db vector store retriver in our application later, with ensemble retriever to re-rank the results. # The retriever is just a small file so we just store it using pickle, but for production this is still not recommended. def setup_bm25_retriever(all_documents, chunk_size, chunk_overlap, country): bm25_dir = "bm25" if not os.path.exists(bm25_dir): os.makedirs(bm25_dir) print(f"Processing Chunk Size: {chunk_size}, Chunk Overlap: {chunk_overlap}, Country: {country}") text_splitter = RecursiveCharacterTextSplitter( chunk_size=chunk_size, chunk_overlap=chunk_overlap ) split_documents = text_splitter.split_documents(all_documents) split_documents = [doc for doc in split_documents if doc.metadata['country']==country] bm25_retriever = BM25Retriever.from_documents(split_documents) filename = f"{bm25_dir}/new_{country}_chunk_{chunk_size}_overlap_{chunk_overlap}_.pickle" with open(filename, 'wb') as handle: pickle.dump(bm25_retriever, handle) return True # to let user know this process is done