Kali / mcp_server.py
SAUSERLA's picture
Upload 5 files
37f9172 verified
#!/usr/bin/env python3
import asyncio
import json
import logging
import os
import sys
from typing import Any, Dict, List, Sequence
# MCP SDK imports
from mcp import Tool
from mcp.server import Server
from mcp.types import (
TextContent,
ImageContent,
EmbeddedResource,
LoggingLevel
)
# HTTP client for API calls
import requests
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("kali-mcp-server")
# Configuration
KALI_API_URL = os.environ.get("KALI_API_URL", "https://sauserla-kali.hf.space")
class KaliMCPServer:
"""MCP Server for Kali Linux tools"""
def __init__(self, api_url: str = KALI_API_URL):
self.api_url = api_url.rstrip('/')
self.session = requests.Session()
logger.info(f"Initialized Kali MCP Server with API: {self.api_url}")
def _call_api(self, endpoint: str, method: str = "GET", data: Dict = None) -> Dict[str, Any]:
"""Make API call to Kali server"""
url = f"{self.api_url}{endpoint}"
try:
if method.upper() == "POST":
response = self.session.post(url, json=data, timeout=300)
else:
response = self.session.get(url, timeout=30)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logger.error(f"API call failed: {str(e)}")
return {"error": str(e)}
def get_capabilities(self) -> Dict[str, Any]:
"""Get MCP tool capabilities"""
return self._call_api("/mcp/capabilities")
def execute_command(self, command: str) -> Dict[str, Any]:
"""Execute a command via the API"""
data = {"command": command}
return self._call_api("/api/command", "POST", data)
def health_check(self) -> Dict[str, Any]:
"""Check server health"""
return self._call_api("/health")
# Global server instance
kali_server = KaliMCPServer()
# MCP Server setup
server = Server("kali-mcp-server")
@server.tool()
async def execute_command(command: str) -> str:
"""Execute arbitrary commands on Kali Linux
Args:
command: The shell command to execute on the Kali Linux system
Returns:
Command execution results including stdout, stderr, and exit code
"""
logger.info(f"Executing command: {command}")
try:
result = kali_server.execute_command(command)
if result.get("error"):
return f"Error: {result['error']}"
output = []
if result.get("stdout"):
output.append(f"STDOUT:\n{result['stdout']}")
if result.get("stderr"):
output.append(f"STDERR:\n{result['stderr']}")
status = "SUCCESS" if result.get("success") else "FAILED"
output.append(f"Exit Code: {result.get('return_code', 'unknown')}")
output.append(f"Status: {status}")
if result.get("timed_out"):
output.append("WARNING: Command timed out after 180 seconds")
return "\n\n".join(output)
except Exception as e:
logger.error(f"Tool execution error: {str(e)}")
return f"Execution failed: {str(e)}"
@server.tool()
async def server_health() -> str:
"""Check the health status of the Kali Linux server and available tools
Returns:
Server health information and tool availability status
"""
try:
health = kali_server.health_check()
if health.get("error"):
return f"Health check failed: {health['error']}"
output = []
output.append(f"Status: {health.get('status', 'unknown')}")
output.append(f"Message: {health.get('message', '')}")
output.append(f"All Essential Tools Available: {health.get('all_essential_tools_available', False)}")
tools_status = health.get('tools_status', {})
if tools_status:
output.append("\nTool Status:")
for tool, available in tools_status.items():
status_icon = "✅" if available else "❌"
output.append(f" {status_icon} {tool}")
return "\n".join(output)
except Exception as e:
logger.error(f"Health check error: {str(e)}")
return f"Health check failed: {str(e)}"
async def main():
"""Main server entry point"""
# Import here to avoid issues if MCP SDK is not available
from mcp.server.stdio import stdio_server
logger.info("Starting Kali Linux MCP Server")
# Test connection on startup
try:
health = kali_server.health_check()
if health.get("error"):
logger.error(f"Failed to connect to Kali API: {health['error']}")
sys.exit(1)
else:
logger.info("✅ Successfully connected to Kali API server")
except Exception as e:
logger.error(f"Connection test failed: {str(e)}")
sys.exit(1)
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
server.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())