YanSte commited on
Commit
26c0ff1
·
1 Parent(s): 37638bf

cleaned insert by using pipe

Browse files
Files changed (1) hide show
  1. lightrag/lightrag.py +106 -232
lightrag/lightrag.py CHANGED
@@ -4,17 +4,12 @@ from tqdm.asyncio import tqdm as tqdm_async
4
  from dataclasses import asdict, dataclass, field
5
  from datetime import datetime
6
  from functools import partial
7
- from typing import Type, cast, Dict
8
-
9
  from .operate import (
10
  chunking_by_token_size,
11
- extract_entities,
12
- # local_query,global_query,hybrid_query,
13
- kg_query,
14
- naive_query,
15
- mix_kg_vector_query,
16
- extract_keywords_only,
17
- kg_query_with_keywords,
18
  )
19
 
20
  from .utils import (
@@ -30,8 +25,6 @@ from .base import (
30
  BaseGraphStorage,
31
  BaseKVStorage,
32
  BaseVectorStorage,
33
- StorageNameSpace,
34
- QueryParam,
35
  DocStatus,
36
  )
37
 
@@ -176,7 +169,7 @@ class LightRAG:
176
  enable_llm_cache_for_entity_extract: bool = True
177
 
178
  # extension
179
- addon_params: dict = field(default_factory=dict)
180
  convert_response_to_json_func: callable = convert_response_to_json
181
 
182
  # Add new field for document status storage type
@@ -251,7 +244,7 @@ class LightRAG:
251
  ),
252
  embedding_func=self.embedding_func,
253
  )
254
- self.text_chunks: BaseVectorStorage = self.key_string_value_json_storage_cls(
255
  namespace=make_namespace(
256
  self.namespace_prefix, NameSpace.KV_STORE_TEXT_CHUNKS
257
  ),
@@ -281,7 +274,7 @@ class LightRAG:
281
  embedding_func=self.embedding_func,
282
  meta_fields={"src_id", "tgt_id"},
283
  )
284
- self.chunks_vdb = self.vector_db_storage_cls(
285
  namespace=make_namespace(
286
  self.namespace_prefix, NameSpace.VECTOR_STORE_CHUNKS
287
  ),
@@ -310,7 +303,7 @@ class LightRAG:
310
 
311
  # Initialize document status storage
312
  self.doc_status_storage_cls = self._get_storage_class(self.doc_status_storage)
313
- self.doc_status = self.doc_status_storage_cls(
314
  namespace=make_namespace(self.namespace_prefix, NameSpace.DOC_STATUS),
315
  global_config=global_config,
316
  embedding_func=None,
@@ -359,7 +352,9 @@ class LightRAG:
359
  )
360
 
361
  async def ainsert(
362
- self, string_or_strings, split_by_character=None, split_by_character_only=False
 
 
363
  ):
364
  """Insert documents with checkpoint support
365
 
@@ -370,154 +365,10 @@ class LightRAG:
370
  split_by_character_only: if split_by_character_only is True, split the string by character only, when
371
  split_by_character is None, this parameter is ignored.
372
  """
373
- if isinstance(string_or_strings, str):
374
- string_or_strings = [string_or_strings]
375
-
376
- # 1. Remove duplicate contents from the list
377
- unique_contents = list(set(doc.strip() for doc in string_or_strings))
378
-
379
- # 2. Generate document IDs and initial status
380
- new_docs = {
381
- compute_mdhash_id(content, prefix="doc-"): {
382
- "content": content,
383
- "content_summary": self._get_content_summary(content),
384
- "content_length": len(content),
385
- "status": DocStatus.PENDING,
386
- "created_at": datetime.now().isoformat(),
387
- "updated_at": datetime.now().isoformat(),
388
- }
389
- for content in unique_contents
390
- }
391
 
392
- # 3. Filter out already processed documents
393
- # _add_doc_keys = await self.doc_status.filter_keys(list(new_docs.keys()))
394
- _add_doc_keys = set()
395
- for doc_id in new_docs.keys():
396
- current_doc = await self.doc_status.get_by_id(doc_id)
397
-
398
- if current_doc is None:
399
- _add_doc_keys.add(doc_id)
400
- continue # skip to the next doc_id
401
-
402
- status = None
403
- if isinstance(current_doc, dict):
404
- status = current_doc["status"]
405
- else:
406
- status = current_doc.status
407
-
408
- if status == DocStatus.FAILED:
409
- _add_doc_keys.add(doc_id)
410
-
411
- new_docs = {k: v for k, v in new_docs.items() if k in _add_doc_keys}
412
-
413
- if not new_docs:
414
- logger.info("All documents have been processed or are duplicates")
415
- return
416
-
417
- logger.info(f"Processing {len(new_docs)} new unique documents")
418
-
419
- # Process documents in batches
420
- batch_size = self.addon_params.get("insert_batch_size", 10)
421
- for i in range(0, len(new_docs), batch_size):
422
- batch_docs = dict(list(new_docs.items())[i : i + batch_size])
423
-
424
- for doc_id, doc in tqdm_async(
425
- batch_docs.items(), desc=f"Processing batch {i // batch_size + 1}"
426
- ):
427
- try:
428
- # Update status to processing
429
- doc_status = {
430
- "content_summary": doc["content_summary"],
431
- "content_length": doc["content_length"],
432
- "status": DocStatus.PROCESSING,
433
- "created_at": doc["created_at"],
434
- "updated_at": datetime.now().isoformat(),
435
- }
436
- await self.doc_status.upsert({doc_id: doc_status})
437
-
438
- # Generate chunks from document
439
- chunks = {
440
- compute_mdhash_id(dp["content"], prefix="chunk-"): {
441
- **dp,
442
- "full_doc_id": doc_id,
443
- }
444
- for dp in self.chunking_func(
445
- doc["content"],
446
- split_by_character=split_by_character,
447
- split_by_character_only=split_by_character_only,
448
- overlap_token_size=self.chunk_overlap_token_size,
449
- max_token_size=self.chunk_token_size,
450
- tiktoken_model=self.tiktoken_model_name,
451
- **self.chunking_func_kwargs,
452
- )
453
- }
454
-
455
- # Update status with chunks information
456
- doc_status.update(
457
- {
458
- "chunks_count": len(chunks),
459
- "updated_at": datetime.now().isoformat(),
460
- }
461
- )
462
- await self.doc_status.upsert({doc_id: doc_status})
463
-
464
- try:
465
- # Store chunks in vector database
466
- await self.chunks_vdb.upsert(chunks)
467
-
468
- # Extract and store entities and relationships
469
- maybe_new_kg = await extract_entities(
470
- chunks,
471
- knowledge_graph_inst=self.chunk_entity_relation_graph,
472
- entity_vdb=self.entities_vdb,
473
- relationships_vdb=self.relationships_vdb,
474
- llm_response_cache=self.llm_response_cache,
475
- global_config=asdict(self),
476
- )
477
-
478
- if maybe_new_kg is None:
479
- raise Exception(
480
- "Failed to extract entities and relationships"
481
- )
482
-
483
- self.chunk_entity_relation_graph = maybe_new_kg
484
-
485
- # Store original document and chunks
486
- await self.full_docs.upsert(
487
- {doc_id: {"content": doc["content"]}}
488
- )
489
- await self.text_chunks.upsert(chunks)
490
-
491
- # Update status to processed
492
- doc_status.update(
493
- {
494
- "status": DocStatus.PROCESSED,
495
- "updated_at": datetime.now().isoformat(),
496
- }
497
- )
498
- await self.doc_status.upsert({doc_id: doc_status})
499
-
500
- except Exception as e:
501
- # Mark as failed if any step fails
502
- doc_status.update(
503
- {
504
- "status": DocStatus.FAILED,
505
- "error": str(e),
506
- "updated_at": datetime.now().isoformat(),
507
- }
508
- )
509
- await self.doc_status.upsert({doc_id: doc_status})
510
- raise e
511
-
512
- except Exception as e:
513
- import traceback
514
-
515
- error_msg = f"Failed to process document {doc_id}: {str(e)}\n{traceback.format_exc()}"
516
- logger.error(error_msg)
517
- continue
518
- else:
519
- # Only update index when processing succeeds
520
- await self._insert_done()
521
 
522
  def insert_custom_chunks(self, full_text: str, text_chunks: list[str]):
523
  loop = always_get_an_event_loop()
@@ -597,34 +448,32 @@ class LightRAG:
597
  # 1. Remove duplicate contents from the list
598
  unique_contents = list(set(doc.strip() for doc in string_or_strings))
599
 
600
- logger.info(
601
- f"Received {len(string_or_strings)} docs, contains {len(unique_contents)} new unique documents"
602
- )
603
-
604
  # 2. Generate document IDs and initial status
605
- new_docs = {
606
  compute_mdhash_id(content, prefix="doc-"): {
607
  "content": content,
608
  "content_summary": self._get_content_summary(content),
609
  "content_length": len(content),
610
  "status": DocStatus.PENDING,
611
  "created_at": datetime.now().isoformat(),
612
- "updated_at": None,
613
  }
614
  for content in unique_contents
615
  }
616
 
617
  # 3. Filter out already processed documents
618
- _not_stored_doc_keys = await self.full_docs.filter_keys(list(new_docs.keys()))
619
- if len(_not_stored_doc_keys) < len(new_docs):
620
- logger.info(
621
- f"Skipping {len(new_docs) - len(_not_stored_doc_keys)} already existing documents"
622
- )
623
- new_docs = {k: v for k, v in new_docs.items() if k in _not_stored_doc_keys}
 
 
624
 
625
  if not new_docs:
626
  logger.info("All documents have been processed or are duplicates")
627
- return None
628
 
629
  # 4. Store original document
630
  for doc_id, doc in new_docs.items():
@@ -633,96 +482,121 @@ class LightRAG:
633
  )
634
  logger.info(f"Stored {len(new_docs)} new unique documents")
635
 
636
- async def apipeline_process_chunks(self):
 
 
 
 
637
  """Get pendding documents, split into chunks,insert chunks"""
638
  # 1. get all pending and failed documents
639
- _todo_doc_keys = []
640
 
641
- _failed_doc = await self.full_docs.get_by_status_and_ids(
 
642
  status=DocStatus.FAILED
643
  )
644
- _pendding_doc = await self.full_docs.get_by_status_and_ids(
 
 
 
 
645
  status=DocStatus.PENDING
646
  )
 
 
647
 
648
- if _failed_doc:
649
- _todo_doc_keys.extend([doc["id"] for doc in _failed_doc])
650
- if _pendding_doc:
651
- _todo_doc_keys.extend([doc["id"] for doc in _pendding_doc])
652
- if not _todo_doc_keys:
653
  logger.info("All documents have been processed or are duplicates")
654
- return None
655
- else:
656
- logger.info(f"Filtered out {len(_todo_doc_keys)} not processed documents")
657
 
658
- new_docs = {
659
- doc["id"]: doc for doc in await self.full_docs.get_by_ids(_todo_doc_keys)
660
- }
 
661
 
 
 
 
 
662
  # 2. split docs into chunks, insert chunks, update doc status
663
- chunk_cnt = 0
664
  batch_size = self.addon_params.get("insert_batch_size", 10)
665
  for i in range(0, len(new_docs), batch_size):
666
  batch_docs = dict(list(new_docs.items())[i : i + batch_size])
 
667
  for doc_id, doc in tqdm_async(
668
- batch_docs.items(),
669
- desc=f"Level 1 - Spliting doc in batch {i // batch_size + 1}",
670
  ):
 
 
 
 
 
 
 
671
  try:
 
 
672
  # Generate chunks from document
673
- chunks = {
674
  compute_mdhash_id(dp["content"], prefix="chunk-"): {
675
  **dp,
676
  "full_doc_id": doc_id,
677
- "status": DocStatus.PROCESSED,
678
  }
679
- for dp in chunking_by_token_size(
680
  doc["content"],
 
 
681
  overlap_token_size=self.chunk_overlap_token_size,
682
  max_token_size=self.chunk_token_size,
683
  tiktoken_model=self.tiktoken_model_name,
 
684
  )
685
  }
686
- chunk_cnt += len(chunks)
687
-
688
- try:
689
- # Store chunks in vector database
690
- await self.chunks_vdb.upsert(chunks)
691
- # Update doc status
692
- await self.text_chunks.upsert(
693
- {**chunks, "status": DocStatus.PENDING}
694
- )
695
- except Exception as e:
696
- # Mark as failed if any step fails
697
- await self.text_chunks.upsert(
698
- {**chunks, "status": DocStatus.FAILED}
699
- )
700
- raise e
701
- except Exception as e:
702
- import traceback
703
 
704
- error_msg = f"Failed to process document {doc_id}: {str(e)}\n{traceback.format_exc()}"
705
- logger.error(error_msg)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
706
  continue
707
- logger.info(f"Stored {chunk_cnt} chunks from {len(new_docs)} documents")
708
 
709
  async def apipeline_process_extract_graph(self):
710
  """Get pendding or failed chunks, extract entities and relationships from each chunk"""
711
  # 1. get all pending and failed chunks
712
- _todo_chunk_keys = []
713
- _failed_chunks = await self.text_chunks.get_by_status_and_ids(
 
 
714
  status=DocStatus.FAILED
715
  )
716
- _pendding_chunks = await self.text_chunks.get_by_status_and_ids(
 
 
 
 
717
  status=DocStatus.PENDING
718
  )
719
- if _failed_chunks:
720
- _todo_chunk_keys.extend([doc["id"] for doc in _failed_chunks])
721
- if _pendding_chunks:
722
- _todo_chunk_keys.extend([doc["id"] for doc in _pendding_chunks])
723
- if not _todo_chunk_keys:
724
- logger.info("All chunks have been processed or are duplicates")
725
- return None
726
 
727
  # Process documents in batches
728
  batch_size = self.addon_params.get("insert_batch_size", 10)
@@ -731,9 +605,9 @@ class LightRAG:
731
  batch_size
732
  ) # Control the number of tasks that are processed simultaneously
733
 
734
- async def process_chunk(chunk_id):
735
  async with semaphore:
736
- chunks = {
737
  i["id"]: i for i in await self.text_chunks.get_by_ids([chunk_id])
738
  }
739
  # Extract and store entities and relationships
@@ -761,13 +635,13 @@ class LightRAG:
761
  raise e
762
 
763
  with tqdm_async(
764
- total=len(_todo_chunk_keys),
765
  desc="\nLevel 1 - Processing chunks",
766
  unit="chunk",
767
  position=0,
768
  ) as progress:
769
- tasks = []
770
- for chunk_id in _todo_chunk_keys:
771
  task = asyncio.create_task(process_chunk(chunk_id))
772
  tasks.append(task)
773
 
 
4
  from dataclasses import asdict, dataclass, field
5
  from datetime import datetime
6
  from functools import partial
7
+ from typing import Any, Type, Union
8
+ import traceback
9
  from .operate import (
10
  chunking_by_token_size,
11
+ extract_entities
12
+ # local_query,global_query,hybrid_query,,
 
 
 
 
 
13
  )
14
 
15
  from .utils import (
 
25
  BaseGraphStorage,
26
  BaseKVStorage,
27
  BaseVectorStorage,
 
 
28
  DocStatus,
29
  )
30
 
 
169
  enable_llm_cache_for_entity_extract: bool = True
170
 
171
  # extension
172
+ addon_params: dict[str, Any] = field(default_factory=dict)
173
  convert_response_to_json_func: callable = convert_response_to_json
174
 
175
  # Add new field for document status storage type
 
244
  ),
245
  embedding_func=self.embedding_func,
246
  )
247
+ self.text_chunks: BaseKVStorage = self.key_string_value_json_storage_cls(
248
  namespace=make_namespace(
249
  self.namespace_prefix, NameSpace.KV_STORE_TEXT_CHUNKS
250
  ),
 
274
  embedding_func=self.embedding_func,
275
  meta_fields={"src_id", "tgt_id"},
276
  )
277
+ self.chunks_vdb: BaseVectorStorage = self.vector_db_storage_cls(
278
  namespace=make_namespace(
279
  self.namespace_prefix, NameSpace.VECTOR_STORE_CHUNKS
280
  ),
 
303
 
304
  # Initialize document status storage
305
  self.doc_status_storage_cls = self._get_storage_class(self.doc_status_storage)
306
+ self.doc_status: BaseKVStorage = self.doc_status_storage_cls(
307
  namespace=make_namespace(self.namespace_prefix, NameSpace.DOC_STATUS),
308
  global_config=global_config,
309
  embedding_func=None,
 
352
  )
353
 
354
  async def ainsert(
355
+ self, string_or_strings: Union[str, list[str]],
356
+ split_by_character: str | None = None,
357
+ split_by_character_only: bool = False
358
  ):
359
  """Insert documents with checkpoint support
360
 
 
365
  split_by_character_only: if split_by_character_only is True, split the string by character only, when
366
  split_by_character is None, this parameter is ignored.
367
  """
368
+ await self.apipeline_process_documents(string_or_strings)
369
+ await self.apipeline_process_chunks(split_by_character, split_by_character_only)
370
+ await self.apipeline_process_extract_graph()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
371
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
372
 
373
  def insert_custom_chunks(self, full_text: str, text_chunks: list[str]):
374
  loop = always_get_an_event_loop()
 
448
  # 1. Remove duplicate contents from the list
449
  unique_contents = list(set(doc.strip() for doc in string_or_strings))
450
 
 
 
 
 
451
  # 2. Generate document IDs and initial status
452
+ new_docs: dict[str, Any] = {
453
  compute_mdhash_id(content, prefix="doc-"): {
454
  "content": content,
455
  "content_summary": self._get_content_summary(content),
456
  "content_length": len(content),
457
  "status": DocStatus.PENDING,
458
  "created_at": datetime.now().isoformat(),
459
+ "updated_at": datetime.now().isoformat(),
460
  }
461
  for content in unique_contents
462
  }
463
 
464
  # 3. Filter out already processed documents
465
+ _add_doc_keys: set[str] = set()
466
+ for doc_id in new_docs.keys():
467
+ current_doc = await self.doc_status.get_by_id(doc_id)
468
+
469
+ if not current_doc or current_doc["status"] == DocStatus.FAILED:
470
+ _add_doc_keys.add(doc_id)
471
+
472
+ new_docs = {k: v for k, v in new_docs.items() if k in _add_doc_keys}
473
 
474
  if not new_docs:
475
  logger.info("All documents have been processed or are duplicates")
476
+ return
477
 
478
  # 4. Store original document
479
  for doc_id, doc in new_docs.items():
 
482
  )
483
  logger.info(f"Stored {len(new_docs)} new unique documents")
484
 
485
+ async def apipeline_process_chunks(
486
+ self,
487
+ split_by_character: str | None = None,
488
+ split_by_character_only: bool = False
489
+ ) -> None:
490
  """Get pendding documents, split into chunks,insert chunks"""
491
  # 1. get all pending and failed documents
492
+ to_process_doc_keys: list[str] = []
493
 
494
+ # Process failes
495
+ to_process_docs = await self.full_docs.get_by_status(
496
  status=DocStatus.FAILED
497
  )
498
+ if to_process_docs:
499
+ to_process_doc_keys.extend([doc["id"] for doc in to_process_docs])
500
+
501
+ # Process Pending
502
+ to_process_docs = await self.full_docs.get_by_status(
503
  status=DocStatus.PENDING
504
  )
505
+ if to_process_docs:
506
+ to_process_doc_keys.extend([doc["id"] for doc in to_process_docs])
507
 
508
+ if not to_process_doc_keys:
 
 
 
 
509
  logger.info("All documents have been processed or are duplicates")
510
+ return
 
 
511
 
512
+ full_docs_ids = await self.full_docs.get_by_ids(to_process_doc_keys)
513
+ new_docs = {}
514
+ if full_docs_ids:
515
+ new_docs = {doc["id"]: doc for doc in full_docs_ids or []}
516
 
517
+ if not new_docs:
518
+ logger.info("All documents have been processed or are duplicates")
519
+ return
520
+
521
  # 2. split docs into chunks, insert chunks, update doc status
 
522
  batch_size = self.addon_params.get("insert_batch_size", 10)
523
  for i in range(0, len(new_docs), batch_size):
524
  batch_docs = dict(list(new_docs.items())[i : i + batch_size])
525
+
526
  for doc_id, doc in tqdm_async(
527
+ batch_docs.items(), desc=f"Processing batch {i // batch_size + 1}"
 
528
  ):
529
+ doc_status: dict[str, Any] = {
530
+ "content_summary": doc["content_summary"],
531
+ "content_length": doc["content_length"],
532
+ "status": DocStatus.PROCESSING,
533
+ "created_at": doc["created_at"],
534
+ "updated_at": datetime.now().isoformat(),
535
+ }
536
  try:
537
+ await self.doc_status.upsert({doc_id: doc_status})
538
+
539
  # Generate chunks from document
540
+ chunks: dict[str, Any] = {
541
  compute_mdhash_id(dp["content"], prefix="chunk-"): {
542
  **dp,
543
  "full_doc_id": doc_id,
 
544
  }
545
+ for dp in self.chunking_func(
546
  doc["content"],
547
+ split_by_character=split_by_character,
548
+ split_by_character_only=split_by_character_only,
549
  overlap_token_size=self.chunk_overlap_token_size,
550
  max_token_size=self.chunk_token_size,
551
  tiktoken_model=self.tiktoken_model_name,
552
+ **self.chunking_func_kwargs,
553
  )
554
  }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
555
 
556
+ # Update status with chunks information
557
+ doc_status.update(
558
+ {
559
+ "chunks_count": len(chunks),
560
+ "updated_at": datetime.now().isoformat(),
561
+ }
562
+ )
563
+ await self.doc_status.upsert({doc_id: doc_status})
564
+ await self.chunks_vdb.upsert(chunks)
565
+
566
+ except Exception as e:
567
+ doc_status.update(
568
+ {
569
+ "status": DocStatus.FAILED,
570
+ "error": str(e),
571
+ "updated_at": datetime.now().isoformat(),
572
+ }
573
+ )
574
+ await self.doc_status.upsert({doc_id: doc_status})
575
+ logger.error(f"Failed to process document {doc_id}: {str(e)}\n{traceback.format_exc()}")
576
  continue
 
577
 
578
  async def apipeline_process_extract_graph(self):
579
  """Get pendding or failed chunks, extract entities and relationships from each chunk"""
580
  # 1. get all pending and failed chunks
581
+ to_process_doc_keys: list[str] = []
582
+
583
+ # Process failes
584
+ to_process_docs = await self.full_docs.get_by_status(
585
  status=DocStatus.FAILED
586
  )
587
+ if to_process_docs:
588
+ to_process_doc_keys.extend([doc["id"] for doc in to_process_docs])
589
+
590
+ # Process Pending
591
+ to_process_docs = await self.full_docs.get_by_status(
592
  status=DocStatus.PENDING
593
  )
594
+ if to_process_docs:
595
+ to_process_doc_keys.extend([doc["id"] for doc in to_process_docs])
596
+
597
+ if not to_process_doc_keys:
598
+ logger.info("All documents have been processed or are duplicates")
599
+ return
 
600
 
601
  # Process documents in batches
602
  batch_size = self.addon_params.get("insert_batch_size", 10)
 
605
  batch_size
606
  ) # Control the number of tasks that are processed simultaneously
607
 
608
+ async def process_chunk(chunk_id: str):
609
  async with semaphore:
610
+ chunks:dict[str, Any] = {
611
  i["id"]: i for i in await self.text_chunks.get_by_ids([chunk_id])
612
  }
613
  # Extract and store entities and relationships
 
635
  raise e
636
 
637
  with tqdm_async(
638
+ total=len(to_process_doc_keys),
639
  desc="\nLevel 1 - Processing chunks",
640
  unit="chunk",
641
  position=0,
642
  ) as progress:
643
+ tasks: list[asyncio.Task[None]] = []
644
+ for chunk_id in to_process_doc_keys:
645
  task = asyncio.create_task(process_chunk(chunk_id))
646
  tasks.append(task)
647