Spaces:
Sleeping
Sleeping
File size: 11,292 Bytes
aa1db93 cfbac61 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 |
import configparser
import logging
import sqlite3
from typing import List, Dict, Any
import chromadb
import requests
from chromadb import Settings
from App_Function_Libraries.Chunk_Lib import improved_chunking_process
from App_Function_Libraries.DB.DB_Manager import add_media_chunk, update_fts_for_media
from App_Function_Libraries.LLM_API_Calls import get_openai_embeddings
#######################################################################################################################
#
# Functions for ChromaDB
# Get ChromaDB settings
# Load configuration
config = configparser.ConfigParser()
config.read('config.txt')
chroma_db_path = config.get('Database', 'chroma_db_path', fallback='chroma_db')
chroma_client = chromadb.PersistentClient(path=chroma_db_path, settings=Settings(anonymized_telemetry=False))
import os
os.environ["PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION"] = "python"
# Get embedding settings
embedding_provider = config.get('Embeddings', 'provider', fallback='openai')
embedding_model = config.get('Embeddings', 'model', fallback='text-embedding-3-small')
embedding_api_key = config.get('Embeddings', 'api_key', fallback='')
embedding_api_url = config.get('Embeddings', 'api_url', fallback='')
# Get chunking options
chunk_options = {
'method': config.get('Chunking', 'method', fallback='words'),
'max_size': config.getint('Chunking', 'max_size', fallback=400),
'overlap': config.getint('Chunking', 'overlap', fallback=200),
'adaptive': config.getboolean('Chunking', 'adaptive', fallback=False),
'multi_level': config.getboolean('Chunking', 'multi_level', fallback=False),
'language': config.get('Chunking', 'language', fallback='english')
}
def auto_update_chroma_embeddings(media_id: int, content: str):
"""
Automatically update ChromaDB embeddings when a new item is ingested into the SQLite database.
:param media_id: The ID of the newly ingested media item
:param content: The content of the newly ingested media item
"""
collection_name = f"media_{media_id}"
# Initialize or get the ChromaDB collection
collection = chroma_client.get_or_create_collection(name=collection_name)
# Check if embeddings already exist for this media_id
existing_embeddings = collection.get(ids=[f"{media_id}_chunk_{i}" for i in range(len(content))])
if existing_embeddings and len(existing_embeddings) > 0:
logging.info(f"Embeddings already exist for media ID {media_id}, skipping...")
else:
# Process and store content if embeddings do not already exist
process_and_store_content(content, collection_name, media_id)
logging.info(f"Updated ChromaDB embeddings for media ID: {media_id}")
# Function to process content, create chunks, embeddings, and store in ChromaDB and SQLite
def process_and_store_content(content: str, collection_name: str, media_id: int):
# Process the content into chunks
chunks = improved_chunking_process(content, chunk_options)
texts = [chunk['text'] for chunk in chunks]
# Generate embeddings for each chunk
embeddings = [create_embedding(text) for text in texts]
# Create unique IDs for each chunk using the media_id and chunk index
ids = [f"{media_id}_chunk_{i}" for i in range(len(texts))]
# Store the texts, embeddings, and IDs in ChromaDB
store_in_chroma(collection_name, texts, embeddings, ids)
# Store the chunk metadata in SQLite
for i, chunk in enumerate(chunks):
add_media_chunk(media_id, chunk['text'], chunk['start'], chunk['end'], ids[i])
# Update the FTS table
update_fts_for_media(media_id)
# Function to store documents and their embeddings in ChromaDB
def store_in_chroma(collection_name: str, texts: List[str], embeddings: List[List[float]], ids: List[str]):
collection = chroma_client.get_or_create_collection(name=collection_name)
collection.add(
documents=texts,
embeddings=embeddings,
ids=ids
)
# Function to perform vector search using ChromaDB
def vector_search(collection_name: str, query: str, k: int = 10) -> List[str]:
query_embedding = create_embedding(query)
collection = chroma_client.get_collection(name=collection_name)
results = collection.query(
query_embeddings=[query_embedding],
n_results=k
)
return results['documents'][0]
def create_embedding(text: str) -> List[float]:
global embedding_provider, embedding_model, embedding_api_url, embedding_api_key
if embedding_provider == 'openai':
return get_openai_embeddings(text, embedding_model)
elif embedding_provider == 'local':
response = requests.post(
embedding_api_url,
json={"text": text, "model": embedding_model},
headers={"Authorization": f"Bearer {embedding_api_key}"}
)
return response.json()['embedding']
elif embedding_provider == 'huggingface':
from transformers import AutoTokenizer, AutoModel
import torch
tokenizer = AutoTokenizer.from_pretrained(embedding_model)
model = AutoModel.from_pretrained(embedding_model)
inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=512)
with torch.no_grad():
outputs = model(**inputs)
# Use the mean of the last hidden state as the sentence embedding
embeddings = outputs.last_hidden_state.mean(dim=1)
return embeddings[0].tolist() # Convert to list for consistency
else:
raise ValueError(f"Unsupported embedding provider: {embedding_provider}")
def create_all_embeddings(api_choice: str, model_or_url: str) -> str:
try:
all_content = get_all_content_from_database()
if not all_content:
return "No content found in the database."
texts_to_embed = []
embeddings_to_store = []
ids_to_store = []
collection_name = "all_content_embeddings"
# Initialize or get the ChromaDB collection
collection = chroma_client.get_or_create_collection(name=collection_name)
for content_item in all_content:
media_id = content_item['id']
text = content_item['content']
# Check if the embedding already exists in ChromaDB
embedding_exists = collection.get(ids=[f"doc_{media_id}"])
if embedding_exists:
logging.info(f"Embedding already exists for media ID {media_id}, skipping...")
continue # Skip if embedding already exists
# Create the embedding
if api_choice == "openai":
embedding = create_openai_embedding(text, model_or_url)
else: # Llama.cpp
embedding = create_llamacpp_embedding(text, model_or_url)
# Collect the text, embedding, and ID for batch storage
texts_to_embed.append(text)
embeddings_to_store.append(embedding)
ids_to_store.append(f"doc_{media_id}")
# Store all new embeddings in ChromaDB
if texts_to_embed and embeddings_to_store:
store_in_chroma(collection_name, texts_to_embed, embeddings_to_store, ids_to_store)
return "Embeddings created and stored successfully for all new content."
except Exception as e:
logging.error(f"Error during embedding creation: {str(e)}")
return f"Error: {str(e)}"
def create_openai_embedding(text: str, model: str) -> List[float]:
openai_api_key = config['API']['openai_api_key']
embedding = get_openai_embeddings(text, model)
return embedding
def create_llamacpp_embedding(text: str, api_url: str) -> List[float]:
response = requests.post(
api_url,
json={"input": text}
)
if response.status_code == 200:
return response.json()['embedding']
else:
raise Exception(f"Error from Llama.cpp API: {response.text}")
def get_all_content_from_database() -> List[Dict[str, Any]]:
"""
Retrieve all media content from the database that requires embedding.
Returns:
List[Dict[str, Any]]: A list of dictionaries, each containing the media ID, content, title, and other relevant fields.
"""
try:
from App_Function_Libraries.DB.DB_Manager import db
with db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT id, content, title, author, type
FROM Media
WHERE is_trash = 0 -- Exclude items marked as trash
""")
media_items = cursor.fetchall()
# Convert the results into a list of dictionaries
all_content = [
{
'id': item[0],
'content': item[1],
'title': item[2],
'author': item[3],
'type': item[4]
}
for item in media_items
]
return all_content
except sqlite3.Error as e:
logging.error(f"Error retrieving all content from database: {e}")
from App_Function_Libraries.DB.SQLite_DB import DatabaseError
raise DatabaseError(f"Error retrieving all content from database: {e}")
def store_in_chroma_with_citation(collection_name: str, texts: List[str], embeddings: List[List[float]], ids: List[str], sources: List[str]):
collection = chroma_client.get_or_create_collection(name=collection_name)
collection.add(
documents=texts,
embeddings=embeddings,
ids=ids,
metadatas=[{'source': source} for source in sources]
)
def check_embedding_status(selected_item):
if not selected_item:
return "Please select an item", ""
item_id = selected_item.split('(')[0].strip()
collection = chroma_client.get_or_create_collection(name="all_content_embeddings")
result = collection.get(ids=[f"doc_{item_id}"])
if result['ids']:
embedding = result['embeddings'][0]
embedding_preview = str(embedding[:50]) # Convert first 50 elements to string
return f"Embedding exists for item: {item_id}", f"Embedding preview: {embedding_preview}..."
else:
return f"No embedding found for item: {item_id}", ""
def create_new_embedding(selected_item, api_choice, openai_model, llamacpp_url):
if not selected_item:
return "Please select an item"
item_id = selected_item.split('(')[0].strip()
items = get_all_content_from_database()
item = next((item for item in items if item['title'] == item_id), None)
if not item:
return f"Item not found: {item_id}"
try:
if api_choice == "OpenAI":
embedding = create_embedding(item['content'])
else: # Llama.cpp
embedding = create_embedding(item['content'])
collection_name = "all_content_embeddings"
store_in_chroma(collection_name, [item['content']], [embedding], [f"doc_{item['id']}"])
return f"New embedding created and stored for item: {item_id}"
except Exception as e:
return f"Error creating embedding: {str(e)}"
#
# End of Functions for ChromaDB
####################################################################################################################### |