| |
|
|
| import asyncio |
| import json |
| import logging |
| import os |
| import sys |
| from typing import Any, Dict, List, Sequence |
|
|
| |
| from mcp import Tool |
| from mcp.server import Server |
| from mcp.types import ( |
| TextContent, |
| ImageContent, |
| EmbeddedResource, |
| LoggingLevel |
| ) |
|
|
| |
| import requests |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger("kali-mcp-server") |
|
|
| |
| KALI_API_URL = os.environ.get("KALI_API_URL", "https://sauserla-kali.hf.space") |
|
|
| class KaliMCPServer: |
| """MCP Server for Kali Linux tools""" |
|
|
| def __init__(self, api_url: str = KALI_API_URL): |
| self.api_url = api_url.rstrip('/') |
| self.session = requests.Session() |
| logger.info(f"Initialized Kali MCP Server with API: {self.api_url}") |
|
|
| def _call_api(self, endpoint: str, method: str = "GET", data: Dict = None) -> Dict[str, Any]: |
| """Make API call to Kali server""" |
| url = f"{self.api_url}{endpoint}" |
|
|
| try: |
| if method.upper() == "POST": |
| response = self.session.post(url, json=data, timeout=300) |
| else: |
| response = self.session.get(url, timeout=30) |
|
|
| response.raise_for_status() |
| return response.json() |
|
|
| except requests.exceptions.RequestException as e: |
| logger.error(f"API call failed: {str(e)}") |
| return {"error": str(e)} |
|
|
| def get_capabilities(self) -> Dict[str, Any]: |
| """Get MCP tool capabilities""" |
| return self._call_api("/mcp/capabilities") |
|
|
| def execute_command(self, command: str) -> Dict[str, Any]: |
| """Execute a command via the API""" |
| data = {"command": command} |
| return self._call_api("/api/command", "POST", data) |
|
|
| def health_check(self) -> Dict[str, Any]: |
| """Check server health""" |
| return self._call_api("/health") |
|
|
| |
| kali_server = KaliMCPServer() |
|
|
| |
| server = Server("kali-mcp-server") |
|
|
| @server.tool() |
| async def execute_command(command: str) -> str: |
| """Execute arbitrary commands on Kali Linux |
| |
| Args: |
| command: The shell command to execute on the Kali Linux system |
| |
| Returns: |
| Command execution results including stdout, stderr, and exit code |
| """ |
| logger.info(f"Executing command: {command}") |
|
|
| try: |
| result = kali_server.execute_command(command) |
|
|
| if result.get("error"): |
| return f"Error: {result['error']}" |
|
|
| output = [] |
| if result.get("stdout"): |
| output.append(f"STDOUT:\n{result['stdout']}") |
| if result.get("stderr"): |
| output.append(f"STDERR:\n{result['stderr']}") |
|
|
| status = "SUCCESS" if result.get("success") else "FAILED" |
| output.append(f"Exit Code: {result.get('return_code', 'unknown')}") |
| output.append(f"Status: {status}") |
|
|
| if result.get("timed_out"): |
| output.append("WARNING: Command timed out after 180 seconds") |
|
|
| return "\n\n".join(output) |
|
|
| except Exception as e: |
| logger.error(f"Tool execution error: {str(e)}") |
| return f"Execution failed: {str(e)}" |
|
|
| @server.tool() |
| async def server_health() -> str: |
| """Check the health status of the Kali Linux server and available tools |
| |
| Returns: |
| Server health information and tool availability status |
| """ |
| try: |
| health = kali_server.health_check() |
|
|
| if health.get("error"): |
| return f"Health check failed: {health['error']}" |
|
|
| output = [] |
| output.append(f"Status: {health.get('status', 'unknown')}") |
| output.append(f"Message: {health.get('message', '')}") |
| output.append(f"All Essential Tools Available: {health.get('all_essential_tools_available', False)}") |
|
|
| tools_status = health.get('tools_status', {}) |
| if tools_status: |
| output.append("\nTool Status:") |
| for tool, available in tools_status.items(): |
| status_icon = "✅" if available else "❌" |
| output.append(f" {status_icon} {tool}") |
|
|
| return "\n".join(output) |
|
|
| except Exception as e: |
| logger.error(f"Health check error: {str(e)}") |
| return f"Health check failed: {str(e)}" |
|
|
| async def main(): |
| """Main server entry point""" |
| |
| from mcp.server.stdio import stdio_server |
|
|
| logger.info("Starting Kali Linux MCP Server") |
|
|
| |
| try: |
| health = kali_server.health_check() |
| if health.get("error"): |
| logger.error(f"Failed to connect to Kali API: {health['error']}") |
| sys.exit(1) |
| else: |
| logger.info("✅ Successfully connected to Kali API server") |
| except Exception as e: |
| logger.error(f"Connection test failed: {str(e)}") |
| sys.exit(1) |
|
|
| async with stdio_server() as (read_stream, write_stream): |
| await server.run( |
| read_stream, |
| write_stream, |
| server.create_initialization_options() |
| ) |
|
|
| if __name__ == "__main__": |
| asyncio.run(main()) |