| # LightRAG Multi-Document Processing: Concurrent Control Strategy Analysis | |
| LightRAG employs a multi-layered concurrent control strategy when processing multiple documents. This article provides an in-depth analysis of the concurrent control mechanisms at document level, chunk level, and LLM request level, helping you understand why specific concurrent behaviors occur. | |
| ## Overview | |
| LightRAG's concurrent control is divided into three layers: | |
| 1. **Document-level concurrency**: Controls the number of documents processed simultaneously | |
| 2. **Chunk-level concurrency**: Controls the number of chunks processed simultaneously within a single document | |
| 3. **LLM request-level concurrency**: Controls the global concurrent number of LLM requests | |
| ## 1. Document-Level Concurrent Control | |
| **Control Parameter**: `max_parallel_insert` | |
| Document-level concurrency is controlled by the `max_parallel_insert` parameter, with a default value of 2. | |
| ```python | |
| # lightrag/lightrag.py | |
| max_parallel_insert: int = field(default=int(os.getenv("MAX_PARALLEL_INSERT", 2))) | |
| ``` | |
| ### Implementation Mechanism | |
| In the `apipeline_process_enqueue_documents` method, a semaphore is used to control document concurrency: | |
| ```python | |
| # lightrag/lightrag.py - apipeline_process_enqueue_documents method | |
| async def process_document( | |
| doc_id: str, | |
| status_doc: DocProcessingStatus, | |
| split_by_character: str | None, | |
| split_by_character_only: bool, | |
| pipeline_status: dict, | |
| pipeline_status_lock: asyncio.Lock, | |
| semaphore: asyncio.Semaphore, # Document-level semaphore | |
| ) -> None: | |
| """Process single document""" | |
| async with semaphore: # π₯ Document-level concurrent control | |
| # ... Process all chunks of a single document | |
| # Create document-level semaphore | |
| semaphore = asyncio.Semaphore(self.max_parallel_insert) # Default 2 | |
| # Create processing tasks for each document | |
| doc_tasks = [] | |
| for doc_id, status_doc in to_process_docs.items(): | |
| doc_tasks.append( | |
| process_document( | |
| doc_id, status_doc, split_by_character, split_by_character_only, | |
| pipeline_status, pipeline_status_lock, semaphore | |
| ) | |
| ) | |
| # Wait for all documents to complete processing | |
| await asyncio.gather(*doc_tasks) | |
| ``` | |
| ## 2. Chunk-Level Concurrent Control | |
| **Control Parameter**: `llm_model_max_async` | |
| **Key Point**: Each document independently creates its own chunk semaphore! | |
| ```python | |
| # lightrag/lightrag.py | |
| llm_model_max_async: int = field(default=int(os.getenv("MAX_ASYNC", 4))) | |
| ``` | |
| ### Implementation Mechanism | |
| In the `extract_entities` function, **each document independently creates** its own chunk semaphore: | |
| ```python | |
| # lightrag/operate.py - extract_entities function | |
| async def extract_entities(chunks: dict[str, TextChunkSchema], global_config: dict[str, str], ...): | |
| # π₯ Key: Each document independently creates this semaphore! | |
| llm_model_max_async = global_config.get("llm_model_max_async", 4) | |
| semaphore = asyncio.Semaphore(llm_model_max_async) # Chunk semaphore for each document | |
| async def _process_with_semaphore(chunk): | |
| async with semaphore: # π₯ Chunk concurrent control within document | |
| return await _process_single_content(chunk) | |
| # Create tasks for each chunk | |
| tasks = [] | |
| for c in ordered_chunks: | |
| task = asyncio.create_task(_process_with_semaphore(c)) | |
| tasks.append(task) | |
| # Wait for all chunks to complete processing | |
| done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION) | |
| chunk_results = [task.result() for task in tasks] | |
| return chunk_results | |
| ``` | |
| ### Important Inference: System Overall Chunk Concurrency | |
| Since each document independently creates chunk semaphores, the theoretical chunk concurrency of the system is: | |
| **Theoretical Chunk Concurrency = max_parallel_insert Γ llm_model_max_async** | |
| For example: | |
| - `max_parallel_insert = 2` (process 2 documents simultaneously) | |
| - `llm_model_max_async = 4` (maximum 4 chunk concurrency per document) | |
| - **Theoretical result**: Maximum 2 Γ 4 = 8 chunks simultaneously in "processing" state | |
| ## 3. LLM Request-Level Concurrent Control (The Real Bottleneck) | |
| **Control Parameter**: `llm_model_max_async` (globally shared) | |
| **Key**: Although there might be 8 chunks "in processing", all LLM requests share the same global priority queue! | |
| ```python | |
| # lightrag/lightrag.py - __post_init__ method | |
| self.llm_model_func = priority_limit_async_func_call(self.llm_model_max_async)( | |
| partial( | |
| self.llm_model_func, | |
| hashing_kv=hashing_kv, | |
| **self.llm_model_kwargs, | |
| ) | |
| ) | |
| # π₯ Global LLM queue size = llm_model_max_async = 4 | |
| ``` | |
| ### Priority Queue Implementation | |
| ```python | |
| # lightrag/utils.py - priority_limit_async_func_call function | |
| def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000): | |
| def final_decro(func): | |
| queue = asyncio.PriorityQueue(maxsize=max_queue_size) | |
| tasks = set() | |
| async def worker(): | |
| """Worker that processes tasks in the priority queue""" | |
| while not shutdown_event.is_set(): | |
| try: | |
| priority, count, future, args, kwargs = await asyncio.wait_for(queue.get(), timeout=1.0) | |
| result = await func(*args, **kwargs) # π₯ Actual LLM call | |
| if not future.done(): | |
| future.set_result(result) | |
| except Exception as e: | |
| # Error handling... | |
| finally: | |
| queue.task_done() | |
| # π₯ Create fixed number of workers (max_size), this is the real concurrency limit | |
| for _ in range(max_size): | |
| task = asyncio.create_task(worker()) | |
| tasks.add(task) | |
| ``` | |
| ## 4. Chunk Internal Processing Mechanism (Serial) | |
| ### Why Serial? | |
| Internal processing of each chunk strictly follows this serial execution order: | |
| ```python | |
| # lightrag/operate.py - _process_single_content function | |
| async def _process_single_content(chunk_key_dp: tuple[str, TextChunkSchema]): | |
| # Step 1: Initial entity extraction | |
| hint_prompt = entity_extract_prompt.format(**{**context_base, "input_text": content}) | |
| final_result = await use_llm_func_with_cache(hint_prompt, use_llm_func, ...) | |
| # Process initial extraction results | |
| maybe_nodes, maybe_edges = await _process_extraction_result(final_result, chunk_key, file_path) | |
| # Step 2: Gleaning phase | |
| for now_glean_index in range(entity_extract_max_gleaning): | |
| # π₯ Serial wait for gleaning results | |
| glean_result = await use_llm_func_with_cache( | |
| continue_prompt, use_llm_func, | |
| llm_response_cache=llm_response_cache, | |
| history_messages=history, cache_type="extract" | |
| ) | |
| # Process gleaning results | |
| glean_nodes, glean_edges = await _process_extraction_result(glean_result, chunk_key, file_path) | |
| # Merge results... | |
| # Step 3: Determine whether to continue loop | |
| if now_glean_index == entity_extract_max_gleaning - 1: | |
| break | |
| # π₯ Serial wait for loop decision results | |
| if_loop_result = await use_llm_func_with_cache( | |
| if_loop_prompt, use_llm_func, | |
| llm_response_cache=llm_response_cache, | |
| history_messages=history, cache_type="extract" | |
| ) | |
| if if_loop_result.strip().strip('"').strip("'").lower() != "yes": | |
| break | |
| return maybe_nodes, maybe_edges | |
| ``` | |
| ## 5. Complete Concurrent Hierarchy Diagram | |
|  | |
| ### Chunk Internal Processing (Serial) | |
| ``` | |
| Initial Extraction β Gleaning β Loop Decision β Complete | |
| ``` | |
| ## 6. Real-World Scenario Analysis | |
| ### Scenario 1: Single Document with Multiple Chunks | |
| Assume 1 document with 6 chunks: | |
| - **Document level**: Only 1 document, not limited by `max_parallel_insert` | |
| - **Chunk level**: Maximum 4 chunks processed simultaneously (limited by `llm_model_max_async=4`) | |
| - **LLM level**: Global maximum 4 LLM requests concurrent | |
| **Expected behavior**: 4 chunks process concurrently, remaining 2 chunks wait. | |
| ### Scenario 2: Multiple Documents with Multiple Chunks | |
| Assume 3 documents, each with 10 chunks: | |
| - **Document level**: Maximum 2 documents processed simultaneously | |
| - **Chunk level**: Maximum 4 chunks per document processed simultaneously | |
| - **Theoretical Chunk concurrency**: 2 Γ 4 = 8 chunks processed simultaneously | |
| - **Actual LLM concurrency**: Only 4 LLM requests actually execute | |
| **Actual state distribution**: | |
| ``` | |
| # Possible system state: | |
| Document 1: 4 chunks "processing" (2 executing LLM, 2 waiting for LLM response) | |
| Document 2: 4 chunks "processing" (2 executing LLM, 2 waiting for LLM response) | |
| Document 3: Waiting for document-level semaphore | |
| Total: | |
| - 8 chunks in "processing" state | |
| - 4 LLM requests actually executing | |
| - 4 chunks waiting for LLM response | |
| ``` | |
| ## 7. Performance Optimization Recommendations | |
| ### Understanding the Bottleneck | |
| The real bottleneck is the global LLM queue, not the chunk semaphores! | |
| ### Adjustment Strategies | |
| **Strategy 1: Increase LLM Concurrent Capacity** | |
| ```bash | |
| # Environment variable configuration | |
| export MAX_PARALLEL_INSERT=2 # Keep document concurrency | |
| export MAX_ASYNC=8 # π₯ Increase LLM request concurrency | |
| ``` | |
| **Strategy 2: Balance Document and LLM Concurrency** | |
| ```python | |
| rag = LightRAG( | |
| max_parallel_insert=3, # Moderately increase document concurrency | |
| llm_model_max_async=12, # Significantly increase LLM concurrency | |
| entity_extract_max_gleaning=0, # Reduce serial steps within chunks | |
| ) | |
| ``` | |
| ## 8. Summary | |
| Key characteristics of LightRAG's multi-document concurrent processing mechanism: | |
| ### Concurrent Layers | |
| 1. **Inter-document competition**: Controlled by `max_parallel_insert`, default 2 documents concurrent | |
| 2. **Theoretical Chunk concurrency**: Each document independently creates semaphores, total = max_parallel_insert Γ llm_model_max_async | |
| 3. **Actual LLM concurrency**: All chunks share global LLM queue, controlled by `llm_model_max_async` | |
| 4. **Intra-chunk serial**: Multiple LLM requests within each chunk execute strictly serially | |
| ### Key Insights | |
| - **Theoretical vs Actual**: System may have many chunks "in processing", but only few are actually executing LLM requests | |
| - **Real Bottleneck**: Global LLM request queue is the performance bottleneck, not chunk semaphores | |
| - **Optimization Focus**: Increasing `llm_model_max_async` is more effective than increasing `max_parallel_insert` | |