Spaces:
Sleeping
Sleeping
from huggingface_hub import from_pretrained_fastai | |
import gradio as gr | |
from fastai.vision.all import * | |
import PIL | |
import torchvision.transforms as transforms | |
##Extras por si pudiera reconstruir la imagen en HF tambi茅n | |
import numpy as np | |
import os | |
import cv2 | |
def extract_subimages(image : np.ndarray, wwidth, wheight, overlap_fraction): | |
""" | |
Extracts subimages of the input image using a moving window of size (wwidth, wheight) | |
with the specified overlap fraction. Returns a tuple (subimages, coords) where subimages | |
is a list of subimages and coords is a list of tuples (x, y) indicating the top left corner | |
coordinates of each subimage in the input image. | |
""" | |
subimages = [] | |
coords = [] | |
height, width, channels = image.shape | |
if channels > 3: | |
image = image[:,:,0:3] | |
channels = 3 | |
overlap = int(max(0, min(overlap_fraction, 1)) * min(wwidth, wheight)) | |
y = 0 | |
while y + wheight <= height: | |
x = 0 | |
while x + wwidth <= width: | |
subimage = image[y:y+wheight, x:x+wwidth, :] | |
subimages.append(subimage) | |
coords.append((x, y)) | |
x += wwidth - overlap | |
y += wheight - overlap | |
if y < height: | |
y = height - wheight | |
x = 0 | |
while x + wwidth <= width: | |
subimage = image[y:y+wheight, x:x+wwidth, :] | |
subimages.append(subimage) | |
coords.append((x, y)) | |
x += wwidth - overlap | |
if x < width: | |
x = width - wwidth | |
subimage = image[y:y+wheight, x:x+wwidth, :] | |
subimages.append(subimage) | |
coords.append((x, y)) | |
if x < width: | |
x = width - wwidth | |
y = 0 | |
while y + wheight <= height: | |
subimage = image[y:y+wheight, x:x+wwidth, :] | |
subimages.append(subimage) | |
coords.append((x, y)) | |
y += wheight - overlap | |
if y < height: | |
y = height - wheight | |
subimage = image[y:y+wheight, x:x+wwidth, :] | |
subimages.append(subimage) | |
coords.append((x, y)) | |
return subimages, coords | |
# Si no hay archivos tif (labels) no se tratan, no hace falta considerarlo | |
def generate_and_save_subimages(path, output_dir_images, output_dir_labels = None): | |
if output_dir_labels: | |
if not os.path.exists(output_dir_labels): | |
os.makedirs(output_dir_labels) | |
if not os.path.exists(output_dir_images): | |
os.makedirs(output_dir_images) | |
for filename in os.listdir(path): | |
if filename.endswith(".png") or filename.endswith(".tif"): | |
filepath = os.path.join(path, filename) | |
image = cv2.imread(filepath) | |
subimages, coords = extract_subimages(image, 400, 400, 0.66) | |
for i, subimage in enumerate(subimages): | |
if filename.endswith(".png"): | |
output_filename = os.path.join(output_dir_images, f"{filename.rsplit('.', 1)[0]}_{coords[i][0]}_{coords[i][1]}.png") | |
cv2.imwrite(output_filename, subimage) | |
else: | |
if output_dir_labels: | |
output_filename = os.path.join(output_dir_labels, f"{filename.rsplit('.', 1)[0]}_{coords[i][0]}_{coords[i][1]}.tif") | |
cv2.imwrite(output_filename, subimage) | |
def generate_and_save_subimages_nolabel(path, output_dir_images, olverlap=0.0, imagesformat="png", split_in_dirs=True): | |
for entry in os.scandir(path): | |
if entry.is_file() and entry.name.lower().endswith(imagesformat): | |
filepath = entry.path | |
gss_single(filepath, output_dir_images, olverlap, imagesformat, split_in_dirs) | |
def gss_single(filepath, output_dir_images, olverlap=0.0, imagesformat="png", split_in_dirs=True): | |
image = cv2.imread(filepath) | |
if split_in_dirs: | |
dir_this_image = Path(output_dir_images)/filepath.rsplit('.', 1)[0] | |
os.makedirs(dir_this_image, exist_ok=True) | |
else: | |
os.makedirs(output_dir_images, exist_ok=True) | |
subimages, coords = extract_subimages(image, 400, 400, olverlap) | |
for i, subimage in enumerate(subimages): | |
if split_in_dirs: | |
output_filename = os.path.join(dir_this_image, f"{filepath.rsplit('.', 1)[0]}_{coords[i][0]}_{coords[i][1]}.png") | |
else: | |
output_filename = os.path.join(output_dir_images, f"{filepath.rsplit('.', 1)[0]}_{coords[i][0]}_{coords[i][1]}.png") | |
cv2.imwrite(output_filename, subimage) | |
def split_windows_in_folders(input_images_folder, output_images_folder): | |
for filename in os.listdir(input_images_folder): | |
dir_this_image = Path(output_images_folder)/filename.rsplit('.', 1)[0] | |
os.makedirs(dir_this_image, exist_ok=True) | |
if filename.endswith(".png"): | |
print(str(dir_this_image)) | |
filepath = os.path.join(path, filename) | |
image = cv2.imread(filepath) | |
subimages, coords = extract_subimages(image, 400, 400, 0) | |
for i, subimage in enumerate(subimages): | |
output_filename = os.path.join(dir_this_image, f"{filename.rsplit('.', 1)[0]}_{coords[i][0]}_{coords[i][1]}.png") | |
cv2.imwrite(output_filename, subimage) | |
def subimages_from_directory(directorio): | |
# Define el directorio a recorrer | |
directorio = directorio | |
# Define la expresi贸n regular para buscar los n煤meros X e Y en el nombre de archivo | |
patron = re.compile(r"(.*)_(\d+)_(\d+)\.(png|jpg|tif)") | |
windowlist = [] | |
coords = [] | |
# Recorre el directorio en busca de im谩genes | |
for filename in os.listdir(directorio): | |
match = patron.search(filename) | |
if match: | |
origname = match.group(1) | |
x = int(match.group(2)) | |
y = int(match.group(3)) | |
#print(f"El archivo {filename} tiene los n煤meros X={x} e Y={y}") | |
img = cv2.imread(os.path.join(directorio, filename)) | |
windowlist.append(img) | |
coords.append((x, y)) | |
# Ordena las listas por coordenadas X e Y | |
windowlist, coords = zip(*sorted(zip(windowlist, coords), key=lambda pair: (pair[1][0], pair[1][1]))) | |
wh, ww, chan = windowlist[0].shape | |
origsize = tuple(elem1 + elem2 for elem1, elem2 in zip(coords[-1], (wh,ww))) | |
return windowlist, coords, wh, ww, chan, origsize | |
def subimages_onlypath(directorio): | |
# Define el directorio a recorrer | |
directorio = directorio | |
pathlist = [] | |
patron = re.compile(r"(.*)_(\d+)_(\d+)\.(png|jpg|tif)") | |
for filename in os.listdir(directorio): | |
match = patron.search(filename) | |
if match: | |
pathlist.append(os.path.join(directorio, filename)) | |
return pathlist | |
def ReconstructFromMW(windowlist, coords, wh, ww, chan, origsize): | |
canvas = np.zeros((origsize[1], origsize[0], chan), dtype=np.uint8) | |
for idx, window in enumerate(windowlist): | |
canvas[coords[idx][1]:coords[idx][1]+wh, coords[idx][0]:coords[idx][0]+ww, :] = window | |
return canvas | |
def get_list_tp(path): | |
list_to_process = [] # Inicializar la lista que contendr谩 los nombres de los subdirectorios | |
list_names = [] | |
# Recorrer los elementos del directorio | |
for element in os.scandir(path): | |
# Verificar si el elemento es un directorio | |
if element.is_dir(): | |
# Agregar el nombre del subdirectorio a la lista | |
windowlist, coords, wh, ww, chan, origsize = subimages_from_directory(element) | |
list_to_process.append(ReconstructFromMW(windowlist, coords, wh, ww, chan, origsize)) | |
list_names.append(element.name) | |
return list_to_process, list_names | |
def get_paths_tp(path): | |
list_to_process = [] # Inicializar la lista que contendr谩 los nombres de los subdirectorios | |
# Recorrer los elementos del directorio | |
for element in os.scandir(path): | |
# Verificar si el elemento es un directorio | |
if element.is_dir(): | |
# Agregar el nombre del subdirectorio a la lista | |
list_to_process.append(subimages_onlypath(element)) | |
return list_to_process | |
def process_multifolder(process_folders, result_folder): | |
for folder in process_folders: | |
folname = os.path.basename(os.path.dirname(folder[0])) | |
destname = Path(result_folder)/folname | |
os.makedirs(destname, exist_ok=True) | |
for subimagepath in folder: | |
img = PIL.Image.open(subimagepath) | |
image = transforms.Resize((400,400))(img) | |
tensor = transform_image(image=image) | |
with torch.no_grad(): | |
outputs = model(tensor) | |
outputs = torch.argmax(outputs,1) | |
mask = np.array(outputs.cpu()) | |
mask[mask==1]=255 | |
mask=np.reshape(mask,(400,400)) | |
mask_img = Image.fromarray(mask.astype('uint8')) | |
filename = os.path.basename(subimagepath) | |
new_image_path = os.path.join(result_folder, folname, filename) | |
mask_img.save(new_image_path) | |
def recombine_windows(results_folder_w, result_f_rec): | |
imgs, nombres = get_list_tp(results_folder_w) | |
os.makedirs(result_f_rec, exist_ok=True) | |
for idx, image in enumerate(imgs): | |
img = Image.fromarray(image) | |
new_image_path = os.path.join(result_f_rec, nombres[idx] + '.tif') | |
img.save(new_image_path, compression='tiff_lzw') | |
return new_image_path | |
def process_single_image(single_image_path, base_f, pro_f, rsw_f, rsd_f): | |
gss_single(single_image_path, pro_f, 0, "tif", True) | |
process_multifolder(get_paths_tp(pro_f),rsw_f) | |
pt = recombine_windows(rsw_f,rsd_f) | |
shutil.rmtree(pro_f) | |
shutil.rmtree(rsw_f) | |
#copiar_info_georref(single_image_path, pt) | |
return pt | |
# from osgeo import gdal, osr | |
# def copiar_info_georref(entrada, salida): | |
# try: | |
# # Abrir el archivo GeoTIFF original | |
# original_dataset = gdal.Open(entrada) | |
# # Obtener la informaci贸n de georreferenciaci贸n del archivo original | |
# original_projection = original_dataset.GetProjection() | |
# original_geotransform = original_dataset.GetGeoTransform() | |
# # Abrir la imagen resultado | |
# result_dataset = gdal.Open(salida, gdal.GA_Update) | |
# # Copiar la informaci贸n de georreferenciaci贸n del archivo original a la imagen resultado | |
# result_dataset.SetProjection(original_projection) | |
# result_dataset.SetGeoTransform(original_geotransform) | |
# # Cerrar los archivos | |
# original_dataset = None | |
# result_dataset = None | |
# except Exception as e: | |
# print("Error: ", e) | |
###FIN de extras | |
#repo_id = "Ignaciobfp/segmentacion-dron-marras" | |
#learner = from_pretrained_fastai(repo_id) | |
device = torch.device("cpu") | |
#model = learner.model | |
model = torch.jit.load("modelo_marras.pth") | |
model = model.cpu() | |
def transform_image(image): | |
my_transforms = transforms.Compose([transforms.ToTensor(), | |
transforms.Normalize( | |
[0.485, 0.456, 0.406], | |
[0.229, 0.224, 0.225])]) | |
image_aux = image | |
return my_transforms(image_aux).unsqueeze(0).to(device) | |
# Definimos una funci贸n que se encarga de llevar a cabo las predicciones | |
def predict(img): | |
img_pil = PIL.Image.fromarray(img, 'RGB') | |
image = transforms.Resize((400,400))(img_pil) | |
tensor = transform_image(image=image) | |
model.to(device) | |
with torch.no_grad(): | |
outputs = model(tensor) | |
outputs = torch.argmax(outputs,1) | |
mask = np.array(outputs.cpu()) | |
mask[mask==1]=255 | |
mask=np.reshape(mask,(400,400)) | |
return Image.fromarray(mask.astype('uint8')) | |
def predict_full(img): | |
single_image_path = "tmp.tif" | |
base_f = "." | |
pro_f = "processing" | |
rsw_f = "results_windows" | |
rsd_f = "results_together" | |
destpath = process_single_image(single_image_path, base_f, pro_f, rsw_f, rsd_f) | |
im = Image.open(destpath) | |
return im | |
# Creamos la interfaz y la lanzamos. | |
gr.Interface(fn=predict, inputs=gr.inputs.Image(shape=(400, 400)), outputs=gr.outputs.Image(type="pil"), examples=['examples/1CA SUR_1200_800.png', 'examples/1CA SUR_4000_1200.png', 'examples/1CA SUR_4800_2000.png']).launch(share=False) | |
# gr.Interface(fn=predict_full, inputs=gr.inputs.Image(), outputs=gr.outputs.Image(type="pil")).launch(share=False) | |