from llama_index.core.schema import NodeWithScore from typing import List, Any, Optional from llama_index.core import PromptTemplate, get_response_synthesizer, VectorStoreIndex, QueryBundle from llama_index.core.vector_stores import VectorStoreQuery from llama_index.embeddings.openai import OpenAIEmbedding from llama_index.core.retrievers import BaseRetriever from llama_index.core.query_engine import CustomQueryEngine from llama_index.core.response_synthesizers import BaseSynthesizer from llama_index.llms.openai.base import OpenAI from pinecone import Pinecone, PodSpec from llama_index.vector_stores.pinecone import PineconeVectorStore from dotenv import load_dotenv load_dotenv() llm = OpenAI(model="gpt-4-0125-preview", temperature=0.7) pc = Pinecone(api_key="cf85ee13-6be8-4b4b-b512-fbad9f4ce781") embed_model = OpenAIEmbedding(model="text-embedding-3-small") synthesizer = get_response_synthesizer(response_mode="compact") index = pc.Index("test-index") qa_prompt = PromptTemplate( "Context \n" "---------------------\n" "{context_str}\n" "---------------------\n" "Given the context and without prior knowledge answer the user query.\n" "User Query: {query_str}\n" "Answer: ", ) class PineconeRetriever(BaseRetriever): def __init__(self, vector_store: PineconeVectorStore, embed_model: Any, query_mode: str = "default") -> None: self._vector_store = vector_store self._embed_model = embed_model self._query_mode = query_mode # self._similarity_top_k = similarity_top_k super().__init__() def _retrieve(self, query_bundle: QueryBundle, k_value: int) -> List[NodeWithScore]: print("Yup we are using the custom retriever") query_embedding = embed_model.get_query_embedding(query_bundle) vector_store_query = VectorStoreQuery( query_embedding=query_embedding, similarity_top_k=k_value, mode=self._query_mode, ) query_result = self._vector_store.query(vector_store_query) nodes_with_scores = [] for index, node in enumerate(query_result.nodes): score: Optional[float] = None if query_result.similarities is not None: score = query_result.similarities[index] nodes_with_scores.append(NodeWithScore(node=node, score=score)) return nodes_with_scores class RAGStringQueryEngine(CustomQueryEngine): """RAG String Query Engine.""" retriever: PineconeRetriever response_synthesizer: BaseSynthesizer llm: OpenAI qa_prompt: PromptTemplate def custom_query(self, query_str: str, k_value: int): nodes = self.retriever._retrieve(query_str, k_value) context_str = "\n\n".join([n.node.get_content() for n in nodes]) qa_prompt_str = qa_prompt.format( context_str=context_str, query_str=query_str, ) print("QA Prompt STR: ", qa_prompt_str) response = self.llm.complete(qa_prompt_str) return str(response), qa_prompt_str def print_retrieved_nodes(self, query_str: str): nodes = self.retriever._retrieve(query_str) # print(nodes[0].score) for n in nodes: print("Node Score: ", n.score) print(n.node.get_content()) vector_store = PineconeVectorStore(pinecone_index=index) vector_index = VectorStoreIndex.from_vector_store(vector_store=vector_store) # retriever = VectorIndexRetriever(index=vector_index, similarity_top_k=2) retriever = PineconeRetriever(vector_store=vector_store, embed_model=embed_model, query_mode="default") query_engine = RAGStringQueryEngine( retriever=retriever, response_synthesizer=synthesizer, llm=llm, qa_prompt=qa_prompt, )