fsdfdsdf's picture
Update app.py
2988e09 verified
import subprocess
import sys
try:
import fitz # PyMuPDF
except ModuleNotFoundError:
subprocess.check_call([sys.executable, "-m", "pip", "install", "PyMuPDF"])
import fitz
# ---------------------------------------------------------------------
# 0. Hot‑patch: ensure Gradio‑compatible Pydantic (<2.11)
# ---------------------------------------------------------------------
import os, sys, subprocess
from importlib import metadata
try:
from packaging import version
except ModuleNotFoundError:
subprocess.check_call([sys.executable, "-m", "pip", "install", "packaging"])
from packaging import version
def _ensure_compatible_pydantic():
try:
cur = version.parse(metadata.version("pydantic"))
except metadata.PackageNotFoundError:
cur = None
if cur is None or cur >= version.parse("2.11"):
print(f"[patch] Installing pydantic<2.11 (current: {cur}) …", flush=True)
subprocess.check_call([
sys.executable,
"-m",
"pip",
"install",
"--no-cache-dir",
"pydantic<2.11",
"pydantic-core<2.11",
])
os.execv(sys.executable, [sys.executable] + sys.argv)
_ensure_compatible_pydantic()
import re
import random
import io
import os
import tempfile
import logging
from datetime import datetime
import gradio as gr
import shutil
# Configura il logging di base
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# --- (Keep the existing update_pdf_bytes function - using the refined version from previous step) ---
def update_pdf_bytes(pdf_bytes, base_date, date_offset=11, job_offset=11, job_option="Sostituisci con C-Stag"):
# (Using the improved version that applies redactions/insertions after iterating spans)
# ... (function code remains the same as the previously refined version) ...
try:
date_pattern = re.compile(r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})(\.\d+)?')
job_pattern = re.compile(r"((DF25\s+)?- B2020 - Nulla osta/Comunicazione al lavoro subordinato non stagionale nei settori elencati nel DPCM Flussi)")
try:
base = datetime.strptime(base_date, "%Y-%m-%d %H:%M:%S")
except ValueError as e:
raise ValueError("Il formato della data base deve essere 'YYYY-MM-DD HH:MM:SS'.") from e
new_seconds = random.randint(0, 59)
new_fraction = random.randint(0, 999999999)
new_base = base.replace(second=new_seconds)
new_date = new_base.strftime("%Y-%m-%d %H:%M:%S") + f".{new_fraction:09d}"
base_job_text = "- C-Stag - Richiesta di nulla osta/comunicazione al lavoro subordinato stagionale"
doc = None # Initialize doc to None
try:
doc = fitz.open(stream=pdf_bytes, filetype="pdf")
except Exception as e:
logging.error("Errore nell'apertura del PDF: %s", e)
raise
redactions_to_apply = {} # Store page_num: [rect1, rect2]
insertions_to_make = {} # Store page_num: [item1, item2]
for page_num, page in enumerate(doc):
text_dict = page.get_text("dict")
if not text_dict: continue
page_redactions = []
page_insertions = []
for block in text_dict.get("blocks", []):
for line in block.get("lines", []):
for span in line.get("spans", []):
text = span.get("text", "")
bbox = span.get("bbox")
if not bbox: continue
rect = fitz.Rect(bbox)
if date_pattern.search(text):
logging.info("Trovata data da aggiornare nella pagina %d", page_num + 1)
page_redactions.append(rect)
insert_point = fitz.Point(rect.x0, rect.y0 + date_offset)
page_insertions.append({
"point": insert_point, "text": new_date,
"size": span.get("size", 12), "font": span.get("font", "helv"), "color": (0,0,0)
})
elif job_option == "Sostituisci con C-Stag" and job_pattern.search(text):
match = job_pattern.search(text)
prefix = match.group(2) if match.group(2) is not None else ""
new_job_text = f"{prefix}- C-Stag - Richiesta di nulla osta/comunicazione al lavoro subordinato stagionale"
logging.info("Trovato job description da aggiornare nella pagina %d", page_num + 1)
page_redactions.append(rect)
insert_point = fitz.Point(rect.x0, rect.y0 + job_offset)
page_insertions.append({
"point": insert_point, "text": new_job_text,
"size": span.get("size", 12), "font": span.get("font", "helv"), "color": (0,0,0)
})
if page_redactions:
redactions_to_apply[page_num] = page_redactions
if page_insertions:
insertions_to_make[page_num] = page_insertions
# Apply changes page by page after iterating spans
for page_num, page in enumerate(doc):
if page_num in redactions_to_apply:
logging.info("Applicazione redazioni alla pagina %d", page_num + 1)
for rect in redactions_to_apply[page_num]:
page.add_redact_annot(rect, text=' ', fill=(1, 1, 1)) # White out
page.apply_redactions(images=fitz.PDF_REDACT_IMAGE_NONE)
if page_num in insertions_to_make:
logging.info("Inserimento testo aggiornato nella pagina %d", page_num + 1)
for item in insertions_to_make[page_num]:
page.insert_text(item["point"], item["text"],
fontsize=item["size"],
fontname=item["font"],
color=item["color"])
output_stream = io.BytesIO()
try:
# Save with basic garbage collection initially
doc.save(output_stream, garbage=1, deflate=True)
except Exception as e:
logging.error("Errore nel salvataggio del PDF aggiornato: %s", e)
raise
finally:
if doc:
doc.close()
return output_stream.getvalue()
except Exception as e:
logging.exception("Errore durante l'aggiornamento del PDF:")
if 'doc' in locals() and doc is not None:
try: doc.close()
except Exception: pass
raise
# --- NEW ITERATIVE SIZE ADJUSTMENT FUNCTION ---
def adjust_pdf_size_iterative(pdf_bytes: bytes, target_kb: float, tolerance_kb: float = 0.2, max_iterations: int = 10) -> bytes:
"""
Iteratively adjusts PDF size towards target_kb by adding/removing metadata padding or cleaning.
Parameters:
pdf_bytes (bytes): The initial PDF content.
target_kb (float): Target size in kilobytes.
tolerance_kb (float): Allowable deviation from target (e.g., 0.2 KB).
max_iterations (int): Maximum attempts to reach the target size.
Returns:
bytes: The size-adjusted PDF, or the last attempt if target not reached within max_iterations.
"""
if target_kb <= 0:
logging.info("Target KB non valido, saltando l'aggiustamento iterativo.")
return pdf_bytes
target_bytes = int(target_kb * 1024)
tolerance_bytes = int(tolerance_kb * 1024)
current_pdf_bytes = pdf_bytes
padding_key = "X_IterativePaddingData" # Consistent key for padding
logging.info(f"--- Inizio Aggiustamento Iterativo --- Target: {target_kb:.2f} KB ({target_bytes} bytes), Tolleranza: {tolerance_kb:.2f} KB ({tolerance_bytes} bytes)")
for i in range(max_iterations):
current_size = len(current_pdf_bytes)
diff = target_bytes - current_size
abs_diff = abs(diff)
logging.info(f"Iterazione {i+1}/{max_iterations}: Dimensione attuale={current_size / 1024:.2f} KB ({current_size} bytes), Diff dal target={diff} bytes")
# Check if within tolerance
if abs_diff <= tolerance_bytes:
logging.info(f"Dimensione raggiunta entro la tolleranza. ({current_size / 1024:.2f} KB)")
return current_pdf_bytes
doc = None # Ensure doc is reset/closed each iteration
previous_pdf_bytes = current_pdf_bytes # Keep track in case of error
try:
# --- Action: Decide whether to add padding or clean ---
if diff > 0:
# --- Need to INCREASE size (Add Padding) ---
logging.debug("Azione: Aggiungere padding.")
doc = fitz.open(stream=current_pdf_bytes, filetype="pdf")
metadata = doc.metadata or {}
# Estimate padding needed - add slightly more than diff to overshoot a bit
# Simple approach: add diff + small buffer (e.g., 50 bytes)
# More adaptive might be diff * 1.1, but let's keep it simple
chars_to_add = max(1, diff + 50) # Add difference plus a small buffer
current_padding = metadata.get(padding_key, "")
new_padding = current_padding + (" " * chars_to_add)
metadata[padding_key] = new_padding
logging.debug(f"Aggiunta di {chars_to_add} caratteri di padding a '{padding_key}'.")
doc.set_metadata(metadata)
output_stream = io.BytesIO()
# Save *without* strong compression when adding padding
# garbage=1 does basic cleanup but shouldn't drastically shrink
doc.save(output_stream, garbage=1, deflate=False)
current_pdf_bytes = output_stream.getvalue()
doc.close()
doc = None # Mark as closed
else: # diff < 0
# --- Need to DECREASE size (Cleanup) ---
logging.debug("Azione: Pulizia aggressiva.")
doc = fitz.open(stream=current_pdf_bytes, filetype="pdf")
# Option 1: Remove our own padding first if it exists
metadata = doc.metadata or {}
if padding_key in metadata and len(metadata[padding_key]) > 0:
padding_len = len(metadata[padding_key])
# Try removing a chunk of padding roughly equal to the excess size
bytes_to_remove = abs(diff)
# Reduce padding, but don't remove more than exists
keep_chars = max(0, padding_len - bytes_to_remove - 50) # Remove diff + buffer
metadata[padding_key] = metadata[padding_key][:keep_chars]
logging.debug(f"Riduzione padding in '{padding_key}' a {keep_chars} caratteri.")
doc.set_metadata(metadata)
# Save with minimal changes first to see effect of padding removal
output_stream = io.BytesIO()
doc.save(output_stream, garbage=1, deflate=False)
current_pdf_bytes = output_stream.getvalue()
# Check size again *before* aggressive cleanup
if abs(target_bytes - len(current_pdf_bytes)) <= tolerance_bytes:
logging.info("Dimensione raggiunta dopo rimozione padding.")
doc.close()
continue # Skip to next iteration's check
# Option 2: If still too large or no padding to remove, do aggressive cleanup
logging.debug("Esecuzione pulizia aggressiva (garbage=4, deflate=True)")
# Need to reopen if we saved after removing padding
if doc: doc.close() # Close previous handle if open
doc = fitz.open(stream=current_pdf_bytes, filetype="pdf")
output_stream = io.BytesIO()
doc.save(output_stream, garbage=4, deflate=True, linearize=False)
current_pdf_bytes = output_stream.getvalue()
doc.close()
doc = None # Mark as closed
except Exception as e:
logging.exception(f"Errore durante l'aggiustamento nella iterazione {i+1}:")
if doc: # Ensure doc is closed on error
try: doc.close()
except: pass
logging.warning("Ripristino dei bytes dalla iterazione precedente.")
return previous_pdf_bytes # Return the last known good state
# Check if size somehow became drastically smaller/larger than expected (e.g., save error)
# This is a safety check, might need tuning
if len(current_pdf_bytes) < 100: # Arbitrary small size check
logging.error(f"Dimensione del PDF diventata inaspettatamente piccola ({len(current_pdf_bytes)} bytes) dopo l'iterazione {i+1}. Interruzione.")
return previous_pdf_bytes
# If loop finishes without reaching tolerance
logging.warning(f"Raggiunto limite massimo di {max_iterations} iterazioni. Dimensione finale: {len(current_pdf_bytes) / 1024:.2f} KB")
return current_pdf_bytes
# --- Updated process_batch Function ---
def process_batch(pdf_files, base_date, date_offset, job_offset, job_option, adjust_size, target_kb, tolerance_kb, max_iterations):
"""
Elabora un batch di PDF: aggiorna data/job, opzionalmente aggiusta la dimensione iterativamente.
Parameters:
(Includes new parameters: tolerance_kb, max_iterations)
"""
output_dir = None
results = []
try:
output_dir = tempfile.mkdtemp(prefix="updated_pdfs_")
logging.info(f"Creato directory temporanea: {output_dir}")
for file_obj in pdf_files:
original_name = "unknown_file.pdf"
try:
# ... (file reading logic remains the same) ...
if isinstance(file_obj, str):
if not os.path.exists(file_obj):
logging.error(f"File non trovato: {file_obj}")
continue
with open(file_obj, "rb") as f: pdf_bytes = f.read()
original_name = os.path.basename(file_obj)
elif hasattr(file_obj, 'read') and hasattr(file_obj, 'name'):
pdf_bytes = file_obj.read()
original_name = os.path.basename(getattr(file_obj, "name", "uploaded_file.pdf"))
else:
logging.error(f"Input file non riconosciuto: {type(file_obj)}")
continue
logging.info(f"--- Elaborazione file: {original_name} (Dimensione iniziale: {len(pdf_bytes)/1024:.2f} KB) ---")
# 1. Update date and job description
updated_bytes = update_pdf_bytes(pdf_bytes, base_date, date_offset, job_offset, job_option)
logging.info(f"Dimensione dopo aggiornamenti: {len(updated_bytes)/1024:.2f} KB")
# 2. Adjust size iteratively if requested
if adjust_size and target_kb > 0:
logging.info(f"Richiesto aggiustamento dimensione per {original_name} a {target_kb} KB (Tolleranza: {tolerance_kb} KB, Max Iter: {max_iterations})")
final_bytes = adjust_pdf_size_iterative(
updated_bytes,
target_kb,
tolerance_kb=tolerance_kb,
max_iterations=max_iterations
)
else:
final_bytes = updated_bytes
# 3. Save the final PDF
new_file_path = os.path.join(output_dir, original_name)
with open(new_file_path, "wb") as f:
f.write(final_bytes)
results.append(new_file_path)
logging.info(f"File aggiornato salvato in: {new_file_path} (Dimensione finale: {len(final_bytes)/1024:.2f} KB)")
except Exception as e:
logging.exception(f"Errore nel processamento del file '{original_name}':")
continue
return results
except Exception as e:
logging.exception("Errore generale durante l'elaborazione batch:")
return []
finally:
# -------------- FIX --------------
# The cleanup that deleted the temp directory before Gradio
# could read the files has been removed to avoid FileNotFoundError.
# (You can clean up old folders with a scheduled task if desired.)
pass
# --- Updated Gradio Interface ---
with gr.Blocks() as demo:
gr.Markdown(
"""
## Aggiornamento PDF: Data, Job Description e Dimensione (Iterativo)
**Guida:**
1. Carica PDF.
2. Imposta Data Base (YYYY-MM-DD HH:MM:SS).
3. Regola Spostamenti Verticali (offset).
4. Scegli opzione Job Description.
5. **(Opzionale) Aggiusta Dimensione:**
- Seleziona la casella.
- Imposta la **Dimensione Target (KB)**.
- Imposta la **Tolleranza (KB)** (quanto può discostarsi dal target, es. 0.2).
- Imposta le **Max Iterazioni** (quanti tentativi fare, es. 10).
- Il sistema tenterà di aggiungere/rimuovere dati invisibili (metadata/pulizia) per avvicinarsi al target.
6. Clicca **Elabora PDF**.
7. Scarica i file aggiornati.
"""
)
with gr.Row():
pdf_input = gr.File(label="Carica file PDF", file_count="multiple")
with gr.Row():
base_date_input = gr.Textbox(label="Data base (YYYY-MM-DD HH:MM:SS)", value="2025-04-01 10:00:00")
with gr.Row():
date_offset_input = gr.Slider(label="Spostamento Data (pt)", minimum=-10, maximum=30, value=11, step=1)
job_offset_input = gr.Slider(label="Spostamento Job (pt)", minimum=-10, maximum=30, value=11, step=1)
with gr.Row():
job_option_input = gr.Dropdown(label="Aggiornamento Job",
choices=["Sostituisci con C-Stag", "Mantieni B2020"],
value="Sostituisci con C-Stag")
with gr.Accordion("Opzioni Aggiustamento Dimensione", open=False): # Use Accordion
adjust_size_checkbox = gr.Checkbox(label="Aggiusta dimensione file?", value=False)
with gr.Row():
target_kb_input = gr.Number(label="Dimensione Target (KB)", value=33.0, minimum=1.0, step=0.1)
tolerance_kb_input = gr.Number(label="Tolleranza (KB)", value=0.1, minimum=0.05, step=0.05)
max_iterations_input = gr.Slider(label="Max Iterazioni", minimum=1, maximum=20, value=10, step=1)
output_files = gr.File(label="Scarica i PDF aggiornati", file_count="multiple")
btn = gr.Button("Elabora PDF")
btn.click(fn=process_batch,
inputs=[
pdf_input,
base_date_input,
date_offset_input,
job_offset_input,
job_option_input,
adjust_size_checkbox,
target_kb_input,
tolerance_kb_input, # Pass tolerance
max_iterations_input # Pass max iterations
],
outputs=output_files)
# Launch the Gradio app
demo.launch()