Spaces:
Running
Running
File size: 8,557 Bytes
3726922 1158cce 3726922 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 |
from flask import Flask, request, jsonify, Response, render_template_string
import subprocess
import sys
import threading
import time
import requests
from functools import wraps
import urllib.parse
import re
import json
import base64
import os
import glob
app = Flask(__name__)
# 从环境变量中读取配置,如果未设置则使用默认值
OUTPUT_BASE64 = os.getenv('OUTPUT_BASE64', 'True').lower() == 'true'
AUTH_TOKEN = os.getenv('AUTH_TOKEN', '')
# HOST = os.getenv('HOST', '127.0.0.1')
HOST = os.getenv('HOST', '0.0.0.0')
PORT = int(os.getenv('PORT', 7860))
TIMEOUT = int(os.getenv('TIMEOUT', 30))
REMOVE_PYTHON_CODE_BLOCK = os.getenv('REMOVE_PYTHON_CODE_BLOCK', 'True').lower() == 'true'
# 配置变量,决定是否启用单元测试,默认启用
ENABLE_UNIT_TESTS = False
def auth_required(f):
@wraps(f)
def decorated(*args, **kwargs):
if AUTH_TOKEN:
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
return jsonify({'error': 'Authorization header is missing or invalid'}), 401
token = auth_header.split('Bearer ')[1]
if token != AUTH_TOKEN:
return jsonify({'error': 'Invalid token'}), 401
return f(*args, **kwargs)
return decorated
def extract_code(code):
if REMOVE_PYTHON_CODE_BLOCK:
match = re.search(r'```python\s*(.*?)\s*```', code, re.DOTALL)
if match:
return match.group(1)
return code
def run_code(language, code, variables, timeout):
try:
# 根据语言类型选择解释器
if language == 'python':
# 提取代码中的有效部分
code = extract_code(code)
# 将变量注入到代码环境中
var_definitions = '\n'.join(f"{key} = {value}" for key, value in variables.items())
code_with_vars = f"{var_definitions}\n{code}"
# 使用subprocess运行代码
result = subprocess.run([sys.executable, '-c', code_with_vars], capture_output=True, text=True, timeout=timeout)
return result.stdout, result.stderr
else:
return None, "Unsupported language"
except subprocess.TimeoutExpired:
return None, "Code execution timed out"
except Exception as e:
return None, str(e)
def convert_image_to_base64(image_path):
with open(image_path, "rb") as image_file:
encoded_string = base64.b64encode(image_file.read()).decode('utf-8')
return encoded_string
def unescape_string(s):
return s.encode('utf-8').decode('unicode_escape')
@app.route('/runcode', methods=['POST', 'GET'])
@auth_required
def run_code_endpoint():
if request.method == 'POST':
data = request.get_json()
if not data:
return jsonify({'error': 'Invalid JSON'}), 400
language = data.get('languageType', 'python')
variables = data.get('variables', {})
code = data.get('code', '')
elif request.method == 'GET':
query_string = request.query_string.decode('utf-8')
print("Query String:",query_string)
# 检查query_string中是否有"+"
if '+' in query_string:
# query_string的+替换为空格
query_string = re.sub(r'\+', ' ', query_string)
print("Query String:",query_string)
# 如果没有+,则直接输出query_string
else:
print("Query String:",query_string)
# 使用正则表达式提取参数
language_match = re.search(r'languageType=("?)([^"&]+)\1', query_string)
variables_match = re.search(r'variables=("?)([^"&]+)\1', query_string)
code_match = re.search(r'code=("?)([^"&]+)\1', query_string)
print("code_match:",code_match)
if not language_match or not variables_match or not code_match:
return jsonify({'error': 'Invalid parameters'}), 400
language = urllib.parse.unquote(language_match.group(2))
variables = urllib.parse.unquote(variables_match.group(2))
code = urllib.parse.unquote(code_match.group(2))
print("code:",code)
# 处理转义字符
language = unescape_string(language)
variables = unescape_string(variables)
code = unescape_string(code)
print("code:",code)
try:
variables = json.loads(variables)
except json.JSONDecodeError:
return jsonify({'error': 'Invalid variables format'}), 400
# 打印解析后的JSON参数
print("Parsed JSON Parameters:")
print(json.dumps({
'languageType': language,
'variables': variables,
'code': code
}, indent=4))
else:
return jsonify({'error': 'Unsupported HTTP method'}), 405
# 运行代码
stdout, stderr = run_code(language, code, variables, TIMEOUT)
if stderr:
return jsonify({'error': stderr}), 400
else:
# 查找当前目录中的所有图片文件
image_files = glob.glob('*.png') + glob.glob('*.jpg') + glob.glob('*.jpeg') + glob.glob('*.gif')
if image_files:
image_data = {}
for image_path in image_files:
encoded_image = convert_image_to_base64(image_path)
image_data[image_path] = encoded_image
os.remove(image_path) # 删除生成的图片文件
if not OUTPUT_BASE64:
# 如果 OUTPUT_BASE64 不是 True,则输出原始图片
html_content = ''.join([f'<img src="{image_path}" alt="{image_path}" />' for image_path in image_data.keys()])
return Response(html_content, mimetype='text/html')
else:
return jsonify({'output': stdout, 'images': image_data}), 200
else:
return jsonify({'output': stdout}), 200
@app.route('/', methods=['GET'])
def welcome_page():
return render_template_string('<h1>欢迎使用lee\'s code-interpreter</h1>')
def run_test_case(test_case):
url = f'http://{HOST}:{PORT}/runcode'
if test_case['method'] == 'GET':
params = {
'languageType': test_case['language'],
'variables': test_case['variables'],
'code': test_case['code']
}
headers = {'Authorization': f'Bearer {AUTH_TOKEN}'} if AUTH_TOKEN else {}
print("Request Params:")
print(params)
response = requests.get(url, params=params, headers=headers)
elif test_case['method'] == 'POST':
data = {
'languageType': test_case['language'],
'variables': test_case['variables'],
'code': test_case['code']
}
headers = {'Authorization': f'Bearer {AUTH_TOKEN}'} if AUTH_TOKEN else {}
print("Request Data:")
print(data)
response = requests.post(url, json=data, headers=headers)
else:
raise ValueError("Unsupported HTTP method in test case")
# 打印返回JSON
print("Response JSON:")
print(response.json())
def test_run_code_endpoint():
test_cases = [
{
'method': 'GET',
'language': 'python',
'variables': str({}),
'code': "print('Hello, World!')"
},
{
'method': 'GET',
'language': '"python"',
'variables': str({}),
'code': '"print(\'Hello, World!\')"'
},
{
'method': 'GET',
'language': 'python',
'variables': str({}),
'code': "print('Hello%20World')"
},
{
'method': 'POST',
'language': 'python',
'variables': {},
'code': "print(5 ** 11)"
},
{
'method': 'POST',
'language': 'python',
'variables': {'m': 7, 'n': 4},
'code': "print(m * n)"
},
{
'method': 'POST',
'language': 'python',
'variables': {},
'code': "```python\nprint('Hello from code block!')\n```"
}
]
for test_case in test_cases:
run_test_case(test_case)
if __name__ == '__main__':
# 启动Flask应用
threading.Thread(target=app.run, kwargs={'host': HOST, 'port': PORT, 'debug': False}).start()
# 等待Flask应用启动
time.sleep(1)
# 运行单元测试
if ENABLE_UNIT_TESTS:
test_run_code_endpoint()
print("All tests executed!") |