Spaces:
No application file
No application file
""" | |
extract documents and store them in a vector db as a collection | |
""" | |
from .web_scrape import scrape_main | |
from langchain_openai import AzureOpenAIEmbeddings | |
from langchain_milvus import Milvus | |
import os | |
from dotenv import load_dotenv | |
import logging | |
from langchain_text_splitters import RecursiveCharacterTextSplitter | |
from langchain_experimental.text_splitter import SemanticChunker | |
from langchain_core.documents import Document | |
from concurrent.futures import ProcessPoolExecutor | |
import asyncio | |
import time | |
from itertools import chain | |
from rich.pretty import pprint | |
load_dotenv() | |
logging.basicConfig( | |
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" | |
) | |
logger = logging.getLogger(__name__) | |
embedding_model = AzureOpenAIEmbeddings( | |
azure_deployment="text-embedding-ada-002", | |
azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"), | |
api_key=os.getenv("AZURE_OPENAI_API_KEY"), | |
api_version="2023-05-15", | |
) | |
def get_milvus_vector_store(): | |
return Milvus( | |
embedding_function=embedding_model, | |
collection_name="test", | |
connection_args={"uri": os.getenv("MILVUS_URI")}, | |
auto_id=True, | |
drop_old=True, | |
index_params={ | |
"index_type": "HNSW", | |
"metric_type": "COSINE", | |
"params": {"M": 8, "efConstruction": 64}, | |
}, | |
) | |
def get_vs_as_retriever(): | |
return get_milvus_vector_store().as_retriever( | |
search_type="similarity", search_kwargs={"k": 5} | |
) | |
URLS_TO_SCRAPE = [ | |
"https://www.artisan.co", | |
"https://www.artisan.co/about", | |
"https://www.artisan.co/sales-ai", | |
"https://www.artisan.co/ai-sales-agent", | |
"https://www.artisan.co/products/linkedin-outreach", | |
"https://www.artisan.co/products/email-warmup", | |
"https://www.artisan.co/products/sales-automation", | |
"https://www.artisan.co/products/email-personalization", | |
"https://www.artisan.co/features/email-deliverability", | |
"https://help.artisan.co/articles/7415399613-can-i-schedule-when-emails-go-out", | |
"https://help.artisan.co/articles/5365244006-what-is-email-warmup", | |
"https://help.artisan.co/articles/8442274387-ava-is-sending-strange-messages-from-my-email", | |
"https://help.artisan.co/articles/1195138264-is-there-a-limit-to-the-amount-of-leads-i-can-have-in-my-csv-file", | |
"https://help.artisan.co/articles/5617649387-help-i-can-t-turn-on-my-campaign", | |
"https://help.artisan.co/articles/1048710797-how-does-website-visitor-identification-work", | |
"https://help.artisan.co/articles/3886727025-generate-sample-email", | |
"https://help.artisan.co/articles/6218358204-running-ava-on-copilot-vs-autopilot", | |
"https://help.artisan.co/articles/9265896700-adding-delegates-and-team-members", | |
"https://help.artisan.co/articles/2734968853-how-to-create-a-campaign", | |
"https://help.artisan.co/articles/7633990298-how-to-integrate-artisan-with-your-crm", | |
"https://help.artisan.co/articles/6092562650-how-do-i-upload-a-csv-file-of-my-own-leads", | |
"https://help.artisan.co/articles/4356675492-how-do-i-add-variables-to-my-email", | |
"https://help.artisan.co/articles/3551943296-how-do-i-request-a-script-tag-for-my-watchtower-campaign", | |
"https://help.artisan.co/articles/9602711709-how-do-i-integrate-ava-with-slack", | |
"https://www.artisan.co/pricing", | |
] | |
def chunk_parent_document(document: Document) -> list[Document]: | |
semantic_text_splitter = SemanticChunker( | |
embeddings=embedding_model, min_chunk_size=100 | |
) | |
text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=1000, | |
chunk_overlap=250, | |
separators=[ | |
"\n#{1,6} ", | |
"\n\\*\\*\\*+\n", | |
"\n---+\n", | |
"\n___+\n", | |
"\n\n", | |
"\n", | |
".", | |
"?", | |
"!", | |
], | |
) | |
chunked_documents = semantic_text_splitter.split_documents([document]) | |
chunked_documents = [ | |
chunked_doc_item | |
for chunked_doc_item in chunked_documents | |
if len(chunked_doc_item.page_content) > 0 or chunked_doc_item.page_content | |
] | |
final_chunked_documents = [] | |
for idx, chunked_doc in enumerate(chunked_documents): | |
pprint(chunked_doc) | |
if len(chunked_doc.page_content) > 5000: | |
sub_chunked_documents = text_splitter.split_documents([chunked_doc]) | |
final_chunked_documents.extend(sub_chunked_documents) | |
else: | |
final_chunked_documents.append(chunked_doc) | |
return final_chunked_documents | |
async def ingest_urls(chunk_executor: ProcessPoolExecutor, milvus_vector_store: Milvus): | |
lc_documents = await scrape_main(URLS_TO_SCRAPE) | |
start_time = time.time() | |
chunked_documents = list(chunk_executor.map(chunk_parent_document, lc_documents)) | |
chunked_documents = list(chain.from_iterable(chunked_documents)) | |
end_time = time.time() | |
logger.info(f"Time taken to chunk documents: {end_time - start_time} seconds") | |
start_time = time.time() | |
if chunked_documents: | |
_ = milvus_vector_store.add_documents(chunked_documents, batch_size=50) | |
logger.info( | |
f"Time taken to ingest documents into Milvus: {time.time() - start_time} seconds" | |
) | |
if __name__ == "__main__": | |
chunk_executor = ProcessPoolExecutor() | |
milvus_vector_store = get_milvus_vector_store() | |
asyncio.run(ingest_urls(chunk_executor, milvus_vector_store)) | |