Spaces:
Runtime error
Runtime error
import sys | |
try: | |
import pysqlite3 | |
sys.modules["sqlite3"] = sys.modules.pop("pysqlite3") | |
except: | |
pass | |
import chromadb | |
from langchain.vectorstores import Chroma | |
# from chromadb.api.fastapi import requests | |
from langchain.schema import Document | |
from langchain.chains import RetrievalQA | |
from langchain.embeddings import HuggingFaceBgeEmbeddings | |
from langchain.retrievers.self_query.base import SelfQueryRetriever | |
from langchain.chains.query_constructor.base import AttributeInfo | |
from langchain.retrievers.self_query.chroma import ChromaTranslator | |
from llm.llmFactory import LLMFactory | |
from datetime import datetime | |
import baseInfra.dropbox_handler as dbh | |
from baseInfra.dbInterface import DbInterface | |
from uuid import UUID | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
import logging, asyncio | |
logger = logging.getLogger("root") | |
class myChromaTranslator(ChromaTranslator): | |
allowed_operators = ["$and", "$or"] | |
"""Subset of allowed logical operators.""" | |
allowed_comparators = [ | |
"$eq", | |
"$ne", | |
"$gt", | |
"$gte", | |
"$lt", | |
"$lte", | |
"$contains", | |
"$not_contains", | |
"$in", | |
"$nin", | |
] | |
class ChromaIntf: | |
def __init__(self): | |
self.db_interface = DbInterface() | |
model_name = "BAAI/bge-large-en-v1.5" | |
encode_kwargs = { | |
"normalize_embeddings": True | |
} # set True to compute cosine similarity | |
self.embedding = HuggingFaceBgeEmbeddings( | |
model_name=model_name, | |
model_kwargs={"device": "cpu"}, | |
encode_kwargs=encode_kwargs, | |
) | |
self.persist_db_directory = "db" | |
self.persist_docs_directory = "persistence-docs" | |
self.logger_file = "persistence.log" | |
loop = asyncio.get_event_loop() | |
try: | |
loop.run_until_complete(dbh.restoreFolder(self.persist_db_directory)) | |
loop.run_until_complete(dbh.restoreFolder(self.persist_docs_directory)) | |
except: | |
print("Probably folder doesn't exist as it is brand new setup") | |
docs = [ | |
Document( | |
page_content="this is test doc", | |
metadata={ | |
"timestamp": 1696743148.474055, | |
"ID": "2000-01-01 15:57:11::664165-test", | |
"source": "test", | |
}, | |
id="2000-01-01 15:57:11::664165-test", | |
), | |
] | |
self.vectorstore = Chroma.from_documents( | |
documents=docs, | |
embedding=self.embedding, | |
persist_directory=self.persist_db_directory, | |
) | |
# self.vectorstore._client. | |
# timestamp --> time when added | |
# source --> notes/references/web/youtube/book/conversation, default conversation | |
# title --> of document , will be conversation when source is conversation, default blank | |
# author --> will default to blank | |
# "Year": 2024, | |
# "Month": 1, | |
# "Day": 3, | |
# "Hour": 11, | |
# "Minute": 29 | |
self.metadata_field_info = [ | |
AttributeInfo( | |
name="timestamp", | |
description="Python datetime.timestamp of the document in isoformat, should not be used for query", | |
type="str", | |
), | |
AttributeInfo( | |
name="Year", | |
description="Year from the date when the entry was added in YYYY format", | |
type="int", | |
), | |
AttributeInfo( | |
name="Month", | |
description="Month from the date when the entry was added it is from 1-12", | |
type="int", | |
), | |
AttributeInfo( | |
name="Day", | |
description="Day of month from the date-time stamp when the entry was added, it is from 1-31", | |
type="int", | |
), | |
AttributeInfo( | |
name="Hour", | |
description="Hour from the timestamp when the entry was added", | |
type="int", | |
), | |
AttributeInfo( | |
name="Minute", | |
description="Minute from the timestamp when the entry was added", | |
type="int", | |
), | |
AttributeInfo( | |
name="source", | |
description="Type of entry", | |
type="string or list[string]", | |
), | |
AttributeInfo( | |
name="title", | |
description="Title or Subject of the entry", | |
type="string", | |
), | |
AttributeInfo( | |
name="author", | |
description="Author of the entry", | |
type="string", | |
), | |
] | |
self.document_content_description = ( | |
"Information to store for retrival from LLM based chatbot" | |
) | |
lf = LLMFactory() | |
# self.llm=lf.get_llm("executor2") | |
self.llm = lf.get_llm("executor3") | |
self.retriever = SelfQueryRetriever.from_llm( | |
self.llm, | |
self.vectorstore, | |
self.document_content_description, | |
self.metadata_field_info, | |
structured_query_translator=ChromaTranslator(), | |
verbose=True, | |
) | |
async def getRelevantDocs(self, query: str, kwargs: dict): | |
"""This should also post the result to firebase""" | |
print("retriver state", self.retriever.search_kwargs) | |
print("retriver state", self.retriever.search_type) | |
try: | |
for key in kwargs.keys(): | |
if "search_type" in key: | |
self.retriever.search_type = kwargs[key] | |
else: | |
self.retriever.search_kwargs[key] = kwargs[key] | |
except: | |
print("setting search args failed") | |
print("reaching step2") | |
try: | |
# loop=asyncio.get_event_loop() | |
retVal = self.retriever.get_relevant_documents(query) | |
except Exception as ex: | |
logger.exception("Exception occured:", exc_info=True) | |
value = [] | |
excludeMeta = True | |
print("reaching step3") | |
print(str(len(retVal))) | |
print("reaching step4") | |
try: | |
for item in retVal: | |
if excludeMeta: | |
v = item.page_content + " \n" | |
else: | |
v = "Info:" + item.page_content + " " | |
for key in item.metadata.keys(): | |
if key != "ID": | |
v += key + ":" + str(item.metadata[key]) + " " | |
value.append(v) | |
print("reaching step5") | |
self.db_interface.add_to_cache(input=query, value=value) | |
except: | |
print("reaching step6") | |
for item in retVal: | |
if excludeMeta: | |
v = item["page_content"] + " \n" | |
else: | |
v = "Info:" + item["page_content"] + " " | |
for key in item["metadata"].keys(): | |
if key != "ID": | |
v += key + ":" + str(item["metadata"][key]) + " " | |
value.append(v) | |
print("reaching step7") | |
self.db_interface.add_to_cache(input=query, value=value) | |
print("reaching step8") | |
return retVal | |
async def addText(self, inStr: str, metadata): | |
# metadata expected is some of following | |
# timestamp --> time when added | |
# source --> notes/references/web/youtube/book/conversation, default conversation | |
# title --> of document , will be conversation when source is conversation, default blank | |
# author --> will default to blank | |
##TODO: Preprocess inStr to remove any html, markdown tags etc. | |
metadata = metadata.dict() | |
if "timestamp" not in metadata.keys(): | |
metadata["timestamp"] = datetime.now().isoformat() | |
else: | |
metadata["timestamp"] = datetime.fromisoformat(metadata["timestamp"]) | |
pass | |
if "source" not in metadata.keys(): | |
metadata["source"] = "conversation" | |
if "title" not in metadata.keys(): | |
metadata["title"] = "" | |
if metadata["source"] == "conversation": | |
metadata["title"] == "conversation" | |
if "author" not in metadata.keys(): | |
metadata["author"] = "" | |
# TODO: If url is present in input or when the splitting need to be done, then we'll need to change how we | |
# formulate the ID and may be filename to store information | |
metadata["ID"] = ( | |
metadata["timestamp"].strftime("%Y-%m-%d %H-%M-%S") | |
+ "-" | |
+ metadata["title"] | |
) | |
metadata["Year"] = metadata["timestamp"].year | |
metadata["Month"] = metadata["timestamp"].month | |
metadata["Day"] = int(metadata["timestamp"].strftime("%d")) | |
metadata["Hour"] = metadata["timestamp"].hour | |
metadata["Minute"] = metadata["timestamp"].minute | |
metadata["timestamp"] = metadata["timestamp"].isoformat() | |
print("Metadata is:") | |
print(metadata) | |
# md.pop("timestamp") | |
with open("./docs/" + metadata["ID"] + ".txt", "w") as fd: | |
fd.write(inStr) | |
print("written to file", inStr) | |
text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=800, | |
chunk_overlap=50, | |
length_function=len, | |
is_separator_regex=False, | |
) | |
# docs = [ Document(page_content=inStr, metadata=metadata)] | |
docs = text_splitter.create_documents([inStr], [metadata]) | |
partNumber = 0 | |
for doc in docs: | |
if partNumber > 0: | |
doc.metadata["ID"] += f"__{partNumber}" | |
partNumber += 1 | |
print(f"{partNumber} follows:") | |
print(doc) | |
try: | |
print(metadata["ID"]) | |
ids = [doc.metadata["ID"] for doc in docs] | |
print("ids are:") | |
print(ids) | |
return await self.vectorstore.aadd_documents(docs, ids=ids) | |
except Exception as ex: | |
logger.exception("exception in adding", exc_info=True) | |
print("inside expect of addText") | |
return await self.vectorstore.aadd_documents(docs, ids=[metadata.ID]) | |
async def listDocs(self): | |
collection = self.vectorstore._client.get_collection( | |
self.vectorstore._LANGCHAIN_DEFAULT_COLLECTION_NAME, | |
embedding_function=self.embedding, | |
) | |
return collection.get() | |
# return self.vectorstore._client._get(collection_id=self._uuid(collectionInfo.id)) | |
async def persist(self): | |
self.vectorstore.persist() | |
await dbh.backupFile(self.logger_file) | |
await dbh.backupFolder(self.persist_db_directory) | |
return await dbh.backupFolder(self.persist_docs_directory) | |
def _uuid(self, uuid_str: str) -> UUID: | |
try: | |
return UUID(uuid_str) | |
except ValueError: | |
print("Error generating uuid") | |
raise ValueError(f"Could not parse {uuid_str} as a UUID") | |