|
from fastapi import FastAPI |
|
from pydantic import BaseModel |
|
import requests |
|
import json |
|
import tempfile |
|
import os |
|
import subprocess |
|
|
|
app = FastAPI() |
|
|
|
class SmartContractAnalysisRequest(BaseModel): |
|
api_key: str |
|
pre_traineddata_text: str |
|
prompt: str |
|
|
|
@app.post("/analyze_smart_contract") |
|
def analyze_smart_contract(req: SmartContractAnalysisRequest): |
|
""" |
|
Endpoint that: |
|
1) Receives {api_key, pre_traineddata_text, prompt}. |
|
2) Runs local static analysis with Slither and Semgrep on the provided contract. |
|
3) Calls OpenAI's ChatCompletion with function-calling constraints. |
|
4) Returns a structured security report in JSON containing a list of vulnerabilities. |
|
""" |
|
|
|
|
|
|
|
|
|
contract_code = req.prompt |
|
temp_dir = tempfile.mkdtemp() |
|
contract_path = os.path.join(temp_dir, "contract.sol") |
|
with open(contract_path, "w") as f: |
|
f.write(contract_code) |
|
|
|
|
|
|
|
|
|
slither_data = {} |
|
try: |
|
slither_json_path = os.path.join(temp_dir, "slither-output.json") |
|
slither_cmd = ["slither", contract_path, "--json", slither_json_path] |
|
subprocess.run(slither_cmd, check=True, capture_output=True, text=True) |
|
if os.path.exists(slither_json_path): |
|
with open(slither_json_path, "r") as f: |
|
slither_data = json.load(f) |
|
except subprocess.CalledProcessError as e: |
|
error_msg = e.stderr.strip() if e.stderr else "Unknown error" |
|
print("Slither analysis failed:", error_msg) |
|
slither_data = {"error": error_msg} |
|
except Exception as e: |
|
error_msg = str(e) |
|
print("Unexpected error during Slither analysis:", error_msg) |
|
slither_data = {"error": error_msg} |
|
|
|
|
|
|
|
|
|
semgrep_data = {} |
|
try: |
|
semgrep_cmd = ["semgrep", "--config", "solidity_rules.yml", "--metrics=off", contract_path, "--json"] |
|
semgrep_proc = subprocess.run(semgrep_cmd, check=True, capture_output=True, text=True) |
|
semgrep_data = json.loads(semgrep_proc.stdout) |
|
except subprocess.CalledProcessError as e: |
|
error_msg = e.stderr.strip() if e.stderr else "Unknown error" |
|
print("Semgrep analysis failed:", error_msg) |
|
semgrep_data = {"error": error_msg} |
|
except Exception as e: |
|
error_msg = str(e) |
|
print("Unexpected error during Semgrep analysis:", error_msg) |
|
semgrep_data = {"error": error_msg} |
|
|
|
|
|
|
|
|
|
functions = [ |
|
{ |
|
"name": "analyze_smart_contract", |
|
"description": ( |
|
"Analyzes a given smart contract text and returns a detailed security report. " |
|
"The report includes a list of vulnerabilities, along with additional metrics: " |
|
"complexity_score, number_of_functions, number_of_lines, security_score, " |
|
"and a brief analysis_text summarizing the key findings from Slither and Semgrep. " |
|
"You must return only valid JSON." |
|
), |
|
"parameters": { |
|
"type": "object", |
|
"properties": { |
|
"vulnerabilities": { |
|
"type": "array", |
|
"description": "List of vulnerabilities found in the smart contract", |
|
"items": { |
|
"type": "object", |
|
"properties": { |
|
"title": { |
|
"type": "string", |
|
"description": "The name or short title of the vulnerability" |
|
}, |
|
"severity": { |
|
"type": "string", |
|
"description": "The severity level (high, medium, or low)" |
|
}, |
|
"description": { |
|
"type": "string", |
|
"description": "A detailed explanation of the vulnerability, including context and recommended practices to mitigate the risk." |
|
}, |
|
"codeSnippet": { |
|
"type": "string", |
|
"description": "A code snippet that shows the original vulnerable code and a corrected version with inline comments explaining the changes." |
|
} |
|
}, |
|
"required": ["title", "severity", "description", "codeSnippet"] |
|
} |
|
}, |
|
"complexity_score": { |
|
"type": "number", |
|
"description": "A numerical score out of 10 representing the complexity of the contract." |
|
}, |
|
"number_of_functions": { |
|
"type": "number", |
|
"description": "The total number of functions in the contract." |
|
}, |
|
"number_of_lines": { |
|
"type": "number", |
|
"description": "The total number of lines in the contract." |
|
}, |
|
"security_score": { |
|
"type": "number", |
|
"description": "A numerical score out of 100 representing the overall security posture of the contract." |
|
}, |
|
"analysis_text": { |
|
"type": "string", |
|
"description": "A brief 1-5 sentence summary analyzing the static analysis outputs from Slither and Semgrep." |
|
} |
|
}, |
|
"required": [ |
|
"vulnerabilities", |
|
"complexity_score", |
|
"number_of_functions", |
|
"number_of_lines", |
|
"security_score", |
|
"analysis_text" |
|
] |
|
} |
|
} |
|
] |
|
|
|
system_message = ( |
|
"You are a security analysis assistant specialized in analyzing smart contracts. " |
|
"You must produce ONLY valid JSON containing the following keys: 'vulnerabilities', " |
|
"'complexity_score', 'number_of_functions', 'number_of_lines', 'security_score', and 'analysis_text'. " |
|
"Each vulnerability must include: 'title', 'severity', 'description', and 'codeSnippet'.\n\n" |
|
"Below is a MOCKED JSON EXAMPLE (for reference only) that includes detailed context, a longer code snippet, " |
|
"and recommended fixes with inline comments:\n\n" |
|
"{\n" |
|
" \"vulnerabilities\": [\n" |
|
" {\n" |
|
" \"title\": \"Reentrancy Attack\",\n" |
|
" \"severity\": \"high\",\n" |
|
" \"description\": \"A reentrancy vulnerability occurs when a contract makes an external call before updating its state. " |
|
"In the example below, the 'withdraw' function calls an external contract before reducing the user's balance. " |
|
"This can allow an attacker to recursively call 'withdraw' and drain funds. The recommended fix is to update the state first or use a reentrancy guard.\",\n" |
|
" \"codeSnippet\": \"// Vulnerable code:\\nfunction withdraw(uint256 amount) public {\\n require(balances[msg.sender] >= amount, 'Insufficient balance');\\n (bool success, ) = msg.sender.call{value: amount}(\"\"); // External call before state update\\n require(success, 'Transfer failed');\\n balances[msg.sender] -= amount;\\n}\\n\\n// Recommended fix using checks-effects-interactions pattern:\\nfunction withdraw(uint256 amount) public {\\n require(balances[msg.sender] >= amount, 'Insufficient balance');\\n balances[msg.sender] -= amount; // Update state first\\n (bool success, ) = msg.sender.call{value: amount}(\"\"); // External call after state update\\n require(success, 'Transfer failed');\\n // Alternatively, consider using a ReentrancyGuard for added protection\\n}\"\n" |
|
" }\n" |
|
" ],\n" |
|
" \"complexity_score\": 7.5,\n" |
|
" \"number_of_functions\": 5,\n" |
|
" \"number_of_lines\": 120,\n" |
|
" \"security_score\": 85.0,\n" |
|
" \"analysis_text\": \"Slither identified potential issues with function state handling while Semgrep flagged the use of 'selfdestruct'. The overall contract complexity is moderate with some areas for security improvement.\"\n" |
|
"}\n\n" |
|
"Return only a JSON object with these keys. No extra keys or commentary." |
|
) |
|
|
|
user_message = ( |
|
f"Pre-trained data: {req.pre_traineddata_text}\n\n" |
|
f"Smart contract to analyze:\n{req.prompt}\n\n" |
|
"Static analysis partial findings (Slither):\n" |
|
f"{slither_data}\n\n" |
|
"Static analysis partial findings (Semgrep):\n" |
|
f"{semgrep_data}\n\n" |
|
"Based on the above static analysis results, provide a brief 1-5 sentence analysis summarizing the key findings from both Slither and Semgrep. " |
|
"Then, combine these findings with any additional vulnerabilities you discover, and return a JSON object containing the keys " |
|
"'vulnerabilities', 'complexity_score', 'number_of_functions', 'number_of_lines', 'security_score', and 'analysis_text'." |
|
) |
|
|
|
messages = [ |
|
{"role": "system", "content": system_message}, |
|
{"role": "user", "content": user_message} |
|
] |
|
|
|
url = "https://api.openai.com/v1/chat/completions" |
|
headers = { |
|
"Authorization": f"Bearer {req.api_key}", |
|
"Content-Type": "application/json" |
|
} |
|
payload = { |
|
"model": "gpt-4o", |
|
"messages": messages, |
|
"functions": functions, |
|
"function_call": {"name": "analyze_smart_contract"}, |
|
"max_tokens": 3000, |
|
"temperature": 0.6, |
|
"top_p": 0.95 |
|
} |
|
|
|
response = requests.post(url, headers=headers, json=payload) |
|
response.raise_for_status() |
|
data = response.json() |
|
|
|
arguments_str = data["choices"][0]["message"]["function_call"]["arguments"] |
|
result_dict = json.loads(arguments_str) |
|
|
|
try: |
|
os.remove(contract_path) |
|
os.rmdir(temp_dir) |
|
except Exception as cleanup_error: |
|
print("Cleanup error:", cleanup_error) |
|
|
|
|
|
print(result_dict) |
|
|
|
return result_dict |
|
|
|
|
|
@app.get("/") |
|
def greet_json(): |
|
return {"Hello": "World!"} |
|
|