File size: 8,155 Bytes
d97f2ec
889f571
c1a37ff
 
 
 
 
 
167072a
d97f2ec
c1a37ff
 
 
9c660b1
c1a37ff
 
d97f2ec
167072a
889f571
167072a
c1a37ff
 
 
 
 
 
 
 
 
 
 
d97f2ec
c1a37ff
 
 
36ca81e
c1a37ff
 
36ca81e
c1a37ff
 
36ca81e
c1a37ff
 
36ca81e
c1a37ff
 
 
 
 
 
 
 
 
 
 
 
167072a
494e7cf
c1a37ff
 
 
 
 
 
 
 
 
 
36ca81e
c1a37ff
 
a076bea
c1a37ff
 
 
 
 
 
 
 
 
a076bea
c1a37ff
 
 
 
167072a
 
c1a37ff
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
494e7cf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c1a37ff
167072a
 
 
c1a37ff
167072a
 
 
 
 
 
494e7cf
 
167072a
 
 
 
36ca81e
494e7cf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b9c6f79
c1a37ff
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
import os
import tempfile
import shutil
from zipfile import ZipFile
import logging
import psutil
import subprocess
from flask import Flask, request, jsonify, render_template, send_file
from mpi4py import MPI

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Initialize Flask app
app = Flask(__name__)

connected_cpus = {"localhost": {"cpu_count": psutil.cpu_count(logical=False), "usage": psutil.cpu_percent(interval=1)}}

# Define the target function for MPI
def target_function(script_path, folder_path):
    output_log = tempfile.TemporaryFile(mode='w+t')
    try:
        result = subprocess.run(['python', script_path], cwd=folder_path, stdout=output_log, stderr=subprocess.STDOUT)
        output_log.seek(0)
        log_output = output_log.read()
    except Exception as e:
        log_output = str(e)
    finally:
        output_log.close()
    return log_output

# Endpoint to handle file uploads and script execution
@app.route('/upload', methods=['POST'])
def handle_upload():
    try:
        if 'file' not in request.files or 'script_content' not in request.form:
            return jsonify({"status": "error", "message": "File or script content not provided"}), 400
        
        files = request.files.getlist('file')
        script_content = request.form['script_content']
        
        # Create a temporary directory to store uploaded files
        temp_dir = tempfile.mkdtemp()
        
        # Save the uploaded files to the temporary directory
        folder_path = os.path.join(temp_dir, 'uploaded_folder')
        os.makedirs(folder_path, exist_ok=True)
        for file_obj in files:
            file_path = os.path.join(folder_path, file_obj.filename)
            file_obj.save(file_path)
        
        # Save the script content to a file
        script_path = os.path.join(folder_path, 'user_script.py')
        with open(script_path, 'w') as script_file:
            script_file.write(script_content)
        
        # Run the script using MPI
        log_output = run_script_with_mpi(script_path, folder_path)
        
        # Create a zip file of the entire folder
        zip_path = os.path.join(temp_dir, 'output_folder.zip')
        with ZipFile(zip_path, 'w') as zipf:
            for root, _, files in os.walk(folder_path):
                for file in files:
                    zipf.write(os.path.join(root, file), os.path.relpath(os.path.join(root, file), folder_path))
        
        return jsonify({"status": "success", "log_output": log_output, "download_url": f"/download/{os.path.basename(zip_path)}"})
    
    except Exception as e:
        logger.error(f"Error in handle_upload: {e}")
        return jsonify({"status": "error", "message": str(e)}), 500

@app.route('/download/<filename>')
def download_file(filename):
    try:
        return send_file(os.path.join(tempfile.gettempdir(), filename), as_attachment=True)
    except Exception as e:
        logger.error(f"Error in download_file: {e}")
        return jsonify({"status": "error", "message": str(e)}), 500

# Endpoint to get connected CPUs information
@app.route('/cpu_info', methods=['GET'])
def get_cpu_info():
    try:
        info = []
        for host, data in connected_cpus.items():
            usage = psutil.cpu_percent(interval=1) if host == "localhost" else data['usage']
            info.append(f"{host}: {data['cpu_count']} CPUs, {usage}% usage")
        return jsonify({"status": "success", "cpu_info": "\n".join(info)})
    except Exception as e:
        logger.error(f"Error in get_cpu_info: {e}")
        return jsonify({"status": "error", "message": str(e)}), 500

# Endpoint to execute commands
@app.route('/execute_command', methods=['POST'])
def execute_command():
    try:
        command = request.form['command']
        if not command:
            return jsonify({"status": "error", "message": "No command provided"}), 400
        
        # Ensure commands are executed in a safe environment
        allowed_commands = ['pip install']
        if not any(command.startswith(cmd) for cmd in allowed_commands):
            return jsonify({"status": "error", "message": "Command not allowed"}), 400
        
        output_log = tempfile.TemporaryFile(mode='w+t')
        try:
            result = subprocess.run(command.split(), stdout=output_log, stderr=subprocess.STDOUT)
            output_log.seek(0)
            log_output = output_log.read()
        except Exception as e:
            log_output = str(e)
        finally:
            output_log.close()
        return jsonify({"status": "success", "log_output": log_output})
    
    except Exception as e:
        logger.error(f"Error in execute_command: {e}")
        return jsonify({"status": "error", "message": str(e)}), 500

# Endpoint to donate CPU resources
@app.route('/donate_cpu', methods=['POST'])
def donate_cpu():
    try:
        data = request.json
        host = data['host']
        cpu_count = data['cpu_count']
        connected_cpus[host] = {"cpu_count": cpu_count, "usage": 0.0}
        return jsonify({"status": "success", "message": f"CPU resources from {host} donated successfully."})
    except Exception as e:
        logger.error(f"Error in donate_cpu: {e}")
        return jsonify({"status": "error", "message": str(e)}), 500

# Endpoint to update CPU usage
@app.route('/update_cpu_usage', methods=['POST'])
def update_cpu_usage():
    try:
        data = request.json
        host = data['host']
        usage = data['usage']
        if host in connected_cpus:
            connected_cpus[host]['usage'] = usage
            return jsonify({"status": "success", "message": f"CPU usage from {host} updated successfully."})
        else:
            return jsonify({"status": "error", "message": f"Host {host} not found."}), 404
    except Exception as e:
        logger.error(f"Error in update_cpu_usage: {e}")
        return jsonify({"status": "error", "message": str(e)}), 500

# Main interface
@app.route('/')
def index():
    return render_template('index.html')

def run_script_with_mpi(script_path, folder_path):
    # Create a temporary directory for MPI processes
    mpi_temp_dir = tempfile.mkdtemp()
    mpi_script_path = os.path.join(mpi_temp_dir, 'mpi_script.py')
    
    # Write the MPI script to the temporary directory
    with open(mpi_script_path, 'w') as mpi_script_file:
        mpi_script_file.write(f"""
import os
import tempfile
import subprocess
from mpi4py import MPI

def target_function(script_path, folder_path):
    output_log = tempfile.TemporaryFile(mode='w+t')
    try:
        result = subprocess.run(['python', script_path], cwd=folder_path, stdout=output_log, stderr=subprocess.STDOUT)
        output_log.seek(0)
        log_output = output_log.read()
    except Exception as e:
        log_output = str(e)
    finally:
        output_log.close()
    return log_output

def run_script(script_path, folder_path):
    comm = MPI.COMM_WORLD
    rank = comm.Get_rank()
    size = comm.Get_size()

    if rank == 0:
        # Master process
        log_outputs = []
        for i in range(1, size):
            log_output = comm.recv(source=i, tag=11)
            log_outputs.append(log_output)
        with open(os.path.join(folder_path, 'mpi_log_output.txt'), 'w') as log_file:
            log_file.write('\\n'.join(log_outputs))
    else:
        # Worker process
        log_output = target_function(script_path, folder_path)
        comm.send(log_output, dest=0, tag=11)

if __name__ == "__main__":
    run_script('{script_path}', '{folder_path}')
""")
    
    # Run the MPI script using subprocess
    result = subprocess.run(['mpiexec', '-n', str(psutil.cpu_count(logical=False)), 'python', mpi_script_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    
    # Read the log output from the file
    log_output_path = os.path.join(folder_path, 'mpi_log_output.txt')
    with open(log_output_path, 'r') as log_file:
        log_output = log_file.read()
    
    # Clean up the temporary directory
    shutil.rmtree(mpi_temp_dir)
    
    return log_output

if __name__ == "__main__":
    app.run(host='0.0.0.0', port=7860, threaded=True)