nmariotto's picture
Update app.py
8e5d255 verified
import streamlit as st
import roboflow
import pandas as pd
import matplotlib.pyplot as plt
import zipfile
import tempfile
from shapely.geometry import Polygon
from PIL import Image
from io import BytesIO
from concurrent.futures import ThreadPoolExecutor
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
from googleapiclient.http import MediaIoBaseUpload
import gspread
import time
APP_VERSION = "2.4"
# =========================
# Roboflow init
# =========================
API_KEY = st.secrets["roboflow_api_key"]
rf = roboflow.Roboflow(api_key=API_KEY)
project = rf.workspace(st.secrets["roboflow_workspace"]).project(st.secrets["roboflow_project"])
model = project.version(st.secrets["roboflow_version"]).model
model.confidence = 80
model.overlap = 25
dpi_value = 300
# =========================
# Google Drive + Sheets (OAuth2)
# =========================
scope = ["https://www.googleapis.com/auth/drive", "https://www.googleapis.com/auth/spreadsheets"]
credentials = Credentials(
token=None,
refresh_token=st.secrets["GOOGLE_DRIVE_REFRESH_TOKEN"],
token_uri="https://oauth2.googleapis.com/token",
client_id=st.secrets["GOOGLE_DRIVE_CLIENT_ID"],
client_secret=st.secrets["GOOGLE_DRIVE_CLIENT_SECRET"],
scopes=scope,
)
drive_service = build("drive", "v3", credentials=credentials)
sheets_client = gspread.authorize(credentials)
sheet = sheets_client.open_by_url(st.secrets["feedback_sheet_url"]).sheet1
# =========================
# Helpers
# =========================
def calculate_polygon_area(points):
polygon = Polygon([(p["x"], p["y"]) for p in points])
return polygon.area
def safe_predict(image_path):
for _ in range(3):
try:
return model.predict(image_path)
except Exception:
time.sleep(1)
return None
def resize_image(image):
return image.resize((640, 640))
def upload_to_drive(image_bytes, filename, folder_id):
media = MediaIoBaseUpload(image_bytes, mimetype="image/png")
drive_service.files().create(
body={"name": filename, "parents": [folder_id]},
media_body=media,
fields="id",
).execute()
def find_or_create_folder(folder_name, parent=None):
query = f"name='{folder_name}' and mimeType='application/vnd.google-apps.folder' and trashed=false"
if parent:
query += f" and '{parent}' in parents"
results = drive_service.files().list(q=query, spaces="drive", fields="files(id, name)").execute()
folders = results.get("files", [])
if folders:
return folders[0]["id"]
file_metadata = {"name": folder_name, "mimeType": "application/vnd.google-apps.folder"}
if parent:
file_metadata["parents"] = [parent]
file = drive_service.files().create(body=file_metadata, fields="id").execute()
return file.get("id")
def get_image_bytes(image):
buf = BytesIO()
image.save(buf, format="PNG")
buf.seek(0)
return buf
def process_image(uploaded_file, fov_um=None, pixel_size_um=None):
try:
safe_name = uploaded_file.name.replace(" ", "_")
image = Image.open(uploaded_file).convert("RGB")
width_px, _ = image.size
effective_pixel_size_um = None
if pixel_size_um is not None and pixel_size_um > 0:
effective_pixel_size_um = pixel_size_um
elif fov_um is not None and fov_um > 0:
effective_pixel_size_um = fov_um / float(width_px)
with tempfile.NamedTemporaryFile(suffix=".png", delete=True) as temp_file:
image.save(temp_file.name)
prediction = safe_predict(temp_file.name)
if not prediction:
return {
"Imagem": safe_name,
"Área Segmentada (px²)": None,
"Área Segmentada (µm²)": None,
"SemSegmentacao": True,
"Exibir": image,
"Original": get_image_bytes(image),
}
prediction_data = prediction.json()
if not prediction_data["predictions"]:
return {
"Imagem": safe_name,
"Área Segmentada (px²)": None,
"Área Segmentada (µm²)": None,
"SemSegmentacao": True,
"Exibir": image,
"Original": get_image_bytes(image),
}
points = prediction_data["predictions"][0]["points"]
area_px2 = calculate_polygon_area(points)
area_um2 = None
if effective_pixel_size_um is not None:
area_um2 = area_px2 * (effective_pixel_size_um**2)
x = [p["x"] for p in points] + [points[0]["x"]]
y = [p["y"] for p in points] + [points[0]["y"]]
original_buffer = get_image_bytes(image)
segmented_buffer = BytesIO()
fig, ax = plt.subplots(figsize=(6, 6), dpi=dpi_value)
ax.imshow(image)
ax.plot(x, y, color="red", linewidth=2)
ax.axis("off")
plt.savefig(segmented_buffer, format="png", bbox_inches="tight", pad_inches=0)
plt.close()
polygon_buffer = BytesIO()
fig2, ax2 = plt.subplots(figsize=(6, 6), dpi=dpi_value)
ax2.plot(x, y, "r-", linewidth=2)
ax2.scatter(x, y, color="red", s=5)
ax2.set_title("Polygon contour")
ax2.grid(True)
plt.savefig(polygon_buffer, format="png", bbox_inches="tight")
plt.close()
return {
"Imagem": safe_name,
"Área Segmentada (px²)": area_px2,
"Área Segmentada (µm²)": area_um2,
"Original": original_buffer,
"Segmentada": segmented_buffer,
"Poligono": polygon_buffer,
"Exibir": image,
"SemSegmentacao": False,
}
except Exception:
return None
def save_feedback(result, avaliacao, observacao):
image_name = result["Imagem"]
# 1) Sheet
sheet.append_row([image_name, avaliacao, observacao])
# 2) Drive curation
if avaliacao in ["Acceptable", "Bad", "No segmentation"]:
sufixo = "aceitavel" if avaliacao == "Acceptable" else "ruim" if avaliacao == "Bad" else "sem_segmentacao"
parent_folder = find_or_create_folder("Feedback Segmentacoes")
subfolder = find_or_create_folder(image_name.replace(".png", ""), parent_folder)
resized_original = resize_image(result["Exibir"])
buf = BytesIO()
resized_original.save(buf, format="PNG")
buf.seek(0)
upload_to_drive(buf, f"original_{sufixo}.png", subfolder)
if avaliacao != "No segmentation" and result.get("Segmentada") and result.get("Poligono"):
resized_segmented = resize_image(Image.open(BytesIO(result["Segmentada"].getvalue())))
resized_polygon = resize_image(Image.open(BytesIO(result["Poligono"].getvalue())))
for img_obj, nome in zip([resized_segmented, resized_polygon], ["segmentada", "poligono"]):
buf = BytesIO()
img_obj.save(buf, format="PNG")
buf.seek(0)
upload_to_drive(buf, f"{nome}_{sufixo}.png", subfolder)
def render_metrics(result):
area_px2 = result["Área Segmentada (px²)"]
area_um2 = result["Área Segmentada (µm²)"]
st.markdown("**Segmented area**")
if area_px2 is not None:
st.markdown(f"- {area_px2:.2f} px²")
if area_um2 is not None:
st.markdown(f"- {area_um2:.2f} µm²")
def render_feedback_block(result, prefix_key=""):
st.markdown("#### Segmentation quality feedback")
st.caption("User evaluation used for future model refinement.")
avaliacao = st.radio(
"Segmentation quality assessment:",
["Great", "Acceptable", "Bad", "No segmentation"],
horizontal=True,
key=f"{prefix_key}radio_{result['Imagem']}",
)
observacao = st.text_area(
"Observations (optional):",
key=f"{prefix_key}obs_{result['Imagem']}",
)
if st.button("Save feedback", key=f"{prefix_key}btn_{result['Imagem']}"):
save_feedback(result, avaliacao, observacao)
st.success("Feedback saved successfully.")
# =========================
# Layout / UI
# =========================
st.set_page_config(page_title="Scratch Assay Segmentation", layout="wide")
st.title("Scratch Assay Segmentation Tool")
st.caption(f"Version {APP_VERSION} · Deep learning–based wound closure segmentation")
st.markdown("---")
# Upload block
st.markdown("### Input")
upload_option = st.radio("Choose upload type:", ["Single image", "Image folder"], horizontal=True)
# Advanced settings (collapsed by default)
with st.expander("⚙️ Advanced Settings", expanded=False):
model.confidence = st.slider("Model confidence (%)", 20, 100, 80)
st.markdown(
"### Physical calibration (optional)\n"
"Provide the physical scale for conversion from pixel area to physical units (µm²). "
"If left empty, results will be reported only in pixels²."
)
c1, c2 = st.columns(2)
fov_um = c1.number_input(
"Field of view width (µm)",
min_value=0.0,
value=0.0,
step=1.0,
help="Physical width of the image field, in micrometers.",
)
pixel_size_um = c2.number_input(
"Pixel size (µm / pixel)",
min_value=0.0,
value=0.0,
step=0.01,
help="If provided, this overrides the FOV-based calibration.",
)
results = []
with st.sidebar:
st.markdown("## Info")
with st.expander("About / Citation", expanded=False):
st.markdown(
"""
This tool was developed by the **Medical Physics Laboratory** of the Department of **Biophysics and Pharmacology – IBB, UNESP**.
**FAPESP Process:** 2024/01849-4.
**Coordination:** Prof. Allan Alves.
**Development:** Nycolas Mariotto.
"""
)
# =========================
# Single image
# =========================
if upload_option == "Single image":
uploaded_file = st.file_uploader("Upload an image", type=["png", "jpg", "jpeg", "tiff"])
if uploaded_file:
st.markdown("---")
st.markdown("### Result")
result = process_image(uploaded_file, fov_um=fov_um, pixel_size_um=pixel_size_um)
if result:
results.append(result)
st.markdown(f"#### {result['Imagem']}")
if result["SemSegmentacao"]:
col = st.columns(1)[0]
with col:
st.image(result["Exibir"], caption="Original", use_container_width=True)
st.warning("No segmentation was detected for this image.")
else:
col1, col2, col3 = st.columns(3)
with col1:
st.image(result["Exibir"], caption="Original", use_container_width=True)
with col2:
st.image(result["Segmentada"], caption="Segmentation", use_container_width=True)
with col3:
st.image(result["Poligono"], caption="Polygon", use_container_width=True)
render_metrics(result)
st.markdown("### Export")
st.download_button(
"Download segmented overlay (PNG)",
data=result["Segmentada"],
file_name=f"segmented_{result['Imagem']}.png",
mime="image/png",
)
st.markdown("---")
render_feedback_block(result, prefix_key="single_")
# =========================
# Folder
# =========================
elif upload_option == "Image folder":
uploaded_files = st.file_uploader(
"Upload multiple images",
type=["png", "jpg", "jpeg", "tiff"],
accept_multiple_files=True,
)
if uploaded_files:
st.markdown("---")
st.markdown("### Processing")
def process_wrapper(f):
return process_image(f, fov_um=fov_um, pixel_size_um=pixel_size_um)
with ThreadPoolExecutor(max_workers=4) as executor:
processed = list(executor.map(process_wrapper, uploaded_files))
falhas = [f.name for f, r in zip(uploaded_files, processed) if r and r.get("SemSegmentacao")]
if falhas:
st.warning(
f"{len(falhas)} image(s) with no segmentation detected:\n\n- " + "\n- ".join(falhas)
)
zip_images_buffer = BytesIO()
with zipfile.ZipFile(zip_images_buffer, "w") as zip_file:
for idx, result in enumerate(processed, start=1):
if not result:
continue
results.append(result)
st.markdown("---")
st.markdown(f"### Result {idx} · {result['Imagem']}")
if result["SemSegmentacao"]:
st.image(result["Exibir"], caption="Original", use_container_width=True)
st.warning("No segmentation was detected for this image.")
else:
col1, col2, col3 = st.columns(3)
with col1:
st.image(result["Exibir"], caption="Original", use_container_width=True)
with col2:
st.image(result["Segmentada"], caption="Segmentation", use_container_width=True)
with col3:
st.image(result["Poligono"], caption="Polygon", use_container_width=True)
render_metrics(result)
# Build ZIP
zip_file.writestr(f"segmentada_{result['Imagem']}.png", result["Segmentada"].getvalue())
zip_file.writestr(f"poligono_{result['Imagem']}.png", result["Poligono"].getvalue())
render_feedback_block(result, prefix_key="folder_")
zip_images_buffer.seek(0)
# Summary table + exports
if results:
st.markdown("---")
st.markdown("### Quantitative results")
df = pd.DataFrame(
[
{
"Image": r["Imagem"],
"Segmented Area (px²)": (
r["Área Segmentada (px²)"]
if (not r["SemSegmentacao"] and r["Área Segmentada (px²)"] is not None)
else "No Segmentation"
),
"Segmented Area (µm²)": (
f"{r['Área Segmentada (µm²)']:.2f}"
if (not r["SemSegmentacao"] and r["Área Segmentada (µm²)"] is not None)
else ""
),
}
for r in results
]
)
st.dataframe(df, use_container_width=True)
excel_buffer = BytesIO()
df.to_excel(excel_buffer, index=False)
excel_buffer.seek(0)
st.markdown("### Export results")
c1, c2 = st.columns(2)
with c1:
st.download_button(
"Download table (Excel)",
data=excel_buffer,
file_name="segmentation_results.xlsx",
mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
use_container_width=True,
)
with c2:
st.download_button(
"Download segmented images (ZIP)",
data=zip_images_buffer,
file_name="segmented_images.zip",
mime="application/zip",
use_container_width=True,
)