import time from typing import List import logfire from llama_index.core import QueryBundle from llama_index.core.retrievers import BaseRetriever, VectorIndexRetriever from llama_index.core.schema import NodeWithScore, TextNode from llama_index.postprocessor.cohere_rerank import CohereRerank class CustomRetriever(BaseRetriever): """Custom retriever that performs both semantic search and hybrid search.""" def __init__( self, vector_retriever: VectorIndexRetriever, document_dict: dict, ) -> None: """Init params.""" self._vector_retriever = vector_retriever self._document_dict = document_dict super().__init__() def _retrieve(self, query_bundle: QueryBundle) -> List[NodeWithScore]: """Retrieve nodes given query.""" # LlamaIndex adds "\ninput is " to the query string query_bundle.query_str = query_bundle.query_str.replace("\ninput is ", "") query_bundle.query_str = query_bundle.query_str.rstrip() logfire.info(f"Retrieving 10 nodes with string: '{query_bundle}'") start = time.time() nodes = self._vector_retriever.retrieve(query_bundle) duration = time.time() - start logfire.info(f"Retrieving nodes took {duration:.2f}s") # Filter out nodes with the same ref_doc_id def filter_nodes_by_unique_doc_id(nodes): unique_nodes = {} for node in nodes: doc_id = node.node.ref_doc_id if doc_id is not None and doc_id not in unique_nodes: unique_nodes[doc_id] = node return list(unique_nodes.values()) nodes = filter_nodes_by_unique_doc_id(nodes) logfire.info( f"Number of nodes after filtering the ones with same ref_doc_id: {len(nodes)}" ) logfire.info(f"Nodes retrieved: {nodes}") nodes_context = [] for node in nodes: # print("Node ID\t", node.node_id) # print("Title\t", node.metadata["title"]) # print("Text\t", node.text) # print("Score\t", node.score) # print("Metadata\t", node.metadata) # print("-_" * 20) if node.score < 0.2: continue if node.metadata["retrieve_doc"] == True: # print("This node will be replaced by the document") doc = self._document_dict[node.node.ref_doc_id] # print(doc.text) new_node = NodeWithScore( node=TextNode(text=doc.text, metadata=node.metadata), # type: ignore score=node.score, ) nodes_context.append(new_node) else: nodes_context.append(node) try: reranker = CohereRerank(top_n=5, model="rerank-english-v3.0") nodes_context = reranker.postprocess_nodes(nodes_context, query_bundle) nodes_filtered = [] for node in nodes_context: if node.score < 0.10: # type: ignore continue else: nodes_filtered.append(node) logfire.info(f"Cohere raranking to {len(nodes_filtered)} nodes") return nodes_filtered except Exception as e: logfire.error(f"Error reranking nodes with Cohere: {e}") return nodes_context