Spaces:
No application file
No application file
File size: 5,395 Bytes
7f8188c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
"""
extract documents and store them in a vector db as a collection
"""
from .web_scrape import scrape_main
from langchain_openai import AzureOpenAIEmbeddings
from langchain_milvus import Milvus
import os
from dotenv import load_dotenv
import logging
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_experimental.text_splitter import SemanticChunker
from langchain_core.documents import Document
from concurrent.futures import ProcessPoolExecutor
import asyncio
import time
from itertools import chain
from rich.pretty import pprint
load_dotenv()
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
embedding_model = AzureOpenAIEmbeddings(
azure_deployment="text-embedding-ada-002",
azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
api_key=os.getenv("AZURE_OPENAI_API_KEY"),
api_version="2023-05-15",
)
def get_milvus_vector_store():
return Milvus(
embedding_function=embedding_model,
collection_name="test",
connection_args={"uri": os.getenv("MILVUS_URI")},
auto_id=True,
drop_old=True,
index_params={
"index_type": "HNSW",
"metric_type": "COSINE",
"params": {"M": 8, "efConstruction": 64},
},
)
def get_vs_as_retriever():
return get_milvus_vector_store().as_retriever(
search_type="similarity", search_kwargs={"k": 5}
)
URLS_TO_SCRAPE = [
"https://www.artisan.co",
"https://www.artisan.co/about",
"https://www.artisan.co/sales-ai",
"https://www.artisan.co/ai-sales-agent",
"https://www.artisan.co/products/linkedin-outreach",
"https://www.artisan.co/products/email-warmup",
"https://www.artisan.co/products/sales-automation",
"https://www.artisan.co/products/email-personalization",
"https://www.artisan.co/features/email-deliverability",
"https://help.artisan.co/articles/7415399613-can-i-schedule-when-emails-go-out",
"https://help.artisan.co/articles/5365244006-what-is-email-warmup",
"https://help.artisan.co/articles/8442274387-ava-is-sending-strange-messages-from-my-email",
"https://help.artisan.co/articles/1195138264-is-there-a-limit-to-the-amount-of-leads-i-can-have-in-my-csv-file",
"https://help.artisan.co/articles/5617649387-help-i-can-t-turn-on-my-campaign",
"https://help.artisan.co/articles/1048710797-how-does-website-visitor-identification-work",
"https://help.artisan.co/articles/3886727025-generate-sample-email",
"https://help.artisan.co/articles/6218358204-running-ava-on-copilot-vs-autopilot",
"https://help.artisan.co/articles/9265896700-adding-delegates-and-team-members",
"https://help.artisan.co/articles/2734968853-how-to-create-a-campaign",
"https://help.artisan.co/articles/7633990298-how-to-integrate-artisan-with-your-crm",
"https://help.artisan.co/articles/6092562650-how-do-i-upload-a-csv-file-of-my-own-leads",
"https://help.artisan.co/articles/4356675492-how-do-i-add-variables-to-my-email",
"https://help.artisan.co/articles/3551943296-how-do-i-request-a-script-tag-for-my-watchtower-campaign",
"https://help.artisan.co/articles/9602711709-how-do-i-integrate-ava-with-slack",
"https://www.artisan.co/pricing",
]
def chunk_parent_document(document: Document) -> list[Document]:
semantic_text_splitter = SemanticChunker(
embeddings=embedding_model, min_chunk_size=100
)
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=250,
separators=[
"\n#{1,6} ",
"\n\\*\\*\\*+\n",
"\n---+\n",
"\n___+\n",
"\n\n",
"\n",
".",
"?",
"!",
],
)
chunked_documents = semantic_text_splitter.split_documents([document])
chunked_documents = [
chunked_doc_item
for chunked_doc_item in chunked_documents
if len(chunked_doc_item.page_content) > 0 or chunked_doc_item.page_content
]
final_chunked_documents = []
for idx, chunked_doc in enumerate(chunked_documents):
pprint(chunked_doc)
if len(chunked_doc.page_content) > 5000:
sub_chunked_documents = text_splitter.split_documents([chunked_doc])
final_chunked_documents.extend(sub_chunked_documents)
else:
final_chunked_documents.append(chunked_doc)
return final_chunked_documents
async def ingest_urls(chunk_executor: ProcessPoolExecutor, milvus_vector_store: Milvus):
lc_documents = await scrape_main(URLS_TO_SCRAPE)
start_time = time.time()
chunked_documents = list(chunk_executor.map(chunk_parent_document, lc_documents))
chunked_documents = list(chain.from_iterable(chunked_documents))
end_time = time.time()
logger.info(f"Time taken to chunk documents: {end_time - start_time} seconds")
start_time = time.time()
if chunked_documents:
_ = milvus_vector_store.add_documents(chunked_documents, batch_size=50)
logger.info(
f"Time taken to ingest documents into Milvus: {time.time() - start_time} seconds"
)
if __name__ == "__main__":
chunk_executor = ProcessPoolExecutor()
milvus_vector_store = get_milvus_vector_store()
asyncio.run(ingest_urls(chunk_executor, milvus_vector_store))
|