yangdx commited on
Commit
e9b482a
·
1 Parent(s): 8546ae8

Enhance the robustness of concurrency control and scheduling logic

Browse files
Files changed (1) hide show
  1. lightrag/utils.py +31 -6
lightrag/utils.py CHANGED
@@ -297,6 +297,7 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
297
 
298
  # Track active future objects for cleanup
299
  active_futures = weakref.WeakSet()
 
300
 
301
  # Worker function to process tasks in the queue
302
  async def worker():
@@ -349,6 +350,7 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
349
 
350
  async def health_check():
351
  """Periodically check worker health status and recover"""
 
352
  try:
353
  while not shutdown_event.is_set():
354
  await asyncio.sleep(5) # Check every 5 seconds
@@ -378,6 +380,7 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
378
  logger.error(f"limit_async: Error in health check: {str(e)}")
379
  finally:
380
  logger.warning("limit_async: Health check task exiting")
 
381
 
382
  async def ensure_workers():
383
  """Ensure worker threads and health check system are available
@@ -386,7 +389,7 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
386
  If not, it performs a one-time initialization of all worker threads
387
  and starts the health check system.
388
  """
389
- nonlocal initialized, worker_health_check_task, tasks
390
 
391
  if initialized:
392
  return
@@ -395,10 +398,30 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
395
  if initialized:
396
  return
397
 
398
- logger.info("limit_async: Initializing worker system")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
399
 
400
- # Create initial worker tasks
401
- for _ in range(max_size):
 
402
  task = asyncio.create_task(worker())
403
  tasks.add(task)
404
  task.add_done_callback(tasks.discard)
@@ -407,7 +430,7 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
407
  worker_health_check_task = asyncio.create_task(health_check())
408
 
409
  initialized = True
410
- logger.info("limit_async: Worker system initialized")
411
 
412
  async def shutdown():
413
  """Gracefully shut down all workers and the queue"""
@@ -476,7 +499,7 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
476
 
477
  nonlocal counter
478
  async with initialization_lock:
479
- current_count = counter
480
  counter += 1
481
 
482
  # Try to put the task into the queue, supporting timeout
@@ -485,6 +508,7 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
485
  # Use timeout to wait for queue space
486
  try:
487
  await asyncio.wait_for(
 
488
  queue.put((_priority, current_count, future, args, kwargs)),
489
  timeout=_queue_timeout,
490
  )
@@ -494,6 +518,7 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
494
  )
495
  else:
496
  # No timeout, may wait indefinitely
 
497
  await queue.put((_priority, current_count, future, args, kwargs))
498
  except Exception as e:
499
  # Clean up the future
 
297
 
298
  # Track active future objects for cleanup
299
  active_futures = weakref.WeakSet()
300
+ reinit_count = 0 # Reinitialization counter to track system health
301
 
302
  # Worker function to process tasks in the queue
303
  async def worker():
 
350
 
351
  async def health_check():
352
  """Periodically check worker health status and recover"""
353
+ nonlocal initialized
354
  try:
355
  while not shutdown_event.is_set():
356
  await asyncio.sleep(5) # Check every 5 seconds
 
380
  logger.error(f"limit_async: Error in health check: {str(e)}")
381
  finally:
382
  logger.warning("limit_async: Health check task exiting")
383
+ initialized = False
384
 
385
  async def ensure_workers():
386
  """Ensure worker threads and health check system are available
 
389
  If not, it performs a one-time initialization of all worker threads
390
  and starts the health check system.
391
  """
392
+ nonlocal initialized, worker_health_check_task, tasks, reinit_count
393
 
394
  if initialized:
395
  return
 
398
  if initialized:
399
  return
400
 
401
+ # Increment reinitialization counter if this is not the first initialization
402
+ if reinit_count > 0:
403
+ reinit_count += 1
404
+ logger.warning(
405
+ f"limit_async: Reinitializing needed (count: {reinit_count})"
406
+ )
407
+ else:
408
+ reinit_count = 1 # First initialization
409
+
410
+ # Check for completed tasks and remove them from the task set
411
+ current_tasks = set(tasks)
412
+ done_tasks = {t for t in current_tasks if t.done()}
413
+ tasks.difference_update(done_tasks)
414
+
415
+ # Log active tasks count during reinitialization
416
+ active_tasks_count = len(tasks)
417
+ if active_tasks_count > 0 and reinit_count > 1:
418
+ logger.warning(
419
+ f"limit_async: {active_tasks_count} tasks still running during reinitialization"
420
+ )
421
 
422
+ # Create initial worker tasks, only adding the number needed
423
+ workers_needed = max_size - active_tasks_count
424
+ for _ in range(workers_needed):
425
  task = asyncio.create_task(worker())
426
  tasks.add(task)
427
  task.add_done_callback(tasks.discard)
 
430
  worker_health_check_task = asyncio.create_task(health_check())
431
 
432
  initialized = True
433
+ logger.info(f"limit_async: {workers_needed} new workers initialized")
434
 
435
  async def shutdown():
436
  """Gracefully shut down all workers and the queue"""
 
499
 
500
  nonlocal counter
501
  async with initialization_lock:
502
+ current_count = counter # Use local variable to avoid race conditions
503
  counter += 1
504
 
505
  # Try to put the task into the queue, supporting timeout
 
508
  # Use timeout to wait for queue space
509
  try:
510
  await asyncio.wait_for(
511
+ # current_count is used to ensure FIFO order
512
  queue.put((_priority, current_count, future, args, kwargs)),
513
  timeout=_queue_timeout,
514
  )
 
518
  )
519
  else:
520
  # No timeout, may wait indefinitely
521
+ # current_count is used to ensure FIFO order
522
  await queue.put((_priority, current_count, future, args, kwargs))
523
  except Exception as e:
524
  # Clean up the future