## Spa Customer Service Chatbot with XML Sitemap-Based Scraping

**Problem Statement**

The goal of this project is to create an LLM-Powered Assistant for "Benefit Body Spa" that can efficiently answer customer queries by leveraging website data scraped via XML sitemaps. The system should comprehensively cover the spa's services, business hours, contact information, and any other relevant details. Additionally, the solution should provide structured and unstructured data handling, embeddings generation, and a user-friendly interface for real-time interactions.

## 1. Setup and Installation

**1.1 Install Required Libraries and Packages**

In [1]:
import os
import pandas as pd
import numpy as np
import requests
from bs4 import BeautifulSoup
import re
import json
import time
from typing import List, Dict, Any
from tqdm.notebook import tqdm
from urllib.parse import urljoin
from IPython.display import display, HTML, Markdown
from datetime import datetime
from sklearn.metrics.pairwise import cosine_similarity

# Install required packages
!pip install -q google-genai==1.7.0 beautifulsoup4>=4.12.0 requests>=2.31.0 lxml>=4.9.0

# Import Google Generative AI
from google import genai
from google.genai import types

# Set up API key
from kaggle_secrets import UserSecretsClient
GOOGLE_API_KEY = UserSecretsClient().get_secret("GOOGLE_API_KEY")
client = genai.Client(api_key=GOOGLE_API_KEY)

## 1.2 project Constants

In [2]:
# Project constants
PROJECT_NAME = "Benefit Body Spa Customer Service Agent"
WEBSITE_URL = "https://benefitbodyspa.com"
SITEMAP_URL = "https://benefitbodyspa.com/sitemap.xml"
DATA_DIR = "data"
os.makedirs(DATA_DIR, exist_ok=True)
EMBEDDING_MODEL = "models/text-embedding-004"
LLM_MODEL = "gemini-2.0-flash"
VECTOR_DB_PATH = f"{DATA_DIR}/benefit_spa_vector_db.pkl"
SIMILARITY_THRESHOLD = 0.65

In [3]:
# Manual information to supplement website data
SUPPLEMENTAL_INFO = {
    "business_hours": """
    Benefit Body Spa Business Hours:
    Monday to Friday: 10:00 AM - 8:00 PM
    Saturday: 10:00 AM - 5:00 PM
    Sunday: Closed
    """,
    "appointment_policy": """
    Appointment Policy:
    - Please arrive 10-15 minutes before your scheduled appointment time
    - First-time clients should complete registration forms prior to treatment
    - Wear comfortable clothing and avoid jewelry
    - Please inform us of any medical conditions or concerns before treatment
    """,
    "cancellation_policy": """
    Cancellation Policy:
    - Please provide at least 24 hours notice for cancellations or rescheduling
    - Late cancellations (less than 24 hours) may incur a 50% fee
    - No-shows will be charged the full service fee
    - Repeated cancellations may affect future booking privileges
    """
}

## 2. Data Scraping and Preprocessing

In [5]:
# 2.1. XML Sitemap-Based Scraping

def fetch_url(url, retry_count=3, delay=2):
    """Fetch a URL with robust error handling."""
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
    }
    
    for attempt in range(retry_count):
        try:
            print(f"Fetching URL: {url} (attempt {attempt+1}/{retry_count})")
            response = requests.get(url, headers=headers, timeout=30)
            response.raise_for_status()
            print(f"‚úÖ Successfully fetched {url} - {len(response.content)} bytes")
            return response.content
        except requests.exceptions.RequestException as e:
            print(f"‚ùå Error fetching URL: {url} - {str(e)}")
            if attempt < retry_count - 1:
                sleep_time = delay * (attempt + 1)
                print(f"Retrying in {sleep_time} seconds...")
                time.sleep(sleep_time)
    
    print(f"‚ùå Failed to fetch URL after {retry_count} attempts: {url}")
    return None

def parse_sitemap_recursive(sitemap_url):
    """Parse sitemap.xml recursively to handle sitemap indexes."""
    print(f"\nüîç Parsing sitemap: {sitemap_url}")
    content = fetch_url(sitemap_url)
    
    if not content:
        print("‚ùå Failed to fetch sitemap")
        return []
    
    urls = []
    try:
        soup = BeautifulSoup(content, 'lxml-xml')  # Using lxml for better XML handling
        
        # Check if this is a sitemap index (contains other sitemaps)
        sitemap_tags = soup.find_all('sitemap')
        if sitemap_tags:
            print(f"üìë Found sitemap index with {len(sitemap_tags)} child sitemaps")
            for sitemap_tag in sitemap_tags:
                loc = sitemap_tag.find('loc')
                if loc:
                    child_sitemap_url = loc.text.strip()
                    # Recursively parse child sitemaps
                    child_urls = parse_sitemap_recursive(child_sitemap_url)
                    urls.extend(child_urls)
        
        # Regular sitemap with URLs
        url_tags = soup.find_all('url')
        if url_tags:
            for url_tag in url_tags:
                loc = url_tag.find('loc')
                if loc:
                    page_url = loc.text.strip()
                    
                    # Get last modification date if available
                    lastmod = url_tag.find('lastmod')
                    lastmod_date = lastmod.text if lastmod else None
                    
                    # Get priority if available
                    priority = url_tag.find('priority')
                    priority_value = float(priority.text) if priority else 0.5
                    
                    urls.append({
                        'url': page_url,
                        'lastmod': lastmod_date,
                        'priority': priority_value
                    })
            
            print(f"‚úÖ Found {len(url_tags)} URLs in sitemap")
    
    except Exception as e:
        print(f"‚ùå Error parsing sitemap: {e}")
    
    return urls

def extract_page_content(url_data):
    """Extract comprehensive content from a page."""
    url = url_data['url']
    content = fetch_url(url)
    
    if not content:
        return {
            "url": url,
            "title": "",
            "content": "",
            "lastmod": url_data.get('lastmod'),
            "priority": url_data.get('priority', 0.5),
            "success": False
        }
    
    try:
        soup = BeautifulSoup(content, 'html.parser')
        
        # Extract title
        title = soup.title.string.strip() if soup.title else ""
        
        # Extract meta description
        meta_description = ""
        meta_tag = soup.find('meta', attrs={'name': 'description'})
        if meta_tag and meta_tag.get('content'):
            meta_description = meta_tag.get('content', '').strip()
        
        # Extract schema metadata for business info
        structured_data = []
        for script in soup.find_all('script', type='application/ld+json'):
            try:
                if script.string:
                    data = json.loads(script.string)
                    structured_data.append(data)
            except json.JSONDecodeError:
                pass
        
        # Look for business hours, address and contact info in structured data
        business_info = ""
        for data in structured_data:
            if isinstance(data, dict):
                # Look for business hours
                if 'openingHoursSpecification' in data:
                    business_info += "BUSINESS HOURS:\n"
                    hours_data = data['openingHoursSpecification']
                    if isinstance(hours_data, list):
                        for hours in hours_data:
                            day = hours.get('dayOfWeek', '')
                            opens = hours.get('opens', '')
                            closes = hours.get('closes', '')
                            business_info += f"{day}: {opens} - {closes}\n"
                    business_info += "\n"
                
                # Look for address
                if 'address' in data:
                    business_info += "ADDRESS:\n"
                    address = data['address']
                    if isinstance(address, dict):
                        street = address.get('streetAddress', '')
                        city = address.get('addressLocality', '')
                        region = address.get('addressRegion', '')
                        postal = address.get('postalCode', '')
                        business_info += f"{street}, {city}, {region} {postal}\n\n"
                
                # Look for contact info
                if 'telephone' in data:
                    business_info += f"PHONE: {data['telephone']}\n"
                if 'email' in data:
                    business_info += f"EMAIL: {data['email']}\n"
        
        # Remove script and style elements
        for tag in soup(['script', 'style']):
            tag.decompose()
        
        # Extract all visible text with better formatting
        text_parts = []
        
        # Add business info at the beginning if found
        if business_info:
            text_parts.append(business_info)
        
        # Extract headers with emphasis
        for h_tag in soup.find_all(['h1', 'h2', 'h3']):
            header_text = h_tag.get_text(strip=True)
            if header_text:
                text_parts.append(f"SECTION: {header_text}")
        
        # Try multiple content selectors
        selectors = [
            'main', 'article', '.content', '.entry-content',
            'section', '.page-content', '#content',
            '.post-content', '.page', '.main-content'
        ]
        
        content_found = False
        for selector in selectors:
            elements = soup.select(selector)
            if elements:
                for element in elements:
                    # Extract paragraphs
                    paragraphs = element.find_all('p')
                    for p in paragraphs:
                        p_text = p.get_text(strip=True)
                        if p_text and len(p_text) > 10:  # Skip very short paragraphs
                            text_parts.append(p_text)
                    
                    # Extract list items
                    list_items = element.find_all('li')
                    if list_items:
                        for li in list_items:
                            li_text = li.get_text(strip=True)
                            if li_text and len(li_text) > 5:  # Skip very short list items
                                text_parts.append(f"- {li_text}")
                
                content_found = True
                break  # Stop after first successful selector
        
        # If no content found with selectors, use body
        if not content_found:
            body = soup.find('body')
            if body:
                # Remove navigation, header, footer
                for tag in body.find_all(['nav', 'header', 'footer']):
                    tag.decompose()
                
                # Extract paragraphs
                paragraphs = body.find_all('p')
                for p in paragraphs:
                    p_text = p.get_text(strip=True)
                    if p_text and len(p_text) > 10:
                        text_parts.append(p_text)
                
                # Extract list items
                list_items = body.find_all('li')
                for li in list_items:
                    li_text = li.get_text(strip=True)
                    if li_text and len(li_text) > 5:
                        text_parts.append(f"- {li_text}")
        
        # Combine all text with proper formatting
        if meta_description:
            text_parts.insert(0, f"DESCRIPTION: {meta_description}")
        
        full_content = "\n\n".join(text_parts)
        
        # Look for contact information in the content
        contact_patterns = {
            'phone': r'(?:Phone|Tel|Telephone|Call)(?:\s*(?:us|:))?\s*(?:\+\d{1,2}\s*)?(?:\(?\d{3}\)?[\s.-]?)?\d{3}[\s.-]?\d{4}',
            'email': r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}',
            'hours': r'(?:Hours|Open|We are open)(?:\s*(?:of operation|:))?\s*(?:Monday|Mon|Tuesday|Tue|Wednesday|Wed|Thursday|Thu|Friday|Fri|Saturday|Sat|Sunday|Sun)',
            'address': r'\d+\s+[A-Za-z0-9\s,]+(?:Street|St|Avenue|Ave|Road|Rd|Boulevard|Blvd|Drive|Dr|Lane|Ln|Court|Ct|Way|Parkway|Pkwy|Plaza|Plz|Square|Sq)\s*,\s*[A-Za-z\s]+,\s*[A-Z]{2}\s*\d{5}'
        }
        
        for info_type, pattern in contact_patterns.items():
            matches = re.findall(pattern, full_content, re.IGNORECASE)
            if matches and info_type not in full_content.upper():
                additional_info = f"{info_type.upper()}: {matches[0]}\n"
                full_content = additional_info + full_content
        
        result = {
            "url": url,
            "title": title,
            "content": full_content,
            "lastmod": url_data.get('lastmod'),
            "priority": url_data.get('priority', 0.5),
            "success": bool(full_content)
        }
        
        print(f"‚úÖ Successfully extracted content from {url}: {len(full_content)} chars")
        return result
    except Exception as e:
        print(f"‚ùå Error extracting content from {url}: {e}")
        return {
            "url": url,
            "title": "",
            "content": "",
            "lastmod": url_data.get('lastmod'),
            "priority": url_data.get('priority', 0.5),
            "success": False
        }

def scrape_website_from_sitemap(max_urls=None):
    """Scrape website content based on sitemap.xml with detailed logging."""
    print("\nüîç Starting complete website scraping from sitemap.xml")
    
    # Get all URLs from sitemap
    sitemap_urls = parse_sitemap_recursive(SITEMAP_URL)
    
    if not sitemap_urls:
        print("‚ùå No URLs found in sitemap.xml")
        return None
    
    # Sort URLs by priority (if available)
    sitemap_urls.sort(key=lambda x: x.get('priority', 0.5), reverse=True)
    
    if max_urls:
        sitemap_urls = sitemap_urls[:max_urls]
        print(f"Limiting to {max_urls} URLs")
    
    print(f"üìã Processing {len(sitemap_urls)} URLs from sitemap")
    
    results = []
    for url_data in tqdm(sitemap_urls, desc="Scraping pages from sitemap"):
        page_data = extract_page_content(url_data)
        if page_data["success"]:
            results.append(page_data)
    
    if not results:
        print("‚ùå No content extracted from any page")
        return None
    
    df = pd.DataFrame(results)
    df.to_csv(f"{DATA_DIR}/spa_content_from_sitemap.csv", index=False)
    print(f"‚úÖ Successfully scraped {len(df)} pages from sitemap")
    
    # Print a summary of scraped pages
    print("\nüìä Website Scraping Summary:")
    print(f"Total URLs in sitemap: {len(sitemap_urls)}")
    print(f"Successfully scraped: {len(df)} pages")
    print(f"Failed: {len(sitemap_urls) - len(df)} pages")
    
    return df

In [6]:
# 2.2 Text Processing
def clean_text(text):
    """Clean and normalize text."""
    if not text:
        return ""
    
    # Replace multiple spaces, newlines, tabs with single space
    text = re.sub(r'\s+', ' ', text)
    # Remove HTML remnants
    text = re.sub(r'<[^>]+>', '', text)
    # Fix spacing around punctuation
    text = re.sub(r'\s+([.,;:!?])', r'\1', text)
    text = re.sub(r'([.,;:!?])([^\s])', r'\1 \2', text)
    
    return text.strip()

def create_chunks(text, chunk_size=800, overlap=200):
    """Split text into chunks with semantic boundaries."""
    if not text or len(text) < 100:
        return []
    
    text = clean_text(text)
    chunks = []
    start = 0
    text_len = len(text)
    
    while start < text_len:
        end = min(start + chunk_size, text_len)
        
        # Try to find a good break point
        if end < text_len:
            # Look for semantic boundaries
            section_break = text.find("SECTION:", start, end)
            paragraph_break = text.rfind('\n\n', start, end)
            sentence_break = max(
                text.rfind('. ', start, end),
                text.rfind('? ', start, end),
                text.rfind('! ', start, end)
            )
            
            if section_break != -1 and section_break > start + (chunk_size / 2):
                end = section_break
            elif paragraph_break != -1 and paragraph_break > start + (chunk_size / 2):
                end = paragraph_break + 2
            elif sentence_break != -1 and sentence_break > start + (chunk_size / 4):
                end = sentence_break + 2
        
        chunk = text[start:end].strip()
        if chunk:
            chunks.append(chunk)
        
        start = max(start + 1, end - overlap)
    
    return chunks

def add_supplemental_info():
    """Create chunks from supplemental information."""
    supplemental_chunks = []
    
    for info_type, content in SUPPLEMENTAL_INFO.items():
        chunk_data = {
            'id': f"supp_{info_type}",
            'url': f"{WEBSITE_URL}/{info_type}",
            'title': f"Benefit Body Spa {info_type.replace('_', ' ').title()}",
            'chunk_number': 1,
            'total_chunks': 1,
            'chunk_text': f"Page: {info_type.replace('_', ' ').title()}\nURL: {WEBSITE_URL}/{info_type}\n\n{content}",
            'original_text': content
        }
        supplemental_chunks.append(chunk_data)
    
    return supplemental_chunks

def process_content_to_chunks(df):
    """Process content into chunks with metadata."""
    if df is None or len(df) == 0:
        print("‚ùå No content to process")
        return None
    
    all_chunks = []
    for i, row in tqdm(df.iterrows(), total=len(df), desc="Creating chunks"):
        url = row['url']
        title = row['title']
        content = row['content']
        
        if not content or len(content) < 100:
            print(f"‚ö†Ô∏è Skipping {url}: content too short")
            continue
        
        chunks = create_chunks(content)
        
        if not chunks:
            print(f"‚ö†Ô∏è No chunks created for {url}")
            continue
        
        for j, chunk in enumerate(chunks):
            chunk_id = f"{i}_{j}"
            context_header = f"Page: {title}\nURL: {url}\nChunk {j+1} of {len(chunks)}\n\n"
            
            chunk_data = {
                'id': chunk_id,
                'url': url,
                'title': title,
                'chunk_number': j+1,
                'total_chunks': len(chunks),
                'chunk_text': context_header + chunk,
                'original_text': chunk
            }
            all_chunks.append(chunk_data)
    
    # Add supplemental info chunks
    supplemental_chunks = add_supplemental_info()
    all_chunks.extend(supplemental_chunks)
    
    if not all_chunks:
        print("‚ùå No chunks created")
        return None
    
    chunks_df = pd.DataFrame(all_chunks)
    chunks_df.to_csv(f"{DATA_DIR}/spa_chunks.csv", index=False)
    print(f"‚úÖ Created {len(chunks_df)} chunks (including {len(supplemental_chunks)} supplemental)")
    
    return chunks_df

## 4. Embedding Generation

In [26]:
# 4. Embedding Generation
class EmbeddingManager:
    """Manage embedding generation with robust error handling."""
    
    def __init__(self, client, model=EMBEDDING_MODEL):
        self.client = client
        self.model = model
        self.dimensions = None
    
    def generate_embeddings(self, texts, batch_size=5):
        """Generate embeddings for texts with batching."""
        all_embeddings = []
        
        # Process in batches
        for i in range(0, len(texts), batch_size):
            batch_texts = texts[i:i+batch_size]
            batch_num = i//batch_size + 1
            total_batches = (len(texts) + batch_size - 1)//batch_size
            
            print(f"Generating embeddings batch {batch_num}/{total_batches}")
            
            # Retry logic for API resilience
            for attempt in range(3):
                try:
                    response = self.client.models.embed_content(
                        model=self.model,
                        contents=batch_texts,
                        config=types.EmbedContentConfig(task_type='semantic_similarity')
                    )
                    
                    batch_embeddings = [e.values for e in response.embeddings]
                    
                    if self.dimensions is None and batch_embeddings:
                        self.dimensions = len(batch_embeddings[0])
                        print(f"‚úÖ Embedding dimensions: {self.dimensions}")
                    
                    all_embeddings.extend(batch_embeddings)
                    
                    # Rate limiting
                    if i + batch_size < len(texts):
                        time.sleep(0.5)
                    
                    break  # Success
                except Exception as e:
                    print(f"‚ùå Error in batch {batch_num}/{total_batches} (attempt {attempt+1}/3): {e}")
                    if attempt < 2:
                        wait_time = (attempt + 1) * 2
                        print(f"Retrying in {wait_time} seconds...")
                        time.sleep(wait_time)
                    else:
                        print(f"‚ùå Failed to generate embeddings for batch. Using zero vectors.")
                        # Add zero vectors as placeholders
                        all_embeddings.extend([[0.0] * (self.dimensions or 768) for _ in range(len(batch_texts))])
        
        return all_embeddings

# 5. Vector Database

In [28]:
# 5.1 Vector Database
class VectorDatabase:
    """Vector database for semantic search."""
    
    def __init__(self, client, embedding_model=EMBEDDING_MODEL):
        self.client = client
        self.embedding_model = embedding_model
        self.embedding_manager = EmbeddingManager(client, embedding_model)
        self.chunks = None
        self.embeddings = None
        self.initialized = False
    
    def initialize(self, chunks_df):
        """Initialize the vector database from chunks."""
        print("üîÑ Initializing vector database...")
        
        # Extract chunk texts for embedding
        texts = chunks_df['chunk_text'].tolist()
        
        # Generate embeddings
        print(f"üìä Generating embeddings for {len(texts)} chunks")
        embeddings = self.embedding_manager.generate_embeddings(texts)
        
        # Store data
        self.chunks = chunks_df.to_dict('records')
        self.embeddings = np.array(embeddings)
        self.initialized = True
        
        print(f"‚úÖ Vector database initialized with {len(self.chunks)} chunks")
        
        # Save to disk
        self.export_to_pickle()
        
        return self
    
    def search(self, query, top_k=5, threshold=SIMILARITY_THRESHOLD):
        """Search for relevant chunks with improved retrieval."""
        if not self.initialized:
            print("‚ùå Vector database not initialized")
            return []
        
        # Generate embedding for query
        query_embedding = self.embedding_manager.generate_embeddings([query])[0]
        query_embedding_array = np.array([query_embedding])
        
        # Calculate cosine similarity
        similarities = cosine_similarity(query_embedding_array, self.embeddings)[0]
        
        # Create results with similarity scores
        results = []
        for i, score in enumerate(similarities):
            if score >= threshold:
                result = dict(self.chunks[i])
                result['similarity'] = float(score)
                results.append(result)
        
        # Sort by similarity and get top_k
        results.sort(key=lambda x: x['similarity'], reverse=True)
        results = results[:top_k]
        
        # Special handling for business hours, appointment, and cancellation policy
        if len(results) == 0:
            query_lower = query.lower()
            
            # Check if query is about hours
            if any(term in query_lower for term in ['hour', 'open', 'close', 'time', 'schedule']):
                print("‚ö†Ô∏è Fallback: Using supplemental business hours info")
                for chunk in self.chunks:
                    if 'id' in chunk and 'supp_business_hours' in chunk['id']:
                        results.append({**chunk, 'similarity': 1.0})
            
            # Check if query is about appointments
            elif any(term in query_lower for term in ['appointment', 'arrive', 'early', 'late', 'booking']):
                print("‚ö†Ô∏è Fallback: Using supplemental appointment info")
                for chunk in self.chunks:
                    if 'id' in chunk and 'supp_appointment_policy' in chunk['id']:
                        results.append({**chunk, 'similarity': 1.0})
            
            # Check if query is about cancellation
            elif any(term in query_lower for term in ['cancel', 'reschedule', 'policy']):
                print("‚ö†Ô∏è Fallback: Using supplemental cancellation info")
                for chunk in self.chunks:
                    if 'id' in chunk and 'supp_cancellation_policy' in chunk['id']:
                        results.append({**chunk, 'similarity': 1.0})
        
        print(f"üîç Found {len(results)} relevant chunks for query: '{query}'")
        return results
    
    def export_to_pickle(self, path=VECTOR_DB_PATH):
        """Export the vector database to a pickle file."""
        if not self.initialized:
            print("‚ùå Cannot export: vector database not initialized")
            return
        
        data = {
            'chunks': self.chunks,
            'embeddings': self.embeddings,
            'metadata': {
                'model': self.embedding_model,
                'count': len(self.chunks),
                'dimensions': self.embeddings.shape[1],
                'created_at': datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            }
        }
        
        pd.to_pickle(data, path)
        print(f"üíæ Vector database exported to {path}")
    
    def import_from_pickle(self, path=VECTOR_DB_PATH):
        """Import the vector database from a pickle file."""
        if os.path.exists(path):
            try:
                data = pd.read_pickle(path)
                self.chunks = data['chunks']
                self.embeddings = data['embeddings']
                self.initialized = True
                
                metadata = data.get('metadata', {})
                print(f"üì• Vector database imported from {path}")
                print(f"   Model: {metadata.get('model', 'unknown')}")
                print(f"   Chunks: {metadata.get('count', len(self.chunks))}")
                print(f"   Dimensions: {metadata.get('dimensions', self.embeddings.shape[1])}")
                print(f"   Created: {metadata.get('created_at', 'unknown')}")
                
                return True
            except Exception as e:
                print(f"‚ùå Error importing vector database: {e}")
                return False
        else:
            print(f"‚ùå File not found: {path}")
            return False

# 6. Response Generation

In [9]:
# 6 Response Generation
def generate_response(vector_db, query, history=None):
    """Generate a response using vector database and LLM."""
    if not vector_db or not vector_db.initialized:
        return "I'm sorry, but my knowledge base is not initialized. Please try again later."
    
    # Search for relevant chunks
    results = vector_db.search(query, top_k=3)
    
    # Prepare context from search results
    context = ""
    if results:
        for i, result in enumerate(results):
            context += f"\nInformation {i+1}:\n{result['chunk_text']}\n"
    else:
        # If no results, provide a general response
        return (
            "I'm sorry, but I don't have specific information about that. "
            "For the most accurate and up-to-date information, please contact Benefit Body Spa directly "
            "at (506) 454-5403 or email info@benefitbodyspa.com."
        )
    
    # Prepare the prompt
    system_prompt = """You are a helpful customer service assistant for Benefit Body Spa. 
Answer the user's question based on the information provided. 
If the information doesn't fully address the question, acknowledge what you know and suggest contacting the spa directly for more details.
Keep your responses friendly, professional, and focused on the customer's needs.
Format your response in a clear, easy-to-read manner."""
    
    messages = [
        ("system", system_prompt),
        ("user", f"Here is information about Benefit Body Spa:\n{context}\n\nBased on this information, please answer the following question: {query}")
    ]
    
    # Generate response
    response = client.models.generate_content(
        model=LLM_MODEL,
        contents=messages
    )
    
    return response.text

# 7. Main Functions

In [29]:
# 7. Main Functions
def create_knowledge_base(from_scratch=False):
    """Create and initialize the knowledge base."""
    vector_db = VectorDatabase(client)
    
    # Try to load existing database first
    if not from_scratch and os.path.exists(VECTOR_DB_PATH):
        if vector_db.import_from_pickle():
            print("üìå Using existing vector database")
            return vector_db
    
    # Otherwise build from scratch
    print("üîÑ Building new vector database...")
    
    # Scrape and process data
    spa_data = scrape_website_from_sitemap()
    
    if spa_data is None or len(spa_data) == 0:
        print("‚ùå No data scraped. Cannot build vector database.")
        return None
    
    # Process content into chunks
    chunks_df = process_content_to_chunks(spa_data)
    
    if chunks_df is None or len(chunks_df) == 0:
        print("‚ùå No chunks created. Cannot build vector database.")
        return None
    
    # Initialize database with chunks
    vector_db.initialize(chunks_df)
    
    return vector_db

def test_knowledge_base(vector_db):
    """Test the knowledge base with sample queries."""
    if not vector_db or not vector_db.initialized:
        print("‚ùå Vector database not initialized")
        return
    
    test_queries = [
        "What massage services do you offer?",
        "How much does a facial cost?",
        "What are your business hours?",
        "Do I need to arrive early for my appointment?",
        "What is your cancellation policy?",
        "Do you offer financing options?",
        "Where are you located?",
        "What treatments do you offer for skin rejuvenation?",
        "Tell me about your Emsculpt treatments",
        "Do you have laser hair removal services?"
    ]
    
    print("\n=== Testing Knowledge Base with Sample Queries ===")
    
    for query in test_queries:
        print(f"\nüîç Query: {query}")
        
        # Search for relevant chunks
        results = vector_db.search(query, top_k=2)
        
        if results:
            for i, result in enumerate(results):
                print(f"\nResult {i+1} (Similarity: {result['similarity']:.4f}):")
                print(f"Title: {result['title']}")
                print(f"URL: {result['url']}")
                # Print a preview of the content
                content_preview = result['original_text'][:100] + "..." if len(result['original_text']) > 100 else result['original_text']
                print(f"Content: {content_preview}")
                
            # Generate a response
            response = generate_response(vector_db, query)
            print(f"\nüí¨ Chatbot Response:\n{response}")
        else:
            print("‚ùå No relevant results found")

def chat_with_spa_agent(vector_db):
    """Interactive chat with the spa agent."""
    if not vector_db or not vector_db.initialized:
        print("‚ùå Vector database not initialized")
        return
    
    print("\n=== Benefit Body Spa Customer Service Agent ===")
    print("Ask any question about our services, or type 'exit' to quit.\n")
    
    while True:
        query = input("You: ")
        if query.lower() in ['exit', 'quit', 'bye']:
            print("\nThank you for chatting with us!")
            break
        
        response = generate_response(vector_db, query)
        print(f"\nSpa Agent: {response}\n")

def analyze_knowledge_coverage(vector_db):
    """Analyze the coverage of the knowledge base."""
    if not vector_db or not vector_db.initialized:
        print("‚ùå Vector database not initialized")
        return
    
    # Get statistics about chunks
    pages = set()
    url_chunks = {}
    
    for chunk in vector_db.chunks:
        url = chunk['url']
        pages.add(url)
        
        if url not in url_chunks:
            url_chunks[url] = []
        
        url_chunks[url].append(chunk)
    
    # Print statistics
    print("\nüìä Knowledge Base Coverage Analysis:")
    print(f"Total pages: {len(pages)}")
    print(f"Total chunks: {len(vector_db.chunks)}")
    print(f"Average chunks per page: {len(vector_db.chunks)/len(pages):.2f}")
    
    # Pages with most chunks
    sorted_pages = sorted(url_chunks.items(), key=lambda x: len(x[1]), reverse=True)
    
    print("\nüìë Pages with Most Content:")
    for url, chunks in sorted_pages[:5]:
        # Find a chunk to get the title
        title = chunks[0]['title'] if chunks else "Unknown"
        print(f"- {title} ({url}): {len(chunks)} chunks")
    
    # Test some important business queries
    important_queries = [
        "What are your hours?",
        "How can I book an appointment?",
        "What's your cancellation policy?",
        "Where are you located?",
        "What types of payment do you accept?",
        "Do you offer gift cards?"
    ]
    
    print("\nüîç Testing Important Business Queries:")
    coverage_score = 0
    
    for query in important_queries:
        results = vector_db.search(query, top_k=1)
        if results:
            coverage_score += 1
            print(f"‚úÖ '{query}' - Found relevant information")
        else:
            print(f"‚ùå '{query}' - No relevant information found")
    
    coverage_percentage = (coverage_score / len(important_queries)) * 100
    print(f"\nBusiness Information Coverage: {coverage_percentage:.1f}%")

# Run the main process
def main():
    """Main function to run the entire process."""
    print("üèÅ Starting Benefit Body Spa Customer Service Agent")
    
    # Check if we should rebuild from scratch
    rebuild = input("Rebuild knowledge base from scratch? (y/n): ").lower() == 'y'
    
    # Create knowledge base
    vector_db = create_knowledge_base(from_scratch=rebuild)
    
    if not vector_db or not vector_db.initialized:
        print("‚ùå Failed to create knowledge base")
        return
    
    # Analyze knowledge coverage
    analyze_knowledge_coverage(vector_db)
    
    # Test knowledge base
    test_knowledge_base(vector_db)
    
    # Ask if user wants to start chatting
    start_chat = input("\nStart interactive chat? (y/n): ").lower() == 'y'
    if start_chat:
        chat_with_spa_agent(vector_db)
    
    print("üèÅ Process completed")

# Run the main process if executed directly
if __name__ == "__main__":
    main()

üèÅ Starting Benefit Body Spa Customer Service Agent


KeyboardInterrupt: Interrupted by user

**7.2 Save Vector DB**

In [31]:
#7.2 Save Vector DB

# Cell 1 - Save Vector DB to persistent storage
import shutil

# Option 1: Save to Google Drive (mount first)
#from google.colab import drive
#drive.mount('/content/drive')
#shutil.copy(VECTOR_DB_PATH, '/content/drive/MyDrive/benefit_spa_vector_db.pkl')

# Option 2: Save to Kaggle Dataset (if running in Kaggle)
#import opendatasets as od
#od.download_kaggle_dataset('bellosabur/benefit-spa-vector-db')

# Option 3: Download directly from notebook environment
from IPython.display import FileLink
FileLink(VECTOR_DB_PATH)  # Creates downloadable link

**7.3 Deployment Preparation**

In [32]:
# Cell 2 - Deployment Preparation
def export_for_deployment(vector_db):
    """Package components for production deployment"""
    deployment_package = {
        'vector_db_path': VECTOR_DB_PATH,
        'config': {
            'EMBEDDING_MODEL': EMBEDDING_MODEL,
            'LLM_MODEL': LLM_MODEL,
            'SIMILARITY_THRESHOLD': SIMILARITY_THRESHOLD
        },
        'api_example': '''
        # FastAPI Example Endpoint
        from fastapi import FastAPI
        app = FastAPI()
        
        @app.post("/chat")
        async def chat_endpoint(query: str):
            return generate_response(vector_db, query)
        '''
    }
    
    print("Ready for integration with:")
    print("- Jane App: Use FastAPI example to create endpoints")
    print("- POS Systems: Export vector_db.pkl and config to POS environment")
    print("- Web Chat: Build frontend that calls generate_response()")
    return deployment_package

# Execute deployment prep
deployment = export_for_deployment(vector_db)  # Removed the undefined parameter

Ready for integration with:
- Jane App: Use FastAPI example to create endpoints
- POS Systems: Export vector_db.pkl and config to POS environment
- Web Chat: Build frontend that calls generate_response()


In [None]:
!pip install gradio -q
import gradio as gr

def enhanced_chat(query, history):
    # Step 1: LLM processes raw input
    processed_query = client.models.generate_content(
        model=LLM_MODEL,
        contents=[f"Rephrase this spa service query for better search: {query}"]
    ).text
    
    # Step 2: Vector DB search
    results = vector_db.search(processed_query)
    
    # Step 3: LLM generates response
    context = "\n".join([r['chunk_text'] for r in results])
    response = client.models.generate_content(
        model=LLM_MODEL,
        contents=[f"""Answer this spa query concisely: {query} 
                  using only: {context}"""
        ]
    ).text
    
    # Improved follow-up suggestions
    follow_ups = client.models.generate_content(
        model=LLM_MODEL,
        contents=[f"""Generate 2-3 natural follow-up questions about Benefit Body Spa services 
                  based on this response: '{response}'. Format as a bullet list without numbering."""
        ]
    ).text
    
    # Clean up the output
    follow_ups = [q.strip('‚Ä¢- ') for q in follow_ups.split('\n') if q.strip()]
    return response, follow_ups[:3]  # Return response + max 3 suggestions

with gr.Blocks() as demo:
    gr.Markdown("## Benefit Body Spa - LLM-Powered Assistant")
    
    with gr.Row():
        chatbot = gr.Chatbot(height=400, bubble_full_width=False)
        with gr.Accordion("üí° Suggested Questions", open=False):
            followup = gr.HTML()  # Using HTML for better formatting
    
    msg = gr.Textbox(label="Type your question...", placeholder="Ask about services, pricing, or policies")
    btn = gr.Button("Send", variant="primary")
    
    def respond(message, chat_history):
        response, suggestions = enhanced_chat(message, chat_history)
        chat_history.append((message, response))
        
        # Format suggestions as clickable buttons
        suggestions_html = "<div style='margin-top:10px'>" + \
                           "".join([f"""<button style='margin:5px; padding:8px 12px; 
                                       border-radius:5px; cursor:pointer'
                                       onclick='this.innerHTML=this.innerHTML'>
                                       {q}
                                       </button><br>""" 
                                   for q in suggestions]) + \
                           "</div>"
        
        return "", chat_history, suggestions_html
    
    btn.click(respond, [msg, chatbot], [msg, chatbot, followup])
    msg.submit(respond, [msg, chatbot], [msg, chatbot, followup])

demo.launch(share=True, debug=True)

  chatbot = gr.Chatbot(height=400, bubble_full_width=False)
  chatbot = gr.Chatbot(height=400, bubble_full_width=False)


* Running on local URL:  http://127.0.0.1:7861
* Running on public URL: https://5c38f15ce56f45caf2.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


Generating embeddings batch 1/1
üîç Found 5 relevant chunks for query: 'Here are a few ways to rephrase the spa service query "price ofn facial" for better search results, depending on what you're looking for:

**More General:**

*   **Facial price**
*   **Cost of facial**
*   **How much does a facial cost?**
*   **Facial prices near me** (If you want local results)

**More Specific (if you have a location in mind):**

*   **Facial cost [city name]**
*   **Price of facial at [spa name]**
*   **[Spa name] facial prices**

**Even More Specific (if you know the type of facial):**

*   **Price of [type of facial]** (e.g., "Price of hydrating facial")
*   **Cost of [type of facial] near me**

**Why these are better:**

*   **Eliminate the typo:** "ofn" is likely a typo and should be corrected to "of".
*   **Clear and concise language:** Using keywords like "price" and "cost" is straightforward.
*   **Local focus:** Adding "near me" or a city name helps narrow down results to your area.
*  