import warnings warnings.filterwarnings("ignore") import os import numpy as np import pandas as pd from typing import Iterable from styling import js, seafoam, css, DESCRIPTION import gradio as gr from gradio.themes.base import Base from gradio.themes.utils import colors, fonts, sizes import requests import torch import shutil import librosa import torch.nn.functional as F # Image gallery from fetch_img import download_images, scientific_to_species_code # Import the necessary functions from the voj package from audio_class_predictor import predict_class from bird_ast_model import birdast_preprocess, birdast_inference from bird_ast_seq_model import birdast_seq_preprocess, birdast_seq_inference from utils import plot_wave, plot_mel, download_model, bandpass_filter # Define the default parameters ASSET_DIR = "./assets" DEFUALT_SR = 16_000 DEFUALT_HIGH_CUT = 8_000 DEFUALT_LOW_CUT = 1_000 DEVICE = "cuda" if torch.cuda.is_available() else "cpu" print(f"Use Device: {DEVICE}") if not os.path.exists(ASSET_DIR): os.makedirs(ASSET_DIR) # define the assets for the models birdast_assets = { "model_weights": [ f"https://huggingface.co/shiyi-li/BirdAST/resolve/main/BirdAST_Baseline_GroupKFold_fold_{i}.pth" for i in range(5) ], "label_mapping": "https://huggingface.co/shiyi-li/BirdAST/resolve/main/BirdAST_Baseline_GroupKFold_label_map.csv", "preprocess_fn": birdast_preprocess, "inference_fn": birdast_inference, } birdast_seq_assets = { "model_weights": [ f"https://huggingface.co/shiyi-li/BirdAST_Seq/resolve/main/BirdAST_SeqPool_GroupKFold_fold_{i}.pth" for i in range(5) ], "label_mapping": "https://huggingface.co/shiyi-li/BirdAST_Seq/resolve/main/BirdAST_SeqPool_GroupKFold_label_map.csv", "preprocess_fn": birdast_seq_preprocess, "inference_fn": birdast_seq_inference, } # maintain a dictionary of assets ASSET_DICT = { "BirdAST": birdast_assets, "BirdAST_Seq": birdast_seq_assets, } def run_inference_with_model(audio_clip, sr, model_name): # download the model assets assets = ASSET_DICT[model_name] model_weights_url = assets["model_weights"] label_map_url = assets["label_mapping"] preprocess_fn = assets["preprocess_fn"] inference_fn = assets["inference_fn"] # download the model weights model_weights = [] for model_weight in model_weights_url: weight_file = os.path.join(ASSET_DIR, model_weight.split("/")[-1]) if not os.path.exists(weight_file): download_model(model_weight, weight_file) model_weights.append(weight_file) # download the label mapping label_map_csv = os.path.join(ASSET_DIR, label_map_url.split("/")[-1]) if not os.path.exists(label_map_csv): download_model(label_map_url, label_map_csv) # load the label mapping label_mapping = pd.read_csv(label_map_csv) species_id_to_name = {row["species_id"]: row["scientific_name"] for _, row in label_mapping.iterrows()} # preprocess the audio clip spectrogram = preprocess_fn(audio_clip, sr=sr) # run inference predictions = inference_fn(model_weights, spectrogram, device=DEVICE) # aggregate the results final_predicts = predictions.mean(axis=0) topk_values, topk_indices = torch.topk(torch.from_numpy(final_predicts), 10) results = [] for idx, scores in zip(topk_indices, topk_values): species_name = species_id_to_name[idx.item()] probability = scores.item() * 100 results.append([species_name, probability]) return results def predict(audio, start, end, model_name="BirdAST_Seq"): raw_sr, audio_array = audio if audio_array.ndim > 1: audio_array = audio_array.mean(axis=1) # convert to mono print(f"Audio shape raw: {audio_array.shape}, sr: {raw_sr}") # sainty checks len_audio = audio_array.shape[0] / raw_sr if start >= end: raise gr.Error(f"`start` ({start}) must be smaller than end ({end}s)") if audio_array.shape[0] < start * raw_sr: raise gr.Error(f"`start` ({start}) must be smaller than audio duration ({len_audio:.0f}s)") if audio_array.shape[0] < end * raw_sr: end = audio_array.shape[0] / (1.0*raw_sr) audio_array = np.array(audio_array, dtype=np.float32) / 32768.0 audio_array = audio_array[int(start*raw_sr) : int(end*raw_sr)] if raw_sr != DEFUALT_SR: # run bandpass filter & resample audio_array = bandpass_filter(audio_array, DEFUALT_LOW_CUT, DEFUALT_HIGH_CUT, raw_sr) audio_array = librosa.resample(audio_array, orig_sr=raw_sr, target_sr=DEFUALT_SR) print(f"Resampled Audio shape: {audio_array.shape}") audio_array = audio_array.astype(np.float32) # predict audio class audio_class = predict_class(audio_array) fig_spectrogram = plot_mel(DEFUALT_SR, audio_array) fig_waveform = plot_wave(DEFUALT_SR, audio_array) # run inference with model print(f"Running inference with model: {model_name}") species_class = run_inference_with_model(audio_array, DEFUALT_SR, model_name) print("Species is ", species_class[0][0].strip().replace("_", " ")) images = prepare_images(species_class[0][0].strip().replace("_", " ")) if len(images) == 0: images.append(("noimg.png", "No image")) return audio_class, species_class, fig_waveform, fig_spectrogram, images REFERENCES = """ # Appendix We have applied the AudioMAE model to pre-classify the 23000+ unlabelled audio clips collected from the Greater Manaus region in the Amazon rainforest. The results of the audio type classification can be found in the following [link](https://drive.google.com/file/d/1uOT88LDnBD-Z3YcFz1e9XjvW2ugCo6EI/view?usp=drive_link). We hope that the pre-classification results can help researchers better exploring the vast collection of audio recordings and facilitate the study of biodiversity in the Amazon rainforest. # References [1] Torkington, S. (2023, February 7). 50% of the global economy is under threat from biodiversity loss. World Economic Forum. Retrieved from https://www.weforum.org/agenda/2023/02/biodiversity-nature-loss-cop15/. [2] Huang, P.-Y., Xu, H., Li, J., Baevski, A., Auli, M., Galuba, W., Metze, F., & Feichtenhofer, C. (2022). Masked Autoencoders that Listen. In NeurIPS. [3] https://www.kaggle.com/code/dima806/bird-species-by-sound-detection # Acknowledgements We would like to thank all organizers, mentors and participants of the AI+Environment EcoHackathon 2024 event for their unwavering support and collaboration. We extend our gratitude to ETH BiodivX, GainForest and ETH AI Center for providing data, facilities and resources that enabled us to analyse the rich data in different ways. Our special thanks to David Dao, Sarah Tariq, Alessandro Amodio for always being there to help us! 🙏🙏🙏 """ # Function to handle model selection def handle_model_selection(model_name, download_status): # Inform user that download is starting # gr.Info(f"Downloading model weights for {model_name}...") print(f"Downloading model weights for {model_name}...") if model_name is None: model_name = "BirdAST" assets = ASSET_DICT[model_name] model_weights_url = assets["model_weights"] download_flag = True try: total_files = len(model_weights_url) for idx, model_weight in enumerate(model_weights_url): weight_file = os.path.join(ASSET_DIR, model_weight.split("/")[-1]) print(weight_file) if not os.path.exists(weight_file): download_status = f"Downloading {idx + 1} of {total_files}" download_model(model_weight, weight_file) if not os.path.exists(weight_file): download_flag = False break if download_flag: download_status = f"Model <{model_name}> is ready! 🎉🎉🎉\nUsing Device: {DEVICE.upper()}" else: download_status = f"An error occurred while downloading model weights." except Exception as e: download_status = f"An error occurred while downloading model weights." return download_status # Image generation def prepare_images(scientific_name: str): # Get species code scode = scientific_to_species_code(scientific_name) if not scode: return [] # Clear folder assets' images for filename in os.listdir(ASSET_DIR): # Construct full file path file_path = os.path.join(ASSET_DIR, filename) # Check if the file is a .jpg, .jpeg, or .png if file_path.lower().endswith(('.jpg', '.jpeg', '.png')): # If yes, delete the file os.remove(file_path) print(f"Deleted: {file_path}") # Save images to assets download_images(f"https://ebird.org/species/{scode}") # Return array of local image paths nsplit = scientific_name.split(" ") abbreviate_name = nsplit[0][0] + "." + " " + nsplit[1] images = [] for img_file in os.listdir("./assets"): if img_file.lower().endswith(('.png', '.jpg', '.jpeg')): images.append((os.path.join("./assets", img_file), abbreviate_name)) return images sp_and_cl = """