oceansweep's picture
Update App_Function_Libraries/RAG_Libary_2.py
fddcafb verified
raw
history blame
28.1 kB
# Import necessary modules and functions
import configparser
from typing import Dict, Any
# Local Imports
#from App_Function_Libraries.ChromaDB_Library import process_and_store_content, vector_search, chroma_client
from Article_Extractor_Lib import scrape_article
from SQLite_DB import search_db, db
# 3rd-Party Imports
#import openai
# Initialize OpenAI client (adjust this based on your API key management)
#openai.api_key = "your-openai-api-key"
# Main RAG pipeline function
def rag_pipeline(url: str, query: str, api_choice=None) -> Dict[str, Any]:
# Extract content
# article_data = scrape_article(url)
# content = article_data['content']
# Process and store content
# collection_name = "article_" + str(hash(url))
# process_and_store_content(content, collection_name)
# Perform searches
# vector_results = vector_search(collection_name, query, k=5)
# fts_results = search_db(query, ["content"], "", page=1, results_per_page=5)
# Combine results
# all_results = vector_results + [result['content'] for result in fts_results]
# context = "\n".join(all_results)
# Generate answer using the selected API
# answer = generate_answer(api_choice, context, query)
# return {
# "answer": answer,
# "context": context
# }
pass
config = configparser.ConfigParser()
config.read('config.txt')
def generate_answer(api_choice: str, context: str, query: str) -> str:
prompt = f"Context: {context}\n\nQuestion: {query}"
if api_choice == "OpenAI":
from App_Function_Libraries.Summarization_General_Lib import summarize_with_openai
return summarize_with_openai(config['API']['openai_api_key'], prompt, "")
elif api_choice == "Anthropic":
from App_Function_Libraries.Summarization_General_Lib import summarize_with_anthropic
return summarize_with_anthropic(config['API']['anthropic_api_key'], prompt, "")
elif api_choice == "Cohere":
from App_Function_Libraries.Summarization_General_Lib import summarize_with_cohere
return summarize_with_cohere(config['API']['cohere_api_key'], prompt, "")
elif api_choice == "Groq":
from App_Function_Libraries.Summarization_General_Lib import summarize_with_groq
return summarize_with_groq(config['API']['groq_api_key'], prompt, "")
elif api_choice == "OpenRouter":
from App_Function_Libraries.Summarization_General_Lib import summarize_with_openrouter
return summarize_with_openrouter(config['API']['openrouter_api_key'], prompt, "")
elif api_choice == "HuggingFace":
from App_Function_Libraries.Summarization_General_Lib import summarize_with_huggingface
return summarize_with_huggingface(config['API']['huggingface_api_key'], prompt, "")
elif api_choice == "DeepSeek":
from App_Function_Libraries.Summarization_General_Lib import summarize_with_deepseek
return summarize_with_deepseek(config['API']['deepseek_api_key'], prompt, "")
elif api_choice == "Mistral":
from App_Function_Libraries.Summarization_General_Lib import summarize_with_mistral
return summarize_with_mistral(config['API']['mistral_api_key'], prompt, "")
elif api_choice == "Local-LLM":
from App_Function_Libraries.Local_Summarization_Lib import summarize_with_local_llm
return summarize_with_local_llm(config['API']['local_llm_path'], prompt, "")
elif api_choice == "Llama.cpp":
from App_Function_Libraries.Local_Summarization_Lib import summarize_with_llama
return summarize_with_llama(config['API']['llama_api_key'], prompt, "")
elif api_choice == "Kobold":
from App_Function_Libraries.Local_Summarization_Lib import summarize_with_kobold
return summarize_with_kobold(config['API']['kobold_api_key'], prompt, "")
elif api_choice == "Ooba":
from App_Function_Libraries.Local_Summarization_Lib import summarize_with_oobabooga
return summarize_with_oobabooga(config['API']['ooba_api_key'], prompt, "")
elif api_choice == "TabbyAPI":
from App_Function_Libraries.Local_Summarization_Lib import summarize_with_tabbyapi
return summarize_with_tabbyapi(config['API']['tabby_api_key'], prompt, "")
elif api_choice == "vLLM":
from App_Function_Libraries.Local_Summarization_Lib import summarize_with_vllm
return summarize_with_vllm(config['API']['vllm_api_key'], prompt, "")
elif api_choice == "ollama":
from App_Function_Libraries.Local_Summarization_Lib import summarize_with_ollama
return summarize_with_ollama(config['API']['ollama_api_key'], prompt, "")
else:
raise ValueError(f"Unsupported API choice: {api_choice}")
# Function to preprocess and store all existing content in the database
#def preprocess_all_content():
# with db.get_connection() as conn:
# cursor = conn.cursor()
# cursor.execute("SELECT id, content FROM Media")
# for row in cursor.fetchall():
# process_and_store_content(row[1], f"media_{row[0]}")
# Function to perform RAG search across all stored content
def rag_search(query: str, api_choice: str) -> Dict[str, Any]:
# Perform vector search across all collections
# all_collections = chroma_client.list_collections()
# vector_results = []
# for collection in all_collections:
# vector_results.extend(vector_search(collection.name, query, k=2))
# Perform FTS search
# fts_results = search_db(query, ["content"], "", page=1, results_per_page=10)
# Combine results
# all_results = vector_results + [result['content'] for result in fts_results]
# context = "\n".join(all_results[:10]) # Limit to top 10 results
# Generate answer using the selected API
# answer = generate_answer(api_choice, context, query)
# return {
# "answer": answer,
# "context": context
# }
pass
# Example usage:
# 1. Initialize the system:
# create_tables(db) # Ensure FTS tables are set up
# preprocess_all_content() # Process and store all existing content
# 2. Perform RAG on a specific URL:
# result = rag_pipeline("https://example.com/article", "What is the main topic of this article?")
# print(result['answer'])
# 3. Perform RAG search across all content:
# result = rag_search("What are the key points about climate change?")
# print(result['answer'])
##################################################################################################################
# RAG Pipeline 1
#0.62 0.61 0.75 63402.0
# from langchain_openai import ChatOpenAI
#
# from langchain_community.document_loaders import WebBaseLoader
# from langchain_openai import OpenAIEmbeddings
# from langchain.text_splitter import RecursiveCharacterTextSplitter
# from langchain_chroma import Chroma
#
# from langchain_community.retrievers import BM25Retriever
# from langchain.retrievers import ParentDocumentRetriever
# from langchain.storage import InMemoryStore
# import os
# from operator import itemgetter
# from langchain import hub
# from langchain_core.output_parsers import StrOutputParser
# from langchain_core.runnables import RunnablePassthrough, RunnableParallel, RunnableLambda
# from langchain.retrievers import MergerRetriever
# from langchain.retrievers.document_compressors import DocumentCompressorPipeline
# def rag_pipeline():
# try:
# def format_docs(docs):
# return "\n".join(doc.page_content for doc in docs)
#
# llm = ChatOpenAI(model='gpt-4o-mini')
#
# loader = WebBaseLoader('https://en.wikipedia.org/wiki/European_debt_crisis')
# docs = loader.load()
#
# embedding = OpenAIEmbeddings(model='text-embedding-3-large')
#
# splitter = RecursiveCharacterTextSplitter(chunk_size=400, chunk_overlap=200)
# splits = splitter.split_documents(docs)
# c = Chroma.from_documents(documents=splits, embedding=embedding,
# collection_name='testindex-ragbuilder-1724657573', )
# retrievers = []
# retriever = c.as_retriever(search_type='mmr', search_kwargs={'k': 10})
# retrievers.append(retriever)
# retriever = BM25Retriever.from_documents(docs)
# retrievers.append(retriever)
#
# parent_splitter = RecursiveCharacterTextSplitter(chunk_size=1200, chunk_overlap=600)
# splits = parent_splitter.split_documents(docs)
# store = InMemoryStore()
# retriever = ParentDocumentRetriever(vectorstore=c, docstore=store, child_splitter=splitter,
# parent_splitter=parent_splitter)
# retriever.add_documents(docs)
# retrievers.append(retriever)
# retriever = MergerRetriever(retrievers=retrievers)
# prompt = hub.pull("rlm/rag-prompt")
# rag_chain = (
# RunnableParallel(context=retriever, question=RunnablePassthrough())
# .assign(context=itemgetter("context") | RunnableLambda(format_docs))
# .assign(answer=prompt | llm | StrOutputParser())
# .pick(["answer", "context"]))
# return rag_chain
# except Exception as e:
# print(f"An error occurred: {e}")
##To get the answer and context, use the following code
# res=rag_pipeline().invoke("your prompt here")
# print(res["answer"])
# print(res["context"])
############################################################################################################
############################################################################################################
# RAG Pipeline 2
#0.6 0.73 0.68 3125.0
# from langchain_openai import ChatOpenAI
#
# from langchain_community.document_loaders import WebBaseLoader
# from langchain_openai import OpenAIEmbeddings
# from langchain.text_splitter import RecursiveCharacterTextSplitter
# from langchain_chroma import Chroma
# from langchain.retrievers.multi_query import MultiQueryRetriever
# from langchain.retrievers import ParentDocumentRetriever
# from langchain.storage import InMemoryStore
# from langchain_community.document_transformers import EmbeddingsRedundantFilter
# from langchain.retrievers.document_compressors import LLMChainFilter
# from langchain.retrievers.document_compressors import EmbeddingsFilter
# from langchain.retrievers import ContextualCompressionRetriever
# import os
# from operator import itemgetter
# from langchain import hub
# from langchain_core.output_parsers import StrOutputParser
# from langchain_core.runnables import RunnablePassthrough, RunnableParallel, RunnableLambda
# from langchain.retrievers import MergerRetriever
# from langchain.retrievers.document_compressors import DocumentCompressorPipeline
# def rag_pipeline():
# try:
# def format_docs(docs):
# return "\n".join(doc.page_content for doc in docs)
#
# llm = ChatOpenAI(model='gpt-4o-mini')
#
# loader = WebBaseLoader('https://en.wikipedia.org/wiki/European_debt_crisis')
# docs = loader.load()
#
# embedding = OpenAIEmbeddings(model='text-embedding-3-large')
#
# splitter = RecursiveCharacterTextSplitter(chunk_size=400, chunk_overlap=200)
# splits = splitter.split_documents(docs)
# c = Chroma.from_documents(documents=splits, embedding=embedding,
# collection_name='testindex-ragbuilder-1724650962', )
# retrievers = []
# retriever = MultiQueryRetriever.from_llm(c.as_retriever(search_type='similarity', search_kwargs={'k': 10}),
# llm=llm)
# retrievers.append(retriever)
#
# parent_splitter = RecursiveCharacterTextSplitter(chunk_size=1200, chunk_overlap=600)
# splits = parent_splitter.split_documents(docs)
# store = InMemoryStore()
# retriever = ParentDocumentRetriever(vectorstore=c, docstore=store, child_splitter=splitter,
# parent_splitter=parent_splitter)
# retriever.add_documents(docs)
# retrievers.append(retriever)
# retriever = MergerRetriever(retrievers=retrievers)
# arr_comp = []
# arr_comp.append(EmbeddingsRedundantFilter(embeddings=embedding))
# arr_comp.append(LLMChainFilter.from_llm(llm))
# pipeline_compressor = DocumentCompressorPipeline(transformers=arr_comp)
# retriever = ContextualCompressionRetriever(base_retriever=retriever, base_compressor=pipeline_compressor)
# prompt = hub.pull("rlm/rag-prompt")
# rag_chain = (
# RunnableParallel(context=retriever, question=RunnablePassthrough())
# .assign(context=itemgetter("context") | RunnableLambda(format_docs))
# .assign(answer=prompt | llm | StrOutputParser())
# .pick(["answer", "context"]))
# return rag_chain
# except Exception as e:
# print(f"An error occurred: {e}")
##To get the answer and context, use the following code
# res=rag_pipeline().invoke("your prompt here")
# print(res["answer"])
# print(res["context"])
############################################################################################################
# Plain bm25 retriever
# class BM25Retriever(BaseRetriever):
# """`BM25` retriever without Elasticsearch."""
#
# vectorizer: Any
# """ BM25 vectorizer."""
# docs: List[Document] = Field(repr=False)
# """ List of documents."""
# k: int = 4
# """ Number of documents to return."""
# preprocess_func: Callable[[str], List[str]] = default_preprocessing_func
# """ Preprocessing function to use on the text before BM25 vectorization."""
#
# class Config:
# arbitrary_types_allowed = True
#
# @classmethod
# def from_texts(
# cls,
# texts: Iterable[str],
# metadatas: Optional[Iterable[dict]] = None,
# bm25_params: Optional[Dict[str, Any]] = None,
# preprocess_func: Callable[[str], List[str]] = default_preprocessing_func,
# **kwargs: Any,
# ) -> BM25Retriever:
# """
# Create a BM25Retriever from a list of texts.
# Args:
# texts: A list of texts to vectorize.
# metadatas: A list of metadata dicts to associate with each text.
# bm25_params: Parameters to pass to the BM25 vectorizer.
# preprocess_func: A function to preprocess each text before vectorization.
# **kwargs: Any other arguments to pass to the retriever.
#
# Returns:
# A BM25Retriever instance.
# """
# try:
# from rank_bm25 import BM25Okapi
# except ImportError:
# raise ImportError(
# "Could not import rank_bm25, please install with `pip install "
# "rank_bm25`."
# )
#
# texts_processed = [preprocess_func(t) for t in texts]
# bm25_params = bm25_params or {}
# vectorizer = BM25Okapi(texts_processed, **bm25_params)
# metadatas = metadatas or ({} for _ in texts)
# docs = [Document(page_content=t, metadata=m) for t, m in zip(texts, metadatas)]
# return cls(
# vectorizer=vectorizer, docs=docs, preprocess_func=preprocess_func, **kwargs
# )
#
# @classmethod
# def from_documents(
# cls,
# documents: Iterable[Document],
# *,
# bm25_params: Optional[Dict[str, Any]] = None,
# preprocess_func: Callable[[str], List[str]] = default_preprocessing_func,
# **kwargs: Any,
# ) -> BM25Retriever:
# """
# Create a BM25Retriever from a list of Documents.
# Args:
# documents: A list of Documents to vectorize.
# bm25_params: Parameters to pass to the BM25 vectorizer.
# preprocess_func: A function to preprocess each text before vectorization.
# **kwargs: Any other arguments to pass to the retriever.
#
# Returns:
# A BM25Retriever instance.
# """
# texts, metadatas = zip(*((d.page_content, d.metadata) for d in documents))
# return cls.from_texts(
# texts=texts,
# bm25_params=bm25_params,
# metadatas=metadatas,
# preprocess_func=preprocess_func,
# **kwargs,
# )
#
# def _get_relevant_documents(
# self, query: str, *, run_manager: CallbackManagerForRetrieverRun
# ) -> List[Document]:
# processed_query = self.preprocess_func(query)
# return_docs = self.vectorizer.get_top_n(processed_query, self.docs, n=self.k)
# return return_docs
############################################################################################################
############################################################################################################
# ElasticSearch BM25 Retriever
# class ElasticSearchBM25Retriever(BaseRetriever):
# """`Elasticsearch` retriever that uses `BM25`.
#
# To connect to an Elasticsearch instance that requires login credentials,
# including Elastic Cloud, use the Elasticsearch URL format
# https://username:password@es_host:9243. For example, to connect to Elastic
# Cloud, create the Elasticsearch URL with the required authentication details and
# pass it to the ElasticVectorSearch constructor as the named parameter
# elasticsearch_url.
#
# You can obtain your Elastic Cloud URL and login credentials by logging in to the
# Elastic Cloud console at https://cloud.elastic.co, selecting your deployment, and
# navigating to the "Deployments" page.
#
# To obtain your Elastic Cloud password for the default "elastic" user:
#
# 1. Log in to the Elastic Cloud console at https://cloud.elastic.co
# 2. Go to "Security" > "Users"
# 3. Locate the "elastic" user and click "Edit"
# 4. Click "Reset password"
# 5. Follow the prompts to reset the password
#
# The format for Elastic Cloud URLs is
# https://username:password@cluster_id.region_id.gcp.cloud.es.io:9243.
# """
#
# client: Any
# """Elasticsearch client."""
# index_name: str
# """Name of the index to use in Elasticsearch."""
#
# @classmethod
# def create(
# cls, elasticsearch_url: str, index_name: str, k1: float = 2.0, b: float = 0.75
# ) -> ElasticSearchBM25Retriever:
# """
# Create a ElasticSearchBM25Retriever from a list of texts.
#
# Args:
# elasticsearch_url: URL of the Elasticsearch instance to connect to.
# index_name: Name of the index to use in Elasticsearch.
# k1: BM25 parameter k1.
# b: BM25 parameter b.
#
# Returns:
#
# """
# from elasticsearch import Elasticsearch
#
# # Create an Elasticsearch client instance
# es = Elasticsearch(elasticsearch_url)
#
# # Define the index settings and mappings
# settings = {
# "analysis": {"analyzer": {"default": {"type": "standard"}}},
# "similarity": {
# "custom_bm25": {
# "type": "BM25",
# "k1": k1,
# "b": b,
# }
# },
# }
# mappings = {
# "properties": {
# "content": {
# "type": "text",
# "similarity": "custom_bm25", # Use the custom BM25 similarity
# }
# }
# }
#
# # Create the index with the specified settings and mappings
# es.indices.create(index=index_name, mappings=mappings, settings=settings)
# return cls(client=es, index_name=index_name)
#
# def add_texts(
# self,
# texts: Iterable[str],
# refresh_indices: bool = True,
# ) -> List[str]:
# """Run more texts through the embeddings and add to the retriever.
#
# Args:
# texts: Iterable of strings to add to the retriever.
# refresh_indices: bool to refresh ElasticSearch indices
#
# Returns:
# List of ids from adding the texts into the retriever.
# """
# try:
# from elasticsearch.helpers import bulk
# except ImportError:
# raise ImportError(
# "Could not import elasticsearch python package. "
# "Please install it with `pip install elasticsearch`."
# )
# requests = []
# ids = []
# for i, text in enumerate(texts):
# _id = str(uuid.uuid4())
# request = {
# "_op_type": "index",
# "_index": self.index_name,
# "content": text,
# "_id": _id,
# }
# ids.append(_id)
# requests.append(request)
# bulk(self.client, requests)
#
# if refresh_indices:
# self.client.indices.refresh(index=self.index_name)
# return ids
#
# def _get_relevant_documents(
# self, query: str, *, run_manager: CallbackManagerForRetrieverRun
# ) -> List[Document]:
# query_dict = {"query": {"match": {"content": query}}}
# res = self.client.search(index=self.index_name, body=query_dict)
#
# docs = []
# for r in res["hits"]["hits"]:
# docs.append(Document(page_content=r["_source"]["content"]))
# return docs
############################################################################################################
############################################################################################################
# Multi Query Retriever
# class MultiQueryRetriever(BaseRetriever):
# """Given a query, use an LLM to write a set of queries.
#
# Retrieve docs for each query. Return the unique union of all retrieved docs.
# """
#
# retriever: BaseRetriever
# llm_chain: Runnable
# verbose: bool = True
# parser_key: str = "lines"
# """DEPRECATED. parser_key is no longer used and should not be specified."""
# include_original: bool = False
# """Whether to include the original query in the list of generated queries."""
#
# @classmethod
# def from_llm(
# cls,
# retriever: BaseRetriever,
# llm: BaseLanguageModel,
# prompt: BasePromptTemplate = DEFAULT_QUERY_PROMPT,
# parser_key: Optional[str] = None,
# include_original: bool = False,
# ) -> "MultiQueryRetriever":
# """Initialize from llm using default template.
#
# Args:
# retriever: retriever to query documents from
# llm: llm for query generation using DEFAULT_QUERY_PROMPT
# prompt: The prompt which aims to generate several different versions
# of the given user query
# include_original: Whether to include the original query in the list of
# generated queries.
#
# Returns:
# MultiQueryRetriever
# """
# output_parser = LineListOutputParser()
# llm_chain = prompt | llm | output_parser
# return cls(
# retriever=retriever,
# llm_chain=llm_chain,
# include_original=include_original,
# )
#
# async def _aget_relevant_documents(
# self,
# query: str,
# *,
# run_manager: AsyncCallbackManagerForRetrieverRun,
# ) -> List[Document]:
# """Get relevant documents given a user query.
#
# Args:
# query: user query
#
# Returns:
# Unique union of relevant documents from all generated queries
# """
# queries = await self.agenerate_queries(query, run_manager)
# if self.include_original:
# queries.append(query)
# documents = await self.aretrieve_documents(queries, run_manager)
# return self.unique_union(documents)
#
# async def agenerate_queries(
# self, question: str, run_manager: AsyncCallbackManagerForRetrieverRun
# ) -> List[str]:
# """Generate queries based upon user input.
#
# Args:
# question: user query
#
# Returns:
# List of LLM generated queries that are similar to the user input
# """
# response = await self.llm_chain.ainvoke(
# {"question": question}, config={"callbacks": run_manager.get_child()}
# )
# if isinstance(self.llm_chain, LLMChain):
# lines = response["text"]
# else:
# lines = response
# if self.verbose:
# logger.info(f"Generated queries: {lines}")
# return lines
#
# async def aretrieve_documents(
# self, queries: List[str], run_manager: AsyncCallbackManagerForRetrieverRun
# ) -> List[Document]:
# """Run all LLM generated queries.
#
# Args:
# queries: query list
#
# Returns:
# List of retrieved Documents
# """
# document_lists = await asyncio.gather(
# *(
# self.retriever.ainvoke(
# query, config={"callbacks": run_manager.get_child()}
# )
# for query in queries
# )
# )
# return [doc for docs in document_lists for doc in docs]
#
# def _get_relevant_documents(
# self,
# query: str,
# *,
# run_manager: CallbackManagerForRetrieverRun,
# ) -> List[Document]:
# """Get relevant documents given a user query.
#
# Args:
# query: user query
#
# Returns:
# Unique union of relevant documents from all generated queries
# """
# queries = self.generate_queries(query, run_manager)
# if self.include_original:
# queries.append(query)
# documents = self.retrieve_documents(queries, run_manager)
# return self.unique_union(documents)
#
# def generate_queries(
# self, question: str, run_manager: CallbackManagerForRetrieverRun
# ) -> List[str]:
# """Generate queries based upon user input.
#
# Args:
# question: user query
#
# Returns:
# List of LLM generated queries that are similar to the user input
# """
# response = self.llm_chain.invoke(
# {"question": question}, config={"callbacks": run_manager.get_child()}
# )
# if isinstance(self.llm_chain, LLMChain):
# lines = response["text"]
# else:
# lines = response
# if self.verbose:
# logger.info(f"Generated queries: {lines}")
# return lines
#
# def retrieve_documents(
# self, queries: List[str], run_manager: CallbackManagerForRetrieverRun
# ) -> List[Document]:
# """Run all LLM generated queries.
#
# Args:
# queries: query list
#
# Returns:
# List of retrieved Documents
# """
# documents = []
# for query in queries:
# docs = self.retriever.invoke(
# query, config={"callbacks": run_manager.get_child()}
# )
# documents.extend(docs)
# return documents
#
# def unique_union(self, documents: List[Document]) -> List[Document]:
# """Get unique Documents.
#
# Args:
# documents: List of retrieved Documents
#
# Returns:
# List of unique retrieved Documents
# """
# return _unique_documents(documents)
############################################################################################################
############################################################################################################
# ElasticSearch Retriever
# https://github.com/langchain-ai/langchain/tree/44e3e2391c48bfd0a8e6a20adde0b6567f4f43c3/templates/rag-elasticsearch
#
# https://github.com/langchain-ai/langchain/tree/44e3e2391c48bfd0a8e6a20adde0b6567f4f43c3/templates/rag-self-query