Spaces:
Sleeping
Sleeping
import gradio as gr | |
import os | |
import shutil | |
import fitz | |
from PIL import Image | |
import numpy as np | |
import cv2 | |
import pytesseract | |
from pytesseract import Output | |
import zipfile | |
from pdf2image import convert_from_path | |
import google.generativeai as genai | |
import json | |
from docx import Document | |
from docx.shared import Pt, RGBColor, Inches | |
from docx.enum.text import WD_ALIGN_PARAGRAPH | |
from docx.enum.section import WD_SECTION | |
from docx.oxml import OxmlElement | |
from docx.oxml.ns import qn | |
from typing import Dict, Any, List, Union # Ajout des imports typing nécessaires | |
import logging | |
from helpers.rapport_generator import RapportGenerator | |
from helpers.text_extraction import * | |
def authenticate(username, password): | |
return username == os.getenv("HF_USERNAME") and password == os.getenv("HF_PASSWORD") | |
# Helper Functions | |
def convert_to_rgb(image_path): | |
img = Image.open(image_path) | |
rgb_img = img.convert("RGB") | |
return rgb_img | |
def preprocess_image(image): | |
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) | |
_, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) | |
denoised = cv2.fastNlMeansDenoising(binary, None, 30, 7, 21) | |
resized = cv2.resize(denoised, None, fx=2, fy=2, interpolation=cv2.INTER_CUBIC) | |
return resized | |
def extract_vertical_blocks(image): | |
image_np = np.array(image) | |
data = pytesseract.image_to_data(image_np, lang='fra', output_type=Output.DICT) | |
blocks = [] | |
current_block = "" | |
current_block_coords = [float('inf'), float('inf'), 0, 0] | |
last_bottom = -1 | |
line_height = 0 | |
for i in range(len(data['text'])): | |
if int(data['conf'][i]) > 0: | |
text = data['text'][i] | |
x, y, w, h = data['left'][i], data['top'][i], data['width'][i], data['height'][i] | |
if line_height == 0: | |
line_height = h * 1.2 | |
if y > last_bottom + line_height: | |
if current_block: | |
blocks.append({ | |
"text": current_block.strip(), | |
"coords": current_block_coords | |
}) | |
current_block = "" | |
current_block_coords = [float('inf'), float('inf'), 0, 0] | |
current_block += text + " " | |
current_block_coords[0] = min(current_block_coords[0], x) | |
current_block_coords[1] = min(current_block_coords[1], y) | |
current_block_coords[2] = max(current_block_coords[2], x + w) | |
current_block_coords[3] = max(current_block_coords[3], y + h) | |
last_bottom = y + h | |
if current_block: | |
blocks.append({ | |
"text": current_block.strip(), | |
"coords": current_block_coords | |
}) | |
return blocks | |
def draw_blocks_on_image(image_path, blocks, output_path): | |
image = cv2.imread(image_path) | |
for block in blocks: | |
coords = block['coords'] | |
cv2.rectangle(image, (coords[0], coords[1]), (coords[2], coords[3]), (0, 0, 255), 2) | |
cv2.imwrite(output_path, image) | |
return output_path | |
def process_image(image, output_folder, page_number): | |
image = convert_to_rgb(image) | |
blocks = extract_vertical_blocks(image) | |
base_name = f'page_{page_number + 1}.png' | |
image_path = os.path.join(output_folder, base_name) | |
image.save(image_path) | |
annotated_image_path = os.path.join(output_folder, f'annotated_{base_name}') | |
annotated_image_path = draw_blocks_on_image(image_path, blocks, annotated_image_path) | |
return blocks, annotated_image_path | |
def save_extracted_text(blocks, page_number, output_folder): | |
text_file_path = os.path.join(output_folder, 'extracted_text.txt') | |
with open(text_file_path, 'a', encoding='utf-8') as f: | |
f.write(f"[PAGE {page_number}]\n") | |
for block in blocks: | |
f.write(block['text'] + "\n") | |
f.write("[FIN DE PAGE]\n\n") | |
return text_file_path | |
# Gemini Functions | |
def initialize_gemini(): | |
try: | |
genai.configure(api_key=os.getenv("GEMINI_API_KEY")) | |
generation_config = { | |
"temperature": 1, | |
"top_p": 0.95, | |
"top_k": 40, | |
"max_output_tokens": 8192, | |
"response_mime_type": "text/plain", | |
} | |
model = genai.GenerativeModel( | |
model_name="gemini-1.5-pro", | |
generation_config=generation_config, | |
) | |
return model | |
except Exception as e: | |
raise gr.Error(f"Error initializing Gemini: {str(e)}") | |
def create_prompt(extracted_text: str, path_to_data_to_extract: str) -> str: | |
# load data to extract | |
with open(path_to_data_to_extract, 'r', encoding='utf-8') as file: | |
data_to_extract = json.load(file) | |
prompt = f"""Tu es un assistant juridique expert en analyse de documents judiciaires français. | |
Je vais te fournir le contenu d'un document judiciaire extrait d'un PDF. | |
Ta tâche est d'analyser ce texte et d'en extraire les informations suivantes de manière précise : | |
{json.dumps(data_to_extract, indent=2, ensure_ascii=False)} | |
Voici quelques règles à suivre : | |
- Si une information n'est pas présente dans le texte, indique "Non spécifié" pour cette catégorie. | |
- Pour les noms des parties (demandeurs et défendeurs, et leurs avocats), liste tous ceux que tu trouves | |
- Assure-toi de différencier correctement les demandeurs des défendeurs. | |
- Si tu n'es pas sûr d'une information, indique-le clairement. | |
Présente tes résultats sous forme de JSON, en utilisant les catégories mentionnées ci-dessus. | |
Voici le contenu du document : | |
{extracted_text.strip()} | |
Analyse ce texte et fournis-moi les informations demandées au format JSON uniquement.""".strip() | |
return prompt | |
def extract_data_with_gemini(text_file_path: str, path_to_data_to_extract: str) -> dict: | |
try: | |
# Initialize Gemini | |
model = initialize_gemini() | |
# Read the extracted text | |
with open(text_file_path, 'r', encoding='utf-8') as f: | |
extracted_text = f.read() | |
# Create prompt and get response | |
prompt = create_prompt(extracted_text, path_to_data_to_extract) | |
response = model.generate_content(prompt) | |
# Parse the JSON response | |
try: | |
# Extract JSON from the response text | |
json_str = response.text | |
if "json" in json_str.lower(): | |
json_str = json_str.split("json")[1].split("```")[0] | |
elif "```" in json_str: | |
json_str = json_str.split("```")[1] | |
result = json.loads(json_str) | |
except: | |
result = {"error": "Failed to parse JSON response", "raw_response": response.text} | |
return result | |
except Exception as e: | |
raise gr.Error(f"Error in Gemini processing: {str(e)}") | |
# Main Processing Function | |
def process_pdf(pdf_file): | |
template_dir = os.path.join(os.getcwd(), "templates") | |
temp_dir = os.path.join(os.getcwd(), "temp_processing") | |
output_dir = os.path.join(temp_dir, 'output_images') | |
if os.path.exists(temp_dir): | |
shutil.rmtree(temp_dir) | |
os.makedirs(output_dir, exist_ok=True) | |
path_to_data_to_extract = os.path.join(template_dir, "data_to_extract.json") | |
text_file_path = os.path.join(output_dir, 'extracted_text.txt') | |
try: | |
# Convert PDF to images and process | |
images = convert_from_path(pdf_file.name) | |
annotated_images = [] | |
# Process each page | |
for i, img in enumerate(images): | |
temp_img_path = os.path.join(temp_dir, f'temp_page_{i}.png') | |
img.save(temp_img_path) | |
blocks, annotated_image_path = process_image(temp_img_path, output_dir, i) | |
annotated_images.append(annotated_image_path) | |
save_extracted_text(blocks, i + 1, output_dir) | |
# Create ZIP file | |
zip_path = os.path.join(temp_dir, "annotated_images.zip") | |
with zipfile.ZipFile(zip_path, 'w') as zipf: | |
for img_path in annotated_images: | |
zipf.write(img_path, os.path.basename(img_path)) | |
# Process with Gemini | |
extracted_data = extract_data_with_gemini(text_file_path, path_to_data_to_extract) | |
# Save extracted data to JSON file | |
json_path = os.path.join(temp_dir, "extracted_data.json") | |
with open(json_path, 'w', encoding='utf-8') as f: | |
json.dump(extracted_data, f, ensure_ascii=False, indent=2) | |
# Generate DOCX report | |
docx_path = os.path.join(temp_dir, "rapport_extraction.docx") | |
generator = RapportGenerator(json_path, docx_path) | |
generator.generate_report() | |
return text_file_path, zip_path, json_path, docx_path | |
except Exception as e: | |
raise gr.Error(f"Error processing PDF: {str(e)}") | |
# Gradio Interface | |
css = """ | |
.gradio-container { | |
font-family: 'IBM Plex Sans', sans-serif; | |
} | |
.gr-button { | |
color: white; | |
border-radius: 8px; | |
background: linear-gradient(45deg, #7928CA, #FF0080); | |
border: none; | |
} | |
""" | |
demo = gr.Interface( | |
fn=process_pdf, | |
inputs=[ | |
gr.File( | |
label="Télécharger un document PDF", | |
file_types=[".pdf"], | |
type="filepath" | |
) | |
], | |
outputs=[ | |
gr.File(label="Texte extrait (TXT)"), | |
gr.File(label="Images annotées (ZIP)"), | |
gr.File(label="Données extraites (JSON)"), | |
gr.File(label="Rapport généré (DOCX)") # Nouvelle sortie | |
], | |
title="Extraction de texte PDF et création d'un rapport DOCX", | |
description=""" | |
Téléchargez un document PDF pour : | |
1. Extraire le contenu textuel | |
2. Obtenir des images annotées montrant les blocs de texte détectés | |
3. Extraire des données structurées grâce à une analyse IA | |
4. Générer un rapport formaté au format DOCX | |
Prend en charge les documents multi-pages et les documents juridiques français. | |
""", | |
css=css, | |
examples=[], | |
cache_examples=False, | |
theme=gr.themes.Soft() | |
) | |
# Launch the app | |
if __name__ == "__main__": | |
demo.launch( | |
debug=False, | |
auth=authenticate | |
).launch() |