Daniyal Mohammed
adding more fields to our dashboard
f79e5bf
from fastapi import FastAPI
from pydantic import BaseModel
import requests
import json
import tempfile
import os
import subprocess
app = FastAPI()
class SmartContractAnalysisRequest(BaseModel):
api_key: str
pre_traineddata_text: str
prompt: str
@app.post("/analyze_smart_contract")
def analyze_smart_contract(req: SmartContractAnalysisRequest):
"""
Endpoint that:
1) Receives {api_key, pre_traineddata_text, prompt}.
2) Runs local static analysis with Slither and Semgrep on the provided contract.
3) Calls OpenAI's ChatCompletion with function-calling constraints.
4) Returns a structured security report in JSON containing a list of vulnerabilities.
"""
############################################################################
# 1) Write the contract code to a temporary file for static analysis.
############################################################################
contract_code = req.prompt
temp_dir = tempfile.mkdtemp()
contract_path = os.path.join(temp_dir, "contract.sol")
with open(contract_path, "w") as f:
f.write(contract_code)
############################################################################
# 2) Run Slither analysis (if installed)
############################################################################
slither_data = {}
try:
slither_json_path = os.path.join(temp_dir, "slither-output.json")
slither_cmd = ["slither", contract_path, "--json", slither_json_path]
subprocess.run(slither_cmd, check=True, capture_output=True, text=True)
if os.path.exists(slither_json_path):
with open(slither_json_path, "r") as f:
slither_data = json.load(f)
except subprocess.CalledProcessError as e:
error_msg = e.stderr.strip() if e.stderr else "Unknown error"
print("Slither analysis failed:", error_msg)
slither_data = {"error": error_msg}
except Exception as e:
error_msg = str(e)
print("Unexpected error during Slither analysis:", error_msg)
slither_data = {"error": error_msg}
############################################################################
# 3) Run Semgrep analysis (if installed)
############################################################################
semgrep_data = {}
try:
semgrep_cmd = ["semgrep", "--config", "solidity_rules.yml", "--metrics=off", contract_path, "--json"]
semgrep_proc = subprocess.run(semgrep_cmd, check=True, capture_output=True, text=True)
semgrep_data = json.loads(semgrep_proc.stdout)
except subprocess.CalledProcessError as e:
error_msg = e.stderr.strip() if e.stderr else "Unknown error"
print("Semgrep analysis failed:", error_msg)
semgrep_data = {"error": error_msg}
except Exception as e:
error_msg = str(e)
print("Unexpected error during Semgrep analysis:", error_msg)
semgrep_data = {"error": error_msg}
############################################################################
# 4) Prepare messages and function schema for OpenAI ChatCompletion call.
############################################################################
functions = [
{
"name": "analyze_smart_contract",
"description": (
"Analyzes a given smart contract text and returns a detailed security report. "
"The report includes a list of vulnerabilities, along with additional metrics: "
"complexity_score, number_of_functions, number_of_lines, security_score, "
"and a brief analysis_text summarizing the key findings from Slither and Semgrep. "
"You must return only valid JSON."
),
"parameters": {
"type": "object",
"properties": {
"vulnerabilities": {
"type": "array",
"description": "List of vulnerabilities found in the smart contract",
"items": {
"type": "object",
"properties": {
"title": {
"type": "string",
"description": "The name or short title of the vulnerability"
},
"severity": {
"type": "string",
"description": "The severity level (high, medium, or low)"
},
"description": {
"type": "string",
"description": "A detailed explanation of the vulnerability, including context and recommended practices to mitigate the risk."
},
"codeSnippet": {
"type": "string",
"description": "A code snippet that shows the original vulnerable code and a corrected version with inline comments explaining the changes."
}
},
"required": ["title", "severity", "description", "codeSnippet"]
}
},
"complexity_score": {
"type": "number",
"description": "A numerical score out of 10 representing the complexity of the contract."
},
"number_of_functions": {
"type": "number",
"description": "The total number of functions in the contract."
},
"number_of_lines": {
"type": "number",
"description": "The total number of lines in the contract."
},
"security_score": {
"type": "number",
"description": "A numerical score out of 100 representing the overall security posture of the contract."
},
"analysis_text": {
"type": "string",
"description": "A brief 1-5 sentence summary analyzing the static analysis outputs from Slither and Semgrep."
}
},
"required": [
"vulnerabilities",
"complexity_score",
"number_of_functions",
"number_of_lines",
"security_score",
"analysis_text"
]
}
}
]
system_message = (
"You are a security analysis assistant specialized in analyzing smart contracts. "
"You must produce ONLY valid JSON containing the following keys: 'vulnerabilities', "
"'complexity_score', 'number_of_functions', 'number_of_lines', 'security_score', and 'analysis_text'. "
"Each vulnerability must include: 'title', 'severity', 'description', and 'codeSnippet'.\n\n"
"Below is a MOCKED JSON EXAMPLE (for reference only) that includes detailed context, a longer code snippet, "
"and recommended fixes with inline comments:\n\n"
"{\n"
" \"vulnerabilities\": [\n"
" {\n"
" \"title\": \"Reentrancy Attack\",\n"
" \"severity\": \"high\",\n"
" \"description\": \"A reentrancy vulnerability occurs when a contract makes an external call before updating its state. "
"In the example below, the 'withdraw' function calls an external contract before reducing the user's balance. "
"This can allow an attacker to recursively call 'withdraw' and drain funds. The recommended fix is to update the state first or use a reentrancy guard.\",\n"
" \"codeSnippet\": \"// Vulnerable code:\\nfunction withdraw(uint256 amount) public {\\n require(balances[msg.sender] >= amount, 'Insufficient balance');\\n (bool success, ) = msg.sender.call{value: amount}(\"\"); // External call before state update\\n require(success, 'Transfer failed');\\n balances[msg.sender] -= amount;\\n}\\n\\n// Recommended fix using checks-effects-interactions pattern:\\nfunction withdraw(uint256 amount) public {\\n require(balances[msg.sender] >= amount, 'Insufficient balance');\\n balances[msg.sender] -= amount; // Update state first\\n (bool success, ) = msg.sender.call{value: amount}(\"\"); // External call after state update\\n require(success, 'Transfer failed');\\n // Alternatively, consider using a ReentrancyGuard for added protection\\n}\"\n"
" }\n"
" ],\n"
" \"complexity_score\": 7.5,\n"
" \"number_of_functions\": 5,\n"
" \"number_of_lines\": 120,\n"
" \"security_score\": 85.0,\n"
" \"analysis_text\": \"Slither identified potential issues with function state handling while Semgrep flagged the use of 'selfdestruct'. The overall contract complexity is moderate with some areas for security improvement.\"\n"
"}\n\n"
"Return only a JSON object with these keys. No extra keys or commentary."
)
user_message = (
f"Pre-trained data: {req.pre_traineddata_text}\n\n"
f"Smart contract to analyze:\n{req.prompt}\n\n"
"Static analysis partial findings (Slither):\n"
f"{slither_data}\n\n"
"Static analysis partial findings (Semgrep):\n"
f"{semgrep_data}\n\n"
"Based on the above static analysis results, provide a brief 1-5 sentence analysis summarizing the key findings from both Slither and Semgrep. "
"Then, combine these findings with any additional vulnerabilities you discover, and return a JSON object containing the keys "
"'vulnerabilities', 'complexity_score', 'number_of_functions', 'number_of_lines', 'security_score', and 'analysis_text'."
)
messages = [
{"role": "system", "content": system_message},
{"role": "user", "content": user_message}
]
url = "https://api.openai.com/v1/chat/completions"
headers = {
"Authorization": f"Bearer {req.api_key}",
"Content-Type": "application/json"
}
payload = {
"model": "gpt-4o",
"messages": messages,
"functions": functions,
"function_call": {"name": "analyze_smart_contract"},
"max_tokens": 3000,
"temperature": 0.6,
"top_p": 0.95
}
response = requests.post(url, headers=headers, json=payload)
response.raise_for_status()
data = response.json()
arguments_str = data["choices"][0]["message"]["function_call"]["arguments"]
result_dict = json.loads(arguments_str)
try:
os.remove(contract_path)
os.rmdir(temp_dir)
except Exception as cleanup_error:
print("Cleanup error:", cleanup_error)
# Do not manually add slither_output or semgrep_output; the 'analysis_text' field should include a brief summary of their outputs.
print(result_dict)
return result_dict
@app.get("/")
def greet_json():
return {"Hello": "World!"}