Spaces:
Sleeping
Sleeping
import os | |
from fastapi import FastAPI, UploadFile | |
from fastapi.responses import FileResponse, HTMLResponse, RedirectResponse | |
import gradio as gr | |
from PIL import Image | |
import PIL | |
import numpy as np | |
import pypdfium2 as pdfium | |
from ultralytics import YOLO | |
from ultralytics.engine.results import Results, Masks | |
import uvicorn | |
import cv2 | |
import uuid | |
from functools import partial | |
from openai import OpenAI | |
PROMPT = "You are analyzing the spec sheet of a solar panel. Plese answer the following questions, format them with bullets. \n" | |
# from solareyes.sam import SAM | |
client = OpenAI( | |
# This is the default and can be omitted | |
api_key=os.environ.get("OPENAI_API_KEY"), | |
) | |
app = FastAPI() | |
# Load the model | |
# model: YOLO = YOLO('model/autodistill_best.pt') # Path to trained model | |
# seg_model: YOLO = YOLO('model/autodistill_best_seg.pt') # Path to trained model | |
# Directories | |
image_dir = './pdf_images/' | |
cropped_dir = './output/' | |
pdf_dir = './pdf_downloads/' | |
os.makedirs(image_dir, exist_ok=True) | |
os.makedirs(cropped_dir, exist_ok=True) | |
os.makedirs(pdf_dir, exist_ok=True) | |
HTML = """ | |
<!DOCTYPE html> | |
<html> | |
<h1>Gradio Request Demo</h1> | |
<p>Click the button to be redirected to the gradio app!</p> | |
<button onclick="window.location.pathname='/gradio'">Redirect</button> | |
</html> | |
""" | |
# sam = SAM() | |
# @app.get("/") | |
# def read_main(): | |
# return HTMLResponse(HTML) | |
# @app.get("/foo") | |
# def redirect(): | |
# return RedirectResponse("/gradio") | |
# def detect_solar_panel(image) -> Results: | |
# # Perform inference | |
# results: Results = model(image) | |
# return results | |
def segment_solar_panel(image) -> Results: | |
# Perform inference | |
seg_model: YOLO = YOLO('model/autodistill_best_seg.pt') | |
results: Results = seg_model.predict(image, imgsz=(841, 595), retina_masks=True) | |
return results | |
def resize_and_pad(subject_image: Image.Image): | |
# Resize subject image to 80% of 1200px while maintaining aspect ratio | |
target_height = int(1200 * 0.8) | |
aspect_ratio = subject_image.width / subject_image.height | |
new_width = int(target_height * aspect_ratio) | |
resized_subject = subject_image.resize((new_width, target_height), Image.LANCZOS) | |
# Create a new transparent image | |
new_image = Image.new("RGBA", (1200, 1200), (0, 0, 0, 0)) | |
# Calculate the position to paste the resized subject image | |
x = (1200 - new_width) // 2 | |
y = (1200 - target_height) // 2 | |
# Paste the resized subject image onto the transparent image | |
new_image.paste(resized_subject, (x, y), resized_subject) | |
# Save or return the PNG image | |
png_image = new_image | |
# Create a new image with a white background | |
jpg_image = Image.new("RGB", (1200, 1200), (255, 255, 255)) | |
jpg_image.paste(png_image, (0, 0), png_image) | |
# Save or return the JPEG image | |
return png_image, jpg_image | |
def segment_image_core(img: np.ndarray | Image.Image) -> Image.Image: | |
if type(img) is np.ndarray: | |
img = Image.fromarray(img) | |
results = segment_solar_panel(img) | |
sections = [] | |
for i, result in enumerate(results): | |
print(f"Result {i}") | |
result: Results | |
try: | |
h2, w2, c2 = result.orig_img.shape | |
# Deal with boxes | |
i = 0 | |
for box in result.boxes: | |
x1, y1, x2, y2 = box.xyxy[0].tolist() | |
sections.append(((int(x1), int(y1), int(x2), int(y2)), f"{section_labels[0]} Bounding Box - index {i} - conf {box.conf}")) | |
# Now the masks | |
masks: Masks = result.masks | |
try: | |
mask = masks[i] | |
cpu_mask = mask.cpu() | |
squeezed_mask = cpu_mask.data.numpy() | |
transposed_mask = squeezed_mask.transpose(1, 2, 0) | |
kernel = cv2.getStructuringElement(cv2.MORPH_OPEN, (11, 11)) | |
opened_mask = cv2.morphologyEx(transposed_mask, cv2.MORPH_OPEN, kernel, iterations=3) | |
cv_mask = cv2.resize(opened_mask, (w2, h2)) | |
image_mask = Image.fromarray((cv_mask * 255).astype(np.uint8)).filter(PIL.ImageFilter.GaussianBlur(1)) | |
img_out = img.copy() | |
img_out.putalpha(image_mask) | |
img_out = img_out.crop((x1, y1, x2, y2)) | |
png_img, jpg_img = resize_and_pad(img_out) | |
sections.append((cv_mask, f"{section_labels[0]} Mask - Index: {i}")) | |
except TypeError as e: | |
print(f"Error processing image: {e}, probably no masks.") | |
i += 1 | |
except IndexError as e: | |
print(f"Error processing image: {e}, probably no boxes.") | |
return (img, sections), jpg_img | |
def process_pdf_core(pdf) -> Image.Image: | |
pdf = pdfium.PdfDocument(pdf) | |
img_input.clear() | |
# Get just the first page | |
page = pdf[0] | |
image = page.render(scale=4).to_pil() | |
return image | |
with gr.Blocks() as demo: | |
section_labels = ['Solar Panel'] | |
def segment_image(img): | |
img_sections, jpg_img = segment_image_core(img) | |
return img_sections | |
# def process_image(img): | |
# results = detect_solar_panel(img) | |
# sections = [] | |
# for result in results: | |
# result: Results | |
# # print(result) | |
# try: | |
# boxes = result.boxes.xyxy[0].tolist() | |
# # Unpack boxes | |
# x1, y1, x2, y2 = boxes | |
# sections.append(((int(x1), int(y1), int(x2), int(y2)), f"{section_labels[0]} Bounding Box")) | |
# #Create 4 centroids around the true centroid shifted by a delta value | |
# delta = 0.3 | |
# delta_x = (x2 - x1) * delta | |
# delta_y = (y2 - y1) * delta | |
# x_centroid = (x1 + x2) / 2 | |
# y_centroid = (y1 + y2) / 2 | |
# xtop_centroid = x_centroid | |
# ytop_centroid = y_centroid + delta_y | |
# xright_centroid = x_centroid + delta_x | |
# yright_centroid = y_centroid | |
# xbottom_centroid = x_centroid | |
# ybottom_centroid = y_centroid - delta_y | |
# xleft_centroid = x_centroid - delta_x | |
# yleft_centroid = y_centroid | |
# sam_mask, sam_scores = sam.segment(img, [[ | |
# [xtop_centroid, ytop_centroid], | |
# [xright_centroid, yright_centroid], | |
# [xbottom_centroid, ybottom_centroid], | |
# [xleft_centroid, yleft_centroid] | |
# ]]) | |
# squeezed_sam_mask_tensor = sam_mask[0].squeeze() | |
# squeezed_sam_scores_tensor = sam_scores[0].squeeze() | |
# print(f"sqeezed sam mask shape {squeezed_sam_mask_tensor.shape}") | |
# print(f"sqeezed sam scores shape {squeezed_sam_scores_tensor.shape}") | |
# for i in range(0, squeezed_sam_mask_tensor.shape[0]): | |
# flat_mask = squeezed_sam_mask_tensor[i].numpy() | |
# sections.append((flat_mask, f"{section_labels[0]} Mask {i} - Score: {squeezed_sam_scores_tensor[i]}")) | |
# i += 1 | |
# except IndexError as e: | |
# print(f"Error processing image: {e}, probably no boxes.") | |
# return (img, sections) | |
def process_pdf(pdf): | |
image = process_pdf_core(pdf) | |
return segment_image(image) | |
with gr.Row(): | |
img_input = gr.Image(label="Upload Image", height=400) | |
img_output = gr.AnnotatedImage(height=400) | |
section_btn = gr.Button("Identify Solar Panel From Image") | |
# Choose a random file in input directory | |
gr.Examples( | |
inputs = img_input, | |
# examples = [os.path.join(image_dir, file) for file in random.sample(os.listdir(image_dir), 15)] | |
examples = [os.path.join(image_dir, file) for file in os.listdir(image_dir)], | |
) | |
with gr.Row(): | |
pdf_input = gr.File(label="Upload PDF", file_types=['pdf'], height=200) | |
pdf_btn = gr.Button("Identify Solar Panel from PDF") | |
gr.Examples( | |
inputs = pdf_input, | |
examples = [os.path.join(pdf_dir, file) for file in os.listdir(pdf_dir)], | |
) | |
section_btn.click(segment_image, [img_input], img_output) | |
pdf_btn.click(process_pdf, [pdf_input], img_output) | |
#Accept a PDF file, return a jpeg image | |
def extract_image(uploadFile: UploadFile) -> FileResponse: | |
file = uploadFile.file.read() | |
image = process_pdf_core(file) | |
img_segments, jpeg_image = segment_image_core(image) | |
id = str(uuid.uuid4()) | |
filename = f"{cropped_dir}/cropped_{id}.jpg" | |
jpeg_image.save(filename) | |
return FileResponse(filename) | |
def parse_pdf_text(file): | |
pdf = pdfium.PdfDocument(file) | |
all_text = "PDF Extract Text Contents Below: \n\n" | |
for page in pdf: | |
textpage = page.get_textpage() | |
text_all = textpage.get_text_range() | |
all_text += text_all | |
#use openai to ask questions about text | |
q1 = "What are module dimensions in L x W x H?" | |
q2 = "What is the module weight in kilograms?" | |
q3 = "What are the cable lengthes in millimeters?" | |
q4 = "What brand, name, or model are the connectors?" | |
q5 = "How many pieces per container? Prefer 40' HQ or HC, if not available try 53'" | |
q6 = "What is the model number?" | |
question = PROMPT + q1 + "\n" + q2 + "\n" + q3 + "\n" + q4 + "\n" + q5 + "\n" + q6 + "\n" + all_text | |
chat_completion = client.chat.completions.create( | |
messages=[ | |
{ | |
"role": "user", | |
"content": question, | |
} | |
], | |
model="gpt-3.5-turbo", | |
) | |
return chat_completion.choices[0].message.content | |
#Accept a PDF file, return a text summary | |
def parse_info(uploadFile: UploadFile): | |
file = uploadFile.file.read() | |
answer = parse_pdf_text(file) | |
return {"answer": answer} | |
app = gr.mount_gradio_app(app, demo, path="/") | |
if __name__ == "__main__": | |
# app = gr.mount_gradio_app(app, demo, path="/gradio") | |
uvicorn.run(app, port=7860) | |
# demo.launch(share=True) | |
# demo.launch(share=True, auth=(os.environ.get("GRADIO_USERNAME"), os.environ.get("GRADIO_PASSWORD"))) |