Spaces:
Running
Running
import os | |
import asyncio | |
import time | |
from typing import Optional | |
from datetime import datetime | |
import httpx | |
import trafilatura | |
import gradio as gr | |
from dateutil import parser as dateparser | |
from limits import parse | |
from limits.aio.storage import MemoryStorage | |
from limits.aio.strategies import MovingWindowRateLimiter | |
from analytics import record_request, last_n_days_df, last_n_days_avg_time_df | |
# Configuration | |
SERPER_API_KEY = os.getenv("SERPER_API_KEY") | |
SERPER_SEARCH_ENDPOINT = "https://google.serper.dev/search" | |
SERPER_NEWS_ENDPOINT = "https://google.serper.dev/news" | |
HEADERS = {"X-API-KEY": SERPER_API_KEY, "Content-Type": "application/json"} | |
# Rate limiting | |
storage = MemoryStorage() | |
limiter = MovingWindowRateLimiter(storage) | |
rate_limit = parse("360/hour") | |
async def search_web( | |
query: str, search_type: str = "search", num_results: Optional[int] = 4 | |
) -> str: | |
""" | |
Search the web for information or fresh news, returning extracted content. | |
This tool can perform two types of searches: | |
- "search" (default): General web search for diverse, relevant content from various sources | |
- "news": Specifically searches for fresh news articles and breaking stories | |
Use "news" mode when looking for: | |
- Breaking news or very recent events | |
- Time-sensitive information | |
- Current affairs and latest developments | |
- Today's/this week's happenings | |
Use "search" mode (default) for: | |
- General information and research | |
- Technical documentation or guides | |
- Historical information | |
- Diverse perspectives from various sources | |
Args: | |
query (str): The search query. This is REQUIRED. Examples: "apple inc earnings", | |
"climate change 2024", "AI developments" | |
search_type (str): Type of search. This is OPTIONAL. Default is "search". | |
Options: "search" (general web search) or "news" (fresh news articles). | |
Use "news" for time-sensitive, breaking news content. | |
num_results (int): Number of results to fetch. This is OPTIONAL. Default is 4. | |
Range: 1-20. More results = more context but longer response time. | |
Returns: | |
str: Formatted text containing extracted content with metadata (title, | |
source, date, URL, and main text) for each result, separated by dividers. | |
Returns error message if API key is missing or search fails. | |
Examples: | |
- search_web("OpenAI GPT-5", "news") - Get 5 fresh news articles about OpenAI | |
- search_web("python tutorial", "search") - Get 4 general results about Python (default count) | |
- search_web("stock market today", "news", 10) - Get 10 news articles about today's market | |
- search_web("machine learning basics") - Get 4 general search results (all defaults) | |
""" | |
start_time = time.time() | |
if not SERPER_API_KEY: | |
await record_request(None, num_results) # Record even failed requests | |
return "Error: SERPER_API_KEY environment variable is not set. Please set it to use this tool." | |
# Validate and constrain num_results | |
if num_results is None: | |
num_results = 4 | |
num_results = max(1, min(20, num_results)) | |
# Validate search_type | |
if search_type not in ["search", "news"]: | |
search_type = "search" | |
try: | |
# Check rate limit | |
if not await limiter.hit(rate_limit, "global"): | |
print(f"[{datetime.now().isoformat()}] Rate limit exceeded") | |
duration = time.time() - start_time | |
await record_request(duration, num_results) | |
return "Error: Rate limit exceeded. Please try again later (limit: 500 requests per hour)." | |
# Select endpoint based on search type | |
endpoint = ( | |
SERPER_NEWS_ENDPOINT if search_type == "news" else SERPER_SEARCH_ENDPOINT | |
) | |
# Prepare payload | |
payload = {"q": query, "num": num_results} | |
if search_type == "news": | |
payload["type"] = "news" | |
payload["page"] = 1 | |
async with httpx.AsyncClient(timeout=15) as client: | |
resp = await client.post(endpoint, headers=HEADERS, json=payload) | |
if resp.status_code != 200: | |
duration = time.time() - start_time | |
await record_request(duration, num_results) | |
return f"Error: Search API returned status {resp.status_code}. Please check your API key and try again." | |
# Extract results based on search type | |
if search_type == "news": | |
results = resp.json().get("news", []) | |
else: | |
results = resp.json().get("organic", []) | |
if not results: | |
duration = time.time() - start_time | |
await record_request(duration, num_results) | |
return f"No {search_type} results found for query: '{query}'. Try a different search term or search type." | |
# Fetch HTML content concurrently | |
urls = [r["link"] for r in results] | |
async with httpx.AsyncClient(timeout=20, follow_redirects=True) as client: | |
tasks = [client.get(u) for u in urls] | |
responses = await asyncio.gather(*tasks, return_exceptions=True) | |
# Extract and format content | |
chunks = [] | |
successful_extractions = 0 | |
for meta, response in zip(results, responses): | |
if isinstance(response, Exception): | |
continue | |
# Extract main text content | |
body = trafilatura.extract( | |
response.text, include_formatting=False, include_comments=False | |
) | |
if not body: | |
continue | |
successful_extractions += 1 | |
print( | |
f"[{datetime.now().isoformat()}] Successfully extracted content from {meta['link']}" | |
) | |
# Format the chunk based on search type | |
if search_type == "news": | |
# News results have date and source | |
try: | |
date_str = meta.get("date", "") | |
if date_str: | |
date_iso = dateparser.parse(date_str, fuzzy=True).strftime( | |
"%Y-%m-%d" | |
) | |
else: | |
date_iso = "Unknown" | |
except Exception: | |
date_iso = "Unknown" | |
chunk = ( | |
f"## {meta['title']}\n" | |
f"**Source:** {meta.get('source', 'Unknown')} " | |
f"**Date:** {date_iso}\n" | |
f"**URL:** {meta['link']}\n\n" | |
f"{body.strip()}\n" | |
) | |
else: | |
# Search results don't have date/source but have domain | |
domain = meta["link"].split("/")[2].replace("www.", "") | |
chunk = ( | |
f"## {meta['title']}\n" | |
f"**Domain:** {domain}\n" | |
f"**URL:** {meta['link']}\n\n" | |
f"{body.strip()}\n" | |
) | |
chunks.append(chunk) | |
if not chunks: | |
duration = time.time() - start_time | |
await record_request(duration, num_results) | |
return f"Found {len(results)} {search_type} results for '{query}', but couldn't extract readable content from any of them. The websites might be blocking automated access." | |
result = "\n---\n".join(chunks) | |
summary = f"Successfully extracted content from {successful_extractions} out of {len(results)} {search_type} results for query: '{query}'\n\n---\n\n" | |
print( | |
f"[{datetime.now().isoformat()}] Extraction complete: {successful_extractions}/{len(results)} successful for query '{query}'" | |
) | |
# Record successful request with duration | |
duration = time.time() - start_time | |
await record_request(duration, num_results) | |
return summary + result | |
except Exception as e: | |
# Record failed request with duration | |
duration = time.time() - start_time | |
return f"Error occurred while searching: {str(e)}. Please try again or check your query." | |
# Create Gradio interface | |
with gr.Blocks(title="Web Search MCP Server") as demo: | |
gr.HTML( | |
""" | |
<div style="background-color: rgba(59, 130, 246, 0.1); border: 1px solid rgba(59, 130, 246, 0.3); border-radius: 8px; padding: 12px; margin-bottom: 16px; text-align: center;"> | |
<p style="color: rgb(59, 130, 246); margin: 0; font-size: 14px; font-weight: 500;"> | |
🤝 Community resource — please use responsibly to keep this service available for everyone | |
</p> | |
</div> | |
""" | |
) | |
gr.Markdown("# 🔍 Web Search MCP Server") | |
with gr.Tabs(): | |
with gr.Tab("App"): | |
gr.Markdown( | |
""" | |
This MCP server provides web search capabilities to LLMs. It can perform general web searches | |
or specifically search for fresh news articles, extracting the main content from results. | |
**⚡ Speed-Focused:** Optimized to complete the entire search process - from query to | |
fully extracted web content - in under 2 seconds. Check out the Analytics tab | |
to see real-time performance metrics. | |
**Search Types:** | |
- **General Search**: Diverse results from various sources (blogs, docs, articles, etc.) | |
- **News Search**: Fresh news articles and breaking stories from news sources | |
**Note:** This interface is primarily designed for MCP tool usage by LLMs, but you can | |
also test it manually below. | |
""" | |
) | |
gr.HTML( | |
""" | |
<div style="margin-bottom: 24px;"> | |
<a href="https://huggingface.co/spaces/victor/websearch?view=api"> | |
<img src="https://huggingface.co/datasets/huggingface/badges/resolve/main/use-with-mcp-lg-dark.svg" | |
alt="Use with MCP" | |
style="height: 36px;"> | |
</a> | |
</div> | |
""", | |
padding=0, | |
) | |
with gr.Row(): | |
with gr.Column(scale=3): | |
query_input = gr.Textbox( | |
label="Search Query", | |
placeholder='e.g. "OpenAI news", "climate change 2024", "AI developments"', | |
info="Required: Enter your search query", | |
) | |
with gr.Column(scale=1): | |
search_type_input = gr.Radio( | |
choices=["search", "news"], | |
value="search", | |
label="Search Type", | |
info="Choose search type", | |
) | |
with gr.Row(): | |
num_results_input = gr.Slider( | |
minimum=1, | |
maximum=20, | |
value=4, | |
step=1, | |
label="Number of Results", | |
info="Optional: How many results to fetch (default: 4)", | |
) | |
search_button = gr.Button("Search", variant="primary") | |
output = gr.Textbox( | |
label="Extracted Content", | |
lines=25, | |
max_lines=50, | |
info="The extracted article content will appear here", | |
) | |
# Add examples | |
gr.Examples( | |
examples=[ | |
["OpenAI GPT-5 latest developments", "news", 5], | |
["React hooks useState", "search", 4], | |
["Tesla stock price today", "news", 6], | |
["Apple Vision Pro reviews", "search", 4], | |
["best Italian restaurants NYC", "search", 4], | |
], | |
inputs=[query_input, search_type_input, num_results_input], | |
outputs=output, | |
fn=search_web, | |
cache_examples=False, | |
) | |
with gr.Tab("Analytics"): | |
gr.Markdown("## Community Usage Analytics") | |
gr.Markdown( | |
"Track daily request counts and average response times from all community users." | |
) | |
with gr.Row(): | |
with gr.Column(): | |
requests_plot = gr.BarPlot( | |
value=last_n_days_df( | |
14 | |
), # Show only last 14 days for better visibility | |
x="date", | |
y="count", | |
title="Daily Request Count", | |
tooltip=["date", "count"], | |
height=350, | |
x_label_angle=-45, # Rotate labels to prevent overlap | |
container=False, | |
) | |
with gr.Column(): | |
avg_time_plot = gr.BarPlot( | |
value=last_n_days_avg_time_df(14), # Show only last 14 days | |
x="date", | |
y="avg_time", | |
title="Average Request Time (seconds)", | |
tooltip=["date", "avg_time", "request_count"], | |
height=350, | |
x_label_angle=-45, | |
container=False, | |
) | |
search_button.click( | |
fn=search_web, # Use search_web directly instead of search_and_log | |
inputs=[query_input, search_type_input, num_results_input], | |
outputs=output, | |
api_name=False, # Hide this endpoint from API & MCP | |
) | |
# Load fresh analytics data when the page loads or Analytics tab is clicked | |
demo.load( | |
fn=lambda: (last_n_days_df(14), last_n_days_avg_time_df(14)), | |
outputs=[requests_plot, avg_time_plot], | |
api_name=False, | |
) | |
# Expose search_web as the only MCP tool | |
gr.api(search_web, api_name="search_web") | |
if __name__ == "__main__": | |
# Launch with MCP server enabled | |
# The MCP endpoint will be available at: http://localhost:7860/gradio_api/mcp/sse | |
demo.launch(mcp_server=True, show_api=True) | |