Spaces:
Runtime error
Runtime error
| """Question-answering with sources over an index.""" | |
| from typing import Any, Dict, List | |
| from langchain_core.pydantic_v1 import Field | |
| from langchain_core.retrievers import BaseRetriever | |
| from langchain.callbacks.manager import ( | |
| AsyncCallbackManagerForChainRun, | |
| CallbackManagerForChainRun, | |
| ) | |
| from langchain.chains.combine_documents.stuff import StuffDocumentsChain | |
| from langchain.chains.qa_with_sources.base import BaseQAWithSourcesChain | |
| from langchain.docstore.document import Document | |
| class RetrievalQAWithSourcesChain(BaseQAWithSourcesChain): | |
| """Question-answering with sources over an index.""" | |
| retriever: BaseRetriever = Field(exclude=True) | |
| """Index to connect to.""" | |
| reduce_k_below_max_tokens: bool = False | |
| """Reduce the number of results to return from store based on tokens limit""" | |
| max_tokens_limit: int = 3375 | |
| """Restrict the docs to return from store based on tokens, | |
| enforced only for StuffDocumentChain and if reduce_k_below_max_tokens is to true""" | |
| def _reduce_tokens_below_limit(self, docs: List[Document]) -> List[Document]: | |
| num_docs = len(docs) | |
| if self.reduce_k_below_max_tokens and isinstance( | |
| self.combine_documents_chain, StuffDocumentsChain | |
| ): | |
| tokens = [ | |
| self.combine_documents_chain.llm_chain._get_num_tokens(doc.page_content) | |
| for doc in docs | |
| ] | |
| token_count = sum(tokens[:num_docs]) | |
| while token_count > self.max_tokens_limit: | |
| num_docs -= 1 | |
| token_count -= tokens[num_docs] | |
| return docs[:num_docs] | |
| def _get_docs( | |
| self, inputs: Dict[str, Any], *, run_manager: CallbackManagerForChainRun | |
| ) -> List[Document]: | |
| question = inputs[self.question_key] | |
| docs = self.retriever.get_relevant_documents( | |
| question, callbacks=run_manager.get_child() | |
| ) | |
| return self._reduce_tokens_below_limit(docs) | |
| async def _aget_docs( | |
| self, inputs: Dict[str, Any], *, run_manager: AsyncCallbackManagerForChainRun | |
| ) -> List[Document]: | |
| question = inputs[self.question_key] | |
| docs = await self.retriever.aget_relevant_documents( | |
| question, callbacks=run_manager.get_child() | |
| ) | |
| return self._reduce_tokens_below_limit(docs) | |
| def _chain_type(self) -> str: | |
| """Return the chain type.""" | |
| return "retrieval_qa_with_sources_chain" | |