# +-------------------------------------------------------------+ # # Add Bedrock Knowledge Base Context to your LLM calls # # +-------------------------------------------------------------+ # Thank you users! We ❤️ you! - Krrish & Ishaan import json from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple from litellm._logging import verbose_logger, verbose_proxy_logger from litellm.integrations.custom_logger import CustomLogger from litellm.integrations.custom_prompt_management import CustomPromptManagement from litellm.llms.bedrock.base_aws_llm import BaseAWSLLM from litellm.llms.custom_httpx.http_handler import ( get_async_httpx_client, httpxSpecialProvider, ) from litellm.types.integrations.rag.bedrock_knowledgebase import ( BedrockKBContent, BedrockKBGuardrailConfiguration, BedrockKBRequest, BedrockKBResponse, BedrockKBRetrievalConfiguration, BedrockKBRetrievalQuery, BedrockKBRetrievalResult, ) from litellm.types.llms.openai import AllMessageValues, ChatCompletionUserMessage if TYPE_CHECKING: from litellm.litellm_core_utils.litellm_logging import StandardCallbackDynamicParams else: StandardCallbackDynamicParams = Any class BedrockKnowledgeBaseHook(CustomPromptManagement, BaseAWSLLM): CONTENT_PREFIX_STRING = "Context: \n\n" def __init__( self, **kwargs, ): self.async_handler = get_async_httpx_client( llm_provider=httpxSpecialProvider.LoggingCallback ) # store kwargs as optional_params self.optional_params = kwargs super().__init__(**kwargs) BaseAWSLLM.__init__(self) async def async_get_chat_completion_prompt( self, model: str, messages: List[AllMessageValues], non_default_params: dict, prompt_id: Optional[str], prompt_variables: Optional[dict], dynamic_callback_params: StandardCallbackDynamicParams, ) -> Tuple[str, List[AllMessageValues], dict]: """ Retrieves the context from the Bedrock Knowledge Base and appends it to the messages. """ vector_store_ids = non_default_params.pop("vector_store_ids", None) if vector_store_ids: for vector_store_id in vector_store_ids: response = await self.make_bedrock_kb_retrieve_request( knowledge_base_id=vector_store_id, query=self._get_kb_query_from_messages(messages), ) verbose_logger.debug(f"Bedrock Knowledge Base Response: {response}") context_message = ( self.get_chat_completion_message_from_bedrock_kb_response(response) ) if context_message is not None: messages.append(context_message) return model, messages, non_default_params def _get_kb_query_from_messages(self, messages: List[AllMessageValues]) -> str: """ Uses the text `content` field of the last message in the list of messages """ if len(messages) == 0: return "" last_message = messages[-1] last_message_content = last_message.get("content", None) if last_message_content is None: return "" if isinstance(last_message_content, str): return last_message_content elif isinstance(last_message_content, list): return "\n".join([item.get("text", "") for item in last_message_content]) return "" def _prepare_request( self, credentials: Any, data: BedrockKBRequest, optional_params: dict, aws_region_name: str, api_base: str, extra_headers: Optional[dict] = None, ) -> Any: """ Prepare a signed AWS request. Args: credentials: AWS credentials data: Request data optional_params: Additional parameters aws_region_name: AWS region name api_base: Base API URL extra_headers: Additional headers Returns: AWSRequest: A signed AWS request """ try: from botocore.auth import SigV4Auth from botocore.awsrequest import AWSRequest except ImportError: raise ImportError("Missing boto3 to call bedrock. Run 'pip install boto3'.") sigv4 = SigV4Auth(credentials, "bedrock", aws_region_name) encoded_data = json.dumps(data).encode("utf-8") headers = {"Content-Type": "application/json"} if extra_headers is not None: headers = {"Content-Type": "application/json", **extra_headers} request = AWSRequest( method="POST", url=api_base, data=encoded_data, headers=headers ) sigv4.add_auth(request) if extra_headers is not None and "Authorization" in extra_headers: # prevent sigv4 from overwriting the auth header request.headers["Authorization"] = extra_headers["Authorization"] return request.prepare() async def make_bedrock_kb_retrieve_request( self, knowledge_base_id: str, query: str, guardrail_id: Optional[str] = None, guardrail_version: Optional[str] = None, next_token: Optional[str] = None, retrieval_configuration: Optional[BedrockKBRetrievalConfiguration] = None, ) -> BedrockKBResponse: """ Make a Bedrock Knowledge Base retrieve request. Args: knowledge_base_id (str): The unique identifier of the knowledge base to query query (str): The query text to search for guardrail_id (Optional[str]): The guardrail ID to apply guardrail_version (Optional[str]): The version of the guardrail to apply next_token (Optional[str]): Token for pagination retrieval_configuration (Optional[BedrockKBRetrievalConfiguration]): Configuration for the retrieval process Returns: BedrockKBRetrievalResponse: A typed response object containing the retrieval results """ from fastapi import HTTPException credentials = self.get_credentials() aws_region_name = self._get_aws_region_name( optional_params=self.optional_params ) # Prepare request data request_data: BedrockKBRequest = BedrockKBRequest( retrievalQuery=BedrockKBRetrievalQuery(text=query), ) if next_token: request_data["nextToken"] = next_token if retrieval_configuration: request_data["retrievalConfiguration"] = retrieval_configuration if guardrail_id and guardrail_version: request_data["guardrailConfiguration"] = BedrockKBGuardrailConfiguration( guardrailId=guardrail_id, guardrailVersion=guardrail_version ) verbose_logger.debug( f"Request Data: {json.dumps(request_data, indent=4, default=str)}" ) # Prepare the request api_base = f"https://bedrock-agent-runtime.{aws_region_name}.amazonaws.com/knowledgebases/{knowledge_base_id}/retrieve" prepared_request = self._prepare_request( credentials=credentials, data=request_data, optional_params=self.optional_params, aws_region_name=aws_region_name, api_base=api_base, ) verbose_proxy_logger.debug( "Bedrock Knowledge Base request body: %s, url %s, headers: %s", request_data, prepared_request.url, prepared_request.headers, ) response = await self.async_handler.post( url=prepared_request.url, data=prepared_request.body, # type: ignore headers=prepared_request.headers, # type: ignore ) verbose_proxy_logger.debug("Bedrock Knowledge Base response: %s", response.text) if response.status_code == 200: response_data = response.json() return BedrockKBResponse(**response_data) else: verbose_proxy_logger.error( "Bedrock Knowledge Base: error in response. Status code: %s, response: %s", response.status_code, response.text, ) raise HTTPException( status_code=response.status_code, detail={ "error": "Error calling Bedrock Knowledge Base", "response": response.text, }, ) @staticmethod def should_use_prompt_management_hook(non_default_params: Dict) -> bool: if non_default_params.get("vector_store_ids", None): return True return False @staticmethod def get_initialized_custom_logger( non_default_params: Dict, ) -> Optional[CustomLogger]: from litellm.litellm_core_utils.litellm_logging import ( _init_custom_logger_compatible_class, ) if BedrockKnowledgeBaseHook.should_use_prompt_management_hook( non_default_params ): return _init_custom_logger_compatible_class( logging_integration="bedrock_knowledgebase_hook", internal_usage_cache=None, llm_router=None, ) return None @staticmethod def get_chat_completion_message_from_bedrock_kb_response( response: BedrockKBResponse, ) -> Optional[ChatCompletionUserMessage]: """ Retrieves the context from the Bedrock Knowledge Base response and returns a ChatCompletionUserMessage object. """ retrieval_results: Optional[List[BedrockKBRetrievalResult]] = response.get( "retrievalResults", None ) if retrieval_results is None: return None # string to combine the context from the knowledge base context_string: str = BedrockKnowledgeBaseHook.CONTENT_PREFIX_STRING for retrieval_result in retrieval_results: retrieval_result_content: Optional[BedrockKBContent] = ( retrieval_result.get("content", None) or {} ) if retrieval_result_content is None: continue retrieval_result_text: Optional[str] = retrieval_result_content.get( "text", None ) if retrieval_result_text is None: continue context_string += retrieval_result_text message = ChatCompletionUserMessage( role="user", content=context_string, ) return message