File size: 11,357 Bytes
bf7feec 82b211f 86e093e 82b211f fc5db35 bf7feec 82b211f e80df0e fc5db35 82b211f fc5db35 d394950 fc5db35 0e28738 d394950 0e28738 fc5db35 e80df0e 19ae8e3 e80df0e fc5db35 e80df0e 82b211f f79e5bf 25ebfd2 82b211f 25ebfd2 badd684 25ebfd2 badd684 25ebfd2 f79e5bf 82b211f f79e5bf 82b211f fc5db35 f79e5bf fc5db35 badd684 fc5db35 badd684 f79e5bf fc5db35 f79e5bf fc5db35 f79e5bf fc5db35 e80df0e 4ef96de e80df0e f79e5bf fc5db35 82b211f fc5db35 f79e5bf 82b211f 86e093e e80df0e 86e093e fc5db35 86e093e 82b211f 86e093e 82b211f fc5db35 e80df0e fc5db35 f79e5bf fc5db35 82b211f f79e5bf bf7feec 86e093e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 |
from fastapi import FastAPI
from pydantic import BaseModel
import requests
import json
import tempfile
import os
import subprocess
app = FastAPI()
class SmartContractAnalysisRequest(BaseModel):
api_key: str
pre_traineddata_text: str
prompt: str
@app.post("/analyze_smart_contract")
def analyze_smart_contract(req: SmartContractAnalysisRequest):
"""
Endpoint that:
1) Receives {api_key, pre_traineddata_text, prompt}.
2) Runs local static analysis with Slither and Semgrep on the provided contract.
3) Calls OpenAI's ChatCompletion with function-calling constraints.
4) Returns a structured security report in JSON containing a list of vulnerabilities.
"""
############################################################################
# 1) Write the contract code to a temporary file for static analysis.
############################################################################
contract_code = req.prompt
temp_dir = tempfile.mkdtemp()
contract_path = os.path.join(temp_dir, "contract.sol")
with open(contract_path, "w") as f:
f.write(contract_code)
############################################################################
# 2) Run Slither analysis (if installed)
############################################################################
slither_data = {}
try:
slither_json_path = os.path.join(temp_dir, "slither-output.json")
slither_cmd = ["slither", contract_path, "--json", slither_json_path]
subprocess.run(slither_cmd, check=True, capture_output=True, text=True)
if os.path.exists(slither_json_path):
with open(slither_json_path, "r") as f:
slither_data = json.load(f)
except subprocess.CalledProcessError as e:
error_msg = e.stderr.strip() if e.stderr else "Unknown error"
print("Slither analysis failed:", error_msg)
slither_data = {"error": error_msg}
except Exception as e:
error_msg = str(e)
print("Unexpected error during Slither analysis:", error_msg)
slither_data = {"error": error_msg}
############################################################################
# 3) Run Semgrep analysis (if installed)
############################################################################
semgrep_data = {}
try:
semgrep_cmd = ["semgrep", "--config", "solidity_rules.yml", "--metrics=off", contract_path, "--json"]
semgrep_proc = subprocess.run(semgrep_cmd, check=True, capture_output=True, text=True)
semgrep_data = json.loads(semgrep_proc.stdout)
except subprocess.CalledProcessError as e:
error_msg = e.stderr.strip() if e.stderr else "Unknown error"
print("Semgrep analysis failed:", error_msg)
semgrep_data = {"error": error_msg}
except Exception as e:
error_msg = str(e)
print("Unexpected error during Semgrep analysis:", error_msg)
semgrep_data = {"error": error_msg}
############################################################################
# 4) Prepare messages and function schema for OpenAI ChatCompletion call.
############################################################################
functions = [
{
"name": "analyze_smart_contract",
"description": (
"Analyzes a given smart contract text and returns a detailed security report. "
"The report includes a list of vulnerabilities, along with additional metrics: "
"complexity_score, number_of_functions, number_of_lines, security_score, "
"and a brief analysis_text summarizing the key findings from Slither and Semgrep. "
"You must return only valid JSON."
),
"parameters": {
"type": "object",
"properties": {
"vulnerabilities": {
"type": "array",
"description": "List of vulnerabilities found in the smart contract",
"items": {
"type": "object",
"properties": {
"title": {
"type": "string",
"description": "The name or short title of the vulnerability"
},
"severity": {
"type": "string",
"description": "The severity level (high, medium, or low)"
},
"description": {
"type": "string",
"description": "A detailed explanation of the vulnerability, including context and recommended practices to mitigate the risk."
},
"codeSnippet": {
"type": "string",
"description": "A code snippet that shows the original vulnerable code and a corrected version with inline comments explaining the changes."
}
},
"required": ["title", "severity", "description", "codeSnippet"]
}
},
"complexity_score": {
"type": "number",
"description": "A numerical score out of 10 representing the complexity of the contract."
},
"number_of_functions": {
"type": "number",
"description": "The total number of functions in the contract."
},
"number_of_lines": {
"type": "number",
"description": "The total number of lines in the contract."
},
"security_score": {
"type": "number",
"description": "A numerical score out of 100 representing the overall security posture of the contract."
},
"analysis_text": {
"type": "string",
"description": "A brief 1-5 sentence summary analyzing the static analysis outputs from Slither and Semgrep."
}
},
"required": [
"vulnerabilities",
"complexity_score",
"number_of_functions",
"number_of_lines",
"security_score",
"analysis_text"
]
}
}
]
system_message = (
"You are a security analysis assistant specialized in analyzing smart contracts. "
"You must produce ONLY valid JSON containing the following keys: 'vulnerabilities', "
"'complexity_score', 'number_of_functions', 'number_of_lines', 'security_score', and 'analysis_text'. "
"Each vulnerability must include: 'title', 'severity', 'description', and 'codeSnippet'.\n\n"
"Below is a MOCKED JSON EXAMPLE (for reference only) that includes detailed context, a longer code snippet, "
"and recommended fixes with inline comments:\n\n"
"{\n"
" \"vulnerabilities\": [\n"
" {\n"
" \"title\": \"Reentrancy Attack\",\n"
" \"severity\": \"high\",\n"
" \"description\": \"A reentrancy vulnerability occurs when a contract makes an external call before updating its state. "
"In the example below, the 'withdraw' function calls an external contract before reducing the user's balance. "
"This can allow an attacker to recursively call 'withdraw' and drain funds. The recommended fix is to update the state first or use a reentrancy guard.\",\n"
" \"codeSnippet\": \"// Vulnerable code:\\nfunction withdraw(uint256 amount) public {\\n require(balances[msg.sender] >= amount, 'Insufficient balance');\\n (bool success, ) = msg.sender.call{value: amount}(\"\"); // External call before state update\\n require(success, 'Transfer failed');\\n balances[msg.sender] -= amount;\\n}\\n\\n// Recommended fix using checks-effects-interactions pattern:\\nfunction withdraw(uint256 amount) public {\\n require(balances[msg.sender] >= amount, 'Insufficient balance');\\n balances[msg.sender] -= amount; // Update state first\\n (bool success, ) = msg.sender.call{value: amount}(\"\"); // External call after state update\\n require(success, 'Transfer failed');\\n // Alternatively, consider using a ReentrancyGuard for added protection\\n}\"\n"
" }\n"
" ],\n"
" \"complexity_score\": 7.5,\n"
" \"number_of_functions\": 5,\n"
" \"number_of_lines\": 120,\n"
" \"security_score\": 85.0,\n"
" \"analysis_text\": \"Slither identified potential issues with function state handling while Semgrep flagged the use of 'selfdestruct'. The overall contract complexity is moderate with some areas for security improvement.\"\n"
"}\n\n"
"Return only a JSON object with these keys. No extra keys or commentary."
)
user_message = (
f"Pre-trained data: {req.pre_traineddata_text}\n\n"
f"Smart contract to analyze:\n{req.prompt}\n\n"
"Static analysis partial findings (Slither):\n"
f"{slither_data}\n\n"
"Static analysis partial findings (Semgrep):\n"
f"{semgrep_data}\n\n"
"Based on the above static analysis results, provide a brief 1-5 sentence analysis summarizing the key findings from both Slither and Semgrep. "
"Then, combine these findings with any additional vulnerabilities you discover, and return a JSON object containing the keys "
"'vulnerabilities', 'complexity_score', 'number_of_functions', 'number_of_lines', 'security_score', and 'analysis_text'."
)
messages = [
{"role": "system", "content": system_message},
{"role": "user", "content": user_message}
]
url = "https://api.openai.com/v1/chat/completions"
headers = {
"Authorization": f"Bearer {req.api_key}",
"Content-Type": "application/json"
}
payload = {
"model": "gpt-4o",
"messages": messages,
"functions": functions,
"function_call": {"name": "analyze_smart_contract"},
"max_tokens": 3000,
"temperature": 0.6,
"top_p": 0.95
}
response = requests.post(url, headers=headers, json=payload)
response.raise_for_status()
data = response.json()
arguments_str = data["choices"][0]["message"]["function_call"]["arguments"]
result_dict = json.loads(arguments_str)
try:
os.remove(contract_path)
os.rmdir(temp_dir)
except Exception as cleanup_error:
print("Cleanup error:", cleanup_error)
# Do not manually add slither_output or semgrep_output; the 'analysis_text' field should include a brief summary of their outputs.
print(result_dict)
return result_dict
@app.get("/")
def greet_json():
return {"Hello": "World!"}
|