BirdNet / model.py
BilalSardar's picture
Upload 4 files
bab1cc1
"""Contains functions to use the BirdNET models.
"""
import os
import warnings
import numpy as np
import config as cfg
import utils
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
os.environ["CUDA_VISIBLE_DEVICES"] = ""
warnings.filterwarnings("ignore")
# Import TFLite from runtime or Tensorflow;
# import Keras if protobuf model;
# NOTE: we have to use TFLite if we want to use
# the metadata model or want to extract embeddings
try:
import tflite_runtime.interpreter as tflite
except ModuleNotFoundError:
from tensorflow import lite as tflite
if not cfg.MODEL_PATH.endswith(".tflite"):
from tensorflow import keras
INTERPRETER: tflite.Interpreter = None
C_INTERPRETER: tflite.Interpreter = None
M_INTERPRETER: tflite.Interpreter = None
PBMODEL = None
def loadModel(class_output=True):
"""Initializes the BirdNET Model.
Args:
class_output: Omits the last layer when False.
"""
global PBMODEL
global INTERPRETER
global INPUT_LAYER_INDEX
global OUTPUT_LAYER_INDEX
# Do we have to load the tflite or protobuf model?
if cfg.MODEL_PATH.endswith(".tflite"):
# Load TFLite model and allocate tensors.
INTERPRETER = tflite.Interpreter(model_path=cfg.MODEL_PATH, num_threads=cfg.TFLITE_THREADS)
INTERPRETER.allocate_tensors()
# Get input and output tensors.
input_details = INTERPRETER.get_input_details()
output_details = INTERPRETER.get_output_details()
# Get input tensor index
INPUT_LAYER_INDEX = input_details[0]["index"]
# Get classification output or feature embeddings
if class_output:
OUTPUT_LAYER_INDEX = output_details[0]["index"]
else:
OUTPUT_LAYER_INDEX = output_details[0]["index"] - 1
else:
# Load protobuf model
# Note: This will throw a bunch of warnings about custom gradients
# which we will ignore until TF lets us block them
PBMODEL = keras.models.load_model(cfg.MODEL_PATH, compile=False)
def loadCustomClassifier():
"""Loads the custom classifier."""
global C_INTERPRETER
global C_INPUT_LAYER_INDEX
global C_OUTPUT_LAYER_INDEX
global C_INPUT_SIZE
# Load TFLite model and allocate tensors.
C_INTERPRETER = tflite.Interpreter(model_path=cfg.CUSTOM_CLASSIFIER, num_threads=cfg.TFLITE_THREADS)
C_INTERPRETER.allocate_tensors()
# Get input and output tensors.
input_details = C_INTERPRETER.get_input_details()
output_details = C_INTERPRETER.get_output_details()
# Get input tensor index
C_INPUT_LAYER_INDEX = input_details[0]["index"]
C_INPUT_SIZE = input_details[0]["shape"][-1]
# Get classification output
C_OUTPUT_LAYER_INDEX = output_details[0]["index"]
def loadMetaModel():
"""Loads the model for species prediction.
Initializes the model used to predict species list, based on coordinates and week of year.
"""
global M_INTERPRETER
global M_INPUT_LAYER_INDEX
global M_OUTPUT_LAYER_INDEX
# Load TFLite model and allocate tensors.
M_INTERPRETER = tflite.Interpreter(model_path=cfg.MDATA_MODEL_PATH, num_threads=cfg.TFLITE_THREADS)
M_INTERPRETER.allocate_tensors()
# Get input and output tensors.
input_details = M_INTERPRETER.get_input_details()
output_details = M_INTERPRETER.get_output_details()
# Get input tensor index
M_INPUT_LAYER_INDEX = input_details[0]["index"]
M_OUTPUT_LAYER_INDEX = output_details[0]["index"]
def buildLinearClassifier(num_labels, input_size, hidden_units=0, dropout=0.0):
"""Builds a classifier.
Args:
num_labels: Output size.
input_size: Size of the input.
hidden_units: If > 0, creates another hidden layer with the given number of units.
Returns:
A new classifier.
"""
# import keras
from tensorflow import keras
# Build a simple one- or two-layer linear classifier
model = keras.Sequential()
# Input layer
model.add(keras.layers.InputLayer(input_shape=(input_size,)))
# Hidden layer
if hidden_units > 0:
# Dropout layer?
if dropout > 0:
model.add(keras.layers.Dropout(dropout))
model.add(keras.layers.Dense(hidden_units, activation="relu"))
# Dropout layer?
if dropout > 0:
model.add(keras.layers.Dropout(dropout))
# Classification layer
model.add(keras.layers.Dense(num_labels))
# Activation layer
model.add(keras.layers.Activation("sigmoid"))
return model
def trainLinearClassifier(classifier,
x_train,
y_train,
epochs,
batch_size,
learning_rate,
val_split,
upsampling_ratio,
upsampling_mode,
train_with_mixup,
train_with_label_smoothing,
on_epoch_end=None):
"""Trains a custom classifier.
Trains a new classifier for BirdNET based on the given data.
Args:
classifier: The classifier to be trained.
x_train: Samples.
y_train: Labels.
epochs: Number of epochs to train.
batch_size: Batch size.
learning_rate: The learning rate during training.
on_epoch_end: Optional callback `function(epoch, logs)`.
Returns:
(classifier, history)
"""
# import keras
from tensorflow import keras
class FunctionCallback(keras.callbacks.Callback):
def __init__(self, on_epoch_end=None) -> None:
super().__init__()
self.on_epoch_end_fn = on_epoch_end
def on_epoch_end(self, epoch, logs=None):
if self.on_epoch_end_fn:
self.on_epoch_end_fn(epoch, logs)
# Set random seed
np.random.seed(cfg.RANDOM_SEED)
# Shuffle data
idx = np.arange(x_train.shape[0])
np.random.shuffle(idx)
x_train = x_train[idx]
y_train = y_train[idx]
# Random val split
x_train, y_train, x_val, y_val = utils.random_split(x_train, y_train, val_split)
print(f"Training on {x_train.shape[0]} samples, validating on {x_val.shape[0]} samples.", flush=True)
# Upsample training data
if upsampling_ratio > 0:
x_train, y_train = utils.upsampling(x_train, y_train, upsampling_ratio, upsampling_mode)
print(f"Upsampled training data to {x_train.shape[0]} samples.", flush=True)
# Apply mixup to training data
if train_with_mixup:
x_train, y_train = utils.mixup(x_train, y_train)
# Apply label smoothing
if train_with_label_smoothing:
y_train = utils.label_smoothing(y_train)
# Early stopping
callbacks = [
keras.callbacks.EarlyStopping(
monitor="val_loss", patience=5, verbose=1, start_from_epoch=epochs // 4, restore_best_weights=True
),
FunctionCallback(on_epoch_end=on_epoch_end),
]
# Cosine annealing lr schedule
lr_schedule = keras.experimental.CosineDecay(learning_rate, epochs * x_train.shape[0] / batch_size)
# Compile model
classifier.compile(
optimizer=keras.optimizers.Adam(learning_rate=lr_schedule),
loss="binary_crossentropy",
metrics=[keras.metrics.AUC(curve="PR", multi_label=False, name="AUPRC")],
)
# Train model
history = classifier.fit(
x_train, y_train, epochs=epochs, batch_size=batch_size, validation_data=(x_val, y_val), callbacks=callbacks
)
return classifier, history
def saveLinearClassifier(classifier, model_path, labels):
"""Saves a custom classifier on the hard drive.
Saves the classifier as a tflite model, as well as the used labels in a .txt.
Args:
classifier: The custom classifier.
model_path: Path the model will be saved at.
labels: List of labels used for the classifier.
"""
import tensorflow as tf
saved_model = PBMODEL if PBMODEL else tf.keras.models.load_model(cfg.PB_MODEL, compile=False)
# Remove activation layer
classifier.pop()
combined_model = tf.keras.Sequential([saved_model.embeddings_model, classifier], "basic")
# Append .tflite if necessary
if not model_path.endswith(".tflite"):
model_path += ".tflite"
# Make folders
os.makedirs(os.path.dirname(model_path), exist_ok=True)
# Save model as tflite
converter = tflite.TFLiteConverter.from_keras_model(combined_model)
tflite_model = converter.convert()
open(model_path, "wb").write(tflite_model)
# Save labels
with open(model_path.replace(".tflite", "_Labels.txt"), "w") as f:
for label in labels:
f.write(label + "\n")
def save_raven_model(classifier, model_path, labels):
import tensorflow as tf
import csv
import json
saved_model = PBMODEL if PBMODEL else tf.keras.models.load_model(cfg.PB_MODEL, compile=False)
combined_model = tf.keras.Sequential([saved_model.embeddings_model, classifier], "basic")
# Make signatures
class SignatureModule(tf.Module):
def __init__(self, keras_model):
super().__init__()
self.model = keras_model
@tf.function(input_signature=[tf.TensorSpec(shape=[None, 144000], dtype=tf.float32)])
def basic(self, inputs):
return {"scores": self.model(inputs)}
smodel = SignatureModule(combined_model)
signatures = {
"basic": smodel.basic,
}
# Save signature model
os.makedirs(os.path.dirname(model_path), exist_ok=True)
model_path = model_path[:-7] if model_path.endswith(".tflite") else model_path
tf.saved_model.save(smodel, model_path, signatures=signatures)
# Save label file
labelIds = [label[:4].replace(" ", "") + str(i) for i, label in enumerate(labels, 1)]
labels_dir = os.path.join(model_path, "labels")
os.makedirs(labels_dir, exist_ok=True)
with open(os.path.join(labels_dir, "label_names.csv"), "w", newline="") as labelsfile:
labelwriter = csv.writer(labelsfile)
labelwriter.writerows(zip(labelIds, labels))
# Save class names file
classes_dir = os.path.join(model_path, "classes")
os.makedirs(classes_dir, exist_ok=True)
with open(os.path.join(classes_dir, "classes.csv"), "w", newline="") as classesfile:
classeswriter = csv.writer(classesfile)
for labelId in labelIds:
classeswriter.writerow((labelId, 0.25, cfg.SIG_FMIN, cfg.SIG_FMAX, False))
# Save model config
model_config = os.path.join(model_path, "model_config.json")
with open(model_config, "w") as modelconfigfile:
modelconfig = {
"specVersion": 1,
"modelDescription": "Custom classifier trained with BirdNET "
+ cfg.MODEL_VESION
+ " embeddings.\nBirdNET was developed by the K. Lisa Yang Center for Conservation Bioacoustics at the Cornell Lab of Ornithology in collaboration with Chemnitz University of Technology.\n\nhttps://birdnet.cornell.edu",
"modelTypeConfig": {"modelType": "RECOGNITION"},
"signatures": [
{
"signatureName": "basic",
"modelInputs": [{"inputName": "inputs", "sampleRate": 48000.0, "inputConfig": ["batch", "samples"]}],
"modelOutputs": [{"outputName": "scores", "outputType": "SCORES"}],
}
],
"globalSemanticKeys": labelIds,
}
json.dump(modelconfig, modelconfigfile, indent=2)
def predictFilter(lat, lon, week):
"""Predicts the probability for each species.
Args:
lat: The latitude.
lon: The longitude.
week: The week of the year [1-48]. Use -1 for yearlong.
Returns:
A list of probabilities for all species.
"""
global M_INTERPRETER
# Does interpreter exist?
if M_INTERPRETER == None:
loadMetaModel()
# Prepare mdata as sample
sample = np.expand_dims(np.array([lat, lon, week], dtype="float32"), 0)
# Run inference
M_INTERPRETER.set_tensor(M_INPUT_LAYER_INDEX, sample)
M_INTERPRETER.invoke()
return M_INTERPRETER.get_tensor(M_OUTPUT_LAYER_INDEX)[0]
def explore(lat: float, lon: float, week: int):
"""Predicts the species list.
Predicts the species list based on the coordinates and week of year.
Args:
lat: The latitude.
lon: The longitude.
week: The week of the year [1-48]. Use -1 for yearlong.
Returns:
A sorted list of tuples with the score and the species.
"""
# Make filter prediction
l_filter = predictFilter(lat, lon, week)
# Apply threshold
l_filter = np.where(l_filter >= cfg.LOCATION_FILTER_THRESHOLD, l_filter, 0)
# Zip with labels
l_filter = list(zip(l_filter, cfg.LABELS))
# Sort by filter value
l_filter = sorted(l_filter, key=lambda x: x[0], reverse=True)
return l_filter
def flat_sigmoid(x, sensitivity=-1):
return 1 / (1.0 + np.exp(sensitivity * np.clip(x, -15, 15)))
def predict(sample):
"""Uses the main net to predict a sample.
Args:
sample: Audio sample.
Returns:
The prediction scores for the sample.
"""
# Has custom classifier?
if cfg.CUSTOM_CLASSIFIER != None:
return predictWithCustomClassifier(sample)
global INTERPRETER
# Does interpreter or keras model exist?
if INTERPRETER == None and PBMODEL == None:
loadModel()
if PBMODEL == None:
# Reshape input tensor
INTERPRETER.resize_tensor_input(INPUT_LAYER_INDEX, [len(sample), *sample[0].shape])
INTERPRETER.allocate_tensors()
# Make a prediction (Audio only for now)
INTERPRETER.set_tensor(INPUT_LAYER_INDEX, np.array(sample, dtype="float32"))
INTERPRETER.invoke()
prediction = INTERPRETER.get_tensor(OUTPUT_LAYER_INDEX)
return prediction
else:
# Make a prediction (Audio only for now)
prediction = PBMODEL.embeddings_model.predict(sample)
return prediction
def predictWithCustomClassifier(sample):
"""Uses the custom classifier to make a prediction.
Args:
sample: Audio sample.
Returns:
The prediction scores for the sample.
"""
global C_INTERPRETER
global C_INPUT_SIZE
# Does interpreter exist?
if C_INTERPRETER == None:
loadCustomClassifier()
vector = embeddings(sample) if C_INPUT_SIZE != 144000 else sample
# Reshape input tensor
C_INTERPRETER.resize_tensor_input(C_INPUT_LAYER_INDEX, [len(vector), *vector[0].shape])
C_INTERPRETER.allocate_tensors()
# Make a prediction
C_INTERPRETER.set_tensor(C_INPUT_LAYER_INDEX, np.array(vector, dtype="float32"))
C_INTERPRETER.invoke()
prediction = C_INTERPRETER.get_tensor(C_OUTPUT_LAYER_INDEX)
return prediction
def embeddings(sample):
"""Extracts the embeddings for a sample.
Args:
sample: Audio samples.
Returns:
The embeddings.
"""
global INTERPRETER
# Does interpreter exist?
if INTERPRETER == None:
loadModel(False)
# Reshape input tensor
INTERPRETER.resize_tensor_input(INPUT_LAYER_INDEX, [len(sample), *sample[0].shape])
INTERPRETER.allocate_tensors()
# Extract feature embeddings
INTERPRETER.set_tensor(INPUT_LAYER_INDEX, np.array(sample, dtype="float32"))
INTERPRETER.invoke()
features = INTERPRETER.get_tensor(OUTPUT_LAYER_INDEX)
return features