earayu commited on
Commit
817c201
·
1 Parent(s): 547e11b

feat: support aget_docs_by_ids

Browse files
Files changed (1) hide show
  1. lightrag/lightrag.py +64 -0
lightrag/lightrag.py CHANGED
@@ -1517,6 +1517,70 @@ class LightRAG:
1517
  """
1518
  return await self.doc_status.get_docs_by_status(status)
1519
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1520
  # TODO: Deprecated (Deleting documents can cause hallucinations in RAG.)
1521
  # Document delete is not working properly for most of the storage implementations.
1522
  async def adelete_by_doc_id(self, doc_id: str) -> None:
 
1517
  """
1518
  return await self.doc_status.get_docs_by_status(status)
1519
 
1520
+ async def aget_docs_by_ids(
1521
+ self, ids: str | list[str]
1522
+ ) -> dict[str, DocProcessingStatus]:
1523
+ """Retrieves the processing status for one or more documents by their IDs.
1524
+
1525
+ Args:
1526
+ ids: A single document ID (string) or a list of document IDs (list of strings).
1527
+
1528
+ Returns:
1529
+ A dictionary where keys are the document IDs for which a status was found,
1530
+ and values are the corresponding DocProcessingStatus objects. IDs that
1531
+ are not found in the storage will be omitted from the result dictionary.
1532
+ """
1533
+ if isinstance(ids, str):
1534
+ # Ensure input is always a list of IDs for uniform processing
1535
+ id_list = [ids]
1536
+ elif (
1537
+ ids is None
1538
+ ): # Handle potential None input gracefully, although type hint suggests str/list
1539
+ logger.warning(
1540
+ "aget_docs_by_ids called with None input, returning empty dict."
1541
+ )
1542
+ return {}
1543
+ else:
1544
+ # Assume input is already a list if not a string
1545
+ id_list = ids
1546
+
1547
+ # Return early if the final list of IDs is empty
1548
+ if not id_list:
1549
+ logger.debug("aget_docs_by_ids called with an empty list of IDs.")
1550
+ return {}
1551
+
1552
+ # Create tasks to fetch document statuses concurrently using the doc_status storage
1553
+ tasks = [self.doc_status.get_by_id(doc_id) for doc_id in id_list]
1554
+ # Execute tasks concurrently and gather the results. Results maintain order.
1555
+ # Type hint indicates results can be DocProcessingStatus or None if not found.
1556
+ results_list: list[Optional[DocProcessingStatus]] = await asyncio.gather(*tasks)
1557
+
1558
+ # Build the result dictionary, mapping found IDs to their statuses
1559
+ found_statuses: dict[str, DocProcessingStatus] = {}
1560
+ # Keep track of IDs for which no status was found (for logging purposes)
1561
+ not_found_ids: list[str] = []
1562
+
1563
+ # Iterate through the results, correlating them back to the original IDs
1564
+ for i, status_obj in enumerate(results_list):
1565
+ doc_id = id_list[
1566
+ i
1567
+ ] # Get the original ID corresponding to this result index
1568
+ if status_obj:
1569
+ # If a status object was returned (not None), add it to the result dict
1570
+ found_statuses[doc_id] = status_obj
1571
+ else:
1572
+ # If status_obj is None, the document ID was not found in storage
1573
+ not_found_ids.append(doc_id)
1574
+
1575
+ # Log a warning if any of the requested document IDs were not found
1576
+ if not_found_ids:
1577
+ logger.warning(
1578
+ f"Document statuses not found for the following IDs: {not_found_ids}"
1579
+ )
1580
+
1581
+ # Return the dictionary containing statuses only for the found document IDs
1582
+ return found_statuses
1583
+
1584
  # TODO: Deprecated (Deleting documents can cause hallucinations in RAG.)
1585
  # Document delete is not working properly for most of the storage implementations.
1586
  async def adelete_by_doc_id(self, doc_id: str) -> None: