Spaces:
Running
Running
import os, time, requests, yaml, re, csv, sys, inspect | |
from dataclasses import dataclass, field | |
# from difflib import diff_bytes | |
import pandas as pd | |
import numpy as np | |
from PIL import Image | |
import matplotlib.pyplot as plt | |
from urllib.parse import urlparse | |
from requests.adapters import HTTPAdapter | |
from urllib3.util import Retry | |
from torch import ge | |
from re import S | |
from threading import Lock | |
from random import shuffle | |
from collections import defaultdict | |
currentdir = os.path.dirname(os.path.dirname(inspect.getfile(inspect.currentframe()))) | |
parentdir = os.path.dirname(currentdir) | |
sys.path.append(parentdir) | |
sys.path.append(currentdir) | |
from concurrent.futures import ThreadPoolExecutor as th | |
from vouchervision.general_utils import bcolors, validate_dir | |
''' | |
For download parallelization, I followed this guide https://rednafi.github.io/digressions/python/2020/04/21/python-concurrent-futures.html | |
''' | |
''' | |
#################################################################################################### | |
Read config files | |
#################################################################################################### | |
''' | |
def get_cfg_from_full_path(path_cfg): | |
with open(path_cfg, "r") as ymlfile: | |
cfg = yaml.full_load(ymlfile) | |
return cfg | |
''' | |
Classes | |
''' | |
class ImageCandidate: | |
cfg: str = '' | |
herb_code: str = '' | |
specimen_id: str = '' | |
family: str = '' | |
genus: str = '' | |
species: str = '' | |
fullname: str = '' | |
filename_image: str = '' | |
filename_image_jpg: str = '' | |
url: str = '' | |
headers_occ: str = '' | |
headers_img: str = '' | |
occ_row: list = field(init=False,default_factory=None) | |
image_row: list = field(init=False,default_factory=None) | |
def __init__(self, cfg, image_row, occ_row, url, lock): | |
# self.headers_occ = list(occ_row.columns.values) | |
# self.headers_img = list(image_row.columns.values) | |
self.headers_occ = occ_row | |
self.headers_img = image_row | |
self.occ_row = occ_row # pd.DataFrame(data=occ_row,columns=self.headers_occ) | |
self.image_row = image_row # pd.DataFrame(data=image_row,columns=self.headers_img) | |
self.url = url | |
self.cfg = cfg | |
self.filename_image, self.filename_image_jpg, self.herb_code, self.specimen_id, self.family, self.genus, self.species, self.fullname = generate_image_filename(occ_row) | |
self.download_image(lock) | |
def download_image(self, lock) -> None: | |
dir_destination = self.cfg['dir_destination_images'] | |
MP_low = self.cfg['MP_low'] | |
MP_high = self.cfg['MP_high'] | |
# Define URL get parameters | |
sep = '_' | |
session = requests.Session() | |
retry = Retry(connect=1) #2, backoff_factor=0.5) | |
adapter = HTTPAdapter(max_retries=retry) | |
session.mount('http://', adapter) | |
session.mount('https://', adapter) | |
print(f"{bcolors.BOLD} {self.fullname}{bcolors.ENDC}") | |
print(f"{bcolors.BOLD} URL: {self.url}{bcolors.ENDC}") | |
try: | |
response = session.get(self.url, stream=True, timeout=1.0) | |
img = Image.open(response.raw) | |
self._save_matching_image(img, MP_low, MP_high, dir_destination, lock) | |
print(f"{bcolors.OKGREEN} SUCCESS{bcolors.ENDC}") | |
except Exception as e: | |
print(f"{bcolors.FAIL} SKIP No Connection or ERROR --> {e}{bcolors.ENDC}") | |
print(f"{bcolors.WARNING} Status Code --> {response.status_code}{bcolors.ENDC}") | |
print(f"{bcolors.WARNING} Reasone --> {response.reason}{bcolors.ENDC}") | |
def _save_matching_image(self, img, MP_low, MP_high, dir_destination, lock) -> None: | |
img_mp, img_w, img_h = check_image_size(img) | |
if img_mp < MP_low: | |
print(f"{bcolors.WARNING} SKIP < {MP_low}MP: {img_mp}{bcolors.ENDC}") | |
elif MP_low <= img_mp <= MP_high: | |
image_path = os.path.join(dir_destination,self.filename_image_jpg) | |
img.save(image_path) | |
#imgSaveName = pd.DataFrame({"image_path": [image_path]}) | |
self._add_occ_and_img_data(lock) | |
print(f"{bcolors.OKGREEN} Regular MP: {img_mp}{bcolors.ENDC}") | |
print(f"{bcolors.OKGREEN} Image Saved: {image_path}{bcolors.ENDC}") | |
elif img_mp > MP_high: | |
if self.cfg['do_resize']: | |
[img_w, img_h] = calc_resize(img_w, img_h) | |
newsize = (img_w, img_h) | |
img = img.resize(newsize) | |
image_path = os.path.join(dir_destination,self.filename_image_jpg) | |
img.save(image_path) | |
#imgSaveName = pd.DataFrame({"imgSaveName": [imgSaveName]}) | |
self._add_occ_and_img_data(lock) | |
print(f"{bcolors.OKGREEN} {MP_high}MP+ Resize: {img_mp}{bcolors.ENDC}") | |
print(f"{bcolors.OKGREEN} Image Saved: {image_path}{bcolors.ENDC}") | |
else: | |
print(f"{bcolors.OKCYAN} {MP_high}MP+ Resize: {img_mp}{bcolors.ENDC}") | |
print(f"{bcolors.OKCYAN} SKIP: {image_path}{bcolors.ENDC}") | |
def _add_occ_and_img_data(self, lock) -> None: | |
self.image_row = self.image_row.to_frame().transpose().rename(columns={"identifier": "url"}) | |
self.image_row = self.image_row.rename(columns={"gbifID": "gbifID_images"}) | |
new_data = {'fullname': [self.fullname], 'filename_image': [self.filename_image], 'filename_image_jpg': [self.filename_image_jpg]} | |
new_data = pd.DataFrame(data=new_data) | |
all_data = [new_data.reset_index(), self.image_row.reset_index(), self.occ_row.reset_index()] | |
combined = pd.concat(all_data,ignore_index=False, axis=1) | |
w_1 = new_data.shape[1] + 1 | |
w_2 = self.image_row.shape[1] + 1 | |
w_3 = self.occ_row.shape[1] | |
combined.drop([combined.columns[0], combined.columns[w_1], combined.columns[w_1 + w_2]], axis=1, inplace=True) | |
headers = np.hstack((new_data.columns.values, self.image_row.columns.values, self.occ_row.columns.values)) | |
combined.columns = headers | |
self._append_combined_occ_image(self.cfg, combined, lock) | |
def _append_combined_occ_image(self, cfg, combined, lock) -> None: | |
path_csv_combined = os.path.join(cfg['dir_destination_csv'], cfg['filename_combined']) | |
with lock: | |
try: | |
# Add row once the file exists | |
csv_combined = pd.read_csv(path_csv_combined,dtype=str) | |
combined.to_csv(path_csv_combined, mode='a', header=False, index=False) | |
print(f'{bcolors.OKGREEN} Added 1 row to combined CSV: {path_csv_combined}{bcolors.ENDC}') | |
except Exception as e: | |
print(f"{bcolors.WARNING} Initializing new combined .csv file: [occ,images]: {path_csv_combined}{bcolors.ENDC}") | |
combined.to_csv(path_csv_combined, mode='w', header=True, index=False) | |
class ImageCandidateMulti: | |
cfg: str = '' | |
herb_code: str = '' | |
specimen_id: str = '' | |
family: str = '' | |
genus: str = '' | |
species: str = '' | |
fullname: str = '' | |
filename_image: str = '' | |
filename_image_jpg: str = '' | |
url: str = '' | |
headers_occ: str = '' | |
headers_img: str = '' | |
occ_row: list = field(init=False,default_factory=None) | |
image_row: list = field(init=False,default_factory=None) | |
download_success: bool = False | |
def __init__(self, cfg, image_row, occ_row, url, dir_destination, lock): | |
# Convert the Series to a DataFrame with one row | |
try: | |
# Now, you can access columns and data as you would in a DataFrame | |
self.headers_occ = occ_row | |
self.headers_img = image_row | |
except Exception as e: | |
print(f"Exception occurred: {e}") | |
self.occ_row = occ_row # pd.DataFrame(data=occ_row,columns=self.headers_occ) | |
self.image_row = image_row # pd.DataFrame(data=image_row,columns=self.headers_img) | |
self.url = url | |
self.cfg = cfg | |
self.filename_image, self.filename_image_jpg, self.herb_code, self.specimen_id, self.family, self.genus, self.species, self.fullname = generate_image_filename(occ_row) | |
self.download_success = self.download_image(dir_destination, lock) | |
def download_image(self, dir_destination, lock) -> None: | |
# dir_destination = self.cfg['dir_destination_images'] | |
MP_low = self.cfg['MP_low'] | |
MP_high = self.cfg['MP_high'] | |
# Define URL get parameters | |
sep = '_' | |
session = requests.Session() | |
retry = Retry(connect=1) #2, backoff_factor=0.5) | |
adapter = HTTPAdapter(max_retries=retry) | |
session.mount('http://', adapter) | |
session.mount('https://', adapter) | |
print(f"{bcolors.BOLD} {self.fullname}{bcolors.ENDC}") | |
print(f"{bcolors.BOLD} URL: {self.url}{bcolors.ENDC}") | |
try: | |
response = session.get(self.url, stream=True, timeout=1.0) | |
img = Image.open(response.raw) | |
self._save_matching_image(img, MP_low, MP_high, dir_destination, lock) | |
print(f"{bcolors.OKGREEN} SUCCESS{bcolors.ENDC}") | |
return True | |
except Exception as e: | |
print(f"{bcolors.FAIL} SKIP No Connection or ERROR --> {e}{bcolors.ENDC}") | |
print(f"{bcolors.WARNING} Status Code --> {response.status_code}{bcolors.ENDC}") | |
print(f"{bcolors.WARNING} Reasone --> {response.reason}{bcolors.ENDC}") | |
return False | |
def _save_matching_image(self, img, MP_low, MP_high, dir_destination, lock) -> None: | |
img_mp, img_w, img_h = check_image_size(img) | |
if img_mp < MP_low: | |
print(f"{bcolors.WARNING} SKIP < {MP_low}MP: {img_mp}{bcolors.ENDC}") | |
elif MP_low <= img_mp <= MP_high: | |
image_path = os.path.join(dir_destination,self.filename_image_jpg) | |
img.save(image_path) | |
#imgSaveName = pd.DataFrame({"image_path": [image_path]}) | |
self._add_occ_and_img_data(lock) | |
print(f"{bcolors.OKGREEN} Regular MP: {img_mp}{bcolors.ENDC}") | |
print(f"{bcolors.OKGREEN} Image Saved: {image_path}{bcolors.ENDC}") | |
elif img_mp > MP_high: | |
if self.cfg['do_resize']: | |
[img_w, img_h] = calc_resize(img_w, img_h) | |
newsize = (img_w, img_h) | |
img = img.resize(newsize) | |
image_path = os.path.join(dir_destination,self.filename_image_jpg) | |
img.save(image_path) | |
#imgSaveName = pd.DataFrame({"imgSaveName": [imgSaveName]}) | |
self._add_occ_and_img_data(lock) | |
print(f"{bcolors.OKGREEN} {MP_high}MP+ Resize: {img_mp}{bcolors.ENDC}") | |
print(f"{bcolors.OKGREEN} Image Saved: {image_path}{bcolors.ENDC}") | |
else: | |
print(f"{bcolors.OKCYAN} {MP_high}MP+ Resize: {img_mp}{bcolors.ENDC}") | |
print(f"{bcolors.OKCYAN} SKIP: {image_path}{bcolors.ENDC}") | |
def _add_occ_and_img_data(self, lock) -> None: | |
self.image_row = self.image_row.to_frame().transpose().rename(columns={"identifier": "url"}) | |
self.image_row = self.image_row.rename(columns={"gbifID": "gbifID_images"}) | |
new_data = {'fullname': [self.fullname], 'filename_image': [self.filename_image], 'filename_image_jpg': [self.filename_image_jpg]} | |
new_data = pd.DataFrame(data=new_data) | |
all_data = [new_data.reset_index(), self.image_row.reset_index(), self.occ_row.reset_index()] | |
combined = pd.concat(all_data,ignore_index=False, axis=1) | |
w_1 = new_data.shape[1] + 1 | |
w_2 = self.image_row.shape[1] + 1 | |
w_3 = self.occ_row.shape[1] | |
combined.drop([combined.columns[0], combined.columns[w_1], combined.columns[w_1 + w_2]], axis=1, inplace=True) | |
headers = np.hstack((new_data.columns.values, self.image_row.columns.values, self.occ_row.columns.values)) | |
combined.columns = headers | |
self._append_combined_occ_image(self.cfg, combined, lock) | |
def _append_combined_occ_image(self, cfg, combined, lock) -> None: | |
path_csv_combined = os.path.join(cfg['dir_destination_csv'], cfg['filename_combined']) | |
with lock: | |
try: | |
# Add row once the file exists | |
csv_combined = pd.read_csv(path_csv_combined,dtype=str) | |
combined.to_csv(path_csv_combined, mode='a', header=False, index=False) | |
print(f'{bcolors.OKGREEN} Added 1 row to combined CSV: {path_csv_combined}{bcolors.ENDC}') | |
except Exception as e: | |
print(f"{bcolors.WARNING} Initializing new combined .csv file: [occ,images]: {path_csv_combined}{bcolors.ENDC}") | |
combined.to_csv(path_csv_combined, mode='w', header=True, index=False) | |
class SharedCounter: | |
def __init__(self): | |
self.img_count_dict = {} | |
self.lock = Lock() | |
def increment(self, key, value=1): | |
with self.lock: | |
self.img_count_dict[key] = self.img_count_dict.get(key, 0) + value | |
def get_count(self, key): | |
with self.lock: | |
return self.img_count_dict.get(key, 0) | |
class ImageCandidateCustom: | |
cfg: str = '' | |
# herb_code: str = '' | |
# specimen_id: str = '' | |
# family: str = '' | |
# genus: str = '' | |
# species: str = '' | |
fullname: str = '' | |
filename_image: str = '' | |
filename_image_jpg: str = '' | |
url: str = '' | |
# headers_occ: str = '' | |
headers_img: str = '' | |
# occ_row: list = field(init=False,default_factory=None) | |
image_row: list = field(init=False,default_factory=None) | |
def __init__(self, cfg, image_row, url, col_name, lock): | |
# self.headers_occ = list(occ_row.columns.values) | |
# self.headers_img = list(image_row.columns.values) | |
self.image_row = image_row # pd.DataFrame(data=image_row,columns=self.headers_img) | |
self.url = url | |
self.cfg = cfg | |
self.col_name = col_name | |
self.fullname = image_row[col_name] | |
self.filename_image = image_row[col_name] | |
self.filename_image_jpg = ''.join([image_row[col_name], '.jpg']) | |
self.download_image(lock) | |
def download_image(self, lock) -> None: | |
dir_destination = self.cfg['dir_destination_images'] | |
MP_low = self.cfg['MP_low'] | |
MP_high = self.cfg['MP_high'] | |
# Define URL get parameters | |
sep = '_' | |
session = requests.Session() | |
retry = Retry(connect=1) #2, backoff_factor=0.5) | |
adapter = HTTPAdapter(max_retries=retry) | |
session.mount('http://', adapter) | |
session.mount('https://', adapter) | |
print(f"{bcolors.BOLD} {self.fullname}{bcolors.ENDC}") | |
print(f"{bcolors.BOLD} URL: {self.url}{bcolors.ENDC}") | |
try: | |
response = session.get(self.url, stream=True, timeout=1.0) | |
img = Image.open(response.raw) | |
self._save_matching_image(img, MP_low, MP_high, dir_destination, lock) | |
print(f"{bcolors.OKGREEN} SUCCESS{bcolors.ENDC}") | |
except Exception as e: | |
print(f"{bcolors.FAIL} SKIP No Connection or ERROR --> {e}{bcolors.ENDC}") | |
print(f"{bcolors.WARNING} Status Code --> {response.status_code}{bcolors.ENDC}") | |
print(f"{bcolors.WARNING} Reasone --> {response.reason}{bcolors.ENDC}") | |
def _save_matching_image(self, img, MP_low, MP_high, dir_destination, lock) -> None: | |
img_mp, img_w, img_h = check_image_size(img) | |
if img_mp < MP_low: | |
print(f"{bcolors.WARNING} SKIP < {MP_low}MP: {img_mp}{bcolors.ENDC}") | |
elif MP_low <= img_mp <= MP_high: | |
image_path = os.path.join(dir_destination,self.filename_image_jpg) | |
img.save(image_path) | |
print(f"{bcolors.OKGREEN} Regular MP: {img_mp}{bcolors.ENDC}") | |
print(f"{bcolors.OKGREEN} Image Saved: {image_path}{bcolors.ENDC}") | |
elif img_mp > MP_high: | |
if self.cfg['do_resize']: | |
[img_w, img_h] = calc_resize(img_w, img_h) | |
newsize = (img_w, img_h) | |
img = img.resize(newsize) | |
image_path = os.path.join(dir_destination,self.filename_image_jpg) | |
img.save(image_path) | |
print(f"{bcolors.OKGREEN} {MP_high}MP+ Resize: {img_mp}{bcolors.ENDC}") | |
print(f"{bcolors.OKGREEN} Image Saved: {image_path}{bcolors.ENDC}") | |
else: | |
print(f"{bcolors.OKCYAN} {MP_high}MP+ Resize: {img_mp}{bcolors.ENDC}") | |
print(f"{bcolors.OKCYAN} SKIP: {image_path}{bcolors.ENDC}") | |
''' | |
#################################################################################################### | |
General Functions | |
#################################################################################################### | |
''' | |
# If image is larger than MP max, downsample to have long side = 5000 | |
def calc_resize(w,h): | |
if h > w: | |
ratio = h/w | |
new_h = 5000 | |
new_w = round(5000/ratio) | |
elif w >= h: | |
ratio = w/h | |
new_w = 5000 | |
new_h = round(5000/ratio) | |
return new_w, new_h | |
def check_image_size(img): | |
[img_w, img_h] = img.size | |
img_mp = round(img_w * img_h / 1000000,1) | |
return img_mp, img_w, img_h | |
def check_n_images_in_group(detailedOcc,N): | |
fam = detailedOcc['fullname'].unique() | |
for f in fam: | |
ct = len(detailedOcc[detailedOcc['fullname'].str.match(f)]) | |
if ct == N: | |
print(f"{bcolors.OKGREEN}{f}: {ct}{bcolors.ENDC}") | |
else: | |
print(f"{bcolors.FAIL}{f}: {ct}{bcolors.ENDC}") | |
''' | |
#################################################################################################### | |
Functions for --> download_GBIF_from_user_file.py | |
#################################################################################################### | |
''' | |
# def download_subset_images_user_file(dir_home,dir_destination,n_already_downloaded,MP_low,MP_high,wishlist,filename_occ,filename_img): | |
# # (dirWishlists,dirNewImg,alreadyDownloaded,MP_Low,MP_High,wishlist,aggOcc_filename,aggImg_filename): | |
# sep = '_' | |
# aggOcc = pd.DataFrame() | |
# aggImg = pd.DataFrame() | |
# # Define URL get parameters | |
# session = requests.Session() | |
# retry = Retry(connect=1) #2, backoff_factor=0.5) | |
# adapter = HTTPAdapter(max_retries=retry) | |
# session.mount('http://', adapter) | |
# session.mount('https://', adapter) | |
# listMax = wishlist.shape[0] | |
# for index, spp in wishlist.iterrows(): | |
# imageFound = False | |
# currentFamily = spp['family'] | |
# # currentSpecies = spp['genus'] + ' ' + spp['species'] | |
# currentFullname = spp['fullname'] | |
# currentURL = spp['url'] | |
# currentBarcode = spp['barcode'] | |
# currentHerb = spp['herbCode'] | |
# print(f"{bcolors.BOLD}Family: {currentFamily}{bcolors.ENDC}") | |
# print(f"{bcolors.BOLD} {currentFullname}{bcolors.ENDC}") | |
# print(f"{bcolors.BOLD} In Download List: {index} / {listMax}{bcolors.ENDC}") | |
# imgFilename = [currentHerb, currentBarcode, currentFullname] | |
# imgFilename = sep.join(imgFilename) | |
# imgFilenameJPG = imgFilename + ".jpg" | |
# print(f"{bcolors.BOLD} URL: {currentURL}{bcolors.ENDC}") | |
# try: | |
# img = Image.open(session.get(currentURL, stream=True, timeout=1.0).raw) | |
# imageFound, alreadyDownloaded, aggOcc, aggImg = save_matching_image_user_file(alreadyDownloaded,img,MP_Low,MP_High,dirNewImg,imgFilenameJPG) | |
# print(f"{bcolors.OKGREEN} SUCCESS{bcolors.ENDC}") | |
# except Exception as e: | |
# print(f"{bcolors.WARNING} SKIP No Connection or ERROR{bcolors.ENDC}") | |
# aggOcc.to_csv(os.path.join(dir_home,aggOcc_filename),index=False) | |
# aggImg.to_csv(os.path.join(dir_home,aggImg_filename),index=False) | |
# return alreadyDownloaded, aggOcc, aggImg | |
# Return entire row of file_to_search that matches the gbif_id, else return [] | |
def find_gbifID(gbif_id,file_to_search): | |
row_found = file_to_search.loc[file_to_search['gbifID'].astype(str).str.match(str(gbif_id)),:] | |
if row_found.empty: | |
print(f"{bcolors.WARNING} gbif_id: {gbif_id} not found in occurrences file{bcolors.ENDC}") | |
row_found = None | |
else: | |
print(f"{bcolors.OKGREEN} gbif_id: {gbif_id} successfully found in occurrences file{bcolors.ENDC}") | |
return row_found | |
def validate_herb_code(occ_row): | |
# print(occ_row) | |
# Herbarium codes are not always in the correct column, we need to find the right one | |
try: | |
opts = [occ_row['institutionCode'], | |
occ_row['institutionID'], | |
occ_row['ownerInstitutionCode'], | |
occ_row['collectionCode'], | |
occ_row['publisher'], | |
occ_row['occurrenceID']] | |
opts = [item for item in opts if not(pd.isnull(item.values)) == True] | |
except: | |
opts = [str(occ_row['institutionCode']), | |
str(occ_row['institutionID']), | |
str(occ_row['ownerInstitutionCode']), | |
str(occ_row['collectionCode']), | |
str(occ_row['publisher']), | |
str(occ_row['occurrenceID'])] | |
opts = pd.DataFrame(opts) | |
opts = opts.dropna() | |
opts = opts.apply(lambda x: x[0]).tolist() | |
opts_short = [] | |
for word in opts: | |
#print(word) | |
if len(word) <= 8: | |
if word is not None: | |
opts_short = opts_short + [word] | |
if len(opts_short) == 0: | |
try: | |
herb_code = occ_row['publisher'].values[0].replace(" ","-") | |
except: | |
try: | |
herb_code = occ_row['publisher'].replace(" ","-") | |
except: | |
herb_code = "ERROR" | |
try: | |
inst_ID = occ_row['institutionID'].values[0] | |
occ_ID = occ_row['occurrenceID'].values[0] | |
except: | |
inst_ID = occ_row['institutionID'] | |
occ_ID = occ_row['occurrenceID'] | |
if inst_ID == "UBC Herbarium": | |
herb_code = "UBC" | |
elif inst_ID == "Naturalis Biodiversity Center": | |
herb_code = "L" | |
elif inst_ID == "Forest Herbarium Ibadan (FHI)": | |
herb_code = "FHI" | |
elif 'id.luomus.fi' in occ_ID: | |
herb_code = "FinBIF" | |
else: | |
if len(opts_short) > 0: | |
herb_code = opts_short[0] | |
try: | |
herb_code = herb_code.values[0] | |
except: | |
herb_code = herb_code | |
# Specific cases that require manual overrides | |
# If you see an herbarium DWC file with a similar error, add them here | |
if herb_code == "Qarshi-Botanical-Garden,-Qarshi-Industries-Pvt.-Ltd,-Pakistan": | |
herb_code = "Qarshi-Botanical-Garden" | |
elif herb_code == "12650": | |
herb_code = "SDSU" | |
elif herb_code == "322": | |
herb_code = "SDSU" | |
elif herb_code == "GC-University,-Lahore": | |
herb_code = "GC-University-Lahore" | |
elif herb_code == "Institute-of-Biology-of-Komi-Scientific-Centre-of-the-Ural-Branch-of-the-Russian-Academy-of-Sciences": | |
herb_code = "Komi-Scientific-Centre" | |
return herb_code | |
def remove_illegal_chars(text): | |
cleaned = re.sub(r"[^a-zA-Z0-9_-]","",text) | |
return cleaned | |
def keep_first_word(text): | |
if (' ' in text) == True: | |
cleaned = text.split(' ')[0] | |
else: | |
cleaned = text | |
return cleaned | |
# Create a filename for the downloaded image | |
# In the case sensitive format: | |
# HERBARIUM_barcode_Family_Genus_species.jpg | |
def generate_image_filename(occ_row): | |
herb_code = remove_illegal_chars(validate_herb_code(occ_row)) | |
try: | |
specimen_id = str(occ_row['gbifID'].values[0]) | |
family = remove_illegal_chars(occ_row['family'].values[0]) | |
genus = remove_illegal_chars(occ_row['genus'].values[0]) | |
species = remove_illegal_chars(keep_first_word(occ_row['specificEpithet'].values[0])) | |
except: | |
specimen_id = str(occ_row['gbifID']) | |
family = remove_illegal_chars(occ_row['family']) | |
genus = remove_illegal_chars(occ_row['genus']) | |
species = remove_illegal_chars(keep_first_word(occ_row['specificEpithet'])) | |
fullname = '_'.join([family, genus, species]) | |
filename_image = '_'.join([herb_code, specimen_id, fullname]) | |
filename_image_jpg = '.'.join([filename_image, 'jpg']) | |
return filename_image, filename_image_jpg, herb_code, specimen_id, family, genus, species, fullname | |
def read_DWC_file(cfg): | |
dir_home = cfg['dir_home'] | |
filename_occ = cfg['filename_occ'] | |
filename_img = cfg['filename_img'] | |
# read the images.csv or occurences.csv file. can be txt ro csv | |
occ_df = ingest_DWC(filename_occ,dir_home) | |
images_df = ingest_DWC(filename_img,dir_home) | |
return occ_df, images_df | |
def read_DWC_file_multiDirs(cfg, dir_sub): | |
filename_occ = cfg['filename_occ'] | |
filename_img = cfg['filename_img'] | |
# read the images.csv or occurences.csv file. can be txt ro csv | |
occ_df = ingest_DWC(filename_occ,dir_sub) | |
images_df = ingest_DWC(filename_img,dir_sub) | |
return occ_df, images_df | |
def ingest_DWC(DWC_csv_or_txt_file,dir_home): | |
if DWC_csv_or_txt_file.split('.')[1] == 'txt': | |
df = pd.read_csv(os.path.join(dir_home,DWC_csv_or_txt_file), sep="\t",header=0, low_memory=False, dtype=str) | |
elif DWC_csv_or_txt_file.split('.')[1] == 'csv': | |
df = pd.read_csv(os.path.join(dir_home,DWC_csv_or_txt_file), sep=",",header=0, low_memory=False, dtype=str) | |
else: | |
print(f"{bcolors.FAIL}DWC file {DWC_csv_or_txt_file} is not '.txt' or '.csv' and was not opened{bcolors.ENDC}") | |
return df | |
''' | |
####################################################################### | |
Main function for the config_download_from_GBIF_all_images_in_file.yml | |
see yml for details | |
####################################################################### | |
''' | |
def download_all_images_in_images_csv_multiDirs(cfg): | |
dir_destination_parent = cfg['dir_destination_images'] | |
dir_destination_csv = cfg['dir_destination_csv'] | |
n_already_downloaded = cfg['n_already_downloaded'] | |
n_max_to_download = cfg['n_max_to_download'] | |
n_imgs_per_species = cfg['n_imgs_per_species'] | |
MP_low = cfg['MP_low'] | |
MP_high = cfg['MP_high'] | |
do_shuffle_occurrences = cfg['do_shuffle_occurrences'] | |
shared_counter = SharedCounter() | |
# (dirWishlists,dirNewImg,alreadyDownloaded,MP_Low,MP_High,aggOcc_filename,aggImg_filename): | |
# Get DWC files | |
for dir_DWC, dirs_sub, __ in os.walk(cfg['dir_home']): | |
for dir_sub in dirs_sub: | |
dir_home = os.path.join(dir_DWC, dir_sub) | |
dir_destination = os.path.join(dir_destination_parent, dir_sub) | |
validate_dir(dir_destination) | |
validate_dir(dir_destination_csv) | |
occ_df, images_df = read_DWC_file_multiDirs(cfg, dir_home) | |
# Shuffle the order of the occurrences DataFrame if the flag is set | |
if do_shuffle_occurrences: | |
occ_df = occ_df.sample(frac=1).reset_index(drop=True) | |
# Report summary | |
print(f"{bcolors.BOLD}Beginning of images file:{bcolors.ENDC}") | |
print(images_df.head()) | |
print(f"{bcolors.BOLD}Beginning of occurrence file:{bcolors.ENDC}") | |
print(occ_df.head()) | |
# Ignore problematic Herbaria | |
if cfg['ignore_banned_herb']: | |
for banned_url in cfg['banned_url_stems']: | |
images_df = images_df[~images_df['identifier'].str.contains(banned_url, na=False)] | |
# Report summary | |
n_imgs = images_df.shape[0] | |
n_occ = occ_df.shape[0] | |
print(f"{bcolors.BOLD}Number of images in images file: {n_imgs}{bcolors.ENDC}") | |
print(f"{bcolors.BOLD}Number of occurrence to search through: {n_occ}{bcolors.ENDC}") | |
results = process_image_batch_multiDirs(cfg, images_df, occ_df, dir_destination, shared_counter, n_imgs_per_species, do_shuffle_occurrences) | |
def download_all_images_in_images_csv(cfg): | |
dir_destination = cfg['dir_destination_images'] | |
dir_destination_csv = cfg['dir_destination_csv'] | |
# (dirWishlists,dirNewImg,alreadyDownloaded,MP_Low,MP_High,aggOcc_filename,aggImg_filename): | |
validate_dir(dir_destination) | |
validate_dir(dir_destination_csv) | |
if cfg['is_custom_file']: | |
download_from_custom_file(cfg) | |
else: | |
# Get DWC files | |
occ_df, images_df = read_DWC_file(cfg) | |
# Report summary | |
print(f"{bcolors.BOLD}Beginning of images file:{bcolors.ENDC}") | |
print(images_df.head()) | |
print(f"{bcolors.BOLD}Beginning of occurrence file:{bcolors.ENDC}") | |
print(occ_df.head()) | |
# Ignore problematic Herbaria | |
if cfg['ignore_banned_herb']: | |
for banned_url in cfg['banned_url_stems']: | |
images_df = images_df[~images_df['identifier'].str.contains(banned_url, na=False)] | |
# Report summary | |
n_imgs = images_df.shape[0] | |
n_occ = occ_df.shape[0] | |
print(f"{bcolors.BOLD}Number of images in images file: {n_imgs}{bcolors.ENDC}") | |
print(f"{bcolors.BOLD}Number of occurrence to search through: {n_occ}{bcolors.ENDC}") | |
results = process_image_batch(cfg, images_df, occ_df) | |
def process_image_batch(cfg, images_df, occ_df): | |
futures_list = [] | |
results = [] | |
# single threaded, useful for debugging | |
# for index, image_row in images_df.iterrows(): | |
# futures = process_each_image_row( cfg, image_row, occ_df) | |
# futures_list.append(futures) | |
# for future in futures_list: | |
# try: | |
# result = future.result(timeout=60) | |
# results.append(result) | |
# except Exception: | |
# results.append(None) | |
lock = Lock() | |
with th(max_workers=13) as executor: | |
for index, image_row in images_df.iterrows(): | |
futures = executor.submit(process_each_image_row, cfg, image_row, occ_df, lock) | |
futures_list.append(futures) | |
for future in futures_list: | |
try: | |
result = future.result(timeout=60) | |
results.append(result) | |
except Exception: | |
results.append(None) | |
return results | |
def process_image_batch_multiDirs(cfg, images_df, occ_df, dir_destination, shared_counter, n_imgs_per_species, do_shuffle_occurrences): | |
futures_list = [] | |
results = [] | |
lock = Lock() | |
if do_shuffle_occurrences: | |
images_df = images_df.sample(frac=1).reset_index(drop=True) | |
# Partition occ_df based on the first word of the 'specificEpithet' column | |
partition_dict = defaultdict(list) | |
for index, row in occ_df.iterrows(): | |
first_word = row['specificEpithet'] # Assuming keep_first_word is defined | |
partition_dict[first_word].append(row) | |
# Convert lists to DataFrames | |
for key in partition_dict.keys(): | |
partition_dict[key] = pd.DataFrame(partition_dict[key]) | |
num_workers = 13 | |
with th(max_workers=num_workers) as executor: | |
for specific_epithet, partition in partition_dict.items(): | |
future = executor.submit(process_occ_chunk_multiDirs, cfg, images_df, partition, dir_destination, shared_counter, n_imgs_per_species, do_shuffle_occurrences, lock) | |
futures_list.append(future) | |
for future in futures_list: | |
try: | |
result = future.result(timeout=60) | |
results.append(result) | |
except Exception: | |
results.append(None) | |
return results | |
def process_occ_chunk_multiDirs(cfg, images_df, occ_chunk, dir_destination, shared_counter, n_imgs_per_species, do_shuffle_occurrences, lock): | |
results = [] | |
for index, occ_row in occ_chunk.iterrows(): | |
result = process_each_occ_row_multiDirs(cfg, images_df, occ_row, dir_destination, shared_counter, n_imgs_per_species, do_shuffle_occurrences, lock) | |
results.append(result) | |
return results | |
def process_each_occ_row_multiDirs(cfg, images_df, occ_row, dir_destination, shared_counter, n_imgs_per_species, do_shuffle_occurrences, lock): | |
print(f"{bcolors.BOLD}Working on occurrence: {occ_row['gbifID']}{bcolors.ENDC}") | |
gbif_id = occ_row['gbifID'] | |
image_row = find_gbifID_in_images(gbif_id, images_df) # New function to find the image_row | |
if image_row is not None: | |
filename_image, filename_image_jpg, herb_code, specimen_id, family, genus, species, fullname = generate_image_filename(occ_row) | |
current_count = shared_counter.get_count(fullname) | |
# If the fullname is not in the counter yet, increment it | |
if current_count == 0: | |
shared_counter.increment(fullname) | |
print(shared_counter.get_count(fullname)) | |
if shared_counter.get_count(fullname) > n_imgs_per_species: | |
print(f"Reached image limit for {fullname}. Skipping.") | |
return | |
else: | |
gbif_url = image_row['identifier'] | |
image_candidate = ImageCandidateMulti(cfg, image_row, occ_row, gbif_url, dir_destination, lock) | |
if image_candidate.download_success: | |
shared_counter.increment(fullname) | |
else: | |
pass | |
def find_gbifID_in_images(gbif_id, images_df): | |
image_row = images_df[images_df['gbifID'] == gbif_id] | |
if image_row.empty: | |
return None | |
return image_row.iloc[0] | |
def process_each_image_row_multiDirs(cfg, image_row, occ_df, dir_destination, shared_counter, n_imgs_per_species, do_shuffle_occurrences, lock): | |
print(f"{bcolors.BOLD}Working on image: {image_row['gbifID']}{bcolors.ENDC}") | |
gbif_id = image_row['gbifID'] | |
gbif_url = image_row['identifier'] | |
occ_row = find_gbifID(gbif_id,occ_df) | |
if occ_row is not None: | |
filename_image, filename_image_jpg, herb_code, specimen_id, family, genus, species, fullname = generate_image_filename(occ_row) | |
current_count = shared_counter.get_count(fullname) | |
# If the fullname is not in the counter yet, increment it | |
if current_count == 0: | |
shared_counter.increment(fullname) | |
print(shared_counter.get_count(fullname)) | |
if shared_counter.get_count(fullname) > n_imgs_per_species: | |
print(f"Reached image limit for {fullname}. Skipping.") | |
return | |
image_candidate = ImageCandidateMulti(cfg, image_row, occ_row, gbif_url, dir_destination, lock) | |
if image_candidate.download_success: | |
shared_counter.increment(fullname) | |
else: | |
pass | |
def process_each_image_row(cfg, image_row, occ_df, lock): | |
print(f"{bcolors.BOLD}Working on image: {image_row['gbifID']}{bcolors.ENDC}") | |
gbif_id = image_row['gbifID'] | |
gbif_url = image_row['identifier'] | |
occ_row = find_gbifID(gbif_id,occ_df) | |
if occ_row is not None: | |
ImageInfo = ImageCandidate(cfg, image_row, occ_row, gbif_url, lock) | |
# ImageInfo.download_image(cfg, occ_row, image_row) | |
else: | |
pass | |
def download_from_custom_file(cfg): | |
# Get DWC files | |
images_df = read_custom_file(cfg) | |
col_url = cfg['col_url'] | |
col_name = cfg['col_name'] | |
if col_url == None: | |
col_url = 'identifier' | |
else: | |
col_url = col_url | |
# Report summary | |
print(f"{bcolors.BOLD}Beginning of images file:{bcolors.ENDC}") | |
print(images_df.head()) | |
# Ignore problematic Herbaria | |
if cfg['ignore_banned_herb']: | |
for banned_url in cfg['banned_url_stems']: | |
images_df = images_df[~images_df[col_url].str.contains(banned_url, na=False)] | |
# Report summary | |
n_imgs = images_df.shape[0] | |
print(f"{bcolors.BOLD}Number of images in images file: {n_imgs}{bcolors.ENDC}") | |
results = process_custom_image_batch(cfg, images_df) | |
def read_custom_file(cfg): | |
dir_home = cfg['dir_home'] | |
filename_img = cfg['filename_img'] | |
# read the images.csv or occurences.csv file. can be txt ro csv | |
images_df = ingest_DWC(filename_img,dir_home) | |
return images_df | |
# def ingest_DWC(DWC_csv_or_txt_file,dir_home): | |
# if DWC_csv_or_txt_file.split('.')[1] == 'txt': | |
# df = pd.read_csv(os.path.join(dir_home,DWC_csv_or_txt_file), sep="\t",header=0, low_memory=False, dtype=str) | |
# elif DWC_csv_or_txt_file.split('.')[1] == 'csv': | |
# df = pd.read_csv(os.path.join(dir_home,DWC_csv_or_txt_file), sep=",",header=0, low_memory=False, dtype=str) | |
# else: | |
# print(f"{bcolors.FAIL}DWC file {DWC_csv_or_txt_file} is not '.txt' or '.csv' and was not opened{bcolors.ENDC}") | |
# return df | |
def process_custom_image_batch(cfg, images_df): | |
futures_list = [] | |
results = [] | |
lock = Lock() | |
with th(max_workers=13) as executor: | |
for index, image_row in images_df.iterrows(): | |
futures = executor.submit(process_each_custom_image_row, cfg, image_row, lock) | |
futures_list.append(futures) | |
for future in futures_list: | |
try: | |
result = future.result(timeout=60) | |
results.append(result) | |
except Exception: | |
results.append(None) | |
return results | |
def process_each_custom_image_row(cfg, image_row, lock): | |
col_url = cfg['col_url'] | |
col_name = cfg['col_name'] | |
if col_url == None: | |
col_url = 'identifier' | |
else: | |
col_url = col_url | |
gbif_url = image_row[col_url] | |
print(f"{bcolors.BOLD}Working on image: {image_row[col_name]}{bcolors.ENDC}") | |
if image_row is not None: | |
ImageInfo = ImageCandidateCustom(cfg, image_row, gbif_url, col_name, lock) | |
else: | |
pass |