Spaces:
Sleeping
Sleeping
import streamlit as st | |
import cv2 | |
import numpy as np | |
from PIL import Image | |
import base64 | |
from io import BytesIO | |
from Quality.uiqm import getUCIQE, getUIQM | |
from collected_models.DICAM.UIEB.DICAM import dicam_enhance | |
from skimage.feature import hog | |
#import spacy | |
#from spacy import displacy | |
#from spacy import tokenizer | |
import re | |
import folium | |
from streamlit_folium import st_folium, folium_static | |
import pandas as pd | |
#nlp = spacy.load('en_core_web_lg') | |
# function | |
enhanced_images = [] | |
def adjust_gamma(image, gamma=1.0): | |
# build a lookup table mapping the pixel values [0, 255] to | |
# their adjusted gamma values | |
if gamma ==0: | |
st.error('gamma is not allowed to be 0 !') | |
return image | |
else: | |
invGamma = 1.0 / gamma | |
table = np.array([((i / 255.0) ** invGamma) * 255 for i in np.arange(0, 256)]).astype("uint8") | |
# apply gamma correction using the lookup table | |
return cv2.LUT(image, table) | |
st.set_page_config(layout="wide", page_title="DICAM Demo") | |
st.write("# Demo of the DICAM underwater image enhancement model!") | |
st.write( | |
"Upload images and enhance their quality using various underwater image enhancement methods" | |
) | |
st.sidebar.write("## Upload and download :gear:") | |
st.markdown("""---""") | |
# Download the fixed image | |
def convert_image(img): | |
# img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) | |
img = Image.fromarray(img) | |
buf = BytesIO() | |
img.save(buf, format="PNG") | |
byte_im = buf.getvalue() | |
return byte_im | |
def pil2cv(item): | |
image = Image.open(item).convert('RGB') | |
image = np.array(image) | |
return image | |
def make_recording_widget(f): | |
"""Return a function that wraps a streamlit widget and records the | |
widget's values to a global dictionary. | |
""" | |
def wrapper(label, *args, **kwargs): | |
widget_value = f(label, *args, **kwargs) | |
enhanced_images.append(widget_value) | |
return widget_value | |
return wrapper | |
def update(org,enhanced, alg): | |
col2.write("Original Image :camera:") | |
col2.empty() | |
col2.image(org) | |
col3.empty() | |
col3.write("Enhanced Image" + " using " + alg +" :wrench:") | |
col3.image(enhanced,clamp=True) | |
st.sidebar.empty() | |
st.sidebar.markdown("\n") | |
st.sidebar.download_button("Download the " + alg + " enhanced image", convert_image(enhanced), "enhanced_"+alg+".png", "output/png") | |
if IQA_box: | |
st.markdown("""---""") | |
st.write("### Quality measurements: ") | |
st.write("##### Enhanced image: ") | |
with st.spinner("Measuring enhanced image quality..."): | |
e_uiqm = getUIQM((enhanced)) | |
e_uciqe = getUCIQE((enhanced)) | |
st.metric(label="UIQM", value=str("%.2f" %e_uiqm), delta=str("%.2f" %(e_uiqm-getUIQM(pil2cv(my_upload).copy())))) | |
st.metric(label="UCIQE", value=str("%.2f" %e_uciqe), delta=str("%.2f" %(e_uciqe-getUCIQE(pil2cv(my_upload).copy())))) | |
def quality_evaluator(): | |
if IQA_box and enhanced is None: | |
st.markdown("""---""") | |
st.write("### Quality measurements: ") | |
st.write("##### Raw image: ") | |
with st.spinner("Measuring raw image quality..."): | |
st.text("UIQM: "+str(getUIQM(pil2cv(my_upload).copy())) +" , "+"UCIQE: "+str(getUCIQE(pil2cv(my_upload).copy()))) | |
def reset_to_default(): | |
col2.write("Original Image :camera:") | |
col2.empty() | |
col2.image(my_upload) | |
st.info('Panel cleared!') | |
col1, col2, col3 = st.columns(3) | |
imageLocation = col3.empty() | |
my_upload = st.sidebar.file_uploader("Upload an image", type=["png", "jpg", "jpeg","bmp","mp4","mpg","asf"]) | |
image_types = ["Optical"] | |
img_type_selection = col1.radio("Select image type", image_types) | |
enhanced = None | |
#Enhancement Tools | |
def extract_one_frame_each_second(video_path, output_path): | |
text_detection_result,words, mask = detect_text(pil2cv(my_upload).copy()) | |
update(my_upload,text_detection_result,"Text detection result") | |
mask = mask.astype("uint8") | |
kernel = np.ones((4,4), np.uint8) | |
mask= cv2.dilate(mask, kernel, iterations=4) | |
img = pil2cv(my_upload).copy() | |
dst = cv2.inpaint(img,mask,6,cv2.INPAINT_NS ) | |
col3.markdown("""---""") | |
col3.write("### Inpainted image: ") | |
col3.image(dst,clamp=True) | |
if my_upload is not None: | |
print(my_upload.type) | |
if(my_upload.type=="video/mp4" or my_upload.type=="video/mpeg" or my_upload.type=="application/vnd.ms-asf"): | |
with open(my_upload.name, mode='wb') as f: | |
f.write(my_upload.read()) | |
vidcap = cv2.VideoCapture(my_upload.name) # load video from disk | |
fps = int(vidcap.get(cv2.CAP_PROP_FPS)) | |
frame_count = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
duration = frame_count/fps | |
width = int(vidcap.get(cv2.CAP_PROP_FRAME_WIDTH)) # float `width` | |
height = int(vidcap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
cur_frame = 0 | |
success = True | |
fourcc = cv2.VideoWriter_fourcc('D', 'I', 'V', 'X') | |
out = cv2.VideoWriter("processed video/enhanced.avi",fourcc, fps, (width,height)) | |
percent = 0 | |
my_bar = st.progress(0, text="Operation in progress. Please wait...") | |
while True: | |
success, frame = vidcap.read() # get next frame from video | |
if success ==False: | |
continue | |
# frame = cv2.resize(frame, (512, 512)) | |
enhanced = (dicam_enhance(frame)) | |
percent = (cur_frame / frame_count) | |
my_bar.progress(percent,text="Operation in progress. Please wait...") | |
out.write(enhanced.copy()) | |
cur_frame += 1 | |
out.release() | |
print('done') | |
else: | |
if 'enhanced' not in st.session_state: | |
st.session_state.enhanced = None | |
IQA_box = col1.checkbox("Quality assessment?",on_change=quality_evaluator, value=0) | |
col0_1,col0_2= col1.columns(2) | |
st.sidebar.image(my_upload) | |
col1.write("### Enhancement Tools") | |
col1_1, col1_2, col1_3 = col1.columns(3) | |
#col1_2_1, col1_2_2, col1_2_3 = col1.columns(3) | |
#col1_3_1, col1_3_2, col1_3_3 = col1.columns(3) | |
if img_type_selection =="Optical": | |
if col1_1.button('Clear panel',use_container_width=True): | |
reset_to_default() | |
if col1_2.button('DICAM',use_container_width=True): | |
enhanced = (dicam_enhance(pil2cv(my_upload).copy())) | |
update(my_upload,enhanced,"DICAM") | |
st.session_state.enhanced=enhanced.copy() | |
else: | |
st.info("Please uplaod an image to start!") | |