docker-mcp-server / proxy_script.py
Peter Michael Gits
2nd checkin
795ea6f
#!/usr/bin/env python3
"""
Local MCP proxy script for connecting Claude Desktop to remote HTTP MCP server
Save as: mcp_proxy.py
"""
import asyncio
import json
import sys
import aiohttp
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Your Hugging Face Spaces MCP server URL
MCP_SERVER_URL = "https://pgits-docker-mcp-server.hf.space/mcp"
async def proxy_request(request_data):
"""Proxy MCP request to HTTP server"""
try:
async with aiohttp.ClientSession() as session:
async with session.post(
MCP_SERVER_URL,
json=request_data,
headers={"Content-Type": "application/json"},
timeout=aiohttp.ClientTimeout(total=30)
) as response:
if response.status == 200:
return await response.json()
else:
logger.error(f"HTTP error: {response.status}")
return {
"jsonrpc": "2.0",
"id": request_data.get("id"),
"error": {
"code": -32603,
"message": f"HTTP error: {response.status}"
}
}
except Exception as e:
logger.error(f"Request failed: {e}")
return {
"jsonrpc": "2.0",
"id": request_data.get("id"),
"error": {
"code": -32603,
"message": f"Connection error: {str(e)}"
}
}
async def main():
"""Main proxy loop"""
logger.info(f"Starting MCP proxy for {MCP_SERVER_URL}")
try:
while True:
try:
# Read from stdin
line = await asyncio.get_event_loop().run_in_executor(
None, sys.stdin.readline
)
if not line:
break
# Parse JSON request
request_data = json.loads(line.strip())
logger.info(f"Proxying request: {request_data.get('method')}")
# Send to remote server
response = await proxy_request(request_data)
# Send response back
print(json.dumps(response))
sys.stdout.flush()
except json.JSONDecodeError as e:
logger.error(f"JSON decode error: {e}")
error_response = {
"jsonrpc": "2.0",
"id": None,
"error": {
"code": -32700,
"message": "Parse error"
}
}
print(json.dumps(error_response))
sys.stdout.flush()
except KeyboardInterrupt:
logger.info("Proxy shutting down")
except Exception as e:
logger.error(f"Proxy error: {e}")
if __name__ == "__main__":
asyncio.run(main())