Spaces:
Running
Running
import os, sys, inspect, json, shutil, cv2, time, glob #imagesize | |
import pandas as pd | |
import matplotlib.pyplot as plt | |
from matplotlib.backends.backend_pdf import PdfPages | |
from PIL import Image | |
from tqdm import tqdm | |
from time import perf_counter | |
import concurrent.futures | |
from threading import Lock | |
from collections import defaultdict | |
import multiprocessing | |
import torch | |
currentdir = os.path.dirname(inspect.getfile(inspect.currentframe())) | |
parentdir = os.path.dirname(currentdir) | |
sys.path.append(currentdir) | |
from detect import run | |
sys.path.append(parentdir) | |
from landmark_processing import LeafSkeleton | |
from armature_processing import ArmatureSkeleton | |
def detect_plant_components(cfg, logger, dir_home, Project, Dirs): | |
t1_start = perf_counter() | |
logger.name = 'Locating Plant Components' | |
logger.info(f"Detecting plant components in {len(os.listdir(Project.dir_images))} images") | |
try: | |
dir_exisiting_labels = cfg['leafmachine']['project']['use_existing_plant_component_detections'] | |
except: | |
dir_exisiting_labels = None | |
if cfg['leafmachine']['project']['num_workers'] is None: | |
num_workers = 1 | |
else: | |
num_workers = int(cfg['leafmachine']['project']['num_workers']) | |
# Weights folder base | |
dir_weights = os.path.join(dir_home, 'leafmachine2', 'component_detector','runs','train') | |
# Detection threshold | |
threshold = cfg['leafmachine']['plant_component_detector']['minimum_confidence_threshold'] | |
detector_version = cfg['leafmachine']['plant_component_detector']['detector_version'] | |
detector_iteration = cfg['leafmachine']['plant_component_detector']['detector_iteration'] | |
detector_weights = cfg['leafmachine']['plant_component_detector']['detector_weights'] | |
weights = os.path.join(dir_weights,'Plant_Detector',detector_version,detector_iteration,'weights',detector_weights) | |
do_save_prediction_overlay_images = not cfg['leafmachine']['plant_component_detector']['do_save_prediction_overlay_images'] | |
ignore_objects = cfg['leafmachine']['plant_component_detector']['ignore_objects_for_overlay'] | |
ignore_objects = ignore_objects or [] | |
if dir_exisiting_labels != None: | |
logger.info("Loading existing plant labels") | |
fetch_labels(dir_exisiting_labels, os.path.join(Dirs.path_plant_components, 'labels')) | |
if len(Project.dir_images) <= 4000: | |
logger.debug("Single-threaded create_dictionary_from_txt() len(Project.dir_images) <= 4000") | |
A = create_dictionary_from_txt(logger, dir_exisiting_labels, 'Detections_Plant_Components', Project) | |
else: | |
logger.debug(f"Multi-threaded with ({str(cfg['leafmachine']['project']['num_workers'])}) threads create_dictionary_from_txt() len(Project.dir_images) > 4000") | |
A = create_dictionary_from_txt_parallel(logger, cfg, dir_exisiting_labels, 'Detections_Plant_Components', Project) | |
else: | |
logger.info("Running YOLOv5 to generate plant labels") | |
# run(weights = weights, | |
# source = Project.dir_images, | |
# project = Dirs.path_plant_components, | |
# name = Dirs.run_name, | |
# imgsz = (1280, 1280), | |
# nosave = do_save_prediction_overlay_images, | |
# anno_type = 'Plant_Detector', | |
# conf_thres = threshold, | |
# ignore_objects_for_overlay = ignore_objects, | |
# mode = 'LM2', | |
# LOGGER=logger,) | |
source = Project.dir_images | |
project = Dirs.path_plant_components | |
name = Dirs.run_name | |
imgsz = (1280, 1280) | |
nosave = do_save_prediction_overlay_images | |
anno_type = 'Plant_Detector' | |
conf_thres = threshold | |
ignore_objects_for_overlay = ignore_objects | |
mode = 'LM2' | |
LOGGER = logger | |
with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor: | |
futures = [executor.submit(run_in_parallel, weights, source, project, name, imgsz, nosave, anno_type, | |
conf_thres, 10, ignore_objects_for_overlay, mode, LOGGER, i, num_workers) for i in | |
range(num_workers)] | |
for future in concurrent.futures.as_completed(futures): | |
try: | |
_ = future.result() | |
except Exception as e: | |
logger.error(f'Error in thread: {e}') | |
continue | |
t2_stop = perf_counter() | |
logger.info(f"[Plant components detection elapsed time] {round(t2_stop - t1_start)} seconds") | |
logger.info(f"Threads [{num_workers}]") | |
if len(Project.dir_images) <= 4000: | |
logger.debug("Single-threaded create_dictionary_from_txt() len(Project.dir_images) <= 4000") | |
A = create_dictionary_from_txt(logger, os.path.join(Dirs.path_plant_components, 'labels'), 'Detections_Plant_Components', Project) | |
else: | |
logger.debug(f"Multi-threaded with ({str(cfg['leafmachine']['project']['num_workers'])}) threads create_dictionary_from_txt() len(Project.dir_images) > 4000") | |
A = create_dictionary_from_txt_parallel(logger, cfg, os.path.join(Dirs.path_plant_components, 'labels'), 'Detections_Plant_Components', Project) | |
dict_to_json(Project.project_data, Dirs.path_plant_components, 'Detections_Plant_Components.json') | |
t1_stop = perf_counter() | |
logger.info(f"[Processing plant components elapsed time] {round(t1_stop - t1_start)} seconds") | |
torch.cuda.empty_cache() | |
return Project | |
def detect_archival_components(cfg, logger, dir_home, Project, Dirs): | |
if not cfg['leafmachine']['use_RGB_label_images']: | |
logger.name = 'Skipping LeafMachine2 Label Detection' | |
logger.info(f"Full image will be used instead of the label collage") | |
else: | |
t1_start = perf_counter() | |
logger.name = 'Locating Archival Components' | |
logger.info(f"Detecting archival components in {len(os.listdir(Project.dir_images))} images") | |
try: | |
dir_exisiting_labels = cfg['leafmachine']['project']['use_existing_archival_component_detections'] | |
except: | |
dir_exisiting_labels = None | |
if cfg['leafmachine']['project']['num_workers'] is None: | |
num_workers = 1 | |
else: | |
num_workers = int(cfg['leafmachine']['project']['num_workers']) | |
# Weights folder base | |
dir_weights = os.path.join(dir_home, 'leafmachine2', 'component_detector','runs','train') | |
# Detection threshold | |
threshold = cfg['leafmachine']['archival_component_detector']['minimum_confidence_threshold'] | |
detector_version = cfg['leafmachine']['archival_component_detector']['detector_version'] | |
detector_iteration = cfg['leafmachine']['archival_component_detector']['detector_iteration'] | |
detector_weights = cfg['leafmachine']['archival_component_detector']['detector_weights'] | |
weights = os.path.join(dir_weights,'Archival_Detector',detector_version,detector_iteration,'weights',detector_weights) | |
do_save_prediction_overlay_images = not cfg['leafmachine']['archival_component_detector']['do_save_prediction_overlay_images'] | |
ignore_objects = cfg['leafmachine']['archival_component_detector']['ignore_objects_for_overlay'] | |
ignore_objects = ignore_objects or [] | |
if dir_exisiting_labels != None: | |
logger.info("Loading existing archival labels") | |
fetch_labels(dir_exisiting_labels, os.path.join(Dirs.path_archival_components, 'labels')) | |
if len(Project.dir_images) <= 4000: | |
logger.debug("Single-threaded create_dictionary_from_txt() len(Project.dir_images) <= 4000") | |
A = create_dictionary_from_txt(logger, dir_exisiting_labels, 'Detections_Archival_Components', Project) | |
else: | |
logger.debug(f"Multi-threaded with ({str(cfg['leafmachine']['project']['num_workers'])}) threads create_dictionary_from_txt() len(Project.dir_images) > 4000") | |
A = create_dictionary_from_txt_parallel(logger, cfg, dir_exisiting_labels, 'Detections_Archival_Components', Project) | |
else: | |
logger.info("Running YOLOv5 to generate archival labels") | |
# run(weights = weights, | |
# source = Project.dir_images, | |
# project = Dirs.path_archival_components, | |
# name = Dirs.run_name, | |
# imgsz = (1280, 1280), | |
# nosave = do_save_prediction_overlay_images, | |
# anno_type = 'Archival_Detector', | |
# conf_thres = threshold, | |
# ignore_objects_for_overlay = ignore_objects, | |
# mode = 'LM2', | |
# LOGGER=logger) | |
# split the image paths into 4 chunks | |
source = Project.dir_images | |
project = Dirs.path_archival_components | |
name = Dirs.run_name | |
imgsz = (1280, 1280) | |
nosave = do_save_prediction_overlay_images | |
anno_type = 'Archival_Detector' | |
conf_thres = threshold | |
ignore_objects_for_overlay = ignore_objects | |
mode = 'LM2' | |
LOGGER = logger | |
with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor: | |
futures = [executor.submit(run_in_parallel, weights, source, project, name, imgsz, nosave, anno_type, | |
conf_thres, 10, ignore_objects_for_overlay, mode, LOGGER, i, num_workers) for i in | |
range(num_workers)] | |
for future in concurrent.futures.as_completed(futures): | |
try: | |
_ = future.result() | |
except Exception as e: | |
logger.error(f'Error in thread: {e}') | |
continue | |
t2_stop = perf_counter() | |
logger.info(f"[Archival components detection elapsed time] {round(t2_stop - t1_start)} seconds") | |
logger.info(f"Threads [{num_workers}]") | |
if len(Project.dir_images) <= 4000: | |
logger.debug("Single-threaded create_dictionary_from_txt() len(Project.dir_images) <= 4000") | |
A = create_dictionary_from_txt(logger, os.path.join(Dirs.path_archival_components, 'labels'), 'Detections_Archival_Components', Project) | |
else: | |
logger.debug(f"Multi-threaded with ({str(cfg['leafmachine']['project']['num_workers'])}) threads create_dictionary_from_txt() len(Project.dir_images) > 4000") | |
A = create_dictionary_from_txt_parallel(logger, cfg, os.path.join(Dirs.path_archival_components, 'labels'), 'Detections_Archival_Components', Project) | |
dict_to_json(Project.project_data, Dirs.path_archival_components, 'Detections_Archival_Components.json') | |
t1_stop = perf_counter() | |
logger.info(f"[Processing archival components elapsed time] {round(t1_stop - t1_start)} seconds") | |
torch.cuda.empty_cache() | |
return Project | |
def detect_armature_components(cfg, logger, dir_home, Project, Dirs): | |
t1_start = perf_counter() | |
logger.name = 'Locating Armature Components' | |
logger.info(f"Detecting armature components in {len(os.listdir(Project.dir_images))} images") | |
if cfg['leafmachine']['project']['num_workers'] is None: | |
num_workers = 1 | |
else: | |
num_workers = int(cfg['leafmachine']['project']['num_workers']) | |
# Weights folder base | |
dir_weights = os.path.join(dir_home, 'leafmachine2', 'component_detector','runs','train') | |
# Detection threshold | |
threshold = cfg['leafmachine']['armature_component_detector']['minimum_confidence_threshold'] | |
detector_version = cfg['leafmachine']['armature_component_detector']['detector_version'] | |
detector_iteration = cfg['leafmachine']['armature_component_detector']['detector_iteration'] | |
detector_weights = cfg['leafmachine']['armature_component_detector']['detector_weights'] | |
weights = os.path.join(dir_weights,'Armature_Detector',detector_version,detector_iteration,'weights',detector_weights) | |
do_save_prediction_overlay_images = not cfg['leafmachine']['armature_component_detector']['do_save_prediction_overlay_images'] | |
ignore_objects = cfg['leafmachine']['armature_component_detector']['ignore_objects_for_overlay'] | |
ignore_objects = ignore_objects or [] | |
logger.info("Running YOLOv5 to generate armature labels") | |
source = Project.dir_images | |
project = Dirs.path_armature_components | |
name = Dirs.run_name | |
imgsz = (1280, 1280) | |
nosave = do_save_prediction_overlay_images | |
anno_type = 'Armature_Detector' | |
conf_thres = threshold | |
ignore_objects_for_overlay = ignore_objects | |
mode = 'LM2' | |
LOGGER = logger | |
with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor: | |
futures = [executor.submit(run_in_parallel, weights, source, project, name, imgsz, nosave, anno_type, | |
conf_thres, 10, ignore_objects_for_overlay, mode, LOGGER, i, num_workers) for i in | |
range(num_workers)] | |
for future in concurrent.futures.as_completed(futures): | |
try: | |
_ = future.result() | |
except Exception as e: | |
logger.error(f'Error in thread: {e}') | |
continue | |
t2_stop = perf_counter() | |
logger.info(f"[Plant components detection elapsed time] {round(t2_stop - t1_start)} seconds") | |
logger.info(f"Threads [{num_workers}]") | |
if len(Project.dir_images) <= 4000: | |
logger.debug("Single-threaded create_dictionary_from_txt() len(Project.dir_images) <= 4000") | |
A = create_dictionary_from_txt(logger, os.path.join(Dirs.path_armature_components, 'labels'), 'Detections_Armature_Components', Project) | |
else: | |
logger.debug(f"Multi-threaded with ({str(cfg['leafmachine']['project']['num_workers'])}) threads create_dictionary_from_txt() len(Project.dir_images) > 4000") | |
A = create_dictionary_from_txt_parallel(logger, cfg, os.path.join(Dirs.path_armature_components, 'labels'), 'Detections_Armature_Components', Project) | |
dict_to_json(Project.project_data, Dirs.path_armature_components, 'Detections_Armature_Components.json') | |
t1_stop = perf_counter() | |
logger.info(f"[Processing armature components elapsed time] {round(t1_stop - t1_start)} seconds") | |
torch.cuda.empty_cache() | |
return Project | |
''' RUN IN PARALLEL''' | |
def run_in_parallel(weights, source, project, name, imgsz, nosave, anno_type, conf_thres, line_thickness, ignore_objects_for_overlay, mode, LOGGER, chunk, n_workers): | |
num_files = len(os.listdir(source)) | |
LOGGER.info(f"The number of worker threads: ({n_workers}), number of files ({num_files}).") | |
chunk_size = len(os.listdir(source)) // n_workers | |
start = chunk * chunk_size | |
end = start + chunk_size if chunk < (n_workers-1) else len(os.listdir(source)) | |
sub_source = [os.path.join(source, f) for f in os.listdir(source)[start:end] if f.lower().endswith('.jpg')] | |
run(weights=weights, | |
source=sub_source, | |
project=project, | |
name=name, | |
imgsz=imgsz, | |
nosave=nosave, | |
anno_type=anno_type, | |
conf_thres=conf_thres, | |
ignore_objects_for_overlay=ignore_objects_for_overlay, | |
mode=mode, | |
LOGGER=LOGGER) | |
''' RUN IN PARALLEL''' | |
###### Multi-thread NOTE this works, but unless there are several thousand images, it will be slower | |
def process_file(logger, file, dir_components, component, Project, lock): | |
file_name = str(file.split('.')[0]) | |
with open(os.path.join(dir_components, file), "r") as f: | |
with lock: | |
Project.project_data[file_name][component] = [[int(line.split()[0])] + list(map(float, line.split()[1:])) for line in f] | |
try: | |
image_path = glob.glob(os.path.join(Project.dir_images, file_name + '.*'))[0] | |
name_ext = os.path.basename(image_path) | |
with Image.open(image_path) as im: | |
_, ext = os.path.splitext(name_ext) | |
if ext not in ['.jpg']: | |
im = im.convert('RGB') | |
im.save(os.path.join(Project.dir_images, file_name) + '.jpg', quality=100) | |
# file_name += '.jpg' | |
width, height = im.size | |
except Exception as e: | |
print(f"Unable to get image dimensions. Error: {e}") | |
logger.info(f"Unable to get image dimensions. Error: {e}") | |
width, height = None, None | |
if width and height: | |
Project.project_data[file_name]['height'] = int(height) | |
Project.project_data[file_name]['width'] = int(width) | |
def create_dictionary_from_txt_parallel(logger, cfg, dir_components, component, Project): | |
if cfg['leafmachine']['project']['num_workers'] is None: | |
num_workers = 4 | |
else: | |
num_workers = int(cfg['leafmachine']['project']['num_workers']) | |
files = [file for file in os.listdir(dir_components) if file.endswith(".txt")] | |
lock = Lock() | |
with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor: | |
futures = [] | |
for file in files: | |
futures.append(executor.submit(process_file, logger, file, dir_components, component, Project, lock)) | |
for future in concurrent.futures.as_completed(futures): | |
pass | |
return Project.project_data | |
###### | |
# Single threaded | |
def create_dictionary_from_txt(logger, dir_components, component, Project): | |
# dict_labels = {} | |
for file in tqdm(os.listdir(dir_components), desc="Loading Annotations", colour='green'): | |
if file.endswith(".txt"): | |
file_name = str(file.split('.')[0]) | |
with open(os.path.join(dir_components, file), "r") as f: | |
# dict_labels[file] = [[int(line.split()[0])] + list(map(float, line.split()[1:])) for line in f] | |
Project.project_data[file_name][component] = [[int(line.split()[0])] + list(map(float, line.split()[1:])) for line in f] | |
try: | |
image_path = glob.glob(os.path.join(Project.dir_images, file_name + '.*'))[0] | |
name_ext = os.path.basename(image_path) | |
with Image.open(image_path) as im: | |
_, ext = os.path.splitext(name_ext) | |
if ext not in ['.jpg']: | |
im = im.convert('RGB') | |
im.save(os.path.join(Project.dir_images, file_name) + '.jpg', quality=100) | |
# file_name += '.jpg' | |
width, height = im.size | |
except Exception as e: | |
# print(f"Unable to get image dimensions. Error: {e}") | |
logger.info(f"Unable to get image dimensions. Error: {e}") | |
width, height = None, None | |
if width and height: | |
Project.project_data[file_name]['height'] = int(height) | |
Project.project_data[file_name]['width'] = int(width) | |
# for key, value in dict_labels.items(): | |
# print(f'{key} --> {value}') | |
return Project.project_data | |
# old below | |
'''def create_dictionary_from_txt(dir_components, component, Project): | |
# dict_labels = {} | |
for file in os.listdir(dir_components): | |
if file.endswith(".txt"): | |
file_name = str(file.split('.')[0]) | |
with open(os.path.join(dir_components, file), "r") as f: | |
# dict_labels[file] = [[int(line.split()[0])] + list(map(float, line.split()[1:])) for line in f] | |
Project.project_data[file_name][component] = [[int(line.split()[0])] + list(map(float, line.split()[1:])) for line in f] | |
try: | |
width, height = imagesize.get(os.path.join(Project.dir_images, '.'.join([file_name,'jpg']))) | |
except Exception as e: | |
print(f"Image not in 'jpg' format. Trying 'jpeg'. Note that other formats are not supported.{e}") | |
width, height = imagesize.get(os.path.join(Project.dir_images, '.'.join([file_name,'jpeg']))) | |
Project.project_data[file_name]['height'] = int(height) | |
Project.project_data[file_name]['width'] = int(width) | |
# for key, value in dict_labels.items(): | |
# print(f'{key} --> {value}') | |
return Project.project_data''' | |
def dict_to_json(dict_labels, dir_components, name_json): | |
dir_components = os.path.join(dir_components, 'JSON') | |
with open(os.path.join(dir_components, name_json), "w") as outfile: | |
json.dump(dict_labels, outfile) | |
def fetch_labels(dir_exisiting_labels, new_dir): | |
shutil.copytree(dir_exisiting_labels, new_dir) | |
'''Landmarks - uses YOLO, but works differently than above. A hybrid between segmentation and component detector''' | |
def detect_landmarks(cfg, logger, dir_home, Project, batch, n_batches, Dirs, segmentation_complete): | |
start_t = perf_counter() | |
logger.name = f'[BATCH {batch+1} Detect Landmarks]' | |
logger.info(f'Detecting landmarks for batch {batch+1} of {n_batches}') | |
landmark_whole_leaves = cfg['leafmachine']['landmark_detector']['landmark_whole_leaves'] | |
landmark_partial_leaves = cfg['leafmachine']['landmark_detector']['landmark_partial_leaves'] | |
landmarks_whole_leaves_props = {} | |
landmarks_whole_leaves_overlay = {} | |
landmarks_partial_leaves_props = {} | |
landmarks_partial_leaves_overlay = {} | |
if landmark_whole_leaves: | |
run_landmarks(cfg, logger, dir_home, Project, batch, n_batches, Dirs, 'Landmarks_Whole_Leaves', segmentation_complete) | |
if landmark_partial_leaves: | |
run_landmarks(cfg, logger, dir_home, Project, batch, n_batches, Dirs, 'Landmarks_Partial_Leaves', segmentation_complete) | |
# if cfg['leafmachine']['leaf_segmentation']['segment_whole_leaves']: | |
# landmarks_whole_leaves_props_batch, landmarks_whole_leaves_overlay_batch = run_landmarks(Instance_Detector_Whole, Project.project_data_list[batch], 0, | |
# "Segmentation_Whole_Leaf", "Whole_Leaf_Cropped", cfg, Project, Dirs, batch, n_batches)#, start+1, end) | |
# landmarks_whole_leaves_props.update(landmarks_whole_leaves_props_batch) | |
# landmarks_whole_leaves_overlay.update(landmarks_whole_leaves_overlay_batch) | |
# if cfg['leafmachine']['leaf_segmentation']['segment_partial_leaves']: | |
# landmarks_partial_leaves_props_batch, landmarks_partial_leaves_overlay_batch = run_landmarks(Instance_Detector_Partial, Project.project_data_list[batch], 1, | |
# "Segmentation_Partial_Leaf", "Partial_Leaf_Cropped", cfg, Project, Dirs, batch, n_batches)#, start+1, end) | |
# landmarks_partial_leaves_props.update(landmarks_partial_leaves_props_batch) | |
# landmarks_partial_leaves_overlay.update(landmarks_partial_leaves_overlay_batch) | |
end_t = perf_counter() | |
logger.info(f'Batch {batch+1}/{n_batches}: Landmark Detection Duration --> {round((end_t - start_t)/60)} minutes') | |
return Project | |
def detect_armature(cfg, logger, dir_home, Project, batch, n_batches, Dirs, segmentation_complete): | |
start_t = perf_counter() | |
logger.name = f'[BATCH {batch+1} Detect Armature]' | |
logger.info(f'Detecting armature for batch {batch+1} of {n_batches}') | |
landmark_armature = cfg['leafmachine']['modules']['armature'] | |
landmarks_armature_props = {} | |
landmarks_armature_overlay = {} | |
if landmark_armature: | |
run_armature(cfg, logger, dir_home, Project, batch, n_batches, Dirs, 'Landmarks_Armature', segmentation_complete) | |
end_t = perf_counter() | |
logger.info(f'Batch {batch+1}/{n_batches}: Armature Detection Duration --> {round((end_t - start_t)/60)} minutes') | |
return Project | |
def run_armature(cfg, logger, dir_home, Project, batch, n_batches, Dirs, leaf_type, segmentation_complete): | |
logger.info('Detecting armature landmarks from scratch') | |
if leaf_type == 'Landmarks_Armature': | |
dir_overlay = os.path.join(Dirs.landmarks_armature_overlay, ''.join(['batch_',str(batch+1)])) | |
# if not segmentation_complete: # If segmentation was run, then don't redo the unpack, just do the crop into the temp folder | |
if leaf_type == 'Landmarks_Armature': # TODO THE 0 is for prickles. For spines I'll need to add a 1 like with partial_leaves or just do it for all | |
Project.project_data_list[batch] = unpack_class_from_components_armature(Project.project_data_list[batch], 0, 'Armature_YOLO', 'Armature_BBoxes', Project) | |
Project.project_data_list[batch], dir_temp = crop_images_to_bbox_armature(Project.project_data_list[batch], 0, 'Armature_Cropped', "Armature_BBoxes", Project, Dirs, True, cfg) | |
# Weights folder base | |
dir_weights = os.path.join(dir_home, 'leafmachine2', 'component_detector','runs','train') | |
# Detection threshold | |
threshold = cfg['leafmachine']['landmark_detector_armature']['minimum_confidence_threshold'] | |
detector_version = cfg['leafmachine']['landmark_detector_armature']['detector_version'] | |
detector_iteration = cfg['leafmachine']['landmark_detector_armature']['detector_iteration'] | |
detector_weights = cfg['leafmachine']['landmark_detector_armature']['detector_weights'] | |
weights = os.path.join(dir_weights,'Landmark_Detector_YOLO',detector_version,detector_iteration,'weights',detector_weights) | |
do_save_prediction_overlay_images = not cfg['leafmachine']['landmark_detector_armature']['do_save_prediction_overlay_images'] | |
ignore_objects = cfg['leafmachine']['landmark_detector_armature']['ignore_objects_for_overlay'] | |
ignore_objects = ignore_objects or [] | |
if cfg['leafmachine']['project']['num_workers'] is None: | |
num_workers = 1 | |
else: | |
num_workers = int(cfg['leafmachine']['project']['num_workers']) | |
has_images = False | |
if len(os.listdir(dir_temp)) > 0: | |
has_images = True | |
source = dir_temp | |
project = dir_overlay | |
name = Dirs.run_name | |
imgsz = (1280, 1280) | |
nosave = do_save_prediction_overlay_images | |
anno_type = 'Armature_Detector' | |
conf_thres = threshold | |
line_thickness = 2 | |
ignore_objects_for_overlay = ignore_objects | |
mode = 'Landmark' | |
LOGGER = logger | |
# Initialize a Lock object to ensure thread safety | |
lock = Lock() | |
with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor: | |
futures = [executor.submit(run_in_parallel, weights, source, project, name, imgsz, nosave, anno_type, | |
conf_thres, line_thickness, ignore_objects_for_overlay, mode, LOGGER, i, num_workers) for i in | |
range(num_workers)] | |
for future in concurrent.futures.as_completed(futures): | |
try: | |
_ = future.result() | |
except Exception as e: | |
logger.error(f'Error in thread: {e}') | |
continue | |
with lock: | |
if has_images: | |
dimensions_dict = get_cropped_dimensions(dir_temp) | |
A = add_to_dictionary_from_txt_armature(cfg, logger, Dirs, leaf_type, os.path.join(dir_overlay, 'labels'), leaf_type, Project, dimensions_dict, dir_temp, batch, n_batches) | |
else: | |
# TODO add empty placeholder to the image data | |
pass | |
# delete the temp dir | |
try: | |
shutil.rmtree(dir_temp) | |
except: | |
try: | |
time.sleep(5) | |
shutil.rmtree(dir_temp) | |
except: | |
try: | |
time.sleep(5) | |
shutil.rmtree(dir_temp) | |
except: | |
pass | |
torch.cuda.empty_cache() | |
return Project | |
def run_landmarks(cfg, logger, dir_home, Project, batch, n_batches, Dirs, leaf_type, segmentation_complete): | |
use_existing_landmark_detections = cfg['leafmachine']['landmark_detector']['use_existing_landmark_detections'] | |
if use_existing_landmark_detections is None: | |
logger.info('Detecting landmarks from scratch') | |
if leaf_type == 'Landmarks_Whole_Leaves': | |
dir_overlay = os.path.join(Dirs.landmarks_whole_leaves_overlay, ''.join(['batch_',str(batch+1)])) | |
elif leaf_type == 'Landmarks_Partial_Leaves': | |
dir_overlay = os.path.join(Dirs.landmarks_partial_leaves_overlay, ''.join(['batch_',str(batch+1)])) | |
# if not segmentation_complete: # If segmentation was run, then don't redo the unpack, just do the crop into the temp folder | |
if leaf_type == 'Landmarks_Whole_Leaves': | |
Project.project_data_list[batch] = unpack_class_from_components(Project.project_data_list[batch], 0, 'Whole_Leaf_BBoxes_YOLO', 'Whole_Leaf_BBoxes', Project) | |
Project.project_data_list[batch], dir_temp = crop_images_to_bbox(Project.project_data_list[batch], 0, 'Whole_Leaf_Cropped', "Whole_Leaf_BBoxes", Project, Dirs) | |
elif leaf_type == 'Landmarks_Partial_Leaves': | |
Project.project_data_list[batch] = unpack_class_from_components(Project.project_data_list[batch], 1, 'Partial_Leaf_BBoxes_YOLO', 'Partial_Leaf_BBoxes', Project) | |
Project.project_data_list[batch], dir_temp = crop_images_to_bbox(Project.project_data_list[batch], 1, 'Partial_Leaf_Cropped', "Partial_Leaf_BBoxes", Project, Dirs) | |
# else: | |
# if leaf_type == 'Landmarks_Whole_Leaves': | |
# Project.project_data_list[batch], dir_temp = crop_images_to_bbox(Project.project_data_list[batch], 0, 'Whole_Leaf_Cropped', "Whole_Leaf_BBoxes", Project, Dirs) | |
# elif leaf_type == 'Landmarks_Partial_Leaves': | |
# Project.project_data_list[batch], dir_temp = crop_images_to_bbox(Project.project_data_list[batch], 1, 'Partial_Leaf_Cropped', "Partial_Leaf_BBoxes", Project, Dirs) | |
# Weights folder base | |
dir_weights = os.path.join(dir_home, 'leafmachine2', 'component_detector','runs','train') | |
# Detection threshold | |
threshold = cfg['leafmachine']['landmark_detector']['minimum_confidence_threshold'] | |
detector_version = cfg['leafmachine']['landmark_detector']['detector_version'] | |
detector_iteration = cfg['leafmachine']['landmark_detector']['detector_iteration'] | |
detector_weights = cfg['leafmachine']['landmark_detector']['detector_weights'] | |
weights = os.path.join(dir_weights,'Landmark_Detector_YOLO',detector_version,detector_iteration,'weights',detector_weights) | |
do_save_prediction_overlay_images = not cfg['leafmachine']['landmark_detector']['do_save_prediction_overlay_images'] | |
ignore_objects = cfg['leafmachine']['landmark_detector']['ignore_objects_for_overlay'] | |
ignore_objects = ignore_objects or [] | |
if cfg['leafmachine']['project']['num_workers'] is None: | |
num_workers = 1 | |
else: | |
num_workers = int(cfg['leafmachine']['project']['num_workers']) | |
has_images = False | |
if len(os.listdir(dir_temp)) > 0: | |
has_images = True | |
# run(weights = weights, | |
# source = dir_temp, | |
# project = dir_overlay, | |
# name = Dirs.run_name, | |
# imgsz = (1280, 1280), | |
# nosave = do_save_prediction_overlay_images, | |
# anno_type = 'Landmark_Detector_YOLO', | |
# conf_thres = threshold, | |
# line_thickness = 2, | |
# ignore_objects_for_overlay = ignore_objects, | |
# mode = 'Landmark') | |
source = dir_temp | |
project = dir_overlay | |
name = Dirs.run_name | |
imgsz = (1280, 1280) | |
nosave = do_save_prediction_overlay_images | |
anno_type = 'Landmark_Detector' | |
conf_thres = threshold | |
line_thickness = 2 | |
ignore_objects_for_overlay = ignore_objects | |
mode = 'Landmark' | |
LOGGER = logger | |
# Initialize a Lock object to ensure thread safety | |
lock = Lock() | |
with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor: | |
futures = [executor.submit(run_in_parallel, weights, source, project, name, imgsz, nosave, anno_type, | |
conf_thres, line_thickness, ignore_objects_for_overlay, mode, LOGGER, i, num_workers) for i in | |
range(num_workers)] | |
for future in concurrent.futures.as_completed(futures): | |
try: | |
_ = future.result() | |
except Exception as e: | |
logger.error(f'Error in thread: {e}') | |
continue | |
with lock: | |
if has_images: | |
dimensions_dict = get_cropped_dimensions(dir_temp) | |
A = add_to_dictionary_from_txt(cfg, logger, Dirs, leaf_type, os.path.join(dir_overlay, 'labels'), leaf_type, Project, dimensions_dict, dir_temp, batch, n_batches) | |
else: | |
# TODO add empty placeholder to the image data | |
pass | |
else: | |
logger.info('Loading existing landmark annotations') | |
dir_temp = os.path.join(use_existing_landmark_detections, f'batch_{str(batch+1)}', 'labels') | |
dimensions_dict = get_cropped_dimensions(dir_temp) | |
A = add_to_dictionary_from_txt(cfg, logger, Dirs, leaf_type, use_existing_landmark_detections, leaf_type, Project, dimensions_dict, dir_temp, batch, n_batches) | |
# delete the temp dir | |
try: | |
shutil.rmtree(dir_temp) | |
except: | |
try: | |
time.sleep(5) | |
shutil.rmtree(dir_temp) | |
except: | |
try: | |
time.sleep(5) | |
shutil.rmtree(dir_temp) | |
except: | |
pass | |
torch.cuda.empty_cache() | |
return Project | |
'''def add_to_dictionary_from_txt(cfg, Dirs, leaf_type, dir_components, component, Project, dimensions_dict, dir_temp): | |
# dict_labels = {} | |
for file in os.listdir(dir_components): | |
file_name = str(file.split('.')[0]) | |
file_name_parent = file_name.split('__')[0] | |
Project.project_data[file_name_parent][component] = {} | |
if file.endswith(".txt"): | |
with open(os.path.join(dir_components, file), "r") as f: | |
all_points = [[int(line.split()[0])] + list(map(float, line.split()[1:])) for line in f] | |
Project.project_data[file_name_parent][component][file_name] = all_points | |
height = dimensions_dict[file_name][0] | |
width = dimensions_dict[file_name][1] | |
Leaf_Skeleton = LeafSkeleton(cfg, Dirs, leaf_type, all_points, height, width, dir_temp, file_name) | |
QC_add = Leaf_Skeleton.get_QC()''' | |
return Project.project_data | |
def add_to_dictionary_from_txt_armature(cfg, logger, Dirs, leaf_type, dir_components, component, Project, dimensions_dict, dir_temp, batch, n_batches): | |
dpi = cfg['leafmachine']['overlay']['overlay_dpi'] | |
if leaf_type == 'Landmarks_Armature': | |
logger.info(f'Detecting landmarks armature') | |
pdf_path = os.path.join(Dirs.landmarks_armature_overlay_QC, ''.join(['landmarks_armature_overlay_QC__',str(batch+1), 'of', str(n_batches), '.pdf'])) | |
pdf_path_final = os.path.join(Dirs.landmarks_armature_overlay_final, ''.join(['landmarks_armature_overlay_final__',str(batch+1), 'of', str(n_batches), '.pdf'])) | |
### FINAL | |
# dict_labels = {} | |
fig = plt.figure(figsize=(8.27, 11.69), dpi=dpi) # A4 size, 300 dpi | |
row, col = 0, 0 | |
with PdfPages(pdf_path_final) as pdf: | |
for file in os.listdir(dir_components): | |
file_name = str(file.split('.')[0]) | |
file_name_parent = file_name.split('__')[0] | |
# Project.project_data_list[batch][file_name_parent][component] = [] | |
if file_name_parent in Project.project_data_list[batch]: | |
if file.endswith(".txt"): | |
with open(os.path.join(dir_components, file), "r") as f: | |
all_points = [[int(line.split()[0])] + list(map(float, line.split()[1:])) for line in f] | |
# Project.project_data_list[batch][file_name_parent][component][file_name] = all_points | |
height = dimensions_dict[file_name][0] | |
width = dimensions_dict[file_name][1] | |
Armature_Skeleton = ArmatureSkeleton(cfg, logger, Dirs, leaf_type, all_points, height, width, dir_temp, file_name) | |
Project = add_armature_skeleton_to_project(cfg, logger, Project, batch, file_name_parent, component, Dirs, leaf_type, all_points, height, width, dir_temp, file_name, Armature_Skeleton) | |
final_add = cv2.cvtColor(Armature_Skeleton.get_final(), cv2.COLOR_BGR2RGB) | |
# Add image to the current subplot | |
ax = fig.add_subplot(5, 3, row * 3 + col + 1) | |
ax.imshow(final_add) | |
ax.axis('off') | |
col += 1 | |
if col == 3: | |
col = 0 | |
row += 1 | |
if row == 5: | |
row = 0 | |
pdf.savefig(fig) # Save the current page | |
fig = plt.figure(figsize=(8.27, 11.69), dpi=300) # Create a new page | |
else: | |
pass | |
if row != 0 or col != 0: | |
pdf.savefig(fig) # Save the remaining images on the last page | |
def add_to_dictionary_from_txt(cfg, logger, Dirs, leaf_type, dir_components, component, Project, dimensions_dict, dir_temp, batch, n_batches): | |
dpi = cfg['leafmachine']['overlay']['overlay_dpi'] | |
if leaf_type == 'Landmarks_Whole_Leaves': | |
logger.info(f'Detecting landmarks whole leaves') | |
pdf_path = os.path.join(Dirs.landmarks_whole_leaves_overlay_QC, ''.join(['landmarks_whole_leaves_overlay_QC__',str(batch+1), 'of', str(n_batches), '.pdf'])) | |
pdf_path_final = os.path.join(Dirs.landmarks_whole_leaves_overlay_final, ''.join(['landmarks_whole_leaves_overlay_final__',str(batch+1), 'of', str(n_batches), '.pdf'])) | |
elif leaf_type == 'Landmarks_Partial_Leaves': | |
logger.info(f'Detecting landmarks partial leaves') | |
pdf_path = os.path.join(Dirs.landmarks_partial_leaves_overlay_QC, ''.join(['landmarks_partial_leaves_overlay_QC__',str(batch+1), 'of', str(n_batches), '.pdf'])) | |
pdf_path_final = os.path.join(Dirs.landmarks_partial_leaves_overlay_final, ''.join(['landmarks_partial_leaves_overlay_final__',str(batch+1), 'of', str(n_batches), '.pdf'])) | |
elif leaf_type == 'Landmarks_Armature': | |
logger.info(f'Detecting landmarks armature') | |
pdf_path = os.path.join(Dirs.landmarks_armature_overlay_QC, ''.join(['landmarks_armature_overlay_QC__',str(batch+1), 'of', str(n_batches), '.pdf'])) | |
pdf_path_final = os.path.join(Dirs.landmarks_armature_overlay_final, ''.join(['landmarks_armature_overlay_final__',str(batch+1), 'of', str(n_batches), '.pdf'])) | |
### FINAL | |
# dict_labels = {} | |
fig = plt.figure(figsize=(8.27, 11.69), dpi=dpi) # A4 size, 300 dpi | |
row, col = 0, 0 | |
with PdfPages(pdf_path_final) as pdf: | |
for file in os.listdir(dir_components): | |
file_name = str(file.split('.')[0]) | |
file_name_parent = file_name.split('__')[0] | |
# Project.project_data_list[batch][file_name_parent][component] = [] | |
if file_name_parent in Project.project_data_list[batch]: | |
if file.endswith(".txt"): | |
with open(os.path.join(dir_components, file), "r") as f: | |
all_points = [[int(line.split()[0])] + list(map(float, line.split()[1:])) for line in f] | |
# Project.project_data_list[batch][file_name_parent][component][file_name] = all_points | |
height = dimensions_dict[file_name][0] | |
width = dimensions_dict[file_name][1] | |
Leaf_Skeleton = LeafSkeleton(cfg, logger, Dirs, leaf_type, all_points, height, width, dir_temp, file_name) | |
Project = add_leaf_skeleton_to_project(cfg, logger, Project, batch, file_name_parent, component, Dirs, leaf_type, all_points, height, width, dir_temp, file_name, Leaf_Skeleton) | |
final_add = cv2.cvtColor(Leaf_Skeleton.get_final(), cv2.COLOR_BGR2RGB) | |
# Add image to the current subplot | |
ax = fig.add_subplot(5, 3, row * 3 + col + 1) | |
ax.imshow(final_add) | |
ax.axis('off') | |
col += 1 | |
if col == 3: | |
col = 0 | |
row += 1 | |
if row == 5: | |
row = 0 | |
pdf.savefig(fig) # Save the current page | |
fig = plt.figure(figsize=(8.27, 11.69), dpi=300) # Create a new page | |
else: | |
pass | |
if row != 0 or col != 0: | |
pdf.savefig(fig) # Save the remaining images on the last page | |
### QC | |
'''do_save_QC_pdf = False # TODO refine this | |
if do_save_QC_pdf: | |
# dict_labels = {} | |
fig = plt.figure(figsize=(8.27, 11.69), dpi=dpi) # A4 size, 300 dpi | |
row, col = 0, 0 | |
with PdfPages(pdf_path) as pdf: | |
for file in os.listdir(dir_components): | |
file_name = str(file.split('.')[0]) | |
file_name_parent = file_name.split('__')[0] | |
if file_name_parent in Project.project_data_list[batch]: | |
if file.endswith(".txt"): | |
with open(os.path.join(dir_components, file), "r") as f: | |
all_points = [[int(line.split()[0])] + list(map(float, line.split()[1:])) for line in f] | |
Project.project_data_list[batch][file_name_parent][component][file_name] = all_points | |
height = dimensions_dict[file_name][0] | |
width = dimensions_dict[file_name][1] | |
Leaf_Skeleton = LeafSkeleton(cfg, logger, Dirs, leaf_type, all_points, height, width, dir_temp, file_name) | |
QC_add = cv2.cvtColor(Leaf_Skeleton.get_QC(), cv2.COLOR_BGR2RGB) | |
# Add image to the current subplot | |
ax = fig.add_subplot(5, 3, row * 3 + col + 1) | |
ax.imshow(QC_add) | |
ax.axis('off') | |
col += 1 | |
if col == 3: | |
col = 0 | |
row += 1 | |
if row == 5: | |
row = 0 | |
pdf.savefig(fig) # Save the current page | |
fig = plt.figure(figsize=(8.27, 11.69), dpi=300) # Create a new page | |
else: | |
pass | |
if row != 0 or col != 0: | |
pdf.savefig(fig) # Save the remaining images on the last page''' | |
def add_armature_skeleton_to_project(cfg, logger, Project, batch, file_name_parent, component, Dirs, leaf_type, all_points, height, width, dir_temp, file_name, ARM): | |
if ARM.is_complete: | |
try: | |
Project.project_data_list[batch][file_name_parent][component].append({file_name: [{'armature_status': 'complete'}, {'armature': ARM}]}) | |
except: | |
Project.project_data_list[batch][file_name_parent][component] = [] | |
Project.project_data_list[batch][file_name_parent][component].append({file_name: [{'armature_status': 'complete'}, {'armature': ARM}]}) | |
else: | |
try: | |
Project.project_data_list[batch][file_name_parent][component].append({file_name: [{'armature_status': 'incomplete'}, {'armature': ARM}]}) | |
except: | |
Project.project_data_list[batch][file_name_parent][component] = [] | |
Project.project_data_list[batch][file_name_parent][component].append({file_name: [{'armature_status': 'incomplete'}, {'armature': ARM}]}) | |
return Project | |
def add_leaf_skeleton_to_project(cfg, logger, Project, batch, file_name_parent, component, Dirs, leaf_type, all_points, height, width, dir_temp, file_name, LS): | |
if LS.is_complete_leaf: | |
try: | |
Project.project_data_list[batch][file_name_parent][component].append({file_name: [{'landmark_status': 'complete_leaf'}, {'landmarks': LS}]}) | |
except: | |
Project.project_data_list[batch][file_name_parent][component] = [] | |
Project.project_data_list[batch][file_name_parent][component].append({file_name: [{'landmark_status': 'complete_leaf'}, {'landmarks': LS}]}) | |
# Project.project_data_list[batch][file_name_parent][component][file_name].update({'landmark_status': 'complete_leaf'}) | |
# Project.project_data_list[batch][file_name_parent][component][file_name].update({'landmarks': LS}) | |
elif LS.is_leaf_no_width: | |
try: | |
Project.project_data_list[batch][file_name_parent][component].append({file_name: [{'landmark_status': 'leaf_no_width'}, {'landmarks': LS}]}) | |
except: | |
Project.project_data_list[batch][file_name_parent][component] = [] | |
Project.project_data_list[batch][file_name_parent][component].append({file_name: [{'landmark_status': 'leaf_no_width'}, {'landmarks': LS}]}) | |
# Project.project_data_list[batch][file_name_parent][component][file_name].update({'landmark_status': 'leaf_no_width'}) | |
# Project.project_data_list[batch][file_name_parent][component][file_name].update({'landmarks': LS}) | |
else: | |
try: | |
Project.project_data_list[batch][file_name_parent][component].append({file_name: [{'landmark_status': 'incomplete'}, {'landmarks': LS}]}) | |
except: | |
Project.project_data_list[batch][file_name_parent][component] = [] | |
Project.project_data_list[batch][file_name_parent][component].append({file_name: [{'landmark_status': 'incomplete'}, {'landmarks': LS}]}) | |
# Project.project_data_list[batch][file_name_parent][component][file_name].update({'landmark_status': 'incomplete'}) | |
# Project.project_data_list[batch][file_name_parent][component][file_name].update({'landmarks': LS}) | |
return Project | |
''' | |
self.determine_lamina_length('final') | |
# Lamina tip and base | |
if self.has_lamina_tip: | |
cv2.circle(self.image_final, self.lamina_tip, radius=4, color=(0, 255, 0), thickness=2) | |
cv2.circle(self.image_final, self.lamina_tip, radius=2, color=(255, 255, 255), thickness=-1) | |
if self.has_lamina_base: | |
cv2.circle(self.image_final, self.lamina_base, radius=4, color=(255, 0, 0), thickness=2) | |
cv2.circle(self.image_final, self.lamina_base, radius=2, color=(255, 255, 255), thickness=-1) | |
# Apex angle | |
# if self.apex_center != []: | |
# cv2.circle(self.image_final, self.apex_center, radius=3, color=(0, 255, 0), thickness=-1) | |
if self.apex_left != []: | |
cv2.circle(self.image_final, self.apex_left, radius=3, color=(255, 0, 0), thickness=-1) | |
if self.apex_right != []: | |
cv2.circle(self.image_final, self.apex_right, radius=3, color=(0, 0, 255), thickness=-1) | |
# Base angle | |
# if self.base_center: | |
# cv2.circle(self.image_final, self.base_center, radius=3, color=(0, 255, 0), thickness=-1) | |
if self.base_left: | |
cv2.circle(self.image_final, self.base_left, radius=3, color=(255, 0, 0), thickness=-1) | |
if self.base_right: | |
cv2.circle(self.image_final, self.base_right, radius=3, color=(0, 0, 255), thickness=-1) | |
# Draw line of fit | |
for point in self.width_infer: | |
''' | |
def get_cropped_dimensions(dir_temp): | |
dimensions_dict = {} | |
for file_name in os.listdir(dir_temp): | |
if file_name.endswith(".jpg"): | |
img = cv2.imread(os.path.join(dir_temp, file_name)) | |
height, width, channels = img.shape | |
stem = os.path.splitext(file_name)[0] | |
dimensions_dict[stem] = (height, width) | |
return dimensions_dict | |
def unpack_class_from_components_armature(dict_big, cls, dict_name_yolo, dict_name_location, Project): | |
# Get the dict that contains plant parts, find the whole leaves | |
for filename, value in dict_big.items(): | |
if "Detections_Armature_Components" in value: | |
filtered_components = [val for val in value["Detections_Armature_Components"] if val[0] == cls] | |
value[dict_name_yolo] = filtered_components | |
for filename, value in dict_big.items(): | |
if "Detections_Armature_Components" in value: | |
filtered_components = [val for val in value["Detections_Armature_Components"] if val[0] == cls] | |
height = value['height'] | |
width = value['width'] | |
converted_list = [[convert_index_to_class_armature(val[0]), int((val[1] * width) - ((val[3] * width) / 2)), | |
int((val[2] * height) - ((val[4] * height) / 2)), | |
int(val[3] * width) + int((val[1] * width) - ((val[3] * width) / 2)), | |
int(val[4] * height) + int((val[2] * height) - ((val[4] * height) / 2))] for val in filtered_components] | |
# Verify that the crops are correct | |
# img = Image.open(os.path.join(Project., '.'.join([filename,'jpg']))) | |
# for d in converted_list: | |
# img_crop = img.crop((d[1], d[2], d[3], d[4])) | |
# img_crop.show() | |
value[dict_name_location] = converted_list | |
# print(dict) | |
return dict_big | |
def unpack_class_from_components(dict_big, cls, dict_name_yolo, dict_name_location, Project): | |
# Get the dict that contains plant parts, find the whole leaves | |
for filename, value in dict_big.items(): | |
if "Detections_Plant_Components" in value: | |
filtered_components = [val for val in value["Detections_Plant_Components"] if val[0] == cls] | |
value[dict_name_yolo] = filtered_components | |
for filename, value in dict_big.items(): | |
if "Detections_Plant_Components" in value: | |
filtered_components = [val for val in value["Detections_Plant_Components"] if val[0] == cls] | |
height = value['height'] | |
width = value['width'] | |
converted_list = [[convert_index_to_class(val[0]), int((val[1] * width) - ((val[3] * width) / 2)), | |
int((val[2] * height) - ((val[4] * height) / 2)), | |
int(val[3] * width) + int((val[1] * width) - ((val[3] * width) / 2)), | |
int(val[4] * height) + int((val[2] * height) - ((val[4] * height) / 2))] for val in filtered_components] | |
# Verify that the crops are correct | |
# img = Image.open(os.path.join(Project., '.'.join([filename,'jpg']))) | |
# for d in converted_list: | |
# img_crop = img.crop((d[1], d[2], d[3], d[4])) | |
# img_crop.show() | |
value[dict_name_location] = converted_list | |
# print(dict) | |
return dict_big | |
def crop_images_to_bbox_armature(dict_big, cls, dict_name_cropped, dict_from, Project, Dirs, do_upscale=False, cfg=None): | |
dir_temp = os.path.join(Dirs.landmarks, 'TEMP_landmarks') | |
os.makedirs(dir_temp, exist_ok=True) | |
# For each image, iterate through the whole leaves, segment, report data back to dict_plant_components | |
for filename, value in dict_big.items(): | |
value[dict_name_cropped] = [] | |
if dict_from in value: | |
bboxes_whole_leaves = [val for val in value[dict_from] if val[0] == convert_index_to_class_armature(cls)] | |
if len(bboxes_whole_leaves) == 0: | |
m = str(''.join(['No objects for class ', convert_index_to_class_armature(0), ' were found'])) | |
# Print_Verbose(cfg, 3, m).plain() | |
else: | |
try: | |
img = cv2.imread(os.path.join(Project.dir_images, '.'.join([filename,'jpg']))) | |
# img = cv2.imread(os.path.join(Project, '.'.join([filename,'jpg']))) # Testing | |
except: | |
img = cv2.imread(os.path.join(Project.dir_images, '.'.join([filename,'jpeg']))) | |
# img = cv2.imread(os.path.join(Project, '.'.join([filename,'jpeg']))) # Testing | |
for d in bboxes_whole_leaves: | |
# img_crop = img.crop((d[1], d[2], d[3], d[4])) # PIL | |
img_crop = img[d[2]:d[4], d[1]:d[3]] | |
loc = '-'.join([str(d[1]), str(d[2]), str(d[3]), str(d[4])]) | |
# value[dict_name_cropped].append({crop_name: img_crop}) | |
if do_upscale: | |
upscale_factor = int(cfg['leafmachine']['landmark_detector_armature']['upscale_factor']) | |
if cls == 0: | |
crop_name = '__'.join([filename,f"PRICKLE-{upscale_factor}x",loc]) | |
height, width, _ = img_crop.shape | |
img_crop = cv2.resize(img_crop, ((width * upscale_factor), (height * upscale_factor)), interpolation=cv2.INTER_LANCZOS4) | |
else: | |
if cls == 0: | |
crop_name = '__'.join([filename,'PRICKLE',loc]) | |
cv2.imwrite(os.path.join(dir_temp, '.'.join([crop_name,'jpg'])), img_crop) | |
# cv2.imshow('img_crop', img_crop) | |
# cv2.waitKey(0) | |
# img_crop.show() # PIL | |
return dict_big, dir_temp | |
def crop_images_to_bbox(dict_big, cls, dict_name_cropped, dict_from, Project, Dirs): | |
dir_temp = os.path.join(Dirs.landmarks, 'TEMP_landmarks') | |
os.makedirs(dir_temp, exist_ok=True) | |
# For each image, iterate through the whole leaves, segment, report data back to dict_plant_components | |
for filename, value in dict_big.items(): | |
value[dict_name_cropped] = [] | |
if dict_from in value: | |
bboxes_whole_leaves = [val for val in value[dict_from] if val[0] == convert_index_to_class(cls)] | |
if len(bboxes_whole_leaves) == 0: | |
m = str(''.join(['No objects for class ', convert_index_to_class(0), ' were found'])) | |
# Print_Verbose(cfg, 3, m).plain() | |
else: | |
try: | |
img = cv2.imread(os.path.join(Project.dir_images, '.'.join([filename,'jpg']))) | |
# img = cv2.imread(os.path.join(Project, '.'.join([filename,'jpg']))) # Testing | |
except: | |
img = cv2.imread(os.path.join(Project.dir_images, '.'.join([filename,'jpeg']))) | |
# img = cv2.imread(os.path.join(Project, '.'.join([filename,'jpeg']))) # Testing | |
for d in bboxes_whole_leaves: | |
# img_crop = img.crop((d[1], d[2], d[3], d[4])) # PIL | |
img_crop = img[d[2]:d[4], d[1]:d[3]] | |
loc = '-'.join([str(d[1]), str(d[2]), str(d[3]), str(d[4])]) | |
if cls == 0: | |
crop_name = '__'.join([filename,'L',loc]) | |
elif cls == 1: | |
crop_name = '__'.join([filename,'PL',loc]) | |
elif cls == 2: | |
crop_name = '__'.join([filename,'ARM',loc]) | |
# value[dict_name_cropped].append({crop_name: img_crop}) | |
cv2.imwrite(os.path.join(dir_temp, '.'.join([crop_name,'jpg'])), img_crop) | |
# cv2.imshow('img_crop', img_crop) | |
# cv2.waitKey(0) | |
# img_crop.show() # PIL | |
return dict_big, dir_temp | |
def convert_index_to_class(ind): | |
mapping = { | |
0: 'apex_angle', | |
1: 'base_angle', | |
2: 'lamina_base', | |
3: 'lamina_tip', | |
4: 'lamina_width', | |
5: 'lobe_tip', | |
6: 'midvein_trace', | |
7: 'petiole_tip', | |
8: 'petiole_trace', | |
} | |
return mapping.get(ind, 'Invalid class').lower() | |
def convert_index_to_class_armature(ind): | |
mapping = { | |
0: 'tip', | |
1: 'middle', | |
2: 'outer', | |
} | |
return mapping.get(ind, 'Invalid class').lower() | |