GraphRag / processingPdf /indexer.py
ValerioBotto's picture
Initial clean commit without secrets
aabd32c
#Questo file è responsabile di due compiti principali: caricare il modello di embedding
#e orchestrare l'indicizzazione completa su Neo4j
import logging
import torch
from typing import List
from langchain_core.documents import Document as LangchainDocument
from sentence_transformers import SentenceTransformer
from processingPdf.extractor import EntityExtractor
from dotenv import load_dotenv
import os
from db.graph_db import GraphDB
logger = logging.getLogger(__name__)
load_dotenv()
#Gestisce il caricamento del modello di emedding e l'indicizzazione dei chunk in Neo4j
class Indexer:
def __init__(self):
try:
device = "cuda" if torch.cuda.is_available() else "cpu"
logger.info(f"Caricamento del modello di embedding sul dispositivo: {device}")
#sentence-transformers gestisce l'ottimizzazione del caricamento
self.embedding_model = SentenceTransformer(os.getenv("EMBEDDING_MODEL_NAME"), device=device)
self.embedding_dimensions = self.embedding_model.get_sentence_embedding_dimension()
logger.info(f"Modello di embedding '{os.getenv('EMBEDDING_MODEL_NAME')}' caricato con {self.embedding_dimensions} dimensioni.")
except Exception as e:
logger.error(f"Errore durante il caricamento del modello di embedding: {e}")
raise e
#Genera l'embedding vettoriale per un dato testo. Aggiungo un cast a List[float] per compatibilità con Neo4j
def generate_embeddings(self, text:str) -> List[float]:
return self.embedding_model.encode(text).tolist()
#Orchestra l'indicizzazione dei chunk in Neo4j, gestendo la creazione del documento, dell'utente, del link e dell'inidice vettoriale
def index_chunks_to_neo4j(self, filename: str, chunks: list, user_id: str, lang: str = "it"):
if not chunks:
logger.warning("Nessun chunk fornito per l'indicizzazione.")
return
graph_db = None
try:
# 1. Inizializzo la connessione a Neo4j
graph_db = GraphDB()
# 2. Creazione/Aggiornamento nodi user e document
graph_db.create_user_node(user_id)
graph_db.create_document_node(filename)
graph_db.link_user_to_document(user_id, filename)
# 3. Creazione indice vettoriale (se non esiste)
graph_db.create_vector_index(
index_name="chunk_embeddings_index",
node_label="Chunk",
property_name="embedding",
vector_dimensions=self.embedding_dimensions
)
# 4. Inserimento chunk ed embedding
for i, chunk in enumerate(chunks):
content = chunk.page_content
metadata = chunk.metadata
# Ho deciso di assicurarmi che esista sempre un chunk_id valido
chunk_id = metadata.get("chunk_id") or f"{filename}_{i}"
# Genero l'embedding per il contenuto corrente
embedding = self.generate_embeddings(content)
# Ho deciso di implementare un controllo di sicurezza bloccante:
# se l'embedding è vuoto o ha dimensioni errate, salto l'inserimento per evitare nodi "sporchi"
if not embedding or len(embedding) != self.embedding_dimensions:
logger.error(f"FALLIMENTO CRITICO: Ho rilevato un embedding non valido per il chunk {chunk_id}. Dimensione: {len(embedding) if embedding else 0}")
continue
# Salvo il chunk, l'embedding e i metadati in Neo4j
graph_db.add_chunk_to_document(filename, chunk_id, content, embedding, metadata)
logger.debug(f"Ho indicizzato con successo il chunk '{chunk_id}' per il file '{filename}'.")
# Estrazione e collegamento delle entità tramite GLiNER
try:
entities = EntityExtractor.extract_ne(content)
for ent in entities:
graph_db.add_entity_to_chunk(ent["text"], ent["label"], chunk_id)
except Exception as ne_e:
# Ho deciso di loggare l'errore delle entità come warning per non bloccare l'intera pipeline
logger.warning(f"Non sono riuscito a estrarre entità per il chunk {chunk_id}: {ne_e}")
logger.info(f"Ho completato l'indicizzazione di {len(chunks)} chunk per il file '{filename}'.")
except Exception as e:
logger.error(f"Ho riscontrato un errore fatale durante l'indicizzazione in Neo4j per '{filename}': {e}")
raise
finally:
if graph_db:
graph_db.close()
# Metodo coordinatore per processare il file fisico
def index_pdf(self, file_path: str, user_id: str):
# Import locali per gestire la pipeline
from processingPdf.extractor import PDFExtractor
from processingPdf.chunker import Chunker
filename = os.path.basename(file_path)
# 1. Estrazione del testo strutturato dal PDF
extractor = PDFExtractor()
sections = extractor.extract_sections(file_path)
# 2. Suddivisione delle sezioni in chunk
chunker = Chunker()
chunks = chunker.create_chunks(sections, filename)
# 3. Indicizzazione finale su Neo4j
self.index_chunks_to_neo4j(filename, chunks, user_id)