Spaces:
Sleeping
Sleeping
File size: 8,155 Bytes
d97f2ec 889f571 c1a37ff 167072a d97f2ec c1a37ff 9c660b1 c1a37ff d97f2ec 167072a 889f571 167072a c1a37ff d97f2ec c1a37ff 36ca81e c1a37ff 36ca81e c1a37ff 36ca81e c1a37ff 36ca81e c1a37ff 167072a 494e7cf c1a37ff 36ca81e c1a37ff a076bea c1a37ff a076bea c1a37ff 167072a c1a37ff 494e7cf c1a37ff 167072a c1a37ff 167072a 494e7cf 167072a 36ca81e 494e7cf b9c6f79 c1a37ff |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 |
import os
import tempfile
import shutil
from zipfile import ZipFile
import logging
import psutil
import subprocess
from flask import Flask, request, jsonify, render_template, send_file
from mpi4py import MPI
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize Flask app
app = Flask(__name__)
connected_cpus = {"localhost": {"cpu_count": psutil.cpu_count(logical=False), "usage": psutil.cpu_percent(interval=1)}}
# Define the target function for MPI
def target_function(script_path, folder_path):
output_log = tempfile.TemporaryFile(mode='w+t')
try:
result = subprocess.run(['python', script_path], cwd=folder_path, stdout=output_log, stderr=subprocess.STDOUT)
output_log.seek(0)
log_output = output_log.read()
except Exception as e:
log_output = str(e)
finally:
output_log.close()
return log_output
# Endpoint to handle file uploads and script execution
@app.route('/upload', methods=['POST'])
def handle_upload():
try:
if 'file' not in request.files or 'script_content' not in request.form:
return jsonify({"status": "error", "message": "File or script content not provided"}), 400
files = request.files.getlist('file')
script_content = request.form['script_content']
# Create a temporary directory to store uploaded files
temp_dir = tempfile.mkdtemp()
# Save the uploaded files to the temporary directory
folder_path = os.path.join(temp_dir, 'uploaded_folder')
os.makedirs(folder_path, exist_ok=True)
for file_obj in files:
file_path = os.path.join(folder_path, file_obj.filename)
file_obj.save(file_path)
# Save the script content to a file
script_path = os.path.join(folder_path, 'user_script.py')
with open(script_path, 'w') as script_file:
script_file.write(script_content)
# Run the script using MPI
log_output = run_script_with_mpi(script_path, folder_path)
# Create a zip file of the entire folder
zip_path = os.path.join(temp_dir, 'output_folder.zip')
with ZipFile(zip_path, 'w') as zipf:
for root, _, files in os.walk(folder_path):
for file in files:
zipf.write(os.path.join(root, file), os.path.relpath(os.path.join(root, file), folder_path))
return jsonify({"status": "success", "log_output": log_output, "download_url": f"/download/{os.path.basename(zip_path)}"})
except Exception as e:
logger.error(f"Error in handle_upload: {e}")
return jsonify({"status": "error", "message": str(e)}), 500
@app.route('/download/<filename>')
def download_file(filename):
try:
return send_file(os.path.join(tempfile.gettempdir(), filename), as_attachment=True)
except Exception as e:
logger.error(f"Error in download_file: {e}")
return jsonify({"status": "error", "message": str(e)}), 500
# Endpoint to get connected CPUs information
@app.route('/cpu_info', methods=['GET'])
def get_cpu_info():
try:
info = []
for host, data in connected_cpus.items():
usage = psutil.cpu_percent(interval=1) if host == "localhost" else data['usage']
info.append(f"{host}: {data['cpu_count']} CPUs, {usage}% usage")
return jsonify({"status": "success", "cpu_info": "\n".join(info)})
except Exception as e:
logger.error(f"Error in get_cpu_info: {e}")
return jsonify({"status": "error", "message": str(e)}), 500
# Endpoint to execute commands
@app.route('/execute_command', methods=['POST'])
def execute_command():
try:
command = request.form['command']
if not command:
return jsonify({"status": "error", "message": "No command provided"}), 400
# Ensure commands are executed in a safe environment
allowed_commands = ['pip install']
if not any(command.startswith(cmd) for cmd in allowed_commands):
return jsonify({"status": "error", "message": "Command not allowed"}), 400
output_log = tempfile.TemporaryFile(mode='w+t')
try:
result = subprocess.run(command.split(), stdout=output_log, stderr=subprocess.STDOUT)
output_log.seek(0)
log_output = output_log.read()
except Exception as e:
log_output = str(e)
finally:
output_log.close()
return jsonify({"status": "success", "log_output": log_output})
except Exception as e:
logger.error(f"Error in execute_command: {e}")
return jsonify({"status": "error", "message": str(e)}), 500
# Endpoint to donate CPU resources
@app.route('/donate_cpu', methods=['POST'])
def donate_cpu():
try:
data = request.json
host = data['host']
cpu_count = data['cpu_count']
connected_cpus[host] = {"cpu_count": cpu_count, "usage": 0.0}
return jsonify({"status": "success", "message": f"CPU resources from {host} donated successfully."})
except Exception as e:
logger.error(f"Error in donate_cpu: {e}")
return jsonify({"status": "error", "message": str(e)}), 500
# Endpoint to update CPU usage
@app.route('/update_cpu_usage', methods=['POST'])
def update_cpu_usage():
try:
data = request.json
host = data['host']
usage = data['usage']
if host in connected_cpus:
connected_cpus[host]['usage'] = usage
return jsonify({"status": "success", "message": f"CPU usage from {host} updated successfully."})
else:
return jsonify({"status": "error", "message": f"Host {host} not found."}), 404
except Exception as e:
logger.error(f"Error in update_cpu_usage: {e}")
return jsonify({"status": "error", "message": str(e)}), 500
# Main interface
@app.route('/')
def index():
return render_template('index.html')
def run_script_with_mpi(script_path, folder_path):
# Create a temporary directory for MPI processes
mpi_temp_dir = tempfile.mkdtemp()
mpi_script_path = os.path.join(mpi_temp_dir, 'mpi_script.py')
# Write the MPI script to the temporary directory
with open(mpi_script_path, 'w') as mpi_script_file:
mpi_script_file.write(f"""
import os
import tempfile
import subprocess
from mpi4py import MPI
def target_function(script_path, folder_path):
output_log = tempfile.TemporaryFile(mode='w+t')
try:
result = subprocess.run(['python', script_path], cwd=folder_path, stdout=output_log, stderr=subprocess.STDOUT)
output_log.seek(0)
log_output = output_log.read()
except Exception as e:
log_output = str(e)
finally:
output_log.close()
return log_output
def run_script(script_path, folder_path):
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
if rank == 0:
# Master process
log_outputs = []
for i in range(1, size):
log_output = comm.recv(source=i, tag=11)
log_outputs.append(log_output)
with open(os.path.join(folder_path, 'mpi_log_output.txt'), 'w') as log_file:
log_file.write('\\n'.join(log_outputs))
else:
# Worker process
log_output = target_function(script_path, folder_path)
comm.send(log_output, dest=0, tag=11)
if __name__ == "__main__":
run_script('{script_path}', '{folder_path}')
""")
# Run the MPI script using subprocess
result = subprocess.run(['mpiexec', '-n', str(psutil.cpu_count(logical=False)), 'python', mpi_script_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Read the log output from the file
log_output_path = os.path.join(folder_path, 'mpi_log_output.txt')
with open(log_output_path, 'r') as log_file:
log_output = log_file.read()
# Clean up the temporary directory
shutil.rmtree(mpi_temp_dir)
return log_output
if __name__ == "__main__":
app.run(host='0.0.0.0', port=7860, threaded=True) |