File size: 17,985 Bytes
1dbe6fc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e36de10
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
import gradio as gr
from huggingface_hub import HfApi, list_spaces
import time
from typing import List, Dict, Optional
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor, as_completed

@dataclass
class MCPSpace:
    id: str
    title: str
    author: str
    likes: int
    status: str
    url: str
    description: str
    sdk: str
    last_modified: str
    created_at: str

class MCPSpaceFinder:
    def __init__(self):
        self.api = HfApi()
        self.all_mcp_spaces_cache = None  # Cache for ALL MCP spaces
        self.running_spaces_cache = None  # Separate cache for running/building spaces
        self.cache_timestamp = None
        self.cache_duration = 300  # 5 minutes cache
        
    def get_space_status(self, space_id: str) -> str:
        """Get the current runtime status of a space."""
        try:
            runtime = self.api.get_space_runtime(space_id)
            if hasattr(runtime, 'stage'):
                return runtime.stage
            return "unknown"
        except Exception:
            return "error"
    
    def process_space_batch(self, spaces_batch) -> List[MCPSpace]:
        """Process a batch of spaces to get their runtime status."""
        processed_spaces = []
        
        for space in spaces_batch:
            try:
                space_id = space.id
                
                # Get space status -- an expensive operation (separate request per space)
                status = self.get_space_status(space_id)
                
                processed_space = MCPSpace(
                    id=space_id,
                    title=getattr(space, 'title', space_id.split('/')[-1]) or space_id.split('/')[-1],
                    author=space.author,
                    likes=getattr(space, 'likes', 0),
                    status=status,
                    url=f"https://huggingface.co/spaces/{space_id}",
                    description=getattr(space, 'description', 'No description available') or 'No description available',
                    sdk=getattr(space, 'sdk', 'unknown'),
                    last_modified=str(getattr(space, 'last_modified', 'unknown')),
                    created_at=str(getattr(space, 'created_at', 'unknown'))
                )
                processed_spaces.append(processed_space)
                
            except Exception as e:
                print(f"Error processing space {getattr(space, 'id', 'unknown')}: {e}")
                continue
                
        return processed_spaces
    
    def find_all_mcp_spaces(self, check_status: bool = False) -> List[MCPSpace]:
        """
        Find ALL MCP-enabled spaces, optionally checking their runtime status.
        
        Args:
            check_status: If True, fetch runtime status (slower). If False, return all spaces without status check.
        
        Returns:
            List of MCPSpace objects
        """
        
        # Check cache first
        if not check_status and self.all_mcp_spaces_cache is not None:
            if self.cache_timestamp and time.time() - self.cache_timestamp < self.cache_duration:
                return self.all_mcp_spaces_cache
        
        if check_status and self.running_spaces_cache is not None:
            if self.cache_timestamp and time.time() - self.cache_timestamp < self.cache_duration:
                return self.running_spaces_cache
        
        print("Fetching ALL MCP spaces from HuggingFace Hub...")
        
        # Get ALL spaces with the mcp-server tag
        # Using limit=None or a very high limit to ensure we get everything
        try:
            # First try with no limit (gets all)
            spaces = list(list_spaces(
                filter="mcp-server",
                sort="likes",
                direction=-1,
                limit=None,  # Get ALL spaces
                full=True
            ))
        except Exception as e:
            print(f"Failed with limit=None, trying with high limit: {e}")
            # Fallback to high limit if None doesn't work
            spaces = list(list_spaces(
                filter="mcp-server",
                sort="likes", 
                direction=-1,
                limit=10000,  # Very high limit to ensure we get all the relevant spaces
                full=True
            ))
        
        print(f"Found {len(spaces)} total spaces with mcp-server tag")
        
        if not check_status:
            # Quick mode: Don't check runtime status, just return basic info
            all_spaces = []
            for space in spaces:
                try:
                    processed_space = MCPSpace(
                        id=space.id,
                        title=getattr(space, 'title', space.id.split('/')[-1]) or space.id.split('/')[-1],
                        author=space.author,
                        likes=getattr(space, 'likes', 0),
                        status="not_checked",  # We're not checking status in quick mode
                        url=f"https://huggingface.co/spaces/{space.id}",
                        description=getattr(space, 'description', 'No description available') or 'No description available',
                        sdk=getattr(space, 'sdk', 'unknown'),
                        last_modified=str(getattr(space, 'last_modified', 'unknown')),
                        created_at=str(getattr(space, 'created_at', 'unknown'))
                    )
                    all_spaces.append(processed_space)
                except Exception as e:
                    print(f"Error processing space: {e}")
                    continue
            
            # Sort by likes
            all_spaces.sort(key=lambda x: x.likes, reverse=True)
            
            # Cache the results
            self.all_mcp_spaces_cache = all_spaces
            self.cache_timestamp = time.time()
            
            return all_spaces
        
        else:
            # Full mode: Check runtime status -- will be slow
            print("Checking runtime status for spaces...")
            
            # Process spaces in batches using ThreadPoolExecutor for status checking
            batch_size = 50
            space_batches = [spaces[i:i + batch_size] for i in range(0, len(spaces), batch_size)]
            
            all_processed_spaces = []
            
            with ThreadPoolExecutor(max_workers=5) as executor:
                future_to_batch = {
                    executor.submit(self.process_space_batch, batch): batch 
                    for batch in space_batches
                }
                
                for future in as_completed(future_to_batch):
                    try:
                        batch_results = future.result()
                        all_processed_spaces.extend(batch_results)
                    except Exception as e:
                        print(f"Error processing batch: {e}")
            
            # Filter to only include running or building spaces if desired
            running_building_spaces = [
                space for space in all_processed_spaces 
                if space.status in ["RUNNING", "RUNNING_BUILDING", "BUILDING"]
            ]
            
            # Sort by likes descending
            running_building_spaces.sort(key=lambda x: x.likes, reverse=True)
            
            # Debug: Count by status
            status_counts = {}
            for space in all_processed_spaces:
                status_counts[space.status] = status_counts.get(space.status, 0) + 1
            
            print(f"Status breakdown: {status_counts}")
            print(f"Found {len(running_building_spaces)} running/building spaces out of {len(all_processed_spaces)} total")
            
            # Cache the results
            self.running_spaces_cache = running_building_spaces
            self.cache_timestamp = time.time()
            
            return running_building_spaces
    
    def find_mcp_spaces(self, limit: int = None, only_running: bool = False) -> List[MCPSpace]:
        """
        Backward compatible method that can either:
        1. Get ALL MCP spaces (default behavior, matching SA)
        2. Get only running/building spaces (original FA behavior)
        
        Args:
            limit: Optional limit on number of spaces to return
            only_running: If True, only return running/building spaces (requires status check)
        """
        if only_running:
            spaces = self.find_all_mcp_spaces(check_status=True)
        else:
            spaces = self.find_all_mcp_spaces(check_status=False)
        
        if limit and limit < len(spaces):
            return spaces[:limit]
        return spaces

# Initialize the finder
finder = MCPSpaceFinder()

def get_mcp_spaces_list():
    """Get the list of ALL MCP spaces for the dropdown."""
    # Get ALL spaces, not just running ones (matching SA behavior)
    spaces = finder.find_mcp_spaces(only_running=False)
    
    # Create options for dropdown - format as username/spacename
    options = []
    for space in spaces:
        label = space.id  # This is already in format "username/spacename"
        options.append((label, space.id))
    
    return options, len(spaces)

def display_space_info(space_id):
    """Display detailed information about the selected space."""
    if not space_id:
        return "Please select a space from the dropdown."
    
    # First check the all spaces cache
    spaces = finder.all_mcp_spaces_cache or finder.find_mcp_spaces(only_running=False)
    selected_space = next((s for s in spaces if s.id == space_id), None)
    
    if not selected_space:
        return f"Space {space_id} not found in cache."
    
    # Get fresh status if not already checked
    if selected_space.status == "not_checked":
        selected_space.status = finder.get_space_status(space_id)
    
    # Get status emoji and description
    status_emoji = "🟒" if selected_space.status in ["RUNNING", "RUNNING_BUILDING"] else "🟑" if selected_space.status in ["BUILDING"] else "πŸ”΄"
    status_description = {
        "RUNNING": "Ready to use",
        "RUNNING_BUILDING": "Running (rebuilding)",
        "BUILDING": "Building - please wait",
        "STOPPED": "Stopped/Sleeping",
        "PAUSED": "Paused",
        "error": "Error getting status",
        "unknown": "Status unknown",
        "not_checked": "Status not checked"
    }.get(selected_space.status, selected_space.status)
    
    # Format the information
    info = f"""
# {selected_space.title}
**Author:** {selected_space.author}  
**Likes:** ❀️ {selected_space.likes}  
**Status:** {status_emoji} {status_description}  
**SDK:** {selected_space.sdk}  
**Created:** {selected_space.created_at}  
**Last Modified:** {selected_space.last_modified}  
**URL:** [{selected_space.url}]({selected_space.url})
**Description:**  
{selected_space.description}
---
## πŸ”§ MCP Configuration
### For VSCode/Cursor/Claude Code (Recommended)
```json
{{
  "servers": {{
    "{selected_space.id.replace('/', '-')}": {{
      "url": "{selected_space.url}/gradio_api/mcp/sse"
    }}
  }}
}}
```
### For Claude Desktop
```json
{{
  "mcpServers": {{
    "{selected_space.id.replace('/', '-')}": {{
      "command": "npx",
      "args": [
        "mcp-remote", 
        "{selected_space.url}/gradio_api/mcp/sse"
      ]
    }}
  }}
}}
```
### Alternative: Use HF MCP Space Server
```json
{{
  "mcpServers": {{
    "hf-spaces": {{
      "command": "npx",
      "args": [
        "-y", 
        "@llmindset/mcp-hfspace",
        "{selected_space.id}"
      ]
    }}
  }}
}}
```
---
**Note:** {status_description}{"" if selected_space.status in ["RUNNING", "RUNNING_BUILDING"] else " - The space may need to be started before use."}
    """.strip()
    
    return info

def show_all_spaces(filter_running: bool = False):
    """
    Display information about MCP spaces.
    
    Args:
        filter_running: If True, only show running/building spaces. If False, show all.
    """
    spaces = finder.find_mcp_spaces(only_running=filter_running)
    
    total_spaces = len(spaces)
    
    # Create summary markdown
    filter_text = "Running/Building" if filter_running else "All"
    summary = f"""
# πŸ“Š {filter_text} MCP Servers Summary
**Available MCP Servers:** {total_spaces} {"(running or building)" if filter_running else "(all statuses)"}  
**Sorted by:** Popularity (likes descending)
Browse all MCP servers: [https://huggingface.co/spaces?filter=mcp-server](https://huggingface.co/spaces?filter=mcp-server)
---
"""
    
    # Create DataFrame data
    df_data = []
    for i, space in enumerate(spaces, 1):
        if filter_running:
            status_emoji = "🟒" if space.status == "RUNNING" else "🟑" if space.status == "RUNNING_BUILDING" else "πŸ”Ά"
        else:
            status_emoji = "❓"  # Unknown status for non-filtered view
            
        desc_short = (space.description[:80] + "...") if len(space.description) > 80 else space.description
        
        df_data.append([
            i,  # Rank
            space.id,  # Show as username/spacename format
            space.author,
            space.likes,
            f"{status_emoji} {space.status if filter_running else 'Not checked'}",
            desc_short,
            f"[πŸ”— Open]({space.url})"  # Clickable link
        ])
    
    return summary, df_data

# Create the Gradio interface with toggle for showing all vs running spaces
def create_interface():
    with gr.Blocks(title="HuggingFace MCP Server Browser") as demo:
        gr.Markdown("# πŸ€– HuggingFace MCP Server Browser")
        gr.Markdown("Discover **ALL Model Context Protocol (MCP)** servers on HuggingFace Spaces")
        
        with gr.Row():
            with gr.Column(scale=3):
                # Get initial options and count
                initial_options, total_count = get_mcp_spaces_list()
                
                dropdown = gr.Dropdown(
                    choices=initial_options,
                    label=f"πŸ€– MCP Servers ({total_count} total available)",
                    info="All MCP servers on HuggingFace, sorted by popularity",
                    value=None
                )
                
            with gr.Column(scale=1):
                refresh_btn = gr.Button("πŸ”„ Refresh", variant="secondary")
                filter_toggle = gr.Checkbox(label="Show only running", value=False)
                summary_btn = gr.Button("πŸ“Š Show All", variant="primary")
        
        # Use Markdown for better formatting instead of Textbox
        output_md = gr.Markdown(
            value=f"""
## Welcome! 
Select an MCP server above to view configuration details.
**Total MCP servers found:** {total_count}  
**Browse all:** [https://huggingface.co/spaces?filter=mcp-server](https://huggingface.co/spaces?filter=mcp-server)

ℹ️ **Note:** This browser now shows ALL MCP spaces by default. Enable "Show only running" to filter for active spaces (slower).
            """,
            visible=True
        )
        
        # Add DataFrame for clean table display
        output_df = gr.DataFrame(
            headers=["Rank", "Space ID", "Author", "Likes", "Status", "Description", "Link"],
            datatype=["number", "str", "str", "number", "str", "str", "markdown"],
            visible=False,
            wrap=True,
        )
        
        # Event handlers
        def handle_dropdown_change(space_id):
            if space_id:
                info = display_space_info(space_id)
                return gr.Markdown(value=info, visible=True), gr.DataFrame(visible=False)
            return gr.Markdown(visible=True), gr.DataFrame(visible=False)
        
        def handle_show_all(filter_running):
            summary, df_data = show_all_spaces(filter_running)
            return gr.Markdown(value=summary, visible=True), gr.DataFrame(value=df_data, visible=True)
        
        def handle_refresh(filter_running):
            # Clear cache to force refresh
            finder.all_mcp_spaces_cache = None
            finder.running_spaces_cache = None
            finder.cache_timestamp = None
            
            if filter_running:
                # This will be slower as it checks status
                options, total_count = [(s.id, s.id) for s in finder.find_mcp_spaces(only_running=True)], len(finder.find_mcp_spaces(only_running=True))
                label_text = f"πŸ€– Running MCP Servers ({total_count} available)"
            else:
                options, total_count = get_mcp_spaces_list()
                label_text = f"πŸ€– MCP Servers ({total_count} total available)"
            
            return (
                gr.Dropdown(choices=options, value=None, label=label_text),
                gr.Markdown(value=f"""
## Welcome! 
Select an MCP server above to view configuration details.
**MCP servers found:** {total_count}  
**Filter:** {"Running/Building only" if filter_running else "All spaces"}
**Browse all:** [https://huggingface.co/spaces?filter=mcp-server](https://huggingface.co/spaces?filter=mcp-server)
                """, visible=True),
                gr.DataFrame(visible=False)
            )
        
        dropdown.change(
            fn=handle_dropdown_change,
            inputs=[dropdown],
            outputs=[output_md, output_df]
        )
        
        summary_btn.click(
            fn=handle_show_all,
            inputs=[filter_toggle],
            outputs=[output_md, output_df]
        )
        
        refresh_btn.click(
            fn=handle_refresh,
            inputs=[filter_toggle],
            outputs=[dropdown, output_md, output_df]
        )
        
        filter_toggle.change(
            fn=handle_refresh,
            inputs=[filter_toggle],
            outputs=[dropdown, output_md, output_df]
        )
    
    return demo

# Create and launch the interface
if __name__ == "__main__":
    demo = create_interface()
    demo.launch(mcp_server=True)