|
import os |
|
import json |
|
import traceback |
|
import re |
|
import time |
|
import random |
|
from pathlib import Path |
|
import tiktoken |
|
import numpy as np |
|
from PIL import Image |
|
import io |
|
|
|
|
|
import jax.numpy as jnp |
|
|
|
|
|
os.environ["XLA_PYTHON_CLIENT_MEM_FRACTION"]="1.00" |
|
from gemma import gm |
|
|
|
|
|
from sentence_transformers import SentenceTransformer |
|
|
|
|
|
|
|
|
|
GEMMA_MULTIMODAL_MODEL = "gemma-3.4b-it" |
|
|
|
|
|
|
|
SENTENCE_TRANSFORMER_MODEL = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2" |
|
|
|
EMBEDDING_DIMENSION = 384 |
|
|
|
|
|
MAX_TOKENS_NORMAL = 500 |
|
ENCODING_NAME = "cl100k_base" |
|
|
|
|
|
BASE_DIR = Path("/content/") |
|
PDF_DIRECTORY = BASE_DIR / "docs" |
|
OUTPUT_DIR = BASE_DIR / "output" |
|
EMBEDDINGS_FILE_PATH = OUTPUT_DIR / "embeddings_statistiques_multimodal_gemma_st.json" |
|
|
|
|
|
IMAGE_SAVE_SUBDIR = "extracted_graphs" |
|
TABLE_SAVE_SUBDIR = "extracted_tables" |
|
IMAGE_SAVE_DIR = OUTPUT_DIR / IMAGE_SAVE_SUBDIR |
|
TABLE_SAVE_DIR = OUTPUT_DIR / TABLE_SAVE_SUBDIR |
|
|
|
|
|
|
|
gemma_sampler = None |
|
text_embedding_model = None |
|
|
|
def initialize_models(): |
|
"""Initializes Gemma and Sentence-Transformers models.""" |
|
global gemma_sampler, text_embedding_model |
|
|
|
print("✓ Initializing Gemma Multimodal Model...") |
|
try: |
|
model = gm.nn.Gemma3_4B() |
|
|
|
params = gm.ckpts.load_params(gm.ckpts.CheckpointPath.GEMMA3_4B_IT) |
|
gemma_sampler = gm.text.ChatSampler(model=model, params=params) |
|
print(f"✓ Gemma Multimodal Model '{GEMMA_MULTIMODAL_MODEL}' loaded successfully.") |
|
except Exception as e: |
|
print(f"❌ ERREUR: Échec du chargement du modèle multimodal Gemma : {str(e)}") |
|
print("⚠️ La génération de descriptions multimodales échouera.") |
|
gemma_sampler = None |
|
|
|
print(f"✓ Initializing Sentence-Transformers Model '{SENTENCE_TRANSFORMER_MODEL}'...") |
|
try: |
|
text_embedding_model = SentenceTransformer(SENTENCE_TRANSFORMER_MODEL) |
|
print(f"✓ Modèle d'embedding textuel Sentence-Transformers '{SENTENCE_TRANSFORMER_MODEL}' chargé avec succès.") |
|
except Exception as e: |
|
print(f"❌ ERREUR: Échec du chargement du modèle d'embedding textuel Sentence-Transformers : {str(e)}") |
|
print("⚠️ La génération d'embeddings textuels échouera.") |
|
text_embedding_model = None |
|
|
|
|
|
def clean_text(text): |
|
"""Normalize whitespace and clean text while preserving paragraph breaks""" |
|
if not text: |
|
return "" |
|
text = text.replace('\t', ' ') |
|
text = re.sub(r' +', ' ', text) |
|
text = re.sub(r'\n{3,}', '\n\n', text) |
|
return text.strip() |
|
|
|
|
|
import fitz |
|
import camelot |
|
import pandas as pd |
|
from bs4 import BeautifulSoup |
|
|
|
IMAGE_MIN_WIDTH = 100 |
|
IMAGE_MIN_HEIGHT = 100 |
|
|
|
def extract_page_data_pymupdf(pdf_path): |
|
"""Extract text, tables and save images from each page using PyMuPDF and Camelot.""" |
|
page_data_list = [] |
|
try: |
|
doc = fitz.open(pdf_path) |
|
metadata = doc.metadata or {} |
|
pdf_data = { |
|
'pdf_title': metadata.get('title', pdf_path.name), |
|
'pdf_subject': metadata.get('subject', 'Statistiques'), |
|
'pdf_keywords': metadata.get('keywords', '') |
|
} |
|
|
|
for page_num in range(len(doc)): |
|
page = doc.load_page(page_num) |
|
page_index = page_num + 1 |
|
|
|
print(f" Extraction des données de la page {page_index}...") |
|
|
|
|
|
table_data = extract_tables_and_images_from_page(pdf_path, page, page_index) |
|
|
|
|
|
table_regions = [] |
|
for item in table_data: |
|
if 'rect' in item and item['rect'] and len(item['rect']) == 4: |
|
table_regions.append(fitz.Rect(item['rect'])) |
|
else: |
|
print(f" Warning: Invalid rect for table on page {page_index}") |
|
|
|
|
|
page_text = "" |
|
if table_regions: |
|
blocks = page.get_text("blocks") |
|
for block in blocks: |
|
block_rect = fitz.Rect(block[:4]) |
|
is_in_table = False |
|
for table_rect in table_regions: |
|
if block_rect.intersects(table_rect): |
|
is_in_table = True |
|
break |
|
if not is_in_table: |
|
page_text += block[4] + "\n" |
|
else: |
|
page_text = page.get_text("text") |
|
|
|
page_text = clean_text(page_text) |
|
|
|
|
|
image_data = extract_images_from_page(pdf_path, page, page_index, excluded_rects=table_regions) |
|
|
|
page_data_list.append({ |
|
'pdf_file': pdf_path.name, |
|
'page_number': page_index, |
|
'text': page_text, |
|
'images': image_data, |
|
'tables': [item for item in table_data if item['content_type'] == 'table'], |
|
'pdf_title': pdf_data.get('pdf_title'), |
|
'pdf_subject': pdf_data.get('pdf_subject'), |
|
'pdf_keywords': pdf_data.get('pdf_keywords') |
|
}) |
|
doc.close() |
|
except Exception as e: |
|
print(f"Erreur lors du traitement du PDF {pdf_path.name} avec PyMuPDF : {str(e)}") |
|
traceback.print_exc() |
|
return page_data_list |
|
|
|
|
|
def extract_tables_and_images_from_page(pdf_path, page, page_num): |
|
"""Extract tables using Camelot and capture images of table areas.""" |
|
table_and_image_data = [] |
|
try: |
|
tables = camelot.read_pdf( |
|
str(pdf_path), |
|
pages=str(page_num), |
|
flavor='lattice', |
|
) |
|
|
|
if len(tables) == 0: |
|
tables = camelot.read_pdf( |
|
str(pdf_path), |
|
pages=str(page_num), |
|
flavor='stream' |
|
) |
|
|
|
for i, table in enumerate(tables): |
|
if table.accuracy < 70: |
|
print(f" Skipping low accuracy table ({table.accuracy:.2f}%) on page {page_num}") |
|
continue |
|
|
|
table_bbox = table.parsing_report.get('page_bbox', [0, 0, 0, 0]) |
|
if not table_bbox or len(table_bbox) != 4: |
|
print(f" Warning: Invalid bounding box for table {i} on page {page_num}. Skipping image capture.") |
|
table_rect = None |
|
else: |
|
table_rect = fitz.Rect(table_bbox) |
|
|
|
safe_pdf_name = "".join(c if c.isalnum() else "_" for c in pdf_path.stem) |
|
table_html_filename = f"{safe_pdf_name}_p{page_num}_table{i}.html" |
|
table_html_save_path = TABLE_SAVE_DIR / table_html_filename |
|
relative_html_url_path = f"/static/{TABLE_SAVE_SUBDIR}/{table_html_filename}" |
|
|
|
table_image_filename = f"{safe_pdf_name}_p{page_num}_table{i}.png" |
|
table_image_save_path = IMAGE_SAVE_DIR / table_image_filename |
|
relative_image_url_path = f"/static/{IMAGE_SAVE_SUBDIR}/{table_image_filename}" |
|
|
|
|
|
df = table.df |
|
html = f"<caption>Table extrait de {pdf_path.name}, page {page_num}</caption>\n" + df.to_html(index=False) |
|
soup = BeautifulSoup(html, 'html.parser') |
|
table_tag = soup.find('table') |
|
if table_tag: |
|
table_tag['class'] = 'table table-bordered table-striped' |
|
table_tag['style'] = 'width:100%; border-collapse:collapse;' |
|
|
|
style_tag = soup.new_tag('style') |
|
style_tag.string = """ |
|
.table { border-collapse: collapse; width: 100%; margin-bottom: 1rem;} |
|
.table caption { caption-side: top; padding: 0.5rem; text-align: left; font-weight: bold; } |
|
.table th, .table td { border: 1px solid #ddd; padding: 8px; text-align: left; } |
|
.table th { background-color: #f2f2f2; font-weight: bold; } |
|
.table-striped tbody tr:nth-of-type(odd) { background-color: rgba(0,0,0,.05); } |
|
.table-responsive { overflow-x: auto; margin-bottom: 1rem; } |
|
""" |
|
soup.insert(0, style_tag) |
|
|
|
div = soup.new_tag('div') |
|
div['class'] = 'table-responsive' |
|
table_tag.wrap(div) |
|
|
|
with open(table_html_save_path, 'w', encoding='utf-8') as f: |
|
f.write(str(soup)) |
|
else: |
|
print(f" Warning: Could not find table tag in HTML for table on page {page_num}. Skipping HTML save.") |
|
continue |
|
|
|
table_image_bytes = None |
|
if table_rect: |
|
try: |
|
pix = page.get_pixmap(clip=table_rect) |
|
table_image_bytes = pix.tobytes(format='png') |
|
|
|
with open(table_image_save_path, "wb") as img_file: |
|
img_file.write(table_image_bytes) |
|
|
|
except Exception as img_capture_e: |
|
print(f" Erreur lors de la capture d'image du tableau {i} page {page_num} : {img_capture_e}") |
|
traceback.print_exc() |
|
table_image_bytes = None |
|
|
|
table_and_image_data.append({ |
|
'content_type': 'table', |
|
'table_html_url': relative_html_url_path, |
|
'table_text_representation': df.to_string(index=False), |
|
'rect': [table_rect.x0, table_rect.y0, table_rect.x1, table_rect.y1] if table_rect else None, |
|
'accuracy': table.accuracy, |
|
'image_bytes': table_image_bytes, |
|
'image_url': relative_image_url_path if table_image_bytes else None |
|
}) |
|
|
|
return table_and_image_data |
|
|
|
except Exception as e: |
|
print(f" Erreur lors de l'extraction des tableaux de la page {page_num} : {str(e)}") |
|
traceback.print_exc() |
|
return [] |
|
|
|
|
|
def extract_images_from_page(pdf_path, page, page_num, excluded_rects=[]): |
|
"""Extract and save images from a page, excluding specified regions (like tables).""" |
|
image_data = [] |
|
image_list = page.get_images(full=True) |
|
|
|
for img_index, img_info in enumerate(image_list): |
|
xref = img_info[0] |
|
try: |
|
base_image = page.parent.extract_image(xref) |
|
image_bytes = base_image["image"] |
|
image_ext = base_image["ext"] |
|
width = base_image["width"] |
|
height = base_image["height"] |
|
|
|
if width < IMAGE_MIN_WIDTH or height < IMAGE_MIN_HEIGHT: |
|
continue |
|
|
|
img_rect = None |
|
img_rects = page.get_image_rects(xref) |
|
if img_rects: |
|
img_rect = img_rects[0] |
|
|
|
if img_rect is None: |
|
print(f" Warning: Could not find rectangle for image {img_index} on page {page_num}. Skipping.") |
|
continue |
|
|
|
is_excluded = False |
|
for excluded_rect in excluded_rects: |
|
if img_rect.intersects(excluded_rect): |
|
is_excluded = True |
|
break |
|
if is_excluded: |
|
print(f" Image {img_index} on page {page_num} is within an excluded region (e.g., table). Skipping.") |
|
continue |
|
|
|
safe_pdf_name = "".join(c if c.isalnum() else "_" for c in pdf_path.stem) |
|
image_filename = f"{safe_pdf_name}_p{page_num}_img{img_index}.{image_ext}" |
|
image_save_path = IMAGE_SAVE_DIR / image_filename |
|
relative_url_path = f"/static/{IMAGE_SAVE_SUBDIR}/{image_filename}" |
|
|
|
with open(image_save_path, "wb") as img_file: |
|
img_file.write(image_bytes) |
|
|
|
image_data.append({ |
|
'content_type': 'image', |
|
'image_url': relative_url_path, |
|
'rect': [img_rect.x0, img_rect.y0, img_rect.x1, img_rect.y1], |
|
'image_bytes': image_bytes |
|
}) |
|
|
|
except Exception as img_save_e: |
|
print(f" Erreur lors du traitement de l'image {img_index} de la page {page_num} : {img_save_e}") |
|
traceback.print_exc() |
|
|
|
return image_data |
|
|
|
|
|
|
|
def token_chunking(text, max_tokens, encoding): |
|
"""Chunk text based on token count with smarter boundaries (sentences, paragraphs)""" |
|
if not text: |
|
return [] |
|
|
|
tokens = encoding.encode(text) |
|
chunks = [] |
|
start_token_idx = 0 |
|
|
|
while start_token_idx < len(tokens): |
|
end_token_idx = min(start_token_idx + max_tokens, len(tokens)) |
|
|
|
if end_token_idx < len(tokens): |
|
look_ahead_limit = min(start_token_idx + max_tokens * 2, len(tokens)) |
|
text_segment_to_check = encoding.decode(tokens[start_token_idx:look_ahead_limit]) |
|
|
|
paragraph_break = text_segment_to_check.rfind('\n\n', 0, len(text_segment_to_check) - (look_ahead_limit - (start_token_idx + max_tokens))) |
|
if paragraph_break != -1: |
|
tokens_up_to_break = encoding.encode(text_segment_to_check[:paragraph_break]) |
|
end_token_idx = start_token_idx + len(tokens_up_to_break) |
|
else: |
|
sentence_end = re.search(r'[.!?]\s+', text_segment_to_check[:len(text_segment_to_check) - (look_ahead_limit - (start_token_idx + max_tokens))][::-1]) |
|
if sentence_end: |
|
char_index_in_segment = len(text_segment_to_check) - 1 - sentence_end.start() |
|
tokens_up_to_end = encoding.encode(text_segment_to_check[:char_index_in_segment + 1]) |
|
end_token_idx = start_token_idx + len(tokens_up_to_end) |
|
|
|
current_chunk_tokens = tokens[start_token_idx:end_token_idx] |
|
chunk_text = encoding.decode(current_chunk_tokens).strip() |
|
|
|
if chunk_text: |
|
chunks.append(chunk_text) |
|
|
|
if start_token_idx == end_token_idx: |
|
start_token_idx += 1 |
|
else: |
|
start_token_idx = end_token_idx |
|
|
|
return chunks |
|
|
|
|
|
def generate_multimodal_description(image_bytes, prompt_text, max_retries=5, delay=10): |
|
""" |
|
Generate a text description for an image using the Gemma multimodal model. |
|
Returns description text or None if all retries fail or model is not initialized. |
|
""" |
|
global gemma_sampler |
|
|
|
if gemma_sampler is None: |
|
print(" Skipping multimodal description generation: Gemma sampler is not initialized.") |
|
return None |
|
|
|
|
|
try: |
|
pil_image = Image.open(io.BytesIO(image_bytes)).convert("RGB") |
|
|
|
image_np = np.asarray(pil_image) |
|
gemma_image_input = jnp.asarray(image_np) |
|
|
|
gemma_image_input = jnp.expand_dims(gemma_image_input, axis=0) |
|
except Exception as e: |
|
print(f" Erreur lors de la conversion de l'image pour Gemma : {e}") |
|
return None |
|
|
|
for attempt in range(max_retries): |
|
try: |
|
time.sleep(delay + random.uniform(0, 5)) |
|
|
|
|
|
full_prompt = f"{prompt_text} <img>" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
final_gemma_image_input = jnp.expand_dims(gemma_image_input, axis=1) |
|
|
|
out = gemma_sampler.chat( |
|
full_prompt, |
|
images=final_gemma_image_input, |
|
max_tokens=500 |
|
) |
|
description = out.strip() |
|
|
|
if description: |
|
return description |
|
else: |
|
print(f" Tentative {attempt+1}/{max_retries}: Réponse vide ou inattendue du modèle multimodal Gemma.") |
|
if attempt < max_retries - 1: |
|
retry_delay = delay * (2 ** attempt) + random.uniform(1, 5) |
|
print(f" Réessai dans {retry_delay:.2f}s...") |
|
time.sleep(retry_delay) |
|
continue |
|
|
|
except Exception as e: |
|
error_msg = str(e) |
|
print(f" Tentative {attempt+1}/{max_retries} échouée pour la description (Gemma) : {error_msg}") |
|
|
|
if attempt < max_retries - 1: |
|
retry_delay = delay * (2 ** attempt) + random.uniform(1, 5) |
|
print(f" Réessai dans {retry_delay:.2f}s...") |
|
time.sleep(retry_delay) |
|
continue |
|
else: |
|
print(f" Toutes les {max_retries} tentatives ont échoué pour la description Gemma.") |
|
return None |
|
print(f" Toutes les {max_retries} tentatives ont échoué pour la description (fin de boucle).") |
|
return None |
|
|
|
|
|
def generate_text_embedding(text_content, max_retries=5, delay=5): |
|
""" |
|
Generate text embedding using the Sentence-Transformers model. |
|
Returns embedding vector (list) or None if all retries fail or model is not initialized. |
|
""" |
|
global text_embedding_model |
|
|
|
if text_embedding_model is None: |
|
print(" Skipping text embedding generation: Sentence-Transformers model is not initialized.") |
|
return None |
|
|
|
if not text_content or not text_content.strip(): |
|
return None |
|
|
|
for attempt in range(max_retries): |
|
try: |
|
time.sleep(delay + random.uniform(0, 0.5)) |
|
|
|
|
|
embedding = text_embedding_model.encode(text_content, convert_to_numpy=True) |
|
if embedding is not None and len(embedding) == EMBEDDING_DIMENSION: |
|
return embedding.tolist() |
|
else: |
|
print(f" Tentative {attempt+1}/{max_retries}: Format d'embedding Sentence-Transformers inattendu. Réponse : {embedding}") |
|
return None |
|
|
|
except Exception as e: |
|
error_msg = str(e) |
|
print(f" Tentative {attempt+1}/{max_retries} échouée pour l'embedding (Sentence-Transformers) : {error_msg}") |
|
if attempt < max_retries - 1: |
|
retry_delay = delay * (2 ** attempt) + random.uniform(0.5, 2) |
|
print(f" Réessai dans {retry_delay:.2f}s...") |
|
time.sleep(retry_delay) |
|
continue |
|
else: |
|
print(f" Toutes les {max_retries} tentatives ont échoué pour l'embedding (Sentence-Transformers).") |
|
return None |
|
print(f" Toutes les {max_retries} tentatives ont échoué pour l'embedding (fin de boucle).") |
|
return None |
|
|
|
|
|
|
|
|
|
def process_pdfs_in_directory(directory): |
|
"""Main processing pipeline for all PDFs in a directory.""" |
|
all_embeddings_data = [] |
|
processed_files = 0 |
|
pdf_files = list(directory.glob("*.pdf")) |
|
total_files = len(pdf_files) |
|
|
|
if total_files == 0: |
|
print(f"Aucun fichier PDF trouvé dans le répertoire : {directory}") |
|
return [] |
|
|
|
for pdf_file_path in pdf_files: |
|
processed_files += 1 |
|
print(f"\nTraitement de {pdf_file_path.name} ({processed_files}/{total_files})...") |
|
|
|
page_data_list = extract_page_data_pymupdf(pdf_file_path) |
|
|
|
if not page_data_list: |
|
print(f" Aucune donnée extraite de {pdf_file_path.name}.") |
|
continue |
|
|
|
for page_data in page_data_list: |
|
pdf_file = page_data['pdf_file'] |
|
page_num = page_data['page_number'] |
|
page_text = page_data['text'] |
|
images = page_data['images'] |
|
tables = page_data['tables'] |
|
pdf_title = page_data.get('pdf_title') |
|
pdf_subject = page_data.get('pdf_subject') |
|
pdf_keywords = page_data.get('pdf_keywords') |
|
|
|
print(f" Génération des descriptions et embeddings pour la page {page_num}...") |
|
|
|
|
|
for table_idx, table in enumerate(tables): |
|
table_image_bytes = table.get('image_bytes') |
|
table_text_repr = table.get('table_text_representation', '') |
|
table_html_url = table.get('table_html_url') |
|
|
|
description = None |
|
if table_image_bytes: |
|
prompt = "Décrivez en français le contenu et la structure de ce tableau. Mettez l'accent sur les données principales et les tendances si visibles." |
|
print(f" Page {page_num}: Génération de la description multimodale pour le tableau {table_idx}...") |
|
description = generate_multimodal_description(table_image_bytes, prompt) |
|
elif table_text_repr: |
|
|
|
if gemma_sampler: |
|
prompt = f"Décrivez en français le contenu et la structure de ce tableau basé sur sa représentation textuelle:\n{table_text_repr[:1000]}..." |
|
print(f" Page {page_num}: Génération de la description textuelle pour le tableau {table_idx} (fallback via Gemma)...") |
|
try: |
|
|
|
out = gemma_sampler.chat(prompt, max_tokens=500) |
|
description = out.strip() |
|
except Exception as e: |
|
print(f" Erreur lors de la génération de description textuelle pour le tableau {table_idx} via Gemma: {e}") |
|
description = None |
|
else: |
|
print(" Skipping text description generation for table: Gemma sampler not initialized.") |
|
description = None |
|
|
|
|
|
if description: |
|
print(f" Page {page_num}: Description générée pour le tableau {table_idx}.") |
|
embedding_vector = generate_text_embedding(description) |
|
|
|
if embedding_vector is not None: |
|
chunk_data = { |
|
"pdf_file": pdf_file, |
|
"page_number": page_num, |
|
"chunk_id": f"table_{table_idx}", |
|
"content_type": "table", |
|
"text_content": description, |
|
"embedding": embedding_vector, |
|
"table_html_url": table_html_url, |
|
"image_url": table.get('image_url'), |
|
"pdf_title": pdf_title, |
|
"pdf_subject": pdf_subject, |
|
"pdf_keywords": pdf_keywords |
|
} |
|
all_embeddings_data.append(chunk_data) |
|
print(f" Page {page_num}: Embedding généré pour la description du tableau {table_idx}.") |
|
else: |
|
print(f" Page {page_num}: Échec de la génération de l'embedding pour la description du tableau {table_idx}. Chunk ignoré.") |
|
else: |
|
print(f" Page {page_num}: Aucune description générée pour le tableau {table_idx}. Chunk ignoré.") |
|
|
|
|
|
|
|
for img_idx, image in enumerate(images): |
|
image_bytes = image.get('image_bytes') |
|
image_url = image.get('image_url') |
|
|
|
if image_bytes: |
|
prompt = "Décrivez en français le contenu de cette image. S'il s'agit d'un graphique, décrivez le type de graphique (histogramme, courbe, etc.), les axes, les légendes et les principales informations ou tendances visibles." |
|
print(f" Page {page_num}: Génération de la description multimodale pour l'image {img_idx}...") |
|
description = generate_multimodal_description(image_bytes, prompt) |
|
|
|
if description: |
|
print(f" Page {page_num}: Description générée pour l'image {img_idx}.") |
|
embedding_vector = generate_text_embedding(description) |
|
|
|
if embedding_vector is not None: |
|
chunk_data = { |
|
"pdf_file": pdf_file, |
|
"page_number": page_num, |
|
"chunk_id": f"image_{img_idx}", |
|
"content_type": "image", |
|
"text_content": description, |
|
"embedding": embedding_vector, |
|
"image_url": image_url, |
|
"pdf_title": pdf_title, |
|
"pdf_subject": pdf_subject, |
|
"pdf_keywords": pdf_keywords |
|
} |
|
all_embeddings_data.append(chunk_data) |
|
print(f" Page {page_num}: Embedding généré pour la description de l'image {img_idx}.") |
|
else: |
|
print(f" Page {page_num}: Échec de la génération de l'embedding pour la description de l'image {img_idx}. Chunk ignoré.") |
|
else: |
|
print(f" Page {page_num}: Aucune description générée pour l'image {img_idx}. Chunk ignoré.") |
|
|
|
|
|
|
|
if page_text: |
|
try: |
|
encoding = tiktoken.get_encoding(ENCODING_NAME) |
|
text_chunks = token_chunking(page_text, MAX_TOKENS_NORMAL, encoding) |
|
except Exception as e: |
|
print(f"Erreur lors du chunking du texte de la page {page_num} : {e}. Utilisation du chunking simple.") |
|
text_chunks = [page_text] |
|
|
|
|
|
for chunk_idx, chunk_content in enumerate(text_chunks): |
|
print(f" Page {page_num}: Génération de l'embedding pour le chunk de texte {chunk_idx}...") |
|
embedding_vector = generate_text_embedding(chunk_content) |
|
|
|
if embedding_vector is not None: |
|
chunk_data = { |
|
"pdf_file": pdf_file, |
|
"page_number": page_num, |
|
"chunk_id": f"text_{chunk_idx}", |
|
"content_type": "text", |
|
"text_content": chunk_content, |
|
"embedding": embedding_vector, |
|
"pdf_title": pdf_title, |
|
"pdf_subject": pdf_subject, |
|
"pdf_keywords": pdf_keywords |
|
} |
|
all_embeddings_data.append(chunk_data) |
|
print(f" Page {page_num}: Chunk de texte {chunk_idx} traité avec succès.") |
|
else: |
|
print(f" Page {page_num}: Échec de la génération de l'embedding pour le chunk de texte {chunk_idx}. Chunk ignoré.") |
|
|
|
|
|
print(f" Page {page_num} terminée. Éléments traités : {len(tables)} tableaux, {len(images)} images, {len(text_chunks)} chunks de texte.") |
|
|
|
|
|
return all_embeddings_data |
|
|
|
|
|
if __name__ == "__main__": |
|
print("Démarrage du traitement PDF multimodal avec génération de descriptions (Gemma) et embeddings textuels multilingues (Sentence-Transformers)...") |
|
|
|
|
|
if not PDF_DIRECTORY.is_dir(): |
|
print(f"❌ ERREUR: Répertoire PDF non trouvé ou n'est pas un répertoire : {PDF_DIRECTORY}. Veuillez créer un répertoire 'docs' et y placer vos PDFs.") |
|
|
|
|
|
exit(1) |
|
|
|
OUTPUT_DIR.mkdir(parents=True, exist_ok=True) |
|
IMAGE_SAVE_DIR.mkdir(parents=True, exist_ok=True) |
|
TABLE_SAVE_DIR.mkdir(parents=True, exist_ok=True) |
|
print(f"Répertoire de sortie : {OUTPUT_DIR}") |
|
print(f"Répertoire de sauvegarde des images : {IMAGE_SAVE_DIR}") |
|
print(f"Répertoire de sauvegarde des tableaux (HTML) : {TABLE_SAVE_DIR}") |
|
|
|
|
|
initialize_models() |
|
|
|
|
|
if gemma_sampler is None or text_embedding_model is None: |
|
print("Impossible de continuer car un ou plusieurs modèles n'ont pas pu être initialisés.") |
|
exit(1) |
|
|
|
final_embeddings = process_pdfs_in_directory(PDF_DIRECTORY) |
|
|
|
if final_embeddings: |
|
print(f"\nTotal d'embeddings générés : {len(final_embeddings)}.") |
|
try: |
|
with EMBEDDINGS_FILE_PATH.open('w', encoding='utf-8') as f: |
|
json.dump(final_embeddings, f, indent=2, ensure_ascii=False) |
|
print(f"Embeddings sauvegardés avec succès dans : {EMBEDDINGS_FILE_PATH}") |
|
except Exception as e: |
|
print(f"\nErreur lors de la sauvegarde du fichier JSON d'embeddings : {e}") |
|
traceback.print_exc() |
|
else: |
|
print("\nAucun embedding n'a été généré.") |