khointn's picture
Upload folder using huggingface_hub
5a67683 verified
raw
history blame
5.26 kB
import logging
import tempfile
from pathlib import Path
from typing import AnyStr, BinaryIO
from llama_index import ServiceContext, StorageContext
from llama_index.node_parser import SentenceWindowNodeParser
from app.components.embedding.component import EmbeddingComponent
from app.components.ingest.component import get_ingestion_component
from app.components.llm.component import LLMComponent
from app.components.node_store.component import NodeStoreComponent
from app.components.vector_store.component import VectorStoreComponent
from app.server.ingest.schemas import IngestedDoc
logger = logging.getLogger(__name__)
class IngestService:
def __init__(
self,
llm_component: LLMComponent,
vector_store_component: VectorStoreComponent,
embedding_component: EmbeddingComponent,
node_store_component: NodeStoreComponent,
) -> None:
self.llm_service = llm_component
self.storage_context = StorageContext.from_defaults(
vector_store=vector_store_component.vector_store,
docstore=node_store_component.doc_store,
index_store=node_store_component.index_store,
)
node_parser = SentenceWindowNodeParser.from_defaults()
self.ingest_service_context = ServiceContext.from_defaults(
llm=self.llm_service.llm,
embed_model=embedding_component.embedding_model,
node_parser=node_parser,
# Embeddings done early in the pipeline of node transformations, right
# after the node parsing
transformations=[node_parser, embedding_component.embedding_model],
)
self.ingest_component = get_ingestion_component(
self.storage_context, self.ingest_service_context
)
def _ingest_data(self, file_name: str, file_data: AnyStr) -> list[IngestedDoc]:
logger.debug(f"Got file data of size={len(file_data)} to ingest")
# llama-index mainly supports reading from files, so
# we have to create a tmp file to read for it to work
# delete=False to avoid a Windows 11 permission error.
with tempfile.NamedTemporaryFile(delete=False) as tmp:
try:
path_to_tmp = Path(tmp.name)
if isinstance(file_data, bytes):
path_to_tmp.write_bytes(file_data)
else:
path_to_tmp.write_text(str(file_data))
return self.ingest_file(file_name, path_to_tmp)
finally:
tmp.close()
path_to_tmp.unlink()
def ingest_file(self, file_name: str, file_data: Path) -> list[IngestedDoc]:
logger.info(f"Ingesting file_name={file_name}")
documents = self.ingest_component.ingest(file_name, file_data)
logger.info(f"Finished ingestion file_name={file_name}")
return [IngestedDoc.from_document(document) for document in documents]
def ingest_text(self, file_name: str, text: str) -> list[IngestedDoc]:
logger.debug(f"Ingesting text data with file_name={file_name}")
return self._ingest_data(file_name, text)
def ingest_bin_data(
self, file_name: str, raw_file_data: BinaryIO
) -> list[IngestedDoc]:
logger.debug(f"Ingesting binary data with file_name={file_name}")
file_data = raw_file_data.read()
return self._ingest_data(file_name, file_data)
def bulk_ingest(self, files: list[tuple[str, Path]]) -> list[IngestedDoc]:
logger.info(f"Ingesting file_names={[f[0] for f in files]}")
documents = self.ingest_component.bulk_ingest(files)
logger.info(f"Finished ingestion file_name={[f[0] for f in files]}")
return [IngestedDoc.from_document(document) for document in documents]
def list_ingested(self) -> list[IngestedDoc]:
ingested_docs = []
try:
docstore = self.storage_context.docstore
ingested_docs_ids: set[str] = set()
for node in docstore.docs.values():
if node.ref_doc_id is not None:
ingested_docs_ids.add(node.ref_doc_id)
for doc_id in ingested_docs_ids:
ref_doc_info = docstore.get_ref_doc_info(ref_doc_id=doc_id)
doc_metadata = None
if ref_doc_info is not None and ref_doc_info.metadata is not None:
doc_metadata = IngestedDoc.curate_metadata(ref_doc_info.metadata)
ingested_docs.append(
IngestedDoc(
object="ingest.document",
doc_id=doc_id,
doc_metadata=doc_metadata,
)
)
except ValueError:
logger.warning("Got an exception when getting list of docs", exc_info=True)
pass
logger.debug(f"Found count={len(ingested_docs)} ingested documents")
return ingested_docs
def delete(self, doc_id: str) -> None:
"""Delete an ingested document.
:raises ValueError: if the document does not exist
"""
logger.info(
"Deleting the ingested document=%s in the doc and index store", doc_id
)
self.ingest_component.delete(doc_id)