|
import configparser
|
|
import logging
|
|
import sqlite3
|
|
from typing import List, Dict, Any
|
|
|
|
import chromadb
|
|
import requests
|
|
from chromadb import Settings
|
|
|
|
from App_Function_Libraries.Chunk_Lib import improved_chunking_process
|
|
from App_Function_Libraries.DB.DB_Manager import add_media_chunk, update_fts_for_media
|
|
from App_Function_Libraries.LLM_API_Calls import get_openai_embeddings
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
config = configparser.ConfigParser()
|
|
config.read('config.txt')
|
|
chroma_db_path = config.get('Database', 'chroma_db_path', fallback='chroma_db')
|
|
chroma_client = chromadb.PersistentClient(path=chroma_db_path, settings=Settings(anonymized_telemetry=False))
|
|
|
|
|
|
embedding_provider = config.get('Embeddings', 'provider', fallback='openai')
|
|
embedding_model = config.get('Embeddings', 'model', fallback='text-embedding-3-small')
|
|
embedding_api_key = config.get('Embeddings', 'api_key', fallback='')
|
|
embedding_api_url = config.get('Embeddings', 'api_url', fallback='')
|
|
|
|
|
|
chunk_options = {
|
|
'method': config.get('Chunking', 'method', fallback='words'),
|
|
'max_size': config.getint('Chunking', 'max_size', fallback=400),
|
|
'overlap': config.getint('Chunking', 'overlap', fallback=200),
|
|
'adaptive': config.getboolean('Chunking', 'adaptive', fallback=False),
|
|
'multi_level': config.getboolean('Chunking', 'multi_level', fallback=False),
|
|
'language': config.get('Chunking', 'language', fallback='english')
|
|
}
|
|
|
|
|
|
def auto_update_chroma_embeddings(media_id: int, content: str):
|
|
"""
|
|
Automatically update ChromaDB embeddings when a new item is ingested into the SQLite database.
|
|
|
|
:param media_id: The ID of the newly ingested media item
|
|
:param content: The content of the newly ingested media item
|
|
"""
|
|
collection_name = f"media_{media_id}"
|
|
|
|
|
|
collection = chroma_client.get_or_create_collection(name=collection_name)
|
|
|
|
|
|
existing_embeddings = collection.get(ids=[f"{media_id}_chunk_{i}" for i in range(len(content))])
|
|
|
|
if existing_embeddings and len(existing_embeddings) > 0:
|
|
logging.info(f"Embeddings already exist for media ID {media_id}, skipping...")
|
|
else:
|
|
|
|
process_and_store_content(content, collection_name, media_id)
|
|
logging.info(f"Updated ChromaDB embeddings for media ID: {media_id}")
|
|
|
|
|
|
|
|
def process_and_store_content(content: str, collection_name: str, media_id: int):
|
|
|
|
chunks = improved_chunking_process(content, chunk_options)
|
|
texts = [chunk['text'] for chunk in chunks]
|
|
|
|
|
|
embeddings = [create_embedding(text) for text in texts]
|
|
|
|
|
|
ids = [f"{media_id}_chunk_{i}" for i in range(len(texts))]
|
|
|
|
|
|
store_in_chroma(collection_name, texts, embeddings, ids)
|
|
|
|
|
|
for i, chunk in enumerate(chunks):
|
|
add_media_chunk(media_id, chunk['text'], chunk['start'], chunk['end'], ids[i])
|
|
|
|
|
|
update_fts_for_media(media_id)
|
|
|
|
|
|
def store_in_chroma(collection_name: str, texts: List[str], embeddings: List[List[float]], ids: List[str]):
|
|
collection = chroma_client.get_or_create_collection(name=collection_name)
|
|
collection.add(
|
|
documents=texts,
|
|
embeddings=embeddings,
|
|
ids=ids
|
|
)
|
|
|
|
|
|
def vector_search(collection_name: str, query: str, k: int = 10) -> List[str]:
|
|
query_embedding = create_embedding(query)
|
|
collection = chroma_client.get_collection(name=collection_name)
|
|
results = collection.query(
|
|
query_embeddings=[query_embedding],
|
|
n_results=k
|
|
)
|
|
return results['documents'][0]
|
|
|
|
|
|
def create_embedding(text: str) -> List[float]:
|
|
global embedding_provider, embedding_model, embedding_api_url, embedding_api_key
|
|
|
|
if embedding_provider == 'openai':
|
|
return get_openai_embeddings(text, embedding_model)
|
|
elif embedding_provider == 'local':
|
|
response = requests.post(
|
|
embedding_api_url,
|
|
json={"text": text, "model": embedding_model},
|
|
headers={"Authorization": f"Bearer {embedding_api_key}"}
|
|
)
|
|
return response.json()['embedding']
|
|
elif embedding_provider == 'huggingface':
|
|
from transformers import AutoTokenizer, AutoModel
|
|
import torch
|
|
|
|
tokenizer = AutoTokenizer.from_pretrained(embedding_model)
|
|
model = AutoModel.from_pretrained(embedding_model)
|
|
|
|
inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=512)
|
|
with torch.no_grad():
|
|
outputs = model(**inputs)
|
|
|
|
|
|
embeddings = outputs.last_hidden_state.mean(dim=1)
|
|
return embeddings[0].tolist()
|
|
else:
|
|
raise ValueError(f"Unsupported embedding provider: {embedding_provider}")
|
|
|
|
|
|
def create_all_embeddings(api_choice: str, model_or_url: str) -> str:
|
|
try:
|
|
all_content = get_all_content_from_database()
|
|
|
|
if not all_content:
|
|
return "No content found in the database."
|
|
|
|
texts_to_embed = []
|
|
embeddings_to_store = []
|
|
ids_to_store = []
|
|
collection_name = "all_content_embeddings"
|
|
|
|
|
|
collection = chroma_client.get_or_create_collection(name=collection_name)
|
|
|
|
for content_item in all_content:
|
|
media_id = content_item['id']
|
|
text = content_item['content']
|
|
|
|
|
|
embedding_exists = collection.get(ids=[f"doc_{media_id}"])
|
|
|
|
if embedding_exists:
|
|
logging.info(f"Embedding already exists for media ID {media_id}, skipping...")
|
|
continue
|
|
|
|
|
|
if api_choice == "openai":
|
|
embedding = create_openai_embedding(text, model_or_url)
|
|
else:
|
|
embedding = create_llamacpp_embedding(text, model_or_url)
|
|
|
|
|
|
texts_to_embed.append(text)
|
|
embeddings_to_store.append(embedding)
|
|
ids_to_store.append(f"doc_{media_id}")
|
|
|
|
|
|
if texts_to_embed and embeddings_to_store:
|
|
store_in_chroma(collection_name, texts_to_embed, embeddings_to_store, ids_to_store)
|
|
|
|
return "Embeddings created and stored successfully for all new content."
|
|
except Exception as e:
|
|
logging.error(f"Error during embedding creation: {str(e)}")
|
|
return f"Error: {str(e)}"
|
|
|
|
|
|
def create_openai_embedding(text: str, model: str) -> List[float]:
|
|
openai_api_key = config['API']['openai_api_key']
|
|
embedding = get_openai_embeddings(text, model)
|
|
return embedding
|
|
|
|
|
|
def create_llamacpp_embedding(text: str, api_url: str) -> List[float]:
|
|
response = requests.post(
|
|
api_url,
|
|
json={"input": text}
|
|
)
|
|
if response.status_code == 200:
|
|
return response.json()['embedding']
|
|
else:
|
|
raise Exception(f"Error from Llama.cpp API: {response.text}")
|
|
|
|
|
|
def get_all_content_from_database() -> List[Dict[str, Any]]:
|
|
"""
|
|
Retrieve all media content from the database that requires embedding.
|
|
|
|
Returns:
|
|
List[Dict[str, Any]]: A list of dictionaries, each containing the media ID, content, title, and other relevant fields.
|
|
"""
|
|
try:
|
|
from App_Function_Libraries.DB.DB_Manager import db
|
|
with db.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
SELECT id, content, title, author, type
|
|
FROM Media
|
|
WHERE is_trash = 0 -- Exclude items marked as trash
|
|
""")
|
|
media_items = cursor.fetchall()
|
|
|
|
|
|
all_content = [
|
|
{
|
|
'id': item[0],
|
|
'content': item[1],
|
|
'title': item[2],
|
|
'author': item[3],
|
|
'type': item[4]
|
|
}
|
|
for item in media_items
|
|
]
|
|
|
|
return all_content
|
|
|
|
except sqlite3.Error as e:
|
|
logging.error(f"Error retrieving all content from database: {e}")
|
|
from App_Function_Libraries.DB.SQLite_DB import DatabaseError
|
|
raise DatabaseError(f"Error retrieving all content from database: {e}")
|
|
|
|
|
|
def store_in_chroma_with_citation(collection_name: str, texts: List[str], embeddings: List[List[float]], ids: List[str], sources: List[str]):
|
|
collection = chroma_client.get_or_create_collection(name=collection_name)
|
|
collection.add(
|
|
documents=texts,
|
|
embeddings=embeddings,
|
|
ids=ids,
|
|
metadatas=[{'source': source} for source in sources]
|
|
)
|
|
|
|
|
|
def check_embedding_status(selected_item):
|
|
if not selected_item:
|
|
return "Please select an item", ""
|
|
item_id = selected_item.split('(')[0].strip()
|
|
collection = chroma_client.get_or_create_collection(name="all_content_embeddings")
|
|
result = collection.get(ids=[f"doc_{item_id}"])
|
|
if result['ids']:
|
|
embedding = result['embeddings'][0]
|
|
embedding_preview = str(embedding[:50])
|
|
return f"Embedding exists for item: {item_id}", f"Embedding preview: {embedding_preview}..."
|
|
else:
|
|
return f"No embedding found for item: {item_id}", ""
|
|
|
|
|
|
def create_new_embedding(selected_item, api_choice, openai_model, llamacpp_url):
|
|
if not selected_item:
|
|
return "Please select an item"
|
|
item_id = selected_item.split('(')[0].strip()
|
|
items = get_all_content_from_database()
|
|
item = next((item for item in items if item['title'] == item_id), None)
|
|
if not item:
|
|
return f"Item not found: {item_id}"
|
|
|
|
try:
|
|
if api_choice == "OpenAI":
|
|
embedding = create_embedding(item['content'])
|
|
else:
|
|
embedding = create_embedding(item['content'])
|
|
|
|
collection_name = "all_content_embeddings"
|
|
store_in_chroma(collection_name, [item['content']], [embedding], [f"doc_{item['id']}"])
|
|
return f"New embedding created and stored for item: {item_id}"
|
|
except Exception as e:
|
|
return f"Error creating embedding: {str(e)}"
|
|
|
|
|
|
|
|
|
|
|