Spaces:
Running
Running
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader, Settings | |
from llama_index.llms.anthropic import Anthropic | |
from llama_index.embeddings.huggingface import HuggingFaceEmbedding | |
from llama_index.core.node_parser import SentenceSplitter | |
from llama_index.core.callbacks import CallbackManager, LlamaDebugHandler | |
from llama_index.core import StorageContext, load_index_from_storage | |
import logging | |
import os | |
from dotenv import load_dotenv | |
import time | |
from typing import Optional, Dict, Any | |
from tqdm import tqdm | |
# Set up logging to track what the chatbot is doing | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s' | |
) | |
logger = logging.getLogger(__name__) | |
# Disable tokenizer parallelism warnings | |
os.environ["TOKENIZERS_PARALLELISM"] = "false" | |
# Create a directory for storing the index | |
INDEX_DIR = "index" | |
if not os.path.exists(INDEX_DIR): | |
os.makedirs(INDEX_DIR) | |
class Chatbot: | |
def __init__(self, config: Optional[Dict[str, Any]] = None): | |
"""Initialize the chatbot with configuration.""" | |
# Set up basic variables and load configuration | |
self.config = config or {} | |
self.api_key = self._get_api_key() | |
self.index = None | |
self.query_engine = None | |
self.llm = None | |
self.embed_model = None | |
# Set up debugging tools to help track any issues | |
self.debug_handler = LlamaDebugHandler(print_trace_on_end=True) | |
self.callback_manager = CallbackManager([self.debug_handler]) | |
# Set up all the components needed for the chatbot | |
self._initialize_components() | |
def _get_api_key(self) -> str: | |
"""Get API key from environment or config.""" | |
# Load the API key from environment variables or config file | |
load_dotenv() | |
api_key = os.getenv("ANTHROPIC_API_KEY") or self.config.get("api_key") | |
if not api_key: | |
raise ValueError("API key not found in environment or config") | |
return api_key | |
def _initialize_components(self): | |
"""Initialize all components with proper error handling.""" | |
try: | |
# Set up the language model (Claude) with our settings | |
logger.info("Setting up Claude language model...") | |
self.llm = Anthropic( | |
api_key=self.api_key, | |
model=self.config.get("model", "claude-3-7-sonnet-20250219"), | |
temperature=self.config.get("temperature", 0.1), | |
max_tokens=self.config.get("max_tokens", 2048) # Allow for longer responses | |
) | |
# Set up the model that converts text into numbers (embeddings) | |
logger.info("Setting up text embedding model...") | |
self.embed_model = HuggingFaceEmbedding( | |
model_name=self.config.get("embedding_model", "sentence-transformers/all-MiniLM-L6-v2"), | |
device=self.config.get("device", "cpu"), | |
embed_batch_size=self.config.get("embed_batch_size", 8) | |
) | |
# Configure all the settings for the chatbot | |
logger.info("Configuring chatbot settings...") | |
Settings.embed_model = self.embed_model | |
Settings.text_splitter = SentenceSplitter( | |
chunk_size=self.config.get("chunk_size", 1024), | |
chunk_overlap=self.config.get("chunk_overlap", 100), | |
paragraph_separator="\n\n" | |
) | |
Settings.llm = self.llm | |
Settings.callback_manager = self.callback_manager | |
logger.info("Components initialized successfully") | |
except Exception as e: | |
logger.error(f"Error initializing components: {e}") | |
raise | |
def load_documents(self, data_dir: str = "data"): | |
"""Load documents with retry logic.""" | |
# Try to load documents up to 3 times if there's an error | |
max_retries = 3 | |
retry_delay = 1 | |
for attempt in range(max_retries): | |
try: | |
logger.info(f"Loading documents from {data_dir}...") | |
documents = SimpleDirectoryReader(data_dir).load_data() | |
logger.info(f"Loaded {len(documents)} documents") | |
return documents | |
except Exception as e: | |
if attempt < max_retries - 1: | |
logger.warning(f"Attempt {attempt + 1} failed: {e}. Retrying in {retry_delay} seconds...") | |
time.sleep(retry_delay) | |
else: | |
logger.error(f"Failed to load documents after {max_retries} attempts: {e}") | |
raise | |
def create_index(self, documents): | |
"""Create index with error handling.""" | |
try: | |
# Check if index already exists | |
if os.path.exists(os.path.join(INDEX_DIR, "index.json")): | |
logger.info("Loading existing index...") | |
storage_context = StorageContext.from_defaults(persist_dir=INDEX_DIR) | |
self.index = load_index_from_storage(storage_context) | |
logger.info("Index loaded successfully") | |
return | |
# Create a new index if none exists | |
logger.info("Creating new index...") | |
with tqdm(total=1, desc="Creating searchable index") as pbar: | |
self.index = VectorStoreIndex.from_documents(documents) | |
# Save the index | |
self.index.storage_context.persist(persist_dir=INDEX_DIR) | |
pbar.update(1) | |
logger.info("Index created and saved successfully") | |
except Exception as e: | |
logger.error(f"Error creating/loading index: {e}") | |
raise | |
def initialize_query_engine(self): | |
"""Initialize query engine with error handling.""" | |
try: | |
# Set up the system that will handle questions | |
logger.info("Initializing query engine...") | |
self.query_engine = self.index.as_query_engine() | |
logger.info("Query engine initialized successfully") | |
except Exception as e: | |
logger.error(f"Error initializing query engine: {e}") | |
raise | |
def query(self, query_text: str) -> str: | |
"""Execute a query with error handling and retries.""" | |
# Try to answer questions up to 3 times if there's an error | |
max_retries = 3 | |
retry_delay = 1 | |
for attempt in range(max_retries): | |
try: | |
logger.info(f"Executing query: {query_text}") | |
print("\nThinking...", end="", flush=True) | |
response = self.query_engine.query(query_text) | |
print(" Done!") | |
logger.info("Query executed successfully") | |
return str(response) | |
except Exception as e: | |
if attempt < max_retries - 1: | |
logger.warning(f"Attempt {attempt + 1} failed: {e}. Retrying in {retry_delay} seconds...") | |
time.sleep(retry_delay) | |
else: | |
logger.error(f"Failed to execute query after {max_retries} attempts: {e}") | |
raise | |
def cleanup(self): | |
"""Clean up resources.""" | |
try: | |
# Clean up any resources we used | |
logger.info("Cleaning up resources...") | |
logger.info("Cleanup completed successfully") | |
except Exception as e: | |
logger.error(f"Error during cleanup: {e}") | |
def main(): | |
# Set up all the configuration settings for the chatbot | |
config = { | |
"model": "claude-3-7-sonnet-20250219", | |
"temperature": 0.1, | |
"max_tokens": 2048, # Allow for longer responses | |
"embedding_model": "sentence-transformers/all-MiniLM-L6-v2", | |
"device": "cpu", | |
"embed_batch_size": 8, | |
"chunk_size": 1024, | |
"chunk_overlap": 100 | |
} | |
chatbot = None | |
try: | |
# Create and set up the chatbot | |
print("\nInitializing chatbot...") | |
chatbot = Chatbot(config) | |
# Load the documents we want to analyze | |
documents = chatbot.load_documents() | |
# Create a searchable index from the documents | |
chatbot.create_index(documents) | |
# Set up the system that will handle questions | |
chatbot.initialize_query_engine() | |
print("\nChatbot is ready! You can ask questions about your documents.") | |
print("Type 'exit' to quit.") | |
print("-" * 50) | |
while True: | |
# Get user input | |
question = input("\nYour question: ").strip() | |
# Check if user wants to exit | |
if question.lower() in ['exit', 'quit', 'bye']: | |
print("\nGoodbye!") | |
break | |
# Skip empty questions | |
if not question: | |
continue | |
# Get and print the response | |
response = chatbot.query(question) | |
print("\nAnswer:", response) | |
print("-" * 50) | |
except Exception as e: | |
logger.error(f"Error in main: {e}") | |
finally: | |
# Clean up when we're done | |
if chatbot: | |
chatbot.cleanup() | |
if __name__ == "__main__": | |
main() |