Spaces:
Sleeping
Sleeping
from flask import Flask, render_template, Response, flash, redirect, url_for, request, jsonify | |
import cv2 | |
import numpy as np | |
from unstructured.partition.pdf import partition_pdf | |
import json | |
import base64 | |
import io | |
import os | |
from PIL import Image, ImageEnhance, ImageDraw | |
from imutils.perspective import four_point_transform | |
from dotenv import load_dotenv | |
import pytesseract | |
from transformers import AutoProcessor, AutoModelForImageTextToText, AutoModelForVision2Seq | |
from langchain_community.document_loaders.image_captions import ImageCaptionLoader | |
from werkzeug.utils import secure_filename | |
import tempfile | |
import torch | |
from langchain_groq import ChatGroq | |
from langgraph.prebuilt import create_react_agent | |
import logging | |
# Configure logging | |
logging.basicConfig( | |
level=logging.DEBUG, # Use INFO or ERROR in production | |
format="%(asctime)s [%(levelname)s] %(message)s", | |
handlers=[ | |
logging.FileHandler("app.log"), | |
logging.StreamHandler() | |
] | |
) | |
logger = logging.getLogger(__name__) | |
load_dotenv() | |
# os.environ["GROQ_API_KEY"] = os.getenv("GROQ_API_KEY") | |
groq_api_key = os.getenv("GROQ_API_KEY") | |
llm = ChatGroq( | |
model="meta-llama/llama-4-maverick-17b-128e-instruct", | |
temperature=0, | |
max_tokens=None, | |
) | |
app = Flask(__name__) | |
pytesseract.pytesseract.tesseract_cmd = r"C:\Program Files\Tesseract-OCR\tesseract.exe" | |
poppler_path = r"C:\poppler-23.11.0\Library\bin" | |
count = 0 | |
PDF_GET = r"E:\Pratham\2025\Harsh Sir\Scratch Vision\images\scratch_crab.pdf" | |
OUTPUT_FOLDER = "OUTPUTS" | |
DETECTED_IMAGE_FOLDER_PATH = os.path.join(OUTPUT_FOLDER, "DETECTED_IMAGE") | |
IMAGE_FOLDER_PATH = os.path.join(OUTPUT_FOLDER, "SCANNED_IMAGE") | |
JSON_FOLDER_PATH = os.path.join(OUTPUT_FOLDER, "EXTRACTED_JSON") | |
for path in [OUTPUT_FOLDER, IMAGE_FOLDER_PATH, DETECTED_IMAGE_FOLDER_PATH, JSON_FOLDER_PATH]: | |
os.makedirs(path, exist_ok=True) | |
# Model Initialization | |
try: | |
smolvlm256m_processor = AutoProcessor.from_pretrained( | |
"HuggingFaceTB/SmolVLM-256M-Instruct") | |
# smolvlm256m_model = AutoModelForImageTextToText.from_pretrained("HuggingFaceTB/SmolVLM-256M-Instruct").to("cpu") | |
smolvlm256m_model = AutoModelForVision2Seq.from_pretrained( | |
"HuggingFaceTB/SmolVLM-256M-Instruct", | |
torch_dtype=torch.bfloat16 if hasattr( | |
torch, "bfloat16") else torch.float32, | |
_attn_implementation="eager" | |
).to("cpu") | |
except Exception as e: | |
raise RuntimeError(f"❌ Failed to load SmolVLM model: {str(e)}") | |
# SmolVLM Image Captioning functioning | |
def get_smolvlm_caption(image: Image.Image, prompt: str = "") -> str: | |
try: | |
# Ensure exactly one <image> token | |
if "<image>" not in prompt: | |
prompt = f"<image> {prompt.strip()}" | |
num_image_tokens = prompt.count("<image>") | |
if num_image_tokens != 1: | |
raise ValueError( | |
f"Prompt must contain exactly 1 <image> token. Found {num_image_tokens}") | |
inputs = smolvlm256m_processor( | |
images=[image], text=[prompt], return_tensors="pt").to("cpu") | |
output_ids = smolvlm256m_model.generate(**inputs, max_new_tokens=100) | |
return smolvlm256m_processor.decode(output_ids[0], skip_special_tokens=True) | |
except Exception as e: | |
return f"❌ Error during caption generation: {str(e)}" | |
# --- FUNCTION: Extract images from saved PDF --- | |
def extract_images_from_pdf(pdf_path, output_json_path): | |
''' Extract images from PDF and generate structured sprite JSON ''' | |
try: | |
pdf_filename = os.path.splitext(os.path.basename(pdf_path))[ | |
0] # e.g., "scratch_crab" | |
pdf_dir_path = os.path.dirname(pdf_path).replace("/", "\\") | |
# Create subfolders | |
extracted_image_subdir = os.path.join( | |
DETECTED_IMAGE_FOLDER_PATH, pdf_filename) | |
json_subdir = os.path.join(JSON_FOLDER_PATH, pdf_filename) | |
os.makedirs(extracted_image_subdir, exist_ok=True) | |
os.makedirs(json_subdir, exist_ok=True) | |
# Output paths | |
output_json_path = os.path.join(json_subdir, "extracted.json") | |
final_json_path = os.path.join(json_subdir, "extracted_sprites.json") | |
try: | |
elements = partition_pdf( | |
filename=pdf_path, | |
strategy="hi_res", | |
extract_image_block_types=["Image"], | |
extract_image_block_to_payload=True, # Set to True to get base64 in output | |
) | |
except Exception as e: | |
raise RuntimeError( | |
f"❌ Failed to extract images from PDF: {str(e)}") | |
try: | |
with open(output_json_path, "w") as f: | |
json.dump([element.to_dict() | |
for element in elements], f, indent=4) | |
except Exception as e: | |
raise RuntimeError(f"❌ Failed to write extracted.json: {str(e)}") | |
try: | |
# Display extracted images | |
with open(output_json_path, 'r') as file: | |
file_elements = json.load(file) | |
except Exception as e: | |
raise RuntimeError(f"❌ Failed to read extracted.json: {str(e)}") | |
# Prepare manipulated sprite JSON structure | |
manipulated_json = {} | |
# SET A SYSTEM PROMPT | |
system_prompt = """ | |
You are an expert in visual scene understanding. | |
Your Job is to analyze an image and respond acoording if asked for name give simple name by analyzing it and if ask for descrption generate a short description covering its elements. | |
Guidelines: | |
- Focus only the images given in Square Shape. | |
- Don't Consider Blank areas in Image as. | |
- Don't include generic summary or explanation outside the fields. | |
Return only string. | |
""" | |
agent = create_react_agent( | |
model=llm, | |
tools=[], | |
prompt=system_prompt | |
) | |
# If JSON already exists, load it and find the next available Sprite number | |
if os.path.exists(final_json_path): | |
with open(final_json_path, "r") as existing_file: | |
manipulated = json.load(existing_file) | |
# Determine the next available index (e.g., Sprite 4 if 1–3 already exist) | |
existing_keys = [int(k.replace("Sprite ", "")) | |
for k in manipulated.keys()] | |
start_count = max(existing_keys, default=0) + 1 | |
else: | |
start_count = 1 | |
sprite_count = start_count | |
for i, element in enumerate(file_elements): | |
if "image_base64" in element["metadata"]: | |
try: | |
image_data = base64.b64decode( | |
element["metadata"]["image_base64"]) | |
image = Image.open(io.BytesIO(image_data)).convert("RGB") | |
image.show(title=f"Extracted Image {i+1}") | |
image_path = os.path.join( | |
extracted_image_subdir, f"Sprite_{i+1}.png") | |
image.save(image_path) | |
with open(image_path, "rb") as image_file: | |
image_bytes = image_file.read() | |
img_base64 = base64.b64encode(image_bytes).decode("utf-8") | |
# description = get_smolvlm_caption(image, prompt="Give a brief Description") | |
# name = get_smolvlm_caption(image, prompt="give a short name/title of this Image.") | |
def clean_caption_output(raw_output: str, prompt: str) -> str: | |
answer = raw_output.replace(prompt, '').replace( | |
"<image>", '').strip(" :-\n") | |
return answer | |
prompt_description = "Give a brief Captioning." | |
prompt_name = "give a short name caption of this Image." | |
content1 = [ | |
{ | |
"type": "text", | |
"text": f"{prompt_description}" | |
}, | |
{ | |
"type": "image_url", | |
"image_url": { | |
"url": f"data:image/jpeg;base64,{img_base64}" | |
} | |
} | |
] | |
response1 = agent.invoke( | |
{"messages": [{"role": "user", "content": content1}]}) | |
print(response1) | |
description = response1["messages"][-1].content | |
content2 = [ | |
{ | |
"type": "text", | |
"text": f"{prompt_name}" | |
}, | |
{ | |
"type": "image_url", | |
"image_url": { | |
"url": f"data:image/jpeg;base64,{img_base64}" | |
} | |
} | |
] | |
response2 = agent.invoke( | |
{"messages": [{"role": "user", "content": content2}]}) | |
print(response2) | |
name = response2["messages"][-1].content | |
# raw_description = get_smolvlm_caption(image, prompt=prompt_description) | |
# raw_name = get_smolvlm_caption(image, prompt=prompt_name) | |
# description = clean_caption_output(raw_description, prompt_description) | |
# name = clean_caption_output(raw_name, prompt_name) | |
manipulated_json[f"Sprite {sprite_count}"] = { | |
"name": name, | |
"base64": element["metadata"]["image_base64"], | |
"file-path": pdf_dir_path, | |
"description": description | |
} | |
sprite_count += 1 | |
except Exception as e: | |
print(f"⚠️ Error processing Sprite {i+1}: {str(e)}") | |
# Save manipulated JSON | |
with open(final_json_path, "w") as sprite_file: | |
json.dump(manipulated_json, sprite_file, indent=4) | |
print(f"✅ Manipulated sprite JSON saved: {final_json_path}") | |
return final_json_path, manipulated_json | |
except Exception as e: | |
raise RuntimeError(f"❌ Error in extract_images_from_pdf: {str(e)}") | |
def similarity_matching(input_json_path: str) -> str: | |
import uuid | |
import shutil | |
import tempfile | |
from langchain_experimental.open_clip.open_clip import OpenCLIPEmbeddings | |
from matplotlib.offsetbox import OffsetImage, AnnotationBbox | |
from io import BytesIO | |
logger.info("🔍 Running similarity matching...") | |
# ============================== # | |
# DEFINE PATHS # | |
# ============================== # | |
backdrop_images_path = os.getenv("BACKDROP_FOLDER_PATH", "/app/reference/backdrops") | |
sprite_images_path = os.getenv("SPRITE_FOLDER_PATH", "/app/reference/sprites") | |
image_dirs = [backdrop_images_path, sprite_images_path] | |
# ================================================= # | |
# Generate Random UUID for project folder name # | |
# ================================================= # | |
random_id = str(uuid.uuid4()).replace('-', '') | |
project_folder = os.path.join("outputs", f"project_{random_id}") | |
# =========================================================================== # | |
# Create empty json in project_{random_id} folder # | |
# =========================================================================== # | |
os.makedirs(project_folder, exist_ok=True) | |
project_json_path = os.path.join(project_folder, "project.json") | |
# ============================== # | |
# READ SPRITE METADATA # | |
# ============================== # | |
with open(input_json_path, 'r') as f: | |
sprites_data = json.load(f) | |
sprite_ids, texts, sprite_base64 = [], [], [] | |
for sid, sprite in sprites_data.items(): | |
sprite_ids.append(sid) | |
texts.append( | |
"This is " + sprite.get("description", sprite.get("name", ""))) | |
sprite_base64.append(sprite["base64"]) | |
# ============================== # | |
# INITIALIZE CLIP EMBEDDER # | |
# ============================== # | |
clip_embd = OpenCLIPEmbeddings() | |
# # ========================================= # | |
# # Walk folders to collect all image paths # | |
# # ========================================= # | |
# folder_image_paths = [] | |
# for image_dir in image_dirs: | |
# for root, _, files in os.walk(image_dir): | |
# for fname in files: | |
# if fname.lower().endswith((".png", ".jpg", ".jpeg")): | |
# folder_image_paths.append(os.path.join(root, fname)) | |
# # ============================== # | |
# # EMBED FOLDER IMAGES (REF) # | |
# # ============================== # | |
# img_features = clip_embd.embed_image(folder_image_paths) | |
# # ============================== # | |
# # Store image embeddings # | |
# # ============================== # | |
# embedding_json = [] | |
# for i, path in enumerate(folder_image_paths): | |
# embedding_json.append({ | |
# "name":os.path.basename(path), | |
# "file-path": path, | |
# "embeddings": list(img_features[i]) | |
# }) | |
# # Save to embeddings.json | |
# with open(f"{OUTPUT_FOLDER}/embeddings.json", "w") as f: | |
# json.dump(embedding_json, f, indent=2) | |
# ============================== # | |
# DECODE SPRITE IMAGES # | |
# ============================== # | |
temp_dir = tempfile.mkdtemp() | |
sprite_image_paths = [] | |
for idx, b64 in enumerate(sprite_base64): | |
image_data = base64.b64decode(b64.split(",")[-1]) | |
img = Image.open(BytesIO(image_data)).convert("RGB") | |
temp_path = os.path.join(temp_dir, f"sprite_{idx}.png") | |
img.save(temp_path) | |
sprite_image_paths.append(temp_path) | |
# ============================== # | |
# EMBED SPRITE IMAGES # | |
# ============================== # | |
sprite_features = clip_embd.embed_image(sprite_image_paths) | |
# ============================== # | |
# COMPUTE SIMILARITIES # | |
# ============================== # | |
with open(f"{OUTPUT_FOLDER}/embeddings.json", "r") as f: | |
embedding_json = json.load(f) | |
img_matrix = np.array([img["embeddings"] for img in embedding_json]) | |
sprite_matrix = np.array(sprite_features) | |
similarity = np.matmul(sprite_matrix, img_matrix.T) | |
most_similar_indices = np.argmax(similarity, axis=1) | |
# ============= Match and copy ================ | |
project_data, backdrop_data = [], [] | |
copied_folders = set() | |
for sprite_idx, matched_idx in enumerate(most_similar_indices): | |
matched_entry = embedding_json[matched_idx] | |
# matched_image_path = os.path.normpath(folder_image_paths[matched_idx]) | |
matched_image_path = os.path.normpath(matched_entry["file-path"]) | |
matched_folder = os.path.dirname(matched_image_path) | |
if matched_folder in copied_folders: | |
continue | |
copied_folders.add(matched_folder) | |
# Sprite | |
sprite_json_path = os.path.join(matched_folder, 'sprite.json') | |
if os.path.exists(sprite_json_path): | |
with open(sprite_json_path, 'r') as f: | |
sprite_data = json.load(f) | |
project_data.append(sprite_data) | |
for fname in os.listdir(matched_folder): | |
if fname not in {os.path.basename(matched_image_path), 'sprite.json'}: | |
shutil.copy2(os.path.join( | |
matched_folder, fname), project_folder) | |
# Backdrop | |
if matched_image_path.startswith(os.path.normpath(backdrop_images_path)): | |
backdrop_json_path = os.path.join(matched_folder, 'project.json') | |
if os.path.exists(backdrop_json_path): | |
with open(backdrop_json_path, 'r') as f: | |
backdrop_json_data = json.load(f) | |
for target in backdrop_json_data.get("targets", []): | |
if target.get("isStage"): | |
backdrop_data.append(target) | |
for fname in os.listdir(matched_folder): | |
if fname not in {os.path.basename(matched_image_path), 'project.json'}: | |
shutil.copy2(os.path.join( | |
matched_folder, fname), project_folder) | |
# Merge JSON structure | |
final_project = { | |
"targets": [], | |
"monitors": [], | |
"extensions": [], | |
"meta": { | |
"semver": "3.0.0", | |
"vm": "11.3.0", | |
"agent": "OpenAI ScratchVision Agent" | |
} | |
} | |
for sprite in project_data: | |
if not sprite.get("isStage", False): | |
final_project["targets"].append(sprite) | |
if backdrop_data: | |
all_costumes, sounds = [], [] | |
for idx, bd in enumerate(backdrop_data): | |
all_costumes.extend(bd.get("costumes", [])) | |
if idx == 0 and "sounds" in bd: | |
sounds = bd["sounds"] | |
final_project["targets"].append({ | |
"isStage": True, | |
"name": "Stage", | |
"variables": {}, | |
"lists": {}, | |
"broadcasts": {}, | |
"blocks": {}, | |
"comments": {}, | |
"currentCostume": 1 if len(all_costumes) > 1 else 0, | |
"costumes": all_costumes, | |
"sounds": sounds, | |
"volume": 100, | |
"layerOrder": 0, | |
"tempo": 60, | |
"videoTransparency": 50, | |
"videoState": "on", | |
"textToSpeechLanguage": None | |
}) | |
with open(project_json_path, 'w') as f: | |
json.dump(final_project, f, indent=2) | |
logger.info(f"🎉 Final project saved: {project_json_path}") | |
return project_json_path | |
def index(): | |
return render_template('app_index.html') | |
# API endpoint | |
def process_pdf(): | |
try: | |
logger.info("Received request to process PDF.") | |
if 'pdf_file' not in request.files: | |
logger.warning("No PDF file found in request.") | |
return jsonify({"error": "Missing PDF file in form-data with key 'pdf_file'"}), 400 | |
pdf_file = request.files['pdf_file'] | |
if pdf_file.filename == '': | |
return jsonify({"error": "Empty filename"}), 400 | |
# Save the uploaded PDF temporarily | |
filename = secure_filename(pdf_file.filename) | |
temp_dir = tempfile.mkdtemp() | |
saved_pdf_path = os.path.join(temp_dir, filename) | |
pdf_file.save(saved_pdf_path) | |
logger.info(f"Saved uploaded PDF to: {saved_pdf_path}") | |
# Extract & process | |
json_path = None | |
output_path, result = extract_images_from_pdf( | |
saved_pdf_path, json_path) | |
project_output = similarity_matching(output_path) | |
logger.info("Received request to process PDF.") | |
return jsonify({ | |
"message": "✅ PDF processed successfully", | |
"output_json": output_path, | |
"sprites": result, | |
"project_output_json": project_output | |
}) | |
except Exception as e: | |
logger.exception("❌ Failed to process PDF") | |
return jsonify({"error": f"❌ Failed to process PDF: {str(e)}"}), 500 | |
if __name__ == '__main__': | |
app.run(host='0.0.0.0', port=7860, debug=True) | |