prthm11's picture
Update app_main.py
4fa11e6 verified
from flask import Flask, render_template, Response, flash, redirect, url_for, request, jsonify
import cv2
import numpy as np
from unstructured.partition.pdf import partition_pdf
import json
import base64
import io
import os
from PIL import Image, ImageEnhance, ImageDraw
from imutils.perspective import four_point_transform
from dotenv import load_dotenv
import pytesseract
from transformers import AutoProcessor, AutoModelForImageTextToText, AutoModelForVision2Seq
from langchain_community.document_loaders.image_captions import ImageCaptionLoader
from werkzeug.utils import secure_filename
import tempfile
import torch
from langchain_groq import ChatGroq
from langgraph.prebuilt import create_react_agent
import logging
# Configure logging
logging.basicConfig(
level=logging.DEBUG, # Use INFO or ERROR in production
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler("app.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
load_dotenv()
# os.environ["GROQ_API_KEY"] = os.getenv("GROQ_API_KEY")
groq_api_key = os.getenv("GROQ_API_KEY")
llm = ChatGroq(
model="meta-llama/llama-4-maverick-17b-128e-instruct",
temperature=0,
max_tokens=None,
)
app = Flask(__name__)
pytesseract.pytesseract.tesseract_cmd = r"C:\Program Files\Tesseract-OCR\tesseract.exe"
poppler_path = r"C:\poppler-23.11.0\Library\bin"
count = 0
PDF_GET = r"E:\Pratham\2025\Harsh Sir\Scratch Vision\images\scratch_crab.pdf"
OUTPUT_FOLDER = "OUTPUTS"
DETECTED_IMAGE_FOLDER_PATH = os.path.join(OUTPUT_FOLDER, "DETECTED_IMAGE")
IMAGE_FOLDER_PATH = os.path.join(OUTPUT_FOLDER, "SCANNED_IMAGE")
JSON_FOLDER_PATH = os.path.join(OUTPUT_FOLDER, "EXTRACTED_JSON")
for path in [OUTPUT_FOLDER, IMAGE_FOLDER_PATH, DETECTED_IMAGE_FOLDER_PATH, JSON_FOLDER_PATH]:
os.makedirs(path, exist_ok=True)
# Model Initialization
try:
smolvlm256m_processor = AutoProcessor.from_pretrained(
"HuggingFaceTB/SmolVLM-256M-Instruct")
# smolvlm256m_model = AutoModelForImageTextToText.from_pretrained("HuggingFaceTB/SmolVLM-256M-Instruct").to("cpu")
smolvlm256m_model = AutoModelForVision2Seq.from_pretrained(
"HuggingFaceTB/SmolVLM-256M-Instruct",
torch_dtype=torch.bfloat16 if hasattr(
torch, "bfloat16") else torch.float32,
_attn_implementation="eager"
).to("cpu")
except Exception as e:
raise RuntimeError(f"❌ Failed to load SmolVLM model: {str(e)}")
# SmolVLM Image Captioning functioning
def get_smolvlm_caption(image: Image.Image, prompt: str = "") -> str:
try:
# Ensure exactly one <image> token
if "<image>" not in prompt:
prompt = f"<image> {prompt.strip()}"
num_image_tokens = prompt.count("<image>")
if num_image_tokens != 1:
raise ValueError(
f"Prompt must contain exactly 1 <image> token. Found {num_image_tokens}")
inputs = smolvlm256m_processor(
images=[image], text=[prompt], return_tensors="pt").to("cpu")
output_ids = smolvlm256m_model.generate(**inputs, max_new_tokens=100)
return smolvlm256m_processor.decode(output_ids[0], skip_special_tokens=True)
except Exception as e:
return f"❌ Error during caption generation: {str(e)}"
# --- FUNCTION: Extract images from saved PDF ---
def extract_images_from_pdf(pdf_path, output_json_path):
''' Extract images from PDF and generate structured sprite JSON '''
try:
pdf_filename = os.path.splitext(os.path.basename(pdf_path))[
0] # e.g., "scratch_crab"
pdf_dir_path = os.path.dirname(pdf_path).replace("/", "\\")
# Create subfolders
extracted_image_subdir = os.path.join(
DETECTED_IMAGE_FOLDER_PATH, pdf_filename)
json_subdir = os.path.join(JSON_FOLDER_PATH, pdf_filename)
os.makedirs(extracted_image_subdir, exist_ok=True)
os.makedirs(json_subdir, exist_ok=True)
# Output paths
output_json_path = os.path.join(json_subdir, "extracted.json")
final_json_path = os.path.join(json_subdir, "extracted_sprites.json")
try:
elements = partition_pdf(
filename=pdf_path,
strategy="hi_res",
extract_image_block_types=["Image"],
extract_image_block_to_payload=True, # Set to True to get base64 in output
)
except Exception as e:
raise RuntimeError(
f"❌ Failed to extract images from PDF: {str(e)}")
try:
with open(output_json_path, "w") as f:
json.dump([element.to_dict()
for element in elements], f, indent=4)
except Exception as e:
raise RuntimeError(f"❌ Failed to write extracted.json: {str(e)}")
try:
# Display extracted images
with open(output_json_path, 'r') as file:
file_elements = json.load(file)
except Exception as e:
raise RuntimeError(f"❌ Failed to read extracted.json: {str(e)}")
# Prepare manipulated sprite JSON structure
manipulated_json = {}
# SET A SYSTEM PROMPT
system_prompt = """
You are an expert in visual scene understanding.
Your Job is to analyze an image and respond acoording if asked for name give simple name by analyzing it and if ask for descrption generate a short description covering its elements.
Guidelines:
- Focus only the images given in Square Shape.
- Don't Consider Blank areas in Image as.
- Don't include generic summary or explanation outside the fields.
Return only string.
"""
agent = create_react_agent(
model=llm,
tools=[],
prompt=system_prompt
)
# If JSON already exists, load it and find the next available Sprite number
if os.path.exists(final_json_path):
with open(final_json_path, "r") as existing_file:
manipulated = json.load(existing_file)
# Determine the next available index (e.g., Sprite 4 if 1–3 already exist)
existing_keys = [int(k.replace("Sprite ", ""))
for k in manipulated.keys()]
start_count = max(existing_keys, default=0) + 1
else:
start_count = 1
sprite_count = start_count
for i, element in enumerate(file_elements):
if "image_base64" in element["metadata"]:
try:
image_data = base64.b64decode(
element["metadata"]["image_base64"])
image = Image.open(io.BytesIO(image_data)).convert("RGB")
image.show(title=f"Extracted Image {i+1}")
image_path = os.path.join(
extracted_image_subdir, f"Sprite_{i+1}.png")
image.save(image_path)
with open(image_path, "rb") as image_file:
image_bytes = image_file.read()
img_base64 = base64.b64encode(image_bytes).decode("utf-8")
# description = get_smolvlm_caption(image, prompt="Give a brief Description")
# name = get_smolvlm_caption(image, prompt="give a short name/title of this Image.")
def clean_caption_output(raw_output: str, prompt: str) -> str:
answer = raw_output.replace(prompt, '').replace(
"<image>", '').strip(" :-\n")
return answer
prompt_description = "Give a brief Captioning."
prompt_name = "give a short name caption of this Image."
content1 = [
{
"type": "text",
"text": f"{prompt_description}"
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{img_base64}"
}
}
]
response1 = agent.invoke(
{"messages": [{"role": "user", "content": content1}]})
print(response1)
description = response1["messages"][-1].content
content2 = [
{
"type": "text",
"text": f"{prompt_name}"
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{img_base64}"
}
}
]
response2 = agent.invoke(
{"messages": [{"role": "user", "content": content2}]})
print(response2)
name = response2["messages"][-1].content
# raw_description = get_smolvlm_caption(image, prompt=prompt_description)
# raw_name = get_smolvlm_caption(image, prompt=prompt_name)
# description = clean_caption_output(raw_description, prompt_description)
# name = clean_caption_output(raw_name, prompt_name)
manipulated_json[f"Sprite {sprite_count}"] = {
"name": name,
"base64": element["metadata"]["image_base64"],
"file-path": pdf_dir_path,
"description": description
}
sprite_count += 1
except Exception as e:
print(f"⚠️ Error processing Sprite {i+1}: {str(e)}")
# Save manipulated JSON
with open(final_json_path, "w") as sprite_file:
json.dump(manipulated_json, sprite_file, indent=4)
print(f"✅ Manipulated sprite JSON saved: {final_json_path}")
return final_json_path, manipulated_json
except Exception as e:
raise RuntimeError(f"❌ Error in extract_images_from_pdf: {str(e)}")
def similarity_matching(input_json_path: str) -> str:
import uuid
import shutil
import tempfile
from langchain_experimental.open_clip.open_clip import OpenCLIPEmbeddings
from matplotlib.offsetbox import OffsetImage, AnnotationBbox
from io import BytesIO
logger.info("🔍 Running similarity matching...")
# ============================== #
# DEFINE PATHS #
# ============================== #
backdrop_images_path = os.getenv("BACKDROP_FOLDER_PATH", "/app/reference/backdrops")
sprite_images_path = os.getenv("SPRITE_FOLDER_PATH", "/app/reference/sprites")
image_dirs = [backdrop_images_path, sprite_images_path]
# ================================================= #
# Generate Random UUID for project folder name #
# ================================================= #
random_id = str(uuid.uuid4()).replace('-', '')
project_folder = os.path.join("outputs", f"project_{random_id}")
# =========================================================================== #
# Create empty json in project_{random_id} folder #
# =========================================================================== #
os.makedirs(project_folder, exist_ok=True)
project_json_path = os.path.join(project_folder, "project.json")
# ============================== #
# READ SPRITE METADATA #
# ============================== #
with open(input_json_path, 'r') as f:
sprites_data = json.load(f)
sprite_ids, texts, sprite_base64 = [], [], []
for sid, sprite in sprites_data.items():
sprite_ids.append(sid)
texts.append(
"This is " + sprite.get("description", sprite.get("name", "")))
sprite_base64.append(sprite["base64"])
# ============================== #
# INITIALIZE CLIP EMBEDDER #
# ============================== #
clip_embd = OpenCLIPEmbeddings()
# # ========================================= #
# # Walk folders to collect all image paths #
# # ========================================= #
# folder_image_paths = []
# for image_dir in image_dirs:
# for root, _, files in os.walk(image_dir):
# for fname in files:
# if fname.lower().endswith((".png", ".jpg", ".jpeg")):
# folder_image_paths.append(os.path.join(root, fname))
# # ============================== #
# # EMBED FOLDER IMAGES (REF) #
# # ============================== #
# img_features = clip_embd.embed_image(folder_image_paths)
# # ============================== #
# # Store image embeddings #
# # ============================== #
# embedding_json = []
# for i, path in enumerate(folder_image_paths):
# embedding_json.append({
# "name":os.path.basename(path),
# "file-path": path,
# "embeddings": list(img_features[i])
# })
# # Save to embeddings.json
# with open(f"{OUTPUT_FOLDER}/embeddings.json", "w") as f:
# json.dump(embedding_json, f, indent=2)
# ============================== #
# DECODE SPRITE IMAGES #
# ============================== #
temp_dir = tempfile.mkdtemp()
sprite_image_paths = []
for idx, b64 in enumerate(sprite_base64):
image_data = base64.b64decode(b64.split(",")[-1])
img = Image.open(BytesIO(image_data)).convert("RGB")
temp_path = os.path.join(temp_dir, f"sprite_{idx}.png")
img.save(temp_path)
sprite_image_paths.append(temp_path)
# ============================== #
# EMBED SPRITE IMAGES #
# ============================== #
sprite_features = clip_embd.embed_image(sprite_image_paths)
# ============================== #
# COMPUTE SIMILARITIES #
# ============================== #
with open(f"{OUTPUT_FOLDER}/embeddings.json", "r") as f:
embedding_json = json.load(f)
img_matrix = np.array([img["embeddings"] for img in embedding_json])
sprite_matrix = np.array(sprite_features)
similarity = np.matmul(sprite_matrix, img_matrix.T)
most_similar_indices = np.argmax(similarity, axis=1)
# ============= Match and copy ================
project_data, backdrop_data = [], []
copied_folders = set()
for sprite_idx, matched_idx in enumerate(most_similar_indices):
matched_entry = embedding_json[matched_idx]
# matched_image_path = os.path.normpath(folder_image_paths[matched_idx])
matched_image_path = os.path.normpath(matched_entry["file-path"])
matched_folder = os.path.dirname(matched_image_path)
if matched_folder in copied_folders:
continue
copied_folders.add(matched_folder)
# Sprite
sprite_json_path = os.path.join(matched_folder, 'sprite.json')
if os.path.exists(sprite_json_path):
with open(sprite_json_path, 'r') as f:
sprite_data = json.load(f)
project_data.append(sprite_data)
for fname in os.listdir(matched_folder):
if fname not in {os.path.basename(matched_image_path), 'sprite.json'}:
shutil.copy2(os.path.join(
matched_folder, fname), project_folder)
# Backdrop
if matched_image_path.startswith(os.path.normpath(backdrop_images_path)):
backdrop_json_path = os.path.join(matched_folder, 'project.json')
if os.path.exists(backdrop_json_path):
with open(backdrop_json_path, 'r') as f:
backdrop_json_data = json.load(f)
for target in backdrop_json_data.get("targets", []):
if target.get("isStage"):
backdrop_data.append(target)
for fname in os.listdir(matched_folder):
if fname not in {os.path.basename(matched_image_path), 'project.json'}:
shutil.copy2(os.path.join(
matched_folder, fname), project_folder)
# Merge JSON structure
final_project = {
"targets": [],
"monitors": [],
"extensions": [],
"meta": {
"semver": "3.0.0",
"vm": "11.3.0",
"agent": "OpenAI ScratchVision Agent"
}
}
for sprite in project_data:
if not sprite.get("isStage", False):
final_project["targets"].append(sprite)
if backdrop_data:
all_costumes, sounds = [], []
for idx, bd in enumerate(backdrop_data):
all_costumes.extend(bd.get("costumes", []))
if idx == 0 and "sounds" in bd:
sounds = bd["sounds"]
final_project["targets"].append({
"isStage": True,
"name": "Stage",
"variables": {},
"lists": {},
"broadcasts": {},
"blocks": {},
"comments": {},
"currentCostume": 1 if len(all_costumes) > 1 else 0,
"costumes": all_costumes,
"sounds": sounds,
"volume": 100,
"layerOrder": 0,
"tempo": 60,
"videoTransparency": 50,
"videoState": "on",
"textToSpeechLanguage": None
})
with open(project_json_path, 'w') as f:
json.dump(final_project, f, indent=2)
logger.info(f"🎉 Final project saved: {project_json_path}")
return project_json_path
@app.route('/')
def index():
return render_template('app_index.html')
# API endpoint
@app.route('/process_pdf', methods=['POST'])
def process_pdf():
try:
logger.info("Received request to process PDF.")
if 'pdf_file' not in request.files:
logger.warning("No PDF file found in request.")
return jsonify({"error": "Missing PDF file in form-data with key 'pdf_file'"}), 400
pdf_file = request.files['pdf_file']
if pdf_file.filename == '':
return jsonify({"error": "Empty filename"}), 400
# Save the uploaded PDF temporarily
filename = secure_filename(pdf_file.filename)
temp_dir = tempfile.mkdtemp()
saved_pdf_path = os.path.join(temp_dir, filename)
pdf_file.save(saved_pdf_path)
logger.info(f"Saved uploaded PDF to: {saved_pdf_path}")
# Extract & process
json_path = None
output_path, result = extract_images_from_pdf(
saved_pdf_path, json_path)
project_output = similarity_matching(output_path)
logger.info("Received request to process PDF.")
return jsonify({
"message": "✅ PDF processed successfully",
"output_json": output_path,
"sprites": result,
"project_output_json": project_output
})
except Exception as e:
logger.exception("❌ Failed to process PDF")
return jsonify({"error": f"❌ Failed to process PDF: {str(e)}"}), 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7860, debug=True)