yangdx commited on
Commit
5d737ce
·
1 Parent(s): 822a4ff

Add PipelineStatusResponse model for API endpoint

Browse files
lightrag/api/routers/document_routes.py CHANGED
@@ -99,6 +99,36 @@ class DocsStatusesResponse(BaseModel):
99
  statuses: Dict[DocStatus, List[DocStatusResponse]] = {}
100
 
101
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
102
  class DocumentManager:
103
  def __init__(
104
  self,
@@ -247,7 +277,7 @@ async def pipeline_enqueue_file(rag: LightRAG, file_path: Path) -> bool:
247
  if global_args["main_args"].document_loading_engine == "DOCLING":
248
  if not pm.is_installed("docling"): # type: ignore
249
  pm.install("docling")
250
- from docling.document_converter import DocumentConverter
251
 
252
  converter = DocumentConverter()
253
  result = converter.convert(file_path)
@@ -266,7 +296,7 @@ async def pipeline_enqueue_file(rag: LightRAG, file_path: Path) -> bool:
266
  if global_args["main_args"].document_loading_engine == "DOCLING":
267
  if not pm.is_installed("docling"): # type: ignore
268
  pm.install("docling")
269
- from docling.document_converter import DocumentConverter
270
 
271
  converter = DocumentConverter()
272
  result = converter.convert(file_path)
@@ -286,7 +316,7 @@ async def pipeline_enqueue_file(rag: LightRAG, file_path: Path) -> bool:
286
  if global_args["main_args"].document_loading_engine == "DOCLING":
287
  if not pm.is_installed("docling"): # type: ignore
288
  pm.install("docling")
289
- from docling.document_converter import DocumentConverter
290
 
291
  converter = DocumentConverter()
292
  result = converter.convert(file_path)
@@ -307,7 +337,7 @@ async def pipeline_enqueue_file(rag: LightRAG, file_path: Path) -> bool:
307
  if global_args["main_args"].document_loading_engine == "DOCLING":
308
  if not pm.is_installed("docling"): # type: ignore
309
  pm.install("docling")
310
- from docling.document_converter import DocumentConverter
311
 
312
  converter = DocumentConverter()
313
  result = converter.convert(file_path)
@@ -718,17 +748,29 @@ def create_document_routes(
718
  logger.error(traceback.format_exc())
719
  raise HTTPException(status_code=500, detail=str(e))
720
 
721
- @router.get("/pipeline_status", dependencies=[Depends(optional_api_key)])
722
- async def get_pipeline_status():
723
  """
724
  Get the current status of the document indexing pipeline.
725
 
726
  This endpoint returns information about the current state of the document processing pipeline,
727
- including whether it's busy, the current job name, when it started, how many documents
728
- are being processed, how many batches there are, and which batch is currently being processed.
729
 
730
  Returns:
731
- dict: A dictionary containing the pipeline status information
 
 
 
 
 
 
 
 
 
 
 
 
 
732
  """
733
  try:
734
  from lightrag.kg.shared_storage import get_namespace_data
@@ -746,7 +788,7 @@ def create_document_routes(
746
  if status_dict.get("job_start"):
747
  status_dict["job_start"] = str(status_dict["job_start"])
748
 
749
- return status_dict
750
  except Exception as e:
751
  logger.error(f"Error getting pipeline status: {str(e)}")
752
  logger.error(traceback.format_exc())
 
99
  statuses: Dict[DocStatus, List[DocStatusResponse]] = {}
100
 
101
 
102
+ class PipelineStatusResponse(BaseModel):
103
+ """Response model for pipeline status
104
+
105
+ Attributes:
106
+ autoscanned: Whether auto-scan has started
107
+ busy: Whether the pipeline is currently busy
108
+ job_name: Current job name (e.g., indexing files/indexing texts)
109
+ job_start: Job start time as ISO format string (optional)
110
+ docs: Total number of documents to be indexed
111
+ batchs: Number of batches for processing documents
112
+ cur_batch: Current processing batch
113
+ request_pending: Flag for pending request for processing
114
+ latest_message: Latest message from pipeline processing
115
+ history_messages: List of history messages
116
+ """
117
+ autoscanned: bool = False
118
+ busy: bool = False
119
+ job_name: str = "Default Job"
120
+ job_start: Optional[str] = None
121
+ docs: int = 0
122
+ batchs: int = 0
123
+ cur_batch: int = 0
124
+ request_pending: bool = False
125
+ latest_message: str = ""
126
+ history_messages: Optional[List[str]] = None
127
+
128
+ class Config:
129
+ extra = "allow" # Allow additional fields from the pipeline status
130
+
131
+
132
  class DocumentManager:
133
  def __init__(
134
  self,
 
277
  if global_args["main_args"].document_loading_engine == "DOCLING":
278
  if not pm.is_installed("docling"): # type: ignore
279
  pm.install("docling")
280
+ from docling.document_converter import DocumentConverter # type: ignore
281
 
282
  converter = DocumentConverter()
283
  result = converter.convert(file_path)
 
296
  if global_args["main_args"].document_loading_engine == "DOCLING":
297
  if not pm.is_installed("docling"): # type: ignore
298
  pm.install("docling")
299
+ from docling.document_converter import DocumentConverter # type: ignore
300
 
301
  converter = DocumentConverter()
302
  result = converter.convert(file_path)
 
316
  if global_args["main_args"].document_loading_engine == "DOCLING":
317
  if not pm.is_installed("docling"): # type: ignore
318
  pm.install("docling")
319
+ from docling.document_converter import DocumentConverter # type: ignore
320
 
321
  converter = DocumentConverter()
322
  result = converter.convert(file_path)
 
337
  if global_args["main_args"].document_loading_engine == "DOCLING":
338
  if not pm.is_installed("docling"): # type: ignore
339
  pm.install("docling")
340
+ from docling.document_converter import DocumentConverter # type: ignore
341
 
342
  converter = DocumentConverter()
343
  result = converter.convert(file_path)
 
748
  logger.error(traceback.format_exc())
749
  raise HTTPException(status_code=500, detail=str(e))
750
 
751
+ @router.get("/pipeline_status", dependencies=[Depends(optional_api_key)], response_model=PipelineStatusResponse)
752
+ async def get_pipeline_status() -> PipelineStatusResponse:
753
  """
754
  Get the current status of the document indexing pipeline.
755
 
756
  This endpoint returns information about the current state of the document processing pipeline,
757
+ including the processing status, progress information, and history messages.
 
758
 
759
  Returns:
760
+ PipelineStatusResponse: A response object containing:
761
+ - autoscanned (bool): Whether auto-scan has started
762
+ - busy (bool): Whether the pipeline is currently busy
763
+ - job_name (str): Current job name (e.g., indexing files/indexing texts)
764
+ - job_start (str, optional): Job start time as ISO format string
765
+ - docs (int): Total number of documents to be indexed
766
+ - batchs (int): Number of batches for processing documents
767
+ - cur_batch (int): Current processing batch
768
+ - request_pending (bool): Flag for pending request for processing
769
+ - latest_message (str): Latest message from pipeline processing
770
+ - history_messages (List[str], optional): List of history messages
771
+
772
+ Raises:
773
+ HTTPException: If an error occurs while retrieving pipeline status (500)
774
  """
775
  try:
776
  from lightrag.kg.shared_storage import get_namespace_data
 
788
  if status_dict.get("job_start"):
789
  status_dict["job_start"] = str(status_dict["job_start"])
790
 
791
+ return PipelineStatusResponse(**status_dict)
792
  except Exception as e:
793
  logger.error(f"Error getting pipeline status: {str(e)}")
794
  logger.error(traceback.format_exc())