Feat: Check pending equest_pending after document deletion
Browse files- Add double-check for pipeline status to prevent race conditions
- Implement automatic processing of pending indexing requests after deletion
lightrag/api/routers/document_routes.py
CHANGED
@@ -861,8 +861,13 @@ async def background_delete_documents(
|
|
861 |
successful_deletions = []
|
862 |
failed_deletions = []
|
863 |
|
864 |
-
#
|
865 |
async with pipeline_status_lock:
|
|
|
|
|
|
|
|
|
|
|
866 |
pipeline_status.update(
|
867 |
{
|
868 |
"busy": True,
|
@@ -971,12 +976,23 @@ async def background_delete_documents(
|
|
971 |
async with pipeline_status_lock:
|
972 |
pipeline_status["history_messages"].append(error_msg)
|
973 |
finally:
|
974 |
-
# Final summary
|
975 |
async with pipeline_status_lock:
|
976 |
pipeline_status["busy"] = False
|
977 |
completion_msg = f"Deletion completed: {len(successful_deletions)} successful, {len(failed_deletions)} failed"
|
978 |
pipeline_status["latest_message"] = completion_msg
|
979 |
pipeline_status["history_messages"].append(completion_msg)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
980 |
|
981 |
|
982 |
def create_document_routes(
|
|
|
861 |
successful_deletions = []
|
862 |
failed_deletions = []
|
863 |
|
864 |
+
# Double-check pipeline status before proceeding
|
865 |
async with pipeline_status_lock:
|
866 |
+
if pipeline_status.get("busy", False):
|
867 |
+
logger.warning("Error: Unexpected pipeline busy state, aborting deletion.")
|
868 |
+
return # Abort deletion operation
|
869 |
+
|
870 |
+
# Set pipeline status to busy for deletion
|
871 |
pipeline_status.update(
|
872 |
{
|
873 |
"busy": True,
|
|
|
976 |
async with pipeline_status_lock:
|
977 |
pipeline_status["history_messages"].append(error_msg)
|
978 |
finally:
|
979 |
+
# Final summary and check for pending requests
|
980 |
async with pipeline_status_lock:
|
981 |
pipeline_status["busy"] = False
|
982 |
completion_msg = f"Deletion completed: {len(successful_deletions)} successful, {len(failed_deletions)} failed"
|
983 |
pipeline_status["latest_message"] = completion_msg
|
984 |
pipeline_status["history_messages"].append(completion_msg)
|
985 |
+
|
986 |
+
# Check if there are pending document indexing requests
|
987 |
+
has_pending_request = pipeline_status.get("request_pending", False)
|
988 |
+
|
989 |
+
# If there are pending requests, start document processing pipeline
|
990 |
+
if has_pending_request:
|
991 |
+
try:
|
992 |
+
logger.info("Processing pending document indexing requests after deletion")
|
993 |
+
await rag.apipeline_process_enqueue_documents()
|
994 |
+
except Exception as e:
|
995 |
+
logger.error(f"Error processing pending documents after deletion: {e}")
|
996 |
|
997 |
|
998 |
def create_document_routes(
|