|
from dotenv import load_dotenv |
|
load_dotenv() |
|
import os |
|
import chromadb |
|
import re |
|
import logging |
|
from sqlalchemy import create_engine |
|
from uuid import uuid4 |
|
from llama_index.core import SimpleDirectoryReader, VectorStoreIndex, Settings |
|
from llama_index.vector_stores.chroma import ChromaVectorStore |
|
from llama_index.embeddings.nebius import NebiusEmbedding |
|
from llama_index.core import SQLDatabase |
|
from llama_index.core.schema import TextNode |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
|
|
|
DATABASE_PATH = os.path.join('data', 'sales_database.db') |
|
|
|
|
|
|
|
SCHEMA_DIR = os.path.join('knowledge_base', 'schema') |
|
DATA_DICTIONARY_PATH = os.path.join(SCHEMA_DIR, 'data_dictionary.md') |
|
SALES_SCHEMA_SQL_PATH = os.path.join(SCHEMA_DIR, 'sales_schema.sql') |
|
|
|
|
|
KPI_DEFINITIONS_PATH = os.path.join('knowledge_base', 'business_glossary', 'kpi_definitions.md') |
|
|
|
CHROMA_DB_SCHEMA_PATH = os.path.join('.', 'chroma_db_schema') |
|
CHROMA_DB_KPI_PATH = os.path.join('.', 'chroma_db_kpi') |
|
|
|
|
|
if "NEBIUS_API_KEY" not in os.environ: |
|
logging.critical("NEBIUS_API_KEY environment variable not set. Please set it before running this script.") |
|
raise ValueError("NEBIUS_API_KEY environment variable not set. Please set it before running this script.") |
|
|
|
|
|
print("\n--- Initializing Nebius AI Embedding Model ---") |
|
try: |
|
embed_model = NebiusEmbedding( |
|
api_key=os.environ["NEBIUS_API_KEY"], |
|
model_name="BAAI/bge-en-icl", |
|
api_base="https://api.studio.nebius.com/v1/" |
|
) |
|
test_string = "This is a test string to generate an embedding for diagnostic purposes." |
|
test_embedding = embed_model.get_text_embedding(test_string) |
|
|
|
if test_embedding is None or not isinstance(test_embedding, list) or not all(isinstance(x, (int, float)) for x in test_embedding): |
|
logging.critical(f"FATAL ERROR: NebiusEmbedding returned invalid output for test string. Type: {type(test_embedding)}, Value: {test_embedding[:10] if isinstance(test_embedding, list) else test_embedding}") |
|
raise ValueError("NebiusEmbedding test failed: returned invalid embedding. Cannot proceed with RAG index creation.") |
|
|
|
logging.info(f"Nebius AI Embedding Model initialized and tested successfully. Embedding length: {len(test_embedding)}") |
|
|
|
except Exception as e: |
|
logging.critical(f"UNRECOVERABLE ERROR during Nebius AI Embedding Model initialization or testing: {e}") |
|
print(f"\n!!!! UNRECOVERABLE ERROR: {e} !!!!") |
|
print("Please check your NEBIUS_API_KEY, model_name, and api_base configuration carefully.") |
|
raise |
|
|
|
print("--- Nebius AI Embedding Model setup complete. Proceeding to Knowledge Base setup. ---") |
|
|
|
|
|
def parse_data_dictionary_md(md_path: str) -> dict: |
|
table_descriptions = {} |
|
if not os.path.exists(md_path): |
|
logging.error(f"Data dictionary file not found: {md_path}") |
|
return {} |
|
|
|
with open(md_path, 'r', encoding='utf-8') as f: |
|
content = f.read() |
|
|
|
table_headers = re.finditer(r'## Table: `(\w+)`\n\*\*Purpose:\*\* (.+?)(?=\n## Table: `|\Z)', content, re.DOTALL) |
|
|
|
for match in table_headers: |
|
table_name = match.group(1).strip() |
|
purpose = match.group(2).strip() |
|
table_descriptions[table_name] = purpose |
|
logging.info(f"Parsed table: {table_name}, Purpose length: {len(purpose)}") |
|
|
|
return table_descriptions |
|
|
|
|
|
print("\n--- Setting up Schema Retriever Agent's Knowledge Base (chroma_db_schema) ---") |
|
try: |
|
data_dict_descriptions = parse_data_dictionary_md(DATA_DICTIONARY_PATH) |
|
logging.info(f"Loaded descriptions for {len(data_dict_descriptions)} tables from data_dictionary.md") |
|
|
|
engine = create_engine(f"sqlite:///{DATABASE_PATH}") |
|
sql_database = SQLDatabase(engine) |
|
logging.info(f"Connected to database: {DATABASE_PATH}") |
|
|
|
all_table_names = sql_database.get_usable_table_names() |
|
|
|
schema_nodes = [] |
|
if not all_table_names: |
|
logging.warning("WARNING: No tables found in the database. Schema KB will be empty.") |
|
|
|
for table_name in all_table_names: |
|
ddl = sql_database.get_single_table_info(table_name) |
|
human_description = data_dict_descriptions.get(table_name, "No specific description available for this table.") |
|
|
|
combined_context = f"Table Name: {table_name}\n" \ |
|
f"Description: {human_description}\n" \ |
|
f"Table Schema (DDL):\n{ddl if ddl else ''}" |
|
|
|
node_embedding = embed_model.get_text_embedding(combined_context) |
|
if node_embedding is None: |
|
raise ValueError(f"Failed to generate embedding for schema table: {table_name}") |
|
|
|
|
|
schema_nodes.append(TextNode(text=combined_context, embedding=node_embedding, id_=table_name, metadata={"table_name": table_name, "source": "data_dictionary"})) |
|
|
|
chroma_client_schema = chromadb.PersistentClient(path=CHROMA_DB_SCHEMA_PATH) |
|
chroma_collection_schema = chroma_client_schema.get_or_create_collection(name="schema_kb") |
|
|
|
if chroma_collection_schema.count() > 0: |
|
logging.info(f"Clearing {chroma_collection_schema.count()} existing items from schema_kb before re-indexing.") |
|
chroma_collection_schema.delete(ids=[id_ for id_ in chroma_collection_schema.get()['ids']]) |
|
|
|
|
|
logging.info(f"Adding {len(schema_nodes)} schema nodes to ChromaDB directly...") |
|
if schema_nodes: |
|
chroma_collection_schema.add( |
|
documents=[node.text for node in schema_nodes], |
|
embeddings=[node.embedding for node in schema_nodes], |
|
|
|
metadatas=[node.metadata for node in schema_nodes], |
|
ids=[node.id_ for node in schema_nodes] |
|
) |
|
logging.info(f"Schema knowledge base indexed and persisted to {CHROMA_DB_SCHEMA_PATH}") |
|
except Exception as e: |
|
logging.exception("Error setting up Schema KB:") |
|
print(f"Error setting up Schema KB: {e}") |
|
print("Please ensure your database is created and accessible and data_dictionary.md is correctly formatted.") |
|
print("If the error persists, there might be a deeper compatibility issue. See above for more detailed embedding checks.") |
|
raise |
|
|
|
|
|
|
|
print("\n--- Setting up KPI Answering Agent's Knowledge Base (chroma_db_kpi) ---") |
|
try: |
|
kpi_docs = SimpleDirectoryReader(input_files=[KPI_DEFINITIONS_PATH]).load_data() |
|
logging.info(f"Loaded {len(kpi_docs)} documents for KPI Agent.") |
|
|
|
kpi_nodes = [] |
|
for doc in kpi_docs: |
|
node_embedding = embed_model.get_text_embedding(doc.get_content()) |
|
if node_embedding is None: |
|
raise ValueError(f"Failed to generate embedding for KPI document: {doc.id_}") |
|
|
|
kpi_nodes.append(TextNode(text=doc.get_content(), embedding=node_embedding, id_=str(uuid4()), metadata={"source_file": os.path.basename(KPI_DEFINITIONS_PATH), "doc_type": "kpi_definition"})) |
|
|
|
chroma_client_kpi = chromadb.PersistentClient(path=CHROMA_DB_KPI_PATH) |
|
chroma_collection_kpi = chroma_client_kpi.get_or_create_collection(name="kpi_kb") |
|
|
|
if chroma_collection_kpi.count() > 0: |
|
logging.info(f"Clearing {chroma_collection_kpi.count()} existing items from kpi_kb before re-indexing.") |
|
chroma_collection_kpi.delete(ids=[id_ for id_ in chroma_collection_kpi.get()['ids']]) |
|
|
|
logging.info(f"Adding {len(kpi_nodes)} KPI nodes to ChromaDB directly...") |
|
if kpi_nodes: |
|
chroma_collection_kpi.add( |
|
documents=[node.text for node in kpi_nodes], |
|
embeddings=[node.embedding for node in kpi_nodes], |
|
|
|
metadatas=[node.metadata for node in kpi_nodes], |
|
ids=[node.id_ for node in kpi_nodes] |
|
) |
|
logging.info(f"KPI knowledge base indexed and persisted to {CHROMA_DB_KPI_PATH}") |
|
except Exception as e: |
|
logging.exception("Error setting up KPI KB:") |
|
print(f"Error setting up KPI KB: {e}") |
|
print("Please ensure your 'kpi_definitions.md' file exists and contains valid text content.") |
|
print("If the error persists, there might be a deeper compatibility issue. See above for more detailed embedding checks.") |
|
raise |
|
|
|
|
|
print("\nKnowledge base setup complete for RAG agents!") |
|
print(f"Indices are saved in '{CHROMA_DB_SCHEMA_PATH}' and '{CHROMA_DB_KPI_PATH}' directories.") |