VulnBuster / circle_test_mcp.py
zjkarina's picture
Fix dependency conflicts between smolagents and semgrep
4f21d95
#!/usr/bin/env python3
"""
MCP server for Circle Test - a tool for checking code against security policies
"""
import gradio as gr
import aiohttp
import asyncio
import ssl
import os
from typing import Dict, List
from dotenv import load_dotenv
# Загружаем переменные окружения
load_dotenv()
# Получаем URL из переменных окружения
CIRCLE_API_URL = os.getenv('CIRCLE_API_URL', 'https://api.example.com/protect/check_violation')
async def check_violation(prompt: str, policies: Dict[str, str]) -> Dict:
"""
Проверяет код на соответствие политикам безопасности.
Args:
prompt (str): Код для проверки
policies (Dict[str, str]): Словарь политик безопасности
Returns:
Dict: Результаты проверки
"""
try:
payload = {
"dialog": [
{
"role": "assistant",
"content": prompt
}
],
"policies": policies
}
# Создаем SSL-контекст
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
async with aiohttp.ClientSession() as session:
async with session.post(
CIRCLE_API_URL,
json=payload,
timeout=30,
ssl=ssl_context
) as response:
result = await response.json()
# Преобразуем результат в более читаемый формат
if 'policies' in result:
formatted_result = {}
for policy_num, value in result['policies'].items():
policy_text = policies.get(policy_num, "Unknown policy")
formatted_result[policy_num] = {
"policy": policy_text,
"violation": "yes" if value == 1 else "no"
}
return {
"success": True,
"results": formatted_result
}
return {
"success": False,
"error": "Invalid response format"
}
except Exception as e:
return {
"success": False,
"error": f"Error checking violations: {str(e)}"
}
# Создаем Gradio интерфейс
with gr.Blocks(title="Circle Test MCP") as demo:
gr.Markdown("# 🔍 Circle Test Scanner")
gr.Markdown("Security policy compliance checker with MCP support")
with gr.Tab("Policy Check"):
with gr.Row():
with gr.Column():
code_input = gr.Textbox(
lines=10,
placeholder="Enter code to check...",
label="Code"
)
check_btn = gr.Button("🔍 Check Policies", variant="primary")
with gr.Column():
check_output = gr.JSON(label="Check Results")
check_btn.click(
fn=check_violation,
inputs=[
code_input,
gr.State({
"1": "Presence of SPDX-License-Identifier with an ID not in the approved list, or missing SPDX tag in top-level LICENSE file.",
"2": "Presence of plaintext credentials (passwords, tokens, keys) in configuration files (YAML, JSON, .env, etc.).",
"3": "Presence of TODO or FIXME tags in comments inside non-test production code files.",
"4": "Presence of any string literal starting with http:// not wrapped in a validated secure-client.",
"5": "Presence of logging statements that output sensitive data (user PII, private keys, passwords, tokens) without masking or hashing.",
"6": "Presence of calls to deprecated or outdated APIs (functions or methods marked as deprecated).",
"7": "Presence of subprocess or os.system calls where user input is concatenated directly without proper sanitization or escaping.",
"8": "Presence of file read/write operations using paths derived directly from user input without normalization or path-traversal checks.",
"9": "Presence of SQL queries built using string concatenation with user input instead of parameterized queries or ORM methods.",
"10": "Presence of string literals matching absolute filesystem paths (e.g., \"/home/...\" or \"C:\\\\...\") rather than relative paths or environment variables.",
"11": "Presence of hostnames or URLs containing \"prod\", \"production\", or \"release\" that reference production databases or services in non-test code.",
"12": "Presence of dependencies in lock files (Pipfile.lock or requirements.txt) without exact version pins (using version ranges like \">=\" or \"~=\" without a fixed version).",
"13": "Presence of hashlib.md5(...) or any MD5-based hashing, since MD5 is cryptographically broken (use SHA-256 or better).",
"14": "Presence of pdb.set_trace() or other pdb imports, as debug statements should not remain in production code.",
"15": "Presence of logging.debug($SENSITIVE) or similar logging calls that output sensitive information without redaction.",
"16": "Presence of re.compile($USER_INPUT) where $USER_INPUT is unsanitized, since this can lead to ReDoS attacks.",
"17": "Presence of xml.etree.ElementTree.parse($USER_INPUT) without secure parsing, leading to XXE vulnerabilities.",
"18": "Presence of zipfile.ZipFile($USER_INPUT) or similar extraction calls on untrusted zips, which can cause path traversal.",
"19": "Presence of tarfile.open($USER_INPUT) on untrusted tar files, leading to path traversal vulnerabilities.",
"20": "Presence of os.chmod($PATH, 0o777) or equivalent setting overly permissive permissions, which is insecure.",
"21": "Presence of os.environ[$KEY] = $VALUE modifying environment variables at runtime, which can introduce security risks."
})
],
outputs=check_output
)
with gr.Tab("Examples"):
gr.Markdown("""
## 🚨 Examples of code to check:
### 1. Insecure File Operations
```python
def read_file(filename):
with open(filename, "r") as f:
return f.read()
```
### 2. Hardcoded Credentials
```python
DB_PASSWORD = "secret123"
API_KEY = "sk_live_51H1h2K3L4M5N6O7P8Q9R0S1T2U3V4W5X6Y7Z8"
```
### 3. Insecure Subprocess
```python
import subprocess
subprocess.call(f"ls {user_input}", shell=True)
```
""")
if __name__ == "__main__":
# Получаем настройки сервера из переменных окружения
server_name = os.getenv("GRADIO_SERVER_NAME", "0.0.0.0")
server_port = int(os.getenv("GRADIO_SERVER_PORT", "7864"))
demo.launch(
mcp_server=True,
server_name=server_name,
server_port=server_port,
share=False
)