|
import os |
|
import rasterio |
|
import geopandas as gpd |
|
from shapely.geometry import box |
|
from rasterio.mask import mask |
|
from PIL import Image |
|
import numpy as np |
|
import warnings |
|
from rasterio.errors import NodataShadowWarning |
|
import sys |
|
|
|
warnings.filterwarnings("ignore", category=NodataShadowWarning) |
|
|
|
def cut_trees(output_dir, geojson_path, tif_path): |
|
|
|
if not os.path.exists(output_dir): |
|
os.makedirs(output_dir) |
|
|
|
|
|
gdf = gpd.read_file(geojson_path) |
|
|
|
|
|
os.system('cls' if os.name == 'nt' else 'clear') |
|
|
|
|
|
with rasterio.open(tif_path) as src: |
|
|
|
tif_bounds = box(*src.bounds) |
|
|
|
|
|
tif_crs = src.crs |
|
|
|
|
|
gdf = gdf.to_crs(tif_crs) |
|
|
|
|
|
N = len(gdf) |
|
n = int(N/10) |
|
image_counter = 0 |
|
for idx, row in gdf.iterrows(): |
|
if idx % n == 0: |
|
progress = f"{round(idx/N*100)} % complete --> {idx}/{N}" |
|
sys.stdout.write('\r' + progress) |
|
sys.stdout.flush() |
|
|
|
|
|
geom = row['geometry'] |
|
name = row['id'] |
|
|
|
|
|
if geom.intersects(tif_bounds): |
|
|
|
out_image, out_transform = mask(src, [geom], crop=True) |
|
|
|
|
|
out_image = out_image.transpose(1, 2, 0) |
|
|
|
|
|
if out_image.size == 0: |
|
message = f"{round(idx/N*100)} % complete --> {idx}/{N} | Polygon {idx} resulted in an empty image and will be skipped." |
|
sys.stdout.write('\r' + message) |
|
sys.stdout.flush() |
|
continue |
|
|
|
|
|
mask_array = (out_image[:, :, 0] != src.nodata) |
|
non_zero_rows = np.any(mask_array, axis=1) |
|
non_zero_cols = np.any(mask_array, axis=0) |
|
|
|
|
|
if not np.any(non_zero_rows) or not np.any(non_zero_cols): |
|
message = f"{round(idx/N*100)} % complete --> {idx}/{N} | Polygon {idx} resulted in an invalid image area and will be skipped." |
|
sys.stdout.write('\r' + message) |
|
sys.stdout.flush() |
|
continue |
|
|
|
out_image = out_image[non_zero_rows][:, non_zero_cols] |
|
|
|
|
|
out_image = Image.fromarray(out_image.astype(np.uint8)) |
|
output_path = os.path.join(output_dir, f'tree_{name}.png') |
|
out_image.save(output_path) |
|
image_counter += 1 |
|
else: |
|
message = f"{round(idx/N*100)} % complete --> {idx}/{N} | Polygon {idx} is outside the image bounds and will be skipped." |
|
sys.stdout.write('\r' + message) |
|
sys.stdout.flush() |
|
|
|
print(f'\n {image_counter}/{N} Tree images have been successfully saved in the "detected_trees" folder.') |
|
|
|
|
|
def resize_images(input_folder, output_folder, target_size): |
|
|
|
if not os.path.exists(output_folder): |
|
os.makedirs(output_folder) |
|
|
|
counter = 0 |
|
|
|
for filename in os.listdir(input_folder): |
|
if filename.endswith('.png'): |
|
|
|
with Image.open(os.path.join(input_folder, filename)) as img: |
|
|
|
img.thumbnail(target_size, Image.LANCZOS) |
|
|
|
paste_pos = ((target_size[0] - img.size[0]) // 2, (target_size[1] - img.size[1]) // 2) |
|
|
|
new_img = Image.new("RGBA", target_size, (0, 0, 0, 255)) |
|
|
|
new_img.paste(img, paste_pos, img) |
|
|
|
new_img = new_img.convert("RGB") |
|
|
|
new_img.save(os.path.join(output_folder, filename)) |
|
|
|
counter += 1 |
|
|
|
if counter % 50 == 0: |
|
message = f"Processed {counter} images" |
|
print(message, end='\r') |
|
|
|
|
|
print(f"Processed a total of {counter} images.") |
|
|
|
|
|
|
|
def generate_tree_images(geojson_path, tif_path, target_size = (224, 224)): |
|
""" |
|
INPUT: geojson path, tif_path that contain the trees, optional target_size of the resulting images |
|
|
|
RETURNS: nothing |
|
|
|
Action: It creates two folders: + "detected trees" --> the cut tree images |
|
+ "tree_images" --> the processed cut tree images, ready to use for species recognition |
|
""" |
|
|
|
|
|
|
|
folder_cut_trees = "detected_trees" |
|
folder_finished_images = "tree_images" |
|
|
|
cut_trees(geojson_path = geojson_path, tif_path = tif_path, output_dir = folder_cut_trees) |
|
resize_images(input_folder = folder_cut_trees, output_folder = folder_finished_images, target_size = target_size) |
|
|
|
|