""" ML Knowledge Base Builder for SentrySearch Builds a production-ready knowledge base from curated ML anomaly detection papers and blog posts. Implements Agentic RAG approach with intelligent content processing. Features: - Real content ingestion from URLs - LLM-powered content enrichment - Persistent ChromaDB storage - Question-like chunk processing - Production-ready error handling """ import os import json import time import random import hashlib from typing import List, Dict, Optional, Tuple from dataclasses import dataclass, asdict from urllib.parse import urlparse import logging from pathlib import Path import requests from bs4 import BeautifulSoup import chromadb from chromadb.config import Settings from anthropic import Anthropic import anthropic import numpy as np from sklearn.feature_extraction.text import TfidfVectorizer # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class MLPaperSource: """Represents a source ML paper or blog post""" title: str url: str company: str year: str description: str ml_techniques: List[str] @dataclass class EnrichedChunk: """Represents a processed and enriched document chunk""" chunk_id: str source_title: str source_url: str company: str year: str original_content: str enriched_content: str # Question-like format ml_techniques: List[str] chunk_summary: str keywords: List[str] chunk_index: int content_hash: str bm25_terms: List[str] = None # Additional search terms for BM25 faq_questions: List[str] = None # FAQ-style questions class ContentExtractor: """Extracts and cleans content from web pages""" def __init__(self, timeout: int = 30): self.timeout = timeout self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36' }) def extract_from_url(self, url: str) -> Optional[str]: """Extract clean text content from a URL""" try: logger.info(f"Extracting content from: {url}") response = self.session.get(url, timeout=self.timeout) response.raise_for_status() # Parse HTML soup = BeautifulSoup(response.content, 'html.parser') # Remove unwanted elements for element in soup(['script', 'style', 'nav', 'header', 'footer', 'aside']): element.decompose() # Extract main content content = self._extract_main_content(soup) # Clean and normalize text cleaned_content = self._clean_text(content) logger.info(f"Extracted {len(cleaned_content)} characters from {url}") return cleaned_content except requests.RequestException as e: logger.error(f"Request failed for {url}: {e}") return None except Exception as e: logger.error(f"Content extraction failed for {url}: {e}") return None def _extract_main_content(self, soup: BeautifulSoup) -> str: """Extract main content from parsed HTML""" # Try common article selectors content_selectors = [ 'article', '[role="main"]', '.post-content', '.article-content', '.entry-content', '.content', 'main' ] for selector in content_selectors: content_elem = soup.select_one(selector) if content_elem: return content_elem.get_text() # Fallback to body content body = soup.find('body') return body.get_text() if body else soup.get_text() def _clean_text(self, text: str) -> str: """Clean and normalize extracted text""" # Remove extra whitespace lines = [line.strip() for line in text.split('\n')] lines = [line for line in lines if line] # Join lines and normalize spaces cleaned = ' '.join(lines) cleaned = ' '.join(cleaned.split()) return cleaned class ContentEnricher: """Enriches content using LLM-powered processing""" def __init__(self, anthropic_client): self.client = anthropic_client def _api_call_with_retry(self, **kwargs): """Make API call with intelligent retry logic using retry-after header""" max_retries = 3 base_delay = 5 for attempt in range(max_retries): try: print(f"DEBUG: Content Enricher API call attempt {attempt + 1}/{max_retries}") return self.client.messages.create(**kwargs) except anthropic.RateLimitError as e: if attempt == max_retries - 1: print(f"DEBUG: Content Enricher rate limit exceeded after {max_retries} attempts") raise e # Check if the error response has retry-after information retry_after = None if hasattr(e, 'response') and e.response: retry_after_header = e.response.headers.get('retry-after') if retry_after_header: try: retry_after = float(retry_after_header) print(f"DEBUG: Content Enricher API provided retry-after: {retry_after} seconds") except (ValueError, TypeError): pass # Use retry-after if available, otherwise exponential backoff if retry_after: delay = retry_after + random.uniform(1, 3) else: delay = base_delay * (2 ** attempt) + random.uniform(1, 5) delay = min(delay, 120) print(f"DEBUG: Content Enricher rate limit hit. Waiting {delay:.1f} seconds before retry {attempt + 2}") time.sleep(delay) except Exception as e: print(f"DEBUG: Content Enricher non-rate-limit error: {e}") raise e def enrich_chunk(self, chunk: str, source: MLPaperSource) -> Dict[str, str]: """Enrich a chunk with summary, keywords, question-like format, and BM25-optimized metadata""" prompt = f""" Analyze this text chunk from a machine learning anomaly detection paper/blog and provide: 1. QUESTION_FORMAT: Rewrite the chunk content as if it's answering questions about the ML approach 2. SUMMARY: A 2-line summary of what this chunk covers 3. KEYWORDS: 5-8 relevant technical keywords (comma-separated) 4. BM25_TERMS: Additional search terms for BM25 retrieval (comma-separated, include variations, synonyms, acronyms) 5. FAQ_QUESTIONS: 2-3 potential questions this chunk could answer (pipe-separated) Source Context: - Company: {source.company} - ML Techniques: {', '.join(source.ml_techniques)} - Year: {source.year} Text Chunk: {chunk[:1500]} Format your response as: QUESTION_FORMAT: [rewritten content] SUMMARY: [summary] KEYWORDS: [keywords] BM25_TERMS: [search terms with variations] FAQ_QUESTIONS: [question1|question2|question3] """ try: response = self._api_call_with_retry( model="claude-sonnet-4-20250514", max_tokens=800, messages=[{"role": "user", "content": prompt}] ) # Safe access to response content if not response.content or len(response.content) == 0: raise ValueError("Empty response from content enrichment API") if not hasattr(response.content[0], 'text'): raise ValueError("Response content missing text attribute") content = response.content[0].text.strip() return self._parse_enrichment_response(content) except Exception as e: logger.error(f"Content enrichment failed: {e}") # Return fallback enrichment return { 'question_format': chunk, 'summary': f"Content about {source.ml_techniques[0]} implementation at {source.company}", 'keywords': ', '.join(source.ml_techniques + [source.company.lower(), 'anomaly detection']), 'bm25_terms': ', '.join(source.ml_techniques + [source.company.lower(), 'ml', 'detection', 'analysis']), 'faq_questions': f"How does {source.company} implement {source.ml_techniques[0]}?|What is {source.ml_techniques[0]} used for?" } def _parse_enrichment_response(self, response: str) -> Dict[str, str]: """Parse LLM response into structured enrichment data""" result = { 'question_format': '', 'summary': '', 'keywords': '', 'bm25_terms': '', 'faq_questions': '' } lines = response.split('\n') current_field = None for line in lines: line = line.strip() if line.startswith('QUESTION_FORMAT:'): current_field = 'question_format' result[current_field] = line.replace('QUESTION_FORMAT:', '').strip() elif line.startswith('SUMMARY:'): current_field = 'summary' result[current_field] = line.replace('SUMMARY:', '').strip() elif line.startswith('KEYWORDS:'): current_field = 'keywords' result[current_field] = line.replace('KEYWORDS:', '').strip() elif line.startswith('BM25_TERMS:'): current_field = 'bm25_terms' result[current_field] = line.replace('BM25_TERMS:', '').strip() elif line.startswith('FAQ_QUESTIONS:'): current_field = 'faq_questions' result[current_field] = line.replace('FAQ_QUESTIONS:', '').strip() elif current_field and line: result[current_field] += ' ' + line return result class DocumentProcessor: """Processes documents into enriched chunks""" def __init__(self, content_enricher: ContentEnricher, chunk_size: int = 800): self.enricher = content_enricher self.chunk_size = chunk_size def process_document(self, source: MLPaperSource, content: str) -> List[EnrichedChunk]: """Process a document into enriched chunks""" if not content or len(content) < 100: logger.warning(f"Content too short for {source.title}") return [] # Create chunks chunks = self._create_chunks(content, source) # Enrich each chunk enriched_chunks = [] for i, chunk_content in enumerate(chunks): # Generate content hash for deduplication content_hash = hashlib.md5(chunk_content.encode()).hexdigest() # Enrich with LLM enrichment = self.enricher.enrich_chunk(chunk_content, source) chunk = EnrichedChunk( chunk_id=f"{source.company}_{source.year}_{i}_{content_hash[:8]}", source_title=source.title, source_url=source.url, company=source.company, year=source.year, original_content=chunk_content, enriched_content=enrichment['question_format'], ml_techniques=source.ml_techniques, chunk_summary=enrichment['summary'], keywords=enrichment['keywords'].split(', ') if enrichment['keywords'] else [], chunk_index=i, content_hash=content_hash ) # Add BM25-specific metadata to chunk chunk.bm25_terms = enrichment.get('bm25_terms', '').split(', ') if enrichment.get('bm25_terms') else [] chunk.faq_questions = enrichment.get('faq_questions', '').split('|') if enrichment.get('faq_questions') else [] enriched_chunks.append(chunk) # Rate limiting for API calls time.sleep(0.5) logger.info(f"Processed {len(enriched_chunks)} chunks for {source.title}") return enriched_chunks def _create_chunks(self, content: str, source: MLPaperSource) -> List[str]: """Create overlapping chunks from content""" chunks = [] overlap = self.chunk_size // 4 # 25% overlap for i in range(0, len(content), self.chunk_size - overlap): chunk = content[i:i + self.chunk_size] # Skip very short chunks if len(chunk) < 200: continue # Try to break at sentence boundaries if i + self.chunk_size < len(content): last_period = chunk.rfind('.') if last_period > len(chunk) * 0.7: # If period is in last 30% chunk = chunk[:last_period + 1] chunks.append(chunk.strip()) return chunks class KnowledgeBaseStorage: """Manages persistent storage of the knowledge base""" def __init__(self, storage_path: str = "./ml_knowledge_base"): self.storage_path = Path(storage_path) self.storage_path.mkdir(exist_ok=True) # Initialize ChromaDB with persistent storage self.chroma_client = chromadb.PersistentClient( path=str(self.storage_path / "chroma_db") ) self.collection_name = "ml_anomaly_detection" self.collection = None self._initialize_collection() def _initialize_collection(self): """Initialize or get existing collection""" try: # Try to get existing collection self.collection = self.chroma_client.get_collection(self.collection_name) logger.info(f"Loaded existing collection with {self.collection.count()} documents") except: # Create new collection self.collection = self.chroma_client.create_collection( name=self.collection_name, metadata={"description": "ML Anomaly Detection Knowledge Base"} ) logger.info("Created new collection") def add_chunks(self, chunks: List[EnrichedChunk]) -> bool: """Add enriched chunks to the knowledge base""" try: if not chunks: return True # Prepare data for ChromaDB documents = [] metadatas = [] ids = [] for chunk in chunks: # Create enriched document text document_text = f""" Title: {chunk.source_title} Company: {chunk.company} Year: {chunk.year} ML Techniques: {', '.join(chunk.ml_techniques)} Keywords: {', '.join(chunk.keywords)} Summary: {chunk.chunk_summary} Content: {chunk.enriched_content} """.strip() documents.append(document_text) metadatas.append({ 'source_title': chunk.source_title, 'source_url': chunk.source_url, 'company': chunk.company, 'year': chunk.year, 'ml_techniques': '|'.join(chunk.ml_techniques), 'keywords': '|'.join(chunk.keywords), 'chunk_summary': chunk.chunk_summary, 'chunk_index': chunk.chunk_index, 'content_hash': chunk.content_hash, 'bm25_terms': '|'.join(chunk.bm25_terms) if chunk.bm25_terms else '', 'faq_questions': '|'.join(chunk.faq_questions) if chunk.faq_questions else '' }) ids.append(chunk.chunk_id) # Add to ChromaDB self.collection.add( documents=documents, metadatas=metadatas, ids=ids ) # Save chunk details as JSON backup self._save_chunks_backup(chunks) logger.info(f"Added {len(chunks)} chunks to knowledge base") return True except Exception as e: logger.error(f"Failed to add chunks to knowledge base: {e}") return False def _save_chunks_backup(self, chunks: List[EnrichedChunk]): """Save chunk details as JSON backup""" backup_file = self.storage_path / "chunks_backup.jsonl" with open(backup_file, 'a', encoding='utf-8') as f: for chunk in chunks: f.write(json.dumps(asdict(chunk), ensure_ascii=False) + '\n') def get_stats(self) -> Dict: """Get knowledge base statistics""" try: count = self.collection.count() # Get unique companies and years if count > 0: results = self.collection.get(include=['metadatas']) companies = set() years = set() ml_techniques = set() for metadata in results['metadatas']: companies.add(metadata.get('company', '')) years.add(metadata.get('year', '')) techniques = metadata.get('ml_techniques', '').split('|') ml_techniques.update([t for t in techniques if t]) return { 'total_chunks': count, 'companies': sorted(list(companies)), 'years': sorted(list(years)), 'ml_techniques': sorted(list(ml_techniques)), 'storage_path': str(self.storage_path) } else: return { 'total_chunks': 0, 'companies': [], 'years': [], 'ml_techniques': [], 'storage_path': str(self.storage_path) } except Exception as e: logger.error(f"Failed to get stats: {e}") return {'error': str(e)} def search(self, query: str, n_results: int = 10) -> List[Dict]: """Search the knowledge base""" try: results = self.collection.query( query_texts=[query], n_results=n_results, include=['documents', 'metadatas', 'distances'] ) search_results = [] for i, doc in enumerate(results['documents'][0]): search_results.append({ 'document': doc, 'metadata': results['metadatas'][0][i], 'distance': results['distances'][0][i], 'score': 1 / (1 + results['distances'][0][i]) # Convert distance to similarity }) return search_results except Exception as e: logger.error(f"Search failed: {e}") return [] def get_curated_ml_sources() -> List[MLPaperSource]: """Get the curated list of ML anomaly detection sources""" sources = [ MLPaperSource( title="Detecting Performance Anomalies in External Firmware Deployments", url="https://netflixtechblog.com/detecting-performance-anomalies-in-external-firmware-deployments-ed41b1bfcf46", company="Netflix", year="2019", description="Netflix's approach to detecting anomalies in firmware performance using ML", ml_techniques=["statistical_analysis", "anomaly_detection", "performance_monitoring"] ), MLPaperSource( title="Detecting and Preventing Abuse on LinkedIn using Isolation Forests", url="https://engineering.linkedin.com/blog/2019/isolation-forest", company="LinkedIn", year="2019", description="LinkedIn's implementation of isolation forests for abuse detection", ml_techniques=["isolation_forest", "unsupervised_learning", "abuse_detection"] ), MLPaperSource( title="How Does Spam Protection Work on Stack Exchange?", url="https://stackoverflow.blog/2020/06/25/how-does-spam-protection-work-on-stack-exchange/", company="Stack Exchange", year="2020", description="Stack Exchange's ML-based spam detection system", ml_techniques=["text_classification", "nlp", "spam_detection"] ), MLPaperSource( title="Blocking Slack Invite Spam With Machine Learning", url="https://slack.engineering/blocking-slack-invite-spam-with-machine-learning/", company="Slack", year="2020", description="Slack's ML approach to preventing invite spam", ml_techniques=["classification", "feature_engineering", "spam_detection"] ), MLPaperSource( title="Cloudflare Bot Management: Machine Learning and More", url="https://blog.cloudflare.com/cloudflare-bot-management-machine-learning-and-more/", company="Cloudflare", year="2020", description="Cloudflare's ML-powered bot detection and management", ml_techniques=["behavioral_analysis", "traffic_analysis", "bot_detection"] ), MLPaperSource( title="Graph for Fraud Detection", url="https://engineering.grab.com/graph-for-fraud-detection", company="Grab", year="2022", description="Grab's graph-based approach to fraud detection", ml_techniques=["graph_ml", "fraud_detection", "network_analysis"] ), MLPaperSource( title="Machine Learning for Fraud Detection in Streaming Services", url="https://netflixtechblog.com/machine-learning-for-fraud-detection-in-streaming-services-b0b4ef3be3f6", company="Netflix", year="2023", description="Netflix's ML approach to detecting fraud in streaming services", ml_techniques=["fraud_detection", "streaming_analytics", "behavioral_analysis"] ), MLPaperSource( title="Data Generation and Sampling Strategies", url="https://blog.cloudflare.com/data-generation-and-sampling-strategies/", company="Cloudflare", year="2023", description="Cloudflare's data generation and sampling strategies for ML training", ml_techniques=["data_generation", "sampling", "training_data"] ), MLPaperSource( title="Machine Learning Mobile Traffic Bots", url="https://blog.cloudflare.com/machine-learning-mobile-traffic-bots/", company="Cloudflare", year="2023", description="Cloudflare's ML approach to detecting mobile traffic bots", ml_techniques=["bot_detection", "mobile_traffic", "behavioral_analysis"] ), MLPaperSource( title="Project Radar: Intelligent Early Fraud Detection", url="https://www.uber.com/blog/project-radar-intelligent-early-fraud-detection/", company="Uber", year="2023", description="Uber's Project Radar for intelligent early fraud detection", ml_techniques=["fraud_detection", "early_detection", "real_time_ml"] ) ] return sources def main(): """Main function to build the ML knowledge base""" # Initialize components api_key = os.getenv('ANTHROPIC_API_KEY') if not api_key: logger.error("ANTHROPIC_API_KEY environment variable not set") return print("šŸ”Ø Building ML Anomaly Detection Knowledge Base") print("=" * 50) # Initialize components anthropic_client = Anthropic(api_key=api_key) content_extractor = ContentExtractor() content_enricher = ContentEnricher(anthropic_client) document_processor = DocumentProcessor(content_enricher) knowledge_base = KnowledgeBaseStorage() # Get current stats current_stats = knowledge_base.get_stats() print(f"šŸ“Š Current knowledge base: {current_stats['total_chunks']} chunks") # Get sources to process sources = get_curated_ml_sources() print(f"šŸ“š Processing {len(sources)} ML sources...") # Process each source total_chunks_added = 0 successful_sources = 0 for i, source in enumerate(sources, 1): print(f"\nšŸ”„ [{i}/{len(sources)}] Processing: {source.title}") print(f" Company: {source.company} | Year: {source.year}") # Extract content content = content_extractor.extract_from_url(source.url) if not content: print(f" āŒ Failed to extract content") continue print(f" šŸ“ Extracted {len(content):,} characters") # Process into chunks chunks = document_processor.process_document(source, content) if not chunks: print(f" āŒ No chunks generated") continue print(f" 🧩 Generated {len(chunks)} chunks") # Add to knowledge base if knowledge_base.add_chunks(chunks): total_chunks_added += len(chunks) successful_sources += 1 print(f" āœ… Added to knowledge base") else: print(f" āŒ Failed to add to knowledge base") # Final stats print(f"\nšŸŽ‰ Knowledge Base Build Complete!") print("=" * 50) print(f"Sources processed: {successful_sources}/{len(sources)}") print(f"Total chunks added: {total_chunks_added}") final_stats = knowledge_base.get_stats() print(f"Final knowledge base size: {final_stats['total_chunks']} chunks") print(f"Companies: {', '.join(final_stats['companies'])}") print(f"Years: {', '.join(final_stats['years'])}") print(f"Storage location: {final_stats['storage_path']}") # Test search print(f"\nšŸ” Testing search functionality...") test_queries = [ "How does Netflix detect performance anomalies?", "What ML techniques work for fraud detection?", "Isolation forest implementation details" ] for query in test_queries: results = knowledge_base.search(query, n_results=3) print(f"\nQuery: '{query}'") print(f"Results: {len(results)} found") if results: top_result = results[0] print(f"Top match: {top_result['metadata']['company']} - {top_result['metadata']['source_title'][:60]}...") print(f"Score: {top_result['score']:.3f}") if __name__ == "__main__": main()