Spaces:
Running
Running
π Search Engine Layer
Table of Contents
- Overview
- Supported Search Engines
- Query Optimization
- Multi-Hop Search
- Source Credibility Scoring
- Result Ranking
- Caching & Deduplication
- Configuration
Overview
The Search Engine Layer enables agents to search the web intelligently, optimize queries, perform multi-hop searches, and evaluate source credibility.
Capabilities
- β Multiple search engine APIs (Google, Bing, Brave, DuckDuckGo, Perplexity)
- β Query optimization and rewriting
- β Multi-hop search (search β refine β search again)
- β Source credibility scoring
- β Result ranking and filtering
- β Caching and deduplication
- β Cost tracking
Supported Search Engines
1. Google Search API
Pros:
- Most comprehensive results
- High quality
- Advanced operators support
Cons:
- Requires API key + Custom Search Engine ID
- Costs $5 per 1000 queries after free tier
Configuration:
{
"google": {
"api_key": "YOUR_GOOGLE_API_KEY",
"search_engine_id": "YOUR_CSE_ID",
"region": "us",
"safe_search": True,
"num_results": 10
}
}
Usage:
results = search_engine.search(
query="product reviews for Widget Pro",
engine="google",
num_results=10
)
2. Bing Search API
Pros:
- Good quality results
- Competitive pricing ($7 per 1000 queries)
- News search included
Cons:
- Smaller index than Google
- Less advanced operators
Configuration:
{
"bing": {
"api_key": "YOUR_BING_API_KEY",
"market": "en-US",
"safe_search": "Moderate",
"freshness": None # "Day", "Week", "Month"
}
}
3. Brave Search API
Pros:
- Privacy-focused
- Independent index
- Good pricing ($5 per 1000 queries)
- No tracking
Cons:
- Smaller index
- Newer service
Configuration:
{
"brave": {
"api_key": "YOUR_BRAVE_API_KEY",
"country": "US",
"safe_search": "moderate",
"freshness": None
}
}
4. DuckDuckGo (Free, No API Key)
Pros:
- Completely free
- No API key required
- Privacy-focused
- Good for testing
Cons:
- Rate limited
- Less control over results
- Smaller result set
Usage:
from duckduckgo_search import DDGS
results = DDGS().text(
keywords="web scraping tools",
max_results=10
)
5. Perplexity AI (AI-Powered Search)
Pros:
- Returns AI-summarized answers with citations
- Real-time web access
- Conversational queries
Cons:
- More expensive
- Designed for Q&A, not traditional search
Configuration:
{
"perplexity": {
"api_key": "YOUR_PERPLEXITY_API_KEY",
"model": "pplx-70b-online",
"include_citations": True
}
}
Query Optimization
Query Rewriter
class QueryOptimizer:
"""Optimize search queries for better results."""
def optimize(self, query: str, context: Dict = None) -> str:
"""Optimize a search query."""
optimized = query
# 1. Expand abbreviations
optimized = self.expand_abbreviations(optimized)
# 2. Add context keywords
if context:
optimized = self.add_context(optimized, context)
# 3. Remove stop words (optional)
# optimized = self.remove_stop_words(optimized)
# 4. Add search operators
optimized = self.add_operators(optimized)
return optimized
def expand_abbreviations(self, query: str) -> str:
"""Expand common abbreviations."""
expansions = {
"AI": "artificial intelligence",
"ML": "machine learning",
"API": "application programming interface",
"UI": "user interface",
"UX": "user experience",
}
for abbr, full in expansions.items():
# Only expand if abbreviation stands alone
query = re.sub(rf'\b{abbr}\b', full, query)
return query
def add_context(self, query: str, context: Dict) -> str:
"""Add contextual keywords."""
if context.get('domain'):
query = f"{query} site:{context['domain']}"
if context.get('year'):
query = f"{query} {context['year']}"
if context.get('location'):
query = f"{query} {context['location']}"
return query
def add_operators(self, query: str) -> str:
"""Add search operators for precision."""
# If query has multiple important terms, wrap in quotes
important_terms = self.extract_important_terms(query)
if len(important_terms) > 1:
# Exact phrase search for key terms
for term in important_terms:
if len(term.split()) > 1:
query = query.replace(term, f'"{term}"')
return query
Query Expansion
class QueryExpander:
"""Expand queries with synonyms and related terms."""
def expand(self, query: str) -> List[str]:
"""Generate query variations."""
variations = [query]
# 1. Synonym replacement
synonyms = self.get_synonyms(query)
for synonym_set in synonyms:
for term, synonym in synonym_set:
varied = query.replace(term, synonym)
variations.append(varied)
# 2. Add modifiers
modifiers = ["best", "top", "review", "comparison", "guide"]
for modifier in modifiers:
variations.append(f"{modifier} {query}")
# 3. Question forms
variations.extend([
f"what is {query}",
f"how to {query}",
f"why {query}"
])
return variations[:5] # Limit to top 5
Bad Query Detection
def is_bad_query(query: str) -> bool:
"""Detect poorly formed queries."""
# Too short
if len(query.split()) < 2:
return True
# All stop words
stop_words = {'the', 'a', 'an', 'is', 'are', 'was', 'were', 'be'}
words = set(query.lower().split())
if words.issubset(stop_words):
return True
# No meaningful content
if not re.search(r'[a-zA-Z]{3,}', query):
return True
return False
Multi-Hop Search
Multi-Hop Strategy
class MultiHopSearch:
"""Perform multi-hop search with refinement."""
async def search_multi_hop(
self,
initial_query: str,
max_hops: int = 3
) -> MultiHopResult:
"""Perform multi-hop search."""
results_by_hop = []
current_query = initial_query
for hop in range(max_hops):
# Execute search
results = await self.search(current_query)
results_by_hop.append(results)
# Analyze results
analysis = self.analyze_results(results)
# Check if we found what we need
if analysis.is_satisfactory:
break
# Refine query for next hop
current_query = self.refine_query(
current_query,
results,
analysis
)
return MultiHopResult(
hops=results_by_hop,
final_query=current_query,
best_results=self.rank_all_results(results_by_hop)
)
def refine_query(
self,
original_query: str,
results: List[SearchResult],
analysis: ResultAnalysis
) -> str:
"""Refine query based on previous results."""
# Extract new keywords from top results
new_keywords = self.extract_keywords_from_results(results[:3])
# If results were too broad, add specificity
if analysis.too_broad:
specific_terms = [kw for kw in new_keywords if len(kw.split()) > 1]
if specific_terms:
return f"{original_query} {specific_terms[0]}"
# If results were off-topic, add negative keywords
if analysis.off_topic_terms:
negative = ' '.join(f"-{term}" for term in analysis.off_topic_terms)
return f"{original_query} {negative}"
# If no results, try synonyms
if analysis.no_results:
return self.query_expander.expand(original_query)[0]
return original_query
Example Multi-Hop Flow
# Hop 1: Initial broad search
query_1 = "best web scraping tools"
results_1 = search(query_1)
# Results: General articles about scraping tools
# Hop 2: Refine to specific use case
query_2 = "best web scraping tools for e-commerce Python"
results_2 = search(query_2)
# Results: More specific, Python-focused
# Hop 3: Add recent constraint
query_3 = "best web scraping tools for e-commerce Python 2026"
results_3 = search(query_3)
# Results: Latest tools with recent reviews
Source Credibility Scoring
Credibility Scorer
class SourceCredibilityScorer:
"""Score the credibility of search result sources."""
def score(self, url: str, domain: str, result: SearchResult) -> float:
"""Calculate credibility score (0.0 to 1.0)."""
score = 0.5 # Base score
# 1. Domain reputation
score += self.domain_reputation_score(domain) * 0.3
# 2. Domain age
score += self.domain_age_score(domain) * 0.1
# 3. HTTPS
if url.startswith('https://'):
score += 0.05
# 4. TLD credibility
score += self.tld_score(domain) * 0.1
# 5. Presence in result snippet
score += self.snippet_quality_score(result.snippet) * 0.15
# 6. Backlinks (if available)
score += self.backlink_score(domain) * 0.2
# 7. Freshness
score += self.freshness_score(result.date_published) * 0.1
return min(max(score, 0.0), 1.0)
def domain_reputation_score(self, domain: str) -> float:
"""Score based on known domain reputation."""
# Trusted domains
trusted = {
'wikipedia.org': 1.0,
'github.com': 0.95,
'stackoverflow.com': 0.95,
'nytimes.com': 0.9,
'bbc.com': 0.9,
'reuters.com': 0.9,
'arxiv.org': 0.95,
'nature.com': 0.95,
'sciencedirect.com': 0.9,
}
# Known spammy/low-quality domains
untrusted = {
'contentvilla.com': 0.1,
'ehow.com': 0.3,
}
if domain in trusted:
return trusted[domain]
if domain in untrusted:
return untrusted[domain]
# Medium trust for unknown domains
return 0.5
def tld_score(self, domain: str) -> float:
"""Score based on top-level domain."""
tld = domain.split('.')[-1]
tld_scores = {
'edu': 0.9, # Educational institutions
'gov': 0.95, # Government
'org': 0.8, # Organizations
'com': 0.6, # Commercial (neutral)
'net': 0.6,
'io': 0.6,
'info': 0.4, # Often spammy
'xyz': 0.3, # Cheap, often spam
}
return tld_scores.get(tld, 0.5)
def snippet_quality_score(self, snippet: str) -> float:
"""Score snippet quality."""
score = 0.5
# Penalize clickbait patterns
clickbait_patterns = [
r'you won\'t believe',
r'shocking',
r'one weird trick',
r'\d+ reasons why',
]
for pattern in clickbait_patterns:
if re.search(pattern, snippet, re.I):
score -= 0.2
# Reward factual language
if re.search(r'according to|research|study|data|analysis', snippet, re.I):
score += 0.2
return max(0.0, score)
def freshness_score(self, date_published: Optional[datetime]) -> float:
"""Score based on content freshness."""
if not date_published:
return 0.3 # Unknown date
age_days = (datetime.now() - date_published).days
# Decay function: Fresh content scores higher
if age_days < 30:
return 1.0
elif age_days < 90:
return 0.8
elif age_days < 365:
return 0.6
elif age_days < 730:
return 0.4
else:
return 0.2
Domain Blacklist
DOMAIN_BLACKLIST = [
'contentvilla.com',
'pastebin.com', # Often scraped/duplicated content
'scam-detector.com',
'pinterest.com', # Image aggregator, not original content
# Add more as needed
]
def is_blacklisted(url: str) -> bool:
"""Check if URL is blacklisted."""
domain = urlparse(url).netloc
return any(blocked in domain for blocked in DOMAIN_BLACKLIST)
Result Ranking
Ranking Algorithm
class ResultRanker:
"""Rank search results by relevance and quality."""
def rank(
self,
results: List[SearchResult],
query: str,
context: Dict = None
) -> List[RankedResult]:
"""Rank results by multiple factors."""
ranked = []
for result in results:
score = self.calculate_score(result, query, context)
ranked.append(RankedResult(
result=result,
score=score
))
# Sort by score (highest first)
ranked.sort(key=lambda x: x.score, reverse=True)
return ranked
def calculate_score(
self,
result: SearchResult,
query: str,
context: Dict
) -> float:
"""Calculate ranking score."""
score = 0.0
# 1. Credibility (40%)
credibility = self.credibility_scorer.score(
result.url,
result.domain,
result
)
score += credibility * 0.4
# 2. Relevance (35%)
relevance = self.calculate_relevance(result, query)
score += relevance * 0.35
# 3. Freshness (10%)
freshness = self.credibility_scorer.freshness_score(result.date_published)
score += freshness * 0.1
# 4. Engagement signals (10%)
# (If available: click-through rate, dwell time, etc.)
score += result.engagement_score * 0.1
# 5. Diversity bonus (5%)
# Prefer results from different domains
if context and context.get('seen_domains'):
if result.domain not in context['seen_domains']:
score += 0.05
return score
def calculate_relevance(self, result: SearchResult, query: str) -> float:
"""Calculate query-result relevance."""
# Simple keyword matching (can be enhanced with embeddings)
query_terms = set(query.lower().split())
# Check title
title_terms = set(result.title.lower().split())
title_overlap = len(query_terms & title_terms) / len(query_terms)
# Check snippet
snippet_terms = set(result.snippet.lower().split())
snippet_overlap = len(query_terms & snippet_terms) / len(query_terms)
# Weighted average
relevance = 0.6 * title_overlap + 0.4 * snippet_overlap
return relevance
Caching & Deduplication
Search Result Cache
class SearchCache:
"""Cache search results to reduce API calls."""
def __init__(self, ttl_seconds: int = 3600):
self.cache = {}
self.ttl = ttl_seconds
def get(self, query: str, engine: str) -> Optional[List[SearchResult]]:
"""Get cached results."""
key = self.make_key(query, engine)
if key in self.cache:
cached, timestamp = self.cache[key]
# Check if still valid
age = (datetime.now() - timestamp).total_seconds()
if age < self.ttl:
return cached
else:
# Expired, remove
del self.cache[key]
return None
def set(self, query: str, engine: str, results: List[SearchResult]):
"""Cache results."""
key = self.make_key(query, engine)
self.cache[key] = (results, datetime.now())
def make_key(self, query: str, engine: str) -> str:
"""Generate cache key."""
normalized = query.lower().strip()
return f"{engine}:{normalized}"
Result Deduplication
class ResultDeduplicator:
"""Remove duplicate results across multiple searches."""
def deduplicate(self, results: List[SearchResult]) -> List[SearchResult]:
"""Remove duplicates."""
seen_urls = set()
seen_titles = set()
unique = []
for result in results:
# Normalize URL (remove query params, fragments)
normalized_url = self.normalize_url(result.url)
# Normalize title
normalized_title = result.title.lower().strip()
# Check if we've seen this result
if normalized_url in seen_urls:
continue
# Check for near-duplicate titles
if self.is_near_duplicate_title(normalized_title, seen_titles):
continue
# Add to unique set
unique.append(result)
seen_urls.add(normalized_url)
seen_titles.add(normalized_title)
return unique
def normalize_url(self, url: str) -> str:
"""Normalize URL for comparison."""
parsed = urlparse(url)
# Remove query params and fragment
normalized = f"{parsed.scheme}://{parsed.netloc}{parsed.path}"
# Remove trailing slash
return normalized.rstrip('/')
def is_near_duplicate_title(self, title: str, seen_titles: Set[str]) -> bool:
"""Check if title is near-duplicate of seen titles."""
from difflib import SequenceMatcher
for seen in seen_titles:
similarity = SequenceMatcher(None, title, seen).ratio()
if similarity > 0.85: # 85% similar
return True
return False
Configuration
Search Engine Settings
interface SearchEngineConfig {
default: 'google' | 'bing' | 'brave' | 'duckduckgo' | 'perplexity';
providers: {
google?: GoogleConfig;
bing?: BingConfig;
brave?: BraveConfig;
duckduckgo?: DuckDuckGoConfig;
perplexity?: PerplexityConfig;
};
// Global settings
maxResults: number; // Default: 10
timeout: number; // Seconds
cacheResults: boolean; // Default: true
cacheTTL: number; // Seconds
// Query optimization
optimizeQueries: boolean; // Default: true
expandQueries: boolean; // Default: false
// Multi-hop
enableMultiHop: boolean; // Default: false
maxHops: number; // Default: 3
// Filtering
filterByCredibility: boolean; // Default: true
minCredibilityScore: number; // Default: 0.4
blacklistedDomains: string[];
// Cost tracking
trackCosts: boolean; // Default: true
dailyQueryLimit: number; // Default: 1000
}
Usage Example
# Initialize search engine
search = SearchEngine(config)
# Simple search
results = await search.search(
query="best Python web scraping libraries",
engine="google",
num_results=10
)
# Optimized search
results = await search.search_optimized(
query="web scraping",
context={"domain": "python.org", "year": 2026},
optimize=True,
filter_credibility=True
)
# Multi-hop search
multi_hop_results = await search.search_multi_hop(
initial_query="web scraping tools",
max_hops=3
)
# Get ranked results
ranked = search.rank_results(
results,
query="web scraping tools",
context={"seen_domains": ["github.com"]}
)
Next: See agents.md for agent architecture.