yeq6x commited on
Commit
c6e4f56
·
1 Parent(s): c465a32

max_worker=8にして処理中フラグを導入.

Browse files
Files changed (1) hide show
  1. app.py +17 -4
app.py CHANGED
@@ -40,7 +40,7 @@ active_tasks = {}
40
  task_futures = {}
41
 
42
  # ThreadPoolExecutorの作成
43
- executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
44
 
45
  class Task:
46
  def __init__(self, task_id, mode, weight1, weight2, file_data, client_ip):
@@ -51,12 +51,14 @@ class Task:
51
  self.file_data = file_data
52
  self.cancel_flag = False
53
  self.client_ip = client_ip
 
54
 
55
  def update_queue_status(message=None):
56
  socketio.emit('queue_update', {'active_tasks': len(active_tasks), 'message': message})
57
 
58
  def process_task(task):
59
  try:
 
60
  # ファイルデータをPIL Imageに変換
61
  image = Image.open(io.BytesIO(task.file_data))
62
  image = ensure_rgb(image)
@@ -81,6 +83,7 @@ def process_task(task):
81
  if not task.cancel_flag:
82
  socketio.emit('task_error', {'task_id': task.task_id, 'error': str(e)})
83
  finally:
 
84
  if task.task_id in active_tasks:
85
  del active_tasks[task.task_id]
86
  if task.task_id in task_futures:
@@ -128,13 +131,15 @@ def handle_disconnect():
128
  # キャンセル処理:接続が切断された場合、そのクライアントに関連するタスクをキャンセル。ただし、1番目で処理中のタスクはキャンセルしない
129
  client_ip = get_remote_address()
130
  for task_id, task in list(active_tasks.items()):
131
- if task.client_ip == client_ip and get_active_task_order(task_id) > 0:
132
  task.cancel_flag = True
133
  if task_id in task_futures:
134
  task_futures[task_id].cancel()
135
  del task_futures[task_id]
136
  del active_tasks[task_id]
137
  tasks_per_client[client_ip] = tasks_per_client.get(client_ip, 0) - 1
 
 
138
 
139
  @app.route('/submit_task', methods=['POST'])
140
  @limiter.limit("10 per minute") # 1分間に10回までのリクエストに制限
@@ -203,14 +208,22 @@ def cancel_task(task_id):
203
  return jsonify({'message': 'Task cancellation requested for queued task'})
204
  return jsonify({'error': 'Task not found'}), 404
205
 
 
 
 
 
 
 
 
206
  def get_active_task_order(task_id):
207
  return list(active_tasks.keys()).index(task_id) if task_id in active_tasks else None
208
 
209
  # get_task_orderイベントハンドラー
210
  @app.route('/get_task_order/<task_id>', methods=['GET'])
211
  def handle_get_task_order(task_id):
212
- task_order = get_active_task_order(task_id)
213
- return jsonify({'task_order': task_order})
 
214
 
215
  # Flaskルート
216
  # ルートパスのGETリクエストに対するハンドラ
 
40
  task_futures = {}
41
 
42
  # ThreadPoolExecutorの作成
43
+ executor = concurrent.futures.ThreadPoolExecutor(max_workers=8) # 8vCPUのインスタンスを使用
44
 
45
  class Task:
46
  def __init__(self, task_id, mode, weight1, weight2, file_data, client_ip):
 
51
  self.file_data = file_data
52
  self.cancel_flag = False
53
  self.client_ip = client_ip
54
+ self.is_processing = False
55
 
56
  def update_queue_status(message=None):
57
  socketio.emit('queue_update', {'active_tasks': len(active_tasks), 'message': message})
58
 
59
  def process_task(task):
60
  try:
61
+ task.is_processing = True
62
  # ファイルデータをPIL Imageに変換
63
  image = Image.open(io.BytesIO(task.file_data))
64
  image = ensure_rgb(image)
 
83
  if not task.cancel_flag:
84
  socketio.emit('task_error', {'task_id': task.task_id, 'error': str(e)})
85
  finally:
86
+ task.is_processing = False
87
  if task.task_id in active_tasks:
88
  del active_tasks[task.task_id]
89
  if task.task_id in task_futures:
 
131
  # キャンセル処理:接続が切断された場合、そのクライアントに関連するタスクをキャンセル。ただし、1番目で処理中のタスクはキャンセルしない
132
  client_ip = get_remote_address()
133
  for task_id, task in list(active_tasks.items()):
134
+ if task.client_ip == client_ip and not task.is_processing:
135
  task.cancel_flag = True
136
  if task_id in task_futures:
137
  task_futures[task_id].cancel()
138
  del task_futures[task_id]
139
  del active_tasks[task_id]
140
  tasks_per_client[client_ip] = tasks_per_client.get(client_ip, 0) - 1
141
+
142
+ emit('queue_update', {'active_tasks': len(active_tasks)})
143
 
144
  @app.route('/submit_task', methods=['POST'])
145
  @limiter.limit("10 per minute") # 1分間に10回までのリクエストに制限
 
208
  return jsonify({'message': 'Task cancellation requested for queued task'})
209
  return jsonify({'error': 'Task not found'}), 404
210
 
211
+ @app.route('/task_status/<task_id>', methods=['GET'])
212
+ def task_status(task_id):
213
+ task = active_tasks.get(task_id, None)
214
+ if task:
215
+ return jsonify({'task_id': task_id, 'is_processing': task.is_processing})
216
+ return jsonify({'error': 'Task not found'}), 404
217
+
218
  def get_active_task_order(task_id):
219
  return list(active_tasks.keys()).index(task_id) if task_id in active_tasks else None
220
 
221
  # get_task_orderイベントハンドラー
222
  @app.route('/get_task_order/<task_id>', methods=['GET'])
223
  def handle_get_task_order(task_id):
224
+ if task_id not in active_tasks:
225
+ return jsonify({'error': 'Task not found'}), 404
226
+ return jsonify({'task_order': get_active_task_order(task_id)})
227
 
228
  # Flaskルート
229
  # ルートパスのGETリクエストに対するハンドラ