File size: 16,059 Bytes
45e1f81 a324812 45e1f81 a324812 45e1f81 31334e0 45e1f81 31334e0 45e1f81 a324812 45e1f81 a324812 45e1f81 a324812 45e1f81 a324812 45e1f81 a324812 45e1f81 a324812 45e1f81 a324812 45e1f81 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 |
# RAG_Library_2.py
# Description: This script contains the main RAG pipeline function and related functions for the RAG pipeline.
#
# Import necessary modules and functions
import configparser
import logging
import os
from typing import Dict, Any, List, Optional
# Local Imports
from App_Function_Libraries.RAG.ChromaDB_Library import process_and_store_content, vector_search, chroma_client
from App_Function_Libraries.Web_Scraping.Article_Extractor_Lib import scrape_article
from App_Function_Libraries.DB.DB_Manager import search_db, fetch_keywords_for_media
from App_Function_Libraries.Utils.Utils import load_comprehensive_config
#
# 3rd-Party Imports
import openai
#
########################################################################################################################
#
# Functions:
# Initialize OpenAI client (adjust this based on your API key management)
openai.api_key = "your-openai-api-key"
# Get the directory of the current script
current_dir = os.path.dirname(os.path.abspath(__file__))
# Construct the path to the config file
config_path = os.path.join(current_dir, 'Config_Files', 'config.txt')
# Read the config file
config = configparser.ConfigParser()
# Read the configuration file
config.read('config.txt')
# RAG pipeline function for web scraping
# def rag_web_scraping_pipeline(url: str, query: str, api_choice=None) -> Dict[str, Any]:
# try:
# # Extract content
# try:
# article_data = scrape_article(url)
# content = article_data['content']
# title = article_data['title']
# except Exception as e:
# logging.error(f"Error scraping article: {str(e)}")
# return {"error": "Failed to scrape article", "details": str(e)}
#
# # Store the article in the database and get the media_id
# try:
# media_id = add_media_to_database(url, title, 'article', content)
# except Exception as e:
# logging.error(f"Error adding article to database: {str(e)}")
# return {"error": "Failed to store article in database", "details": str(e)}
#
# # Process and store content
# collection_name = f"article_{media_id}"
# try:
# # Assuming you have a database object available, let's call it 'db'
# db = get_database_connection()
#
# process_and_store_content(
# database=db,
# content=content,
# collection_name=collection_name,
# media_id=media_id,
# file_name=title,
# create_embeddings=True,
# create_contextualized=True,
# api_name=api_choice
# )
# except Exception as e:
# logging.error(f"Error processing and storing content: {str(e)}")
# return {"error": "Failed to process and store content", "details": str(e)}
#
# # Perform searches
# try:
# vector_results = vector_search(collection_name, query, k=5)
# fts_results = search_db(query, ["content"], "", page=1, results_per_page=5)
# except Exception as e:
# logging.error(f"Error performing searches: {str(e)}")
# return {"error": "Failed to perform searches", "details": str(e)}
#
# # Combine results with error handling for missing 'content' key
# all_results = []
# for result in vector_results + fts_results:
# if isinstance(result, dict) and 'content' in result:
# all_results.append(result['content'])
# else:
# logging.warning(f"Unexpected result format: {result}")
# all_results.append(str(result))
#
# context = "\n".join(all_results)
#
# # Generate answer using the selected API
# try:
# answer = generate_answer(api_choice, context, query)
# except Exception as e:
# logging.error(f"Error generating answer: {str(e)}")
# return {"error": "Failed to generate answer", "details": str(e)}
#
# return {
# "answer": answer,
# "context": context
# }
#
# except Exception as e:
# logging.error(f"Unexpected error in rag_pipeline: {str(e)}")
# return {"error": "An unexpected error occurred", "details": str(e)}
# RAG Search with keyword filtering
def enhanced_rag_pipeline(query: str, api_choice: str, keywords: str = None) -> Dict[str, Any]:
try:
# Load embedding provider from config, or fallback to 'openai'
embedding_provider = config.get('Embeddings', 'provider', fallback='openai')
# Log the provider used
logging.debug(f"Using embedding provider: {embedding_provider}")
# Process keywords if provided
keyword_list = [k.strip().lower() for k in keywords.split(',')] if keywords else []
logging.debug(f"enhanced_rag_pipeline - Keywords: {keyword_list}")
# Fetch relevant media IDs based on keywords if keywords are provided
relevant_media_ids = fetch_relevant_media_ids(keyword_list) if keyword_list else None
logging.debug(f"enhanced_rag_pipeline - relevant media IDs: {relevant_media_ids}")
# Perform vector search
vector_results = perform_vector_search(query, relevant_media_ids)
logging.debug(f"enhanced_rag_pipeline - Vector search results: {vector_results}")
# Perform full-text search
fts_results = perform_full_text_search(query, relevant_media_ids)
logging.debug(f"enhanced_rag_pipeline - Full-text search results: {fts_results}")
# Combine results
all_results = vector_results + fts_results
# FIXME - Apply Re-Ranking of results here
apply_re_ranking = False
if apply_re_ranking:
# Implement re-ranking logic here
pass
# Extract content from results
context = "\n".join([result['content'] for result in all_results[:10]]) # Limit to top 10 results
logging.debug(f"Context length: {len(context)}")
logging.debug(f"Context: {context[:200]}")
# Generate answer using the selected API
answer = generate_answer(api_choice, context, query)
if not all_results:
logging.info(f"No results found. Query: {query}, Keywords: {keywords}")
return {
"answer": "No relevant information based on your query and keywords were found in the database. Your query has been directly passed to the LLM, and here is its answer: \n\n" + answer,
"context": "No relevant information based on your query and keywords were found in the database. The only context used was your query: \n\n" + query
}
return {
"answer": answer,
"context": context
}
except Exception as e:
logging.error(f"Error in enhanced_rag_pipeline: {str(e)}")
return {
"answer": "An error occurred while processing your request.",
"context": ""
}
def generate_answer(api_choice: str, context: str, query: str) -> str:
logging.debug("Entering generate_answer function")
config = load_comprehensive_config()
logging.debug(f"Config sections: {config.sections()}")
prompt = f"Context: {context}\n\nQuestion: {query}"
if api_choice == "OpenAI":
from App_Function_Libraries.Summarization.Summarization_General_Lib import summarize_with_openai
return summarize_with_openai(config['API']['openai_api_key'], prompt, "")
elif api_choice == "Anthropic":
from App_Function_Libraries.Summarization.Summarization_General_Lib import summarize_with_anthropic
return summarize_with_anthropic(config['API']['anthropic_api_key'], prompt, "")
elif api_choice == "Cohere":
from App_Function_Libraries.Summarization.Summarization_General_Lib import summarize_with_cohere
return summarize_with_cohere(config['API']['cohere_api_key'], prompt, "")
elif api_choice == "Groq":
from App_Function_Libraries.Summarization.Summarization_General_Lib import summarize_with_groq
return summarize_with_groq(config['API']['groq_api_key'], prompt, "")
elif api_choice == "OpenRouter":
from App_Function_Libraries.Summarization.Summarization_General_Lib import summarize_with_openrouter
return summarize_with_openrouter(config['API']['openrouter_api_key'], prompt, "")
elif api_choice == "HuggingFace":
from App_Function_Libraries.Summarization.Summarization_General_Lib import summarize_with_huggingface
return summarize_with_huggingface(config['API']['huggingface_api_key'], prompt, "")
elif api_choice == "DeepSeek":
from App_Function_Libraries.Summarization.Summarization_General_Lib import summarize_with_deepseek
return summarize_with_deepseek(config['API']['deepseek_api_key'], prompt, "")
elif api_choice == "Mistral":
from App_Function_Libraries.Summarization.Summarization_General_Lib import summarize_with_mistral
return summarize_with_mistral(config['API']['mistral_api_key'], prompt, "")
elif api_choice == "Local-LLM":
from App_Function_Libraries.Summarization.Local_Summarization_Lib import summarize_with_local_llm
return summarize_with_local_llm(config['API']['local_llm_path'], prompt, "")
elif api_choice == "Llama.cpp":
from App_Function_Libraries.Summarization.Local_Summarization_Lib import summarize_with_llama
return summarize_with_llama(config['API']['llama_api_key'], prompt, "")
elif api_choice == "Kobold":
from App_Function_Libraries.Summarization.Local_Summarization_Lib import summarize_with_kobold
return summarize_with_kobold(config['API']['kobold_api_key'], prompt, "")
elif api_choice == "Ooba":
from App_Function_Libraries.Summarization.Local_Summarization_Lib import summarize_with_oobabooga
return summarize_with_oobabooga(config['API']['ooba_api_key'], prompt, "")
elif api_choice == "TabbyAPI":
from App_Function_Libraries.Summarization.Local_Summarization_Lib import summarize_with_tabbyapi
return summarize_with_tabbyapi(config['API']['tabby_api_key'], prompt, "")
elif api_choice == "vLLM":
from App_Function_Libraries.Summarization.Local_Summarization_Lib import summarize_with_vllm
return summarize_with_vllm(config['API']['vllm_api_key'], prompt, "")
elif api_choice == "ollama":
from App_Function_Libraries.Summarization.Local_Summarization_Lib import summarize_with_ollama
return summarize_with_ollama(config['API']['ollama_api_key'], prompt, "")
else:
raise ValueError(f"Unsupported API choice: {api_choice}")
def perform_vector_search(query: str, relevant_media_ids: List[str] = None) -> List[Dict[str, Any]]:
all_collections = chroma_client.list_collections()
vector_results = []
for collection in all_collections:
collection_results = vector_search(collection.name, query, k=5)
filtered_results = [
result for result in collection_results
if relevant_media_ids is None or result['metadata'].get('media_id') in relevant_media_ids
]
vector_results.extend(filtered_results)
return vector_results
def perform_full_text_search(query: str, relevant_media_ids: List[str] = None) -> List[Dict[str, Any]]:
fts_results = search_db(query, ["content"], "", page=1, results_per_page=5)
filtered_fts_results = [
{
"content": result['content'],
"metadata": {"media_id": result['id']}
}
for result in fts_results
if relevant_media_ids is None or result['id'] in relevant_media_ids
]
return filtered_fts_results
def fetch_relevant_media_ids(keywords: List[str]) -> List[int]:
relevant_ids = set()
try:
for keyword in keywords:
media_ids = fetch_keywords_for_media(keyword)
relevant_ids.update(media_ids)
except Exception as e:
logging.error(f"Error fetching relevant media IDs: {str(e)}")
return list(relevant_ids)
def filter_results_by_keywords(results: List[Dict[str, Any]], keywords: List[str]) -> List[Dict[str, Any]]:
if not keywords:
return results
filtered_results = []
for result in results:
try:
metadata = result.get('metadata', {})
if metadata is None:
logging.warning(f"No metadata found for result: {result}")
continue
if not isinstance(metadata, dict):
logging.warning(f"Unexpected metadata type: {type(metadata)}. Expected dict.")
continue
media_id = metadata.get('media_id')
if media_id is None:
logging.warning(f"No media_id found in metadata: {metadata}")
continue
media_keywords = fetch_keywords_for_media(media_id)
if any(keyword.lower() in [mk.lower() for mk in media_keywords] for keyword in keywords):
filtered_results.append(result)
except Exception as e:
logging.error(f"Error processing result: {result}. Error: {str(e)}")
return filtered_results
# FIXME: to be implememted
def extract_media_id_from_result(result: str) -> Optional[int]:
# Implement this function based on how you store the media_id in your results
# For example, if it's stored at the beginning of each result:
try:
return int(result.split('_')[0])
except (IndexError, ValueError):
logging.error(f"Failed to extract media_id from result: {result}")
return None
#
#
########################################################################################################################
# Function to preprocess and store all existing content in the database
# def preprocess_all_content(database, create_contextualized=True, api_name="gpt-3.5-turbo"):
# unprocessed_media = get_unprocessed_media()
# total_media = len(unprocessed_media)
#
# for index, row in enumerate(unprocessed_media, 1):
# media_id, content, media_type, file_name = row
# collection_name = f"{media_type}_{media_id}"
#
# logger.info(f"Processing media {index} of {total_media}: ID {media_id}, Type {media_type}")
#
# try:
# process_and_store_content(
# database=database,
# content=content,
# collection_name=collection_name,
# media_id=media_id,
# file_name=file_name or f"{media_type}_{media_id}",
# create_embeddings=True,
# create_contextualized=create_contextualized,
# api_name=api_name
# )
#
# # Mark the media as processed in the database
# mark_media_as_processed(database, media_id)
#
# logger.info(f"Successfully processed media ID {media_id}")
# except Exception as e:
# logger.error(f"Error processing media ID {media_id}: {str(e)}")
#
# logger.info("Finished preprocessing all unprocessed content")
############################################################################################################
#
# ElasticSearch Retriever
# https://github.com/langchain-ai/langchain/tree/44e3e2391c48bfd0a8e6a20adde0b6567f4f43c3/templates/rag-elasticsearch
#
# https://github.com/langchain-ai/langchain/tree/44e3e2391c48bfd0a8e6a20adde0b6567f4f43c3/templates/rag-self-query
#
# End of RAG_Library_2.py
############################################################################################################
|