Spaces:
Running
Running
import asyncio | |
import os | |
import tempfile | |
import gradio as gr | |
from textwrap import dedent | |
from agno.agent import Agent | |
from agno.tools.mcp import MCPTools | |
from agno.models.nebius import Nebius | |
from mcp import ClientSession | |
from mcp.client.sse import sse_client | |
from dotenv import load_dotenv | |
import base64 | |
import difflib | |
import re | |
import subprocess | |
import sys | |
import shutil | |
import time | |
import aiohttp | |
import logging | |
import signal | |
import socket | |
import json | |
# Настройка логирования | |
logging.basicConfig( | |
level=logging.DEBUG, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
) | |
logger = logging.getLogger(__name__) | |
def clean_and_validate_json(raw_text: str) -> str: | |
""" | |
Агрессивно очищает и валидирует JSON из текста | |
""" | |
logger.debug(f"Попытка очистки JSON из текста длиной {len(raw_text)}") | |
# Убираем все возможные блоки думания и комментариев | |
cleaned = re.sub(r"<think>.*?</think>", "", raw_text, flags=re.DOTALL) | |
cleaned = re.sub(r"```json\s*", "", cleaned) | |
cleaned = re.sub(r"```\s*", "", cleaned) | |
cleaned = cleaned.strip() | |
# Ищем JSON объект от первой { до соответствующей } | |
start_idx = cleaned.find("{") | |
if start_idx == -1: | |
return raw_text | |
# Найдем соответствующую закрывающую скобку | |
bracket_count = 0 | |
end_idx = -1 | |
for i in range(start_idx, len(cleaned)): | |
if cleaned[i] == '{': | |
bracket_count += 1 | |
elif cleaned[i] == '}': | |
bracket_count -= 1 | |
if bracket_count == 0: | |
end_idx = i | |
break | |
if end_idx == -1: | |
return raw_text | |
# Извлекаем JSON часть | |
json_part = cleaned[start_idx:end_idx + 1] | |
try: | |
# Проверяем, является ли это валидным JSON | |
json.loads(json_part) | |
logger.debug("JSON успешно очищен и валидирован") | |
return json_part | |
except json.JSONDecodeError as e: | |
logger.debug(f"Ошибка при валидации очищенного JSON: {e}") | |
return raw_text | |
def standardize_mcp_response(response_text: str, server_name: str) -> str: | |
""" | |
Стандартизирует ответы от разных MCP серверов в единый формат | |
""" | |
try: | |
# Сначала пытаемся парсить как JSON | |
parsed = json.loads(response_text) | |
# Для Circle Test - ответы могут приходить в формате {"results": [...]} | |
if server_name == "circle_test": | |
if isinstance(parsed, dict) and "results" in parsed: | |
return json.dumps(parsed, ensure_ascii=False) | |
elif isinstance(parsed, list): | |
# Если пришел список результатов без обертки | |
standardized = {"results": parsed} | |
return json.dumps(standardized, ensure_ascii=False) | |
# Для Bandit - ответы могут приходить в формате {"results": [...], "metrics": {...}} | |
elif server_name == "bandit": | |
if isinstance(parsed, dict): | |
return json.dumps(parsed, ensure_ascii=False) | |
elif isinstance(parsed, list): | |
# Если пришел список результатов без обертки | |
standardized = {"results": parsed} | |
return json.dumps(standardized, ensure_ascii=False) | |
# Для других серверов - возвращаем как есть, если JSON валидный | |
return json.dumps(parsed, ensure_ascii=False) | |
except json.JSONDecodeError: | |
# Если не JSON, возвращаем оригинальную строку | |
logger.debug(f"Ответ от {server_name} не является валидным JSON") | |
return response_text | |
def extract_json_payload(raw: str) -> str: | |
""" | |
Извлекает первый JSON объект {...} из строки, удаляя Markdown-разметку и дополнительный текст. | |
""" | |
logger.debug(f"Исходная строка для извлечения JSON (длина: {len(raw)}): {raw[:500]}...") | |
# Убираем блоки <think> | |
raw = re.sub(r"<think>.*?</think>", "", raw, flags=re.DOTALL).strip() | |
# Убираем markdown блоки | |
raw = re.sub(r"```json\s*", "", raw) | |
raw = re.sub(r"```", "", raw) | |
# Убираем возможные лишние символы в начале и конце | |
raw = raw.strip() | |
logger.debug(f"После очистки markdown (длина: {len(raw)}): {raw[:500]}...") | |
# Ищем первый '{' и соответствующую ему закрывающую '}' | |
start = raw.find("{") | |
if start == -1: | |
logger.warning("Не найден открывающий символ '{'") | |
return raw | |
# Ищем соответствующую закрывающую скобку | |
bracket_count = 0 | |
end = -1 | |
for i in range(start, len(raw)): | |
if raw[i] == '{': | |
bracket_count += 1 | |
elif raw[i] == '}': | |
bracket_count -= 1 | |
if bracket_count == 0: | |
end = i | |
break | |
if end == -1: | |
logger.warning("Не найден соответствующий закрывающий символ '}'") | |
return raw | |
json_candidate = raw[start:end + 1] | |
logger.debug(f"Извлеченный JSON кандидат (длина: {len(json_candidate)}): {json_candidate[:500]}...") | |
# Проверяем валидность JSON перед возвратом | |
try: | |
parsed = json.loads(json_candidate) | |
logger.debug("JSON успешно распарсен в extract_json_payload") | |
return json_candidate | |
except json.JSONDecodeError as e: | |
logger.warning(f"Извлеченный JSON невалидный: {str(e)}") | |
logger.debug(f"Проблемный JSON: {json_candidate}") | |
# Пытаемся вернуть полную строку без обработки | |
try: | |
# Проверяем, может быть вся строка уже является валидным JSON | |
json.loads(raw) | |
logger.debug("Полная строка является валидным JSON") | |
return raw | |
except json.JSONDecodeError: | |
logger.warning("Даже полная строка не является валидным JSON, возвращаем как есть") | |
return raw | |
# Глобальные переменные для переиспользования сессий | |
MCP_WRAPPERS = {} | |
# Загружаем переменные окружения из .env файла | |
load_dotenv() | |
api_key = os.getenv("NEBIUS_API_KEY") | |
if not api_key: | |
raise ValueError("NEBIUS_API_KEY not found in .env file") | |
# Конфигурация MCP серверов (теперь все в одном контейнере) | |
BANDIT_PORT = os.getenv('BANDIT_INTERNAL_PORT', '7861') | |
DETECT_SECRETS_PORT = os.getenv('DETECT_SECRETS_INTERNAL_PORT', '7862') | |
PIP_AUDIT_PORT = os.getenv('PIP_AUDIT_INTERNAL_PORT', '7863') | |
CIRCLE_TEST_PORT = os.getenv('CIRCLE_TEST_INTERNAL_PORT', '7864') | |
SEMGREP_PORT = os.getenv('SEMGREP_INTERNAL_PORT', '7865') | |
MCP_SERVERS = { | |
"bandit": { | |
"url": f"http://localhost:{BANDIT_PORT}/gradio_api/mcp/sse", | |
"description": "Python code security analysis", | |
"port": int(BANDIT_PORT) | |
}, | |
"detect_secrets": { | |
"url": f"http://localhost:{DETECT_SECRETS_PORT}/gradio_api/mcp/sse", | |
"description": "Secret detection in code", | |
"port": int(DETECT_SECRETS_PORT) | |
}, | |
"pip_audit": { | |
"url": f"http://localhost:{PIP_AUDIT_PORT}/gradio_api/mcp/sse", | |
"description": "Python package vulnerability scanning", | |
"port": int(PIP_AUDIT_PORT) | |
}, | |
"circle_test": { | |
"url": f"http://localhost:{CIRCLE_TEST_PORT}/gradio_api/mcp/sse", | |
"description": "Security policy compliance checking", | |
"port": int(CIRCLE_TEST_PORT) | |
}, | |
"semgrep": { | |
"url": f"http://localhost:{SEMGREP_PORT}/gradio_api/mcp/sse", | |
"description": "Advanced static code analysis", | |
"port": int(SEMGREP_PORT) | |
} | |
} | |
def check_port(port: int) -> bool: | |
"""Проверяет доступность порта""" | |
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
try: | |
result = sock.connect_ex(('127.0.0.1', port)) | |
return result == 0 | |
finally: | |
sock.close() | |
def signal_handler(signum, frame): | |
"""Обработчик сигналов для корректного завершения""" | |
logger.info("Получен сигнал завершения, закрываем серверы...") | |
sys.exit(0) | |
# Регистрируем обработчики сигналов | |
signal.signal(signal.SIGINT, signal_handler) | |
signal.signal(signal.SIGTERM, signal_handler) | |
def generate_simple_diff(original_content: str, updated_content: str, file_path: str) -> str: | |
""" | |
Генерирует простой diff между оригинальным и обновленным содержимым | |
""" | |
diff_lines = list(difflib.unified_diff( | |
original_content.splitlines(keepends=True), | |
updated_content.splitlines(keepends=True), | |
fromfile=f"{file_path} (original)", | |
tofile=f"{file_path} (modified)", | |
n=3 | |
)) | |
if not diff_lines: | |
return "No changes detected." | |
added_lines = sum(1 for l in diff_lines if l.startswith("+") and not l.startswith("+++")) | |
removed_lines = sum(1 for l in diff_lines if l.startswith("-") and not l.startswith("---")) | |
diff_content = "".join(diff_lines) | |
stats = f"\n📊 Changes: +{added_lines} additions, -{removed_lines} deletions" | |
return diff_content + stats | |
async def check_server_availability(url: str, max_retries: int = 5, delay: float = 5.0) -> bool: | |
"""Проверяет доступность MCP сервера с увеличенными таймаутами""" | |
logger.debug(f"Проверка доступности сервера: {url}") | |
for i in range(max_retries): | |
try: | |
async with aiohttp.ClientSession() as session: | |
async with session.get(url, timeout=30) as response: | |
if response.status == 200: | |
logger.info(f"Сервер {url} доступен") | |
return True | |
except Exception as e: | |
logger.warning(f"Попытка {i+1}/{max_retries} не удалась: {str(e)}") | |
await asyncio.sleep(delay) | |
logger.error(f"Сервер {url} недоступен после {max_retries} попыток") | |
return False | |
async def init_all_tools(): | |
"""Инициализирует все MCP инструменты один раз при старте приложения""" | |
global MCP_WRAPPERS | |
try: | |
# Создаем SSE клиенты для каждого сервера | |
for name, cfg in MCP_SERVERS.items(): | |
async with sse_client(cfg["url"]) as (read, write): | |
async with ClientSession(read, write) as session: | |
MCP_WRAPPERS[name] = MCPTools(session=session) | |
logger.info("Все MCP инструменты успешно инициализированы") | |
except Exception as e: | |
logger.error(f"Ошибка при инициализации MCP инструментов: {str(e)}") | |
raise | |
async def run_mcp_agent(message, server_name): | |
"""Запускает агента для конкретного MCP сервера""" | |
logger.info(f"Запуск MCP агента для {server_name}") | |
if not api_key: | |
logger.error("Nebius API key не найден в .env файле") | |
return "Error: Nebius API key not found in .env file" | |
if server_name not in MCP_SERVERS: | |
logger.error(f"Неизвестный MCP сервер: {server_name}") | |
return f"Error: Unknown MCP server {server_name}" | |
if server_name not in MCP_WRAPPERS: | |
logger.error(f"MCP инструмент {server_name} не инициализирован") | |
return f"Error: MCP tool {server_name} not initialized" | |
try: | |
# Получаем инструмент из кэша | |
mcp_tools = MCP_WRAPPERS[server_name] | |
# Создаем агента | |
agent = Agent( | |
tools=[mcp_tools], | |
instructions=dedent(f"""\ | |
You are an intelligent security assistant with access to MCP tools for {server_name}. | |
IMPORTANT INSTRUCTIONS: | |
1. Use the appropriate MCP tool to analyze the provided code | |
2. Return ONLY the raw JSON result from the MCP tool | |
3. Do NOT add any explanations, commentary, or additional formatting | |
4. Do NOT wrap the result in markdown code blocks | |
5. Do NOT add any text before or after the JSON | |
6. If the tool returns a "results" field, return the complete response including that field | |
The JSON output should be clean and parseable without any modifications. | |
"""), | |
markdown=False, # Отключаем Markdown для получения чистого JSON | |
show_tool_calls=True, | |
model=Nebius( | |
id="Qwen/Qwen3-30B-A3B-fast", | |
api_key=api_key | |
) | |
) | |
# Форматируем сообщение | |
formatted_message = f"Analyze this code using {server_name}: {message}" | |
# Запускаем анализ | |
response = await agent.arun(formatted_message) | |
logger.info(f"Успешное выполнение для {server_name}") | |
logger.debug(f"Полный ответ агента для {server_name}: {response.content}") | |
# Сначала пробуем агрессивную очистку | |
cleaned_response = clean_and_validate_json(response.content) | |
# Если агрессивная очистка не помогла, используем обычную функцию | |
if cleaned_response == response.content: | |
cleaned_response = extract_json_payload(response.content) | |
# Стандартизируем ответ для конкретного сервера | |
standardized_response = standardize_mcp_response(cleaned_response, server_name) | |
logger.debug(f"Стандартизированный ответ для {server_name}: {standardized_response}") | |
return standardized_response | |
except Exception as e: | |
logger.error(f"Ошибка выполнения {server_name}: {str(e)}") | |
# Возвращаем ошибку в формате JSON для консистентности | |
error_response = { | |
"success": False, | |
"error": f"Error running {server_name}: {str(e)}", | |
"results": {} | |
} | |
return json.dumps(error_response, ensure_ascii=False) | |
async def run_fix_agent(message): | |
"""Запускает агента для исправления кода""" | |
if not api_key: | |
return "Error: Nebius API key not found in .env file" | |
agent = Agent( | |
tools=[], | |
instructions=dedent("""\ | |
You are an intelligent code refactoring assistant. | |
Based on the vulnerabilities detected, propose a corrected version of the code. | |
Return only the full updated source code, without any additional commentary or markup. | |
"""), | |
markdown=False, | |
show_tool_calls=False, | |
model=Nebius( | |
id="Qwen/Qwen3-30B-A3B-fast", | |
api_key=api_key | |
) | |
) | |
try: | |
response = await agent.arun(message) | |
return response.content | |
except Exception as e: | |
return f"Error proposing fixes: {e}" | |
async def process_file(file_obj, custom_checks, selected_servers): | |
"""Обрабатывает файл с помощью выбранных MCP серверов""" | |
if not file_obj: | |
return "", "", "" | |
try: | |
# Сохраняем загруженный файл | |
temp_dir = tempfile.gettempdir() | |
file_path = os.path.join(temp_dir, file_obj.name) | |
# Получаем содержимое файла из объекта Gradio | |
with open(file_obj.name, 'r', encoding='utf-8') as f: | |
file_content = f.read() | |
with open(file_path, "w", encoding='utf-8') as f: | |
f.write(file_content) | |
# Подготавливаем сообщение для всех серверов | |
if custom_checks: | |
user_message = ( | |
f"Please analyze this code for {custom_checks}, " | |
f"using the most comprehensive settings available:\n\n{file_content}" | |
) | |
else: | |
user_message = ( | |
f"Please perform a full vulnerability and security analysis on this code, " | |
f"selecting the highest intensity settings:\n\n{file_content}" | |
) | |
# Запускаем все анализаторы параллельно | |
tasks = { | |
server: asyncio.create_task(run_mcp_agent(user_message, server)) | |
for server in selected_servers | |
} | |
# Собираем результаты | |
raw_outputs = { | |
server: re.sub(r"<think>.*?</think>", "", await task, flags=re.DOTALL).strip() | |
for server, task in tasks.items() | |
} | |
# Преобразуем raw_outputs в читаемый текст для Markdown | |
formatted_results = [] | |
for name, raw in raw_outputs.items(): | |
logger.debug(f"Обработка результата для {name}, длина: {len(raw)}") | |
logger.debug(f"Полное содержимое для {name}: {raw}") | |
try: | |
# Пытаемся распарсить JSON | |
parsed_data = json.loads(raw) | |
logger.debug(f"JSON успешно распарсен для {name}") | |
# Извлекаем результаты если они есть | |
if isinstance(parsed_data, dict) and 'results' in parsed_data: | |
display_data = parsed_data['results'] | |
logger.debug(f"Извлечены results для {name}") | |
else: | |
display_data = parsed_data | |
logger.debug(f"Используются сырые данные для {name}") | |
# Форматируем для отображения | |
formatted_json = json.dumps(display_data, indent=2, ensure_ascii=False) | |
formatted_results.append(f"### {name.upper()}:\n```json\n{formatted_json}\n```") | |
except json.JSONDecodeError as e: | |
# Если JSON некорректный, показываем как есть | |
logger.warning(f"Не удалось распарсить JSON для {name}: {str(e)}") | |
logger.debug(f"Позиция ошибки: {getattr(e, 'pos', 'неизвестно')}") | |
logger.debug(f"Длина строки: {len(raw)}") | |
# Пытаемся найти проблемную область | |
if hasattr(e, 'pos') and e.pos: | |
start_pos = max(0, e.pos - 50) | |
end_pos = min(len(raw), e.pos + 50) | |
problem_area = raw[start_pos:end_pos] | |
logger.debug(f"Проблемная область вокруг позиции {e.pos}: {repr(problem_area)}") | |
# Проверяем на наличие скрытых символов | |
has_non_printable = any(ord(c) < 32 and c not in '\n\r\t' for c in raw) | |
if has_non_printable: | |
logger.warning(f"Обнаружены непечатаемые символы в ответе для {name}") | |
formatted_results.append(f"### {name.upper()} (Raw output):\n```\n{raw}\n```") | |
except Exception as e: | |
# Любые другие ошибки | |
logger.error(f"Ошибка обработки результата для {name}: {str(e)}") | |
formatted_results.append(f"### {name.upper()} (Error):\n```\nОшибка обработки: {str(e)}\n```") | |
markdown_output = "\n\n".join(formatted_results) | |
# Читаем оригинальный код | |
with open(file_path, 'r', encoding='utf-8') as f_in: | |
orig_code = f_in.read() | |
# Подготавливаем промпт для исправлений | |
orig_name = os.path.basename(file_path) | |
fix_prompt = f"""Below is the full source code of '{orig_name}': | |
```python | |
{orig_code} | |
``` | |
Please generate a corrected version of this code, addressing all security vulnerabilities. Return only the full updated source code.""" | |
# Получаем исправленный код | |
fixed_code = await run_fix_agent(fix_prompt) | |
# Очищаем код от блоков <think> | |
cleaned_code = re.sub(r"<think>.*?</think>", "", fixed_code, flags=re.DOTALL).strip() | |
# Генерируем diff | |
diff_text = generate_simple_diff(orig_code, cleaned_code, orig_name) | |
return markdown_output, diff_text, cleaned_code | |
except Exception as e: | |
logger.error(f"Ошибка при обработке файла: {str(e)}") | |
return f"❌ Произошла ошибка: {str(e)}", "", "" | |
async def check_all_servers(): | |
"""Проверяет доступность всех MCP серверов""" | |
unavailable_servers = [] | |
for server_name, config in MCP_SERVERS.items(): | |
if not check_port(config["port"]): | |
unavailable_servers.append(f"{server_name} (порт {config['port']})") | |
return unavailable_servers | |
def process_file_sync(file_obj, custom_checks, selected_servers): | |
"""Синхронная обертка для process_file""" | |
return asyncio.run(process_file(file_obj, custom_checks, selected_servers)) | |
# Создаем интерфейс Gradio | |
with gr.Blocks(title="VulnBuster - AI Security Agent", theme=gr.themes.Soft()) as demo: | |
gr.Markdown(""" | |
# 🛡️ VulnBuster - AI Security Agent | |
**Intelligent automated code security auditing through orchestrated MCP services** | |
VulnBuster demonstrates an **agentic approach** to vulnerability scanning by combining multiple security tools in a single, intelligent interface. The AI agent automatically analyzes code using various scanners, correlates findings, and provides AI-powered remediation suggestions. | |
## 🎯 Key Features | |
- **🤖 AI Agent Orchestration**: Intelligent coordination of multiple MCP security scanners | |
- **⚡ Real-time Analysis**: Upload code → Multi-tool scanning → AI-powered fixes | |
- **🧠 Context-Aware**: Agent understands scan results and provides meaningful insights | |
- **🔄 Automated Workflow**: From vulnerability detection to code remediation | |
## 🛠️ Integrated Security Tools | |
| Tool | Purpose | Detects | | |
|------|---------|---------| | |
| **🔒 Bandit** | Python security analysis | Hardcoded passwords, SQL injection, shell injection | | |
| **🔍 Detect Secrets** | Secret detection | API keys, tokens, credentials with entropy analysis | | |
| **🛡️ Semgrep** | Multi-language analysis | Advanced patterns, custom rules, 20+ languages | | |
| **📦 Pip Audit** | Dependency scanning | CVE vulnerabilities, supply chain security | | |
| **📋 Circle Test** | Policy compliance (White Circle API) | 12 security policies, code quality standards | | |
## 📊 Agent Demo for Track 3 | |
This Space showcases **intelligent agent capabilities**: | |
- Automatic tool selection based on code type | |
- Cross-tool correlation of security findings | |
- AI-powered vulnerability prioritization | |
- Automated fix generation with context understanding | |
## 📖 Full Documentation | |
**For detailed information about each security tool, examples, and technical architecture, please read the [README.md](https://huggingface.co/spaces/Agents-MCP-Hackathon/VulnBuster/blob/main/README.md) file.** | |
--- | |
""") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
file_input = gr.File( | |
label="Upload a code file", | |
file_types=[".py", ".js", ".java", ".go", ".rb"] | |
) | |
custom_checks = gr.Textbox( | |
label="Enter specific checks or tools to use (optional)", | |
placeholder="e.g., SQL injection, shell injection, detect secrets" | |
) | |
server_checkboxes = gr.CheckboxGroup( | |
choices=list(MCP_SERVERS.keys()), | |
value=list(MCP_SERVERS.keys()), | |
label="Select MCP Servers" | |
) | |
scan_button = gr.Button("Run Scan", variant="primary") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
analysis_output = gr.Markdown(label="Security Analysis Results") | |
diff_output = gr.Textbox(label="Proposed Code Fixes", lines=10) | |
fixed_code_output = gr.Code(label="Fixed Code", language="python") | |
download_button = gr.File(label="Download corrected file") | |
def update_download_button(fixed_code): | |
if fixed_code: | |
temp_dir = tempfile.gettempdir() | |
fixed_path = os.path.join(temp_dir, "fixed_code.py") | |
with open(fixed_path, "w") as f: | |
f.write(fixed_code) | |
return fixed_path | |
return None | |
scan_button.click( | |
fn=process_file_sync, | |
inputs=[file_input, custom_checks, server_checkboxes], | |
outputs=[analysis_output, diff_output, fixed_code_output] | |
).then( | |
fn=update_download_button, | |
inputs=[fixed_code_output], | |
outputs=[download_button] | |
) | |
if __name__ == "__main__": | |
try: | |
# Инициализируем все MCP инструменты при старте | |
asyncio.run(init_all_tools()) | |
logger.info("Запуск Security Tools MCP Agent...") | |
# Получаем настройки сервера из переменных окружения | |
server_name = os.getenv("GRADIO_SERVER_NAME", "0.0.0.0") | |
server_port = int(os.getenv("GRADIO_SERVER_PORT", "7860")) | |
demo.launch( | |
server_name=server_name, | |
server_port=server_port, | |
share=False | |
) | |
except Exception as e: | |
logger.error(f"Ошибка запуска приложения: {str(e)}") | |
sys.exit(1) |