Ron Au commited on
Commit
568065a
1 Parent(s): 06bb75f

feat(tasks): Skip if client has ceased polling

Browse files
Files changed (1) hide show
  1. app.py +17 -8
app.py CHANGED
@@ -43,15 +43,29 @@ def calculate_eta(task_id):
43
  if len(total_durations):
44
  eta = initial_place_in_queue * mean(total_durations)
45
  else:
46
- eta = initial_place_in_queue * 40
47
 
48
  return round(eta, 1)
49
 
50
 
 
 
 
 
 
 
 
 
 
 
51
  def process_task(task_id):
52
  if 'processing' in list(task['status'] for task in tasks.values()):
53
  return
54
 
 
 
 
 
55
  tasks[task_id]["status"] = "processing"
56
  tasks[task_id]["started_at"] = time()
57
 
@@ -63,13 +77,7 @@ def process_task(task_id):
63
  else:
64
  tasks[task_id]["status"] = "completed"
65
  finally:
66
- tasks[task_id]["completed_at"] = time()
67
-
68
- queued_tasks = list(task for task in tasks.values() if task["status"] == "queued")
69
-
70
- if queued_tasks:
71
- print(f"Tasks remaining: {len(queued_tasks)}")
72
- process_task(queued_tasks[0]["task_id"])
73
 
74
 
75
  @app.head('/')
@@ -115,6 +123,7 @@ def create_task(background_tasks: BackgroundTasks, new_task: NewTask):
115
  def poll_task(task_id: str):
116
  tasks[task_id]["place_in_queue"] = get_place_in_queue(task_id)
117
  tasks[task_id]["eta"] = calculate_eta(task_id)
 
118
  tasks[task_id]["poll_count"] += 1
119
 
120
  return tasks[task_id]
 
43
  if len(total_durations):
44
  eta = initial_place_in_queue * mean(total_durations)
45
  else:
46
+ eta = initial_place_in_queue * 35
47
 
48
  return round(eta, 1)
49
 
50
 
51
+ def next_task(task_id):
52
+ tasks[task_id]["completed_at"] = time()
53
+
54
+ queued_tasks = list(task for task in tasks.values() if task["status"] == "queued")
55
+
56
+ if queued_tasks:
57
+ print(f"{task_id} {tasks[task_id]['status']}. Task/s remaining: {len(queued_tasks)}")
58
+ process_task(queued_tasks[0]["task_id"])
59
+
60
+
61
  def process_task(task_id):
62
  if 'processing' in list(task['status'] for task in tasks.values()):
63
  return
64
 
65
+ if tasks[task_id]["last_poll"] and time() - tasks[task_id]["last_poll"] > 30:
66
+ tasks[task_id]["status"] = "abandoned"
67
+ next_task(task_id)
68
+
69
  tasks[task_id]["status"] = "processing"
70
  tasks[task_id]["started_at"] = time()
71
 
 
77
  else:
78
  tasks[task_id]["status"] = "completed"
79
  finally:
80
+ next_task(task_id)
 
 
 
 
 
 
81
 
82
 
83
  @app.head('/')
 
123
  def poll_task(task_id: str):
124
  tasks[task_id]["place_in_queue"] = get_place_in_queue(task_id)
125
  tasks[task_id]["eta"] = calculate_eta(task_id)
126
+ tasks[task_id]["last_poll"] = time()
127
  tasks[task_id]["poll_count"] += 1
128
 
129
  return tasks[task_id]