Spaces:
Running
Running
"""Module to analyze audio samples. | |
""" | |
import argparse | |
import datetime | |
import json | |
import operator | |
import os | |
import sys | |
from multiprocessing import Pool, freeze_support | |
import numpy as np | |
import audio | |
import config as cfg | |
import model | |
import species | |
import utils | |
def loadCodes(): | |
"""Loads the eBird codes. | |
Returns: | |
A dictionary containing the eBird codes. | |
""" | |
with open(cfg.CODES_FILE, "r") as cfile: | |
codes = json.load(cfile) | |
return codes | |
def saveResultFile(r: dict[str, list], path: str, afile_path: str): | |
"""Saves the results to the hard drive. | |
Args: | |
r: The dictionary with {segment: scores}. | |
path: The path where the result should be saved. | |
afile_path: The path to audio file. | |
""" | |
# Make folder if it doesn't exist | |
if os.path.dirname(path): | |
os.makedirs(os.path.dirname(path), exist_ok=True) | |
# Selection table | |
out_string = "" | |
if cfg.RESULT_TYPE == "table": | |
# Raven selection header | |
header = "Selection\tView\tChannel\tBegin Time (s)\tEnd Time (s)\tLow Freq (Hz)\tHigh Freq (Hz)\tSpecies Code\tCommon Name\tConfidence\n" | |
selection_id = 0 | |
# Write header | |
out_string += header | |
# Read native sample rate | |
high_freq = audio.get_sample_rate(afile_path) / 2 | |
if high_freq > cfg.SIG_FMAX: | |
high_freq = cfg.SIG_FMAX | |
# Extract valid predictions for every timestamp | |
for timestamp in getSortedTimestamps(r): | |
rstring = "" | |
start, end = timestamp.split("-", 1) | |
for c in r[timestamp]: | |
if c[1] > cfg.MIN_CONFIDENCE and (not cfg.SPECIES_LIST or c[0] in cfg.SPECIES_LIST): | |
selection_id += 1 | |
label = cfg.TRANSLATED_LABELS[cfg.LABELS.index(c[0])] | |
rstring += "{}\tSpectrogram 1\t1\t{}\t{}\t{}\t{}\t{}\t{}\t{:.4f}\n".format( | |
selection_id, | |
start, | |
end, | |
cfg.SIG_FMIN, | |
high_freq, | |
cfg.CODES[c[0]] if c[0] in cfg.CODES else c[0], | |
label.split("_", 1)[-1], | |
c[1], | |
) | |
# Write result string to file | |
out_string += rstring | |
elif cfg.RESULT_TYPE == "audacity": | |
# Audacity timeline labels | |
for timestamp in getSortedTimestamps(r): | |
rstring = "" | |
for c in r[timestamp]: | |
if c[1] > cfg.MIN_CONFIDENCE and (not cfg.SPECIES_LIST or c[0] in cfg.SPECIES_LIST): | |
label = cfg.TRANSLATED_LABELS[cfg.LABELS.index(c[0])] | |
rstring += "{}\t{}\t{:.4f}\n".format(timestamp.replace("-", "\t"), label.replace("_", ", "), c[1]) | |
# Write result string to file | |
out_string += rstring | |
elif cfg.RESULT_TYPE == "r": | |
# Output format for R | |
header = "filepath,start,end,scientific_name,common_name,confidence,lat,lon,week,overlap,sensitivity,min_conf,species_list,model" | |
out_string += header | |
for timestamp in getSortedTimestamps(r): | |
rstring = "" | |
start, end = timestamp.split("-", 1) | |
for c in r[timestamp]: | |
if c[1] > cfg.MIN_CONFIDENCE and (not cfg.SPECIES_LIST or c[0] in cfg.SPECIES_LIST): | |
label = cfg.TRANSLATED_LABELS[cfg.LABELS.index(c[0])] | |
rstring += "\n{},{},{},{},{},{:.4f},{:.4f},{:.4f},{},{},{},{},{},{}".format( | |
afile_path, | |
start, | |
end, | |
label.split("_", 1)[0], | |
label.split("_", 1)[-1], | |
c[1], | |
cfg.LATITUDE, | |
cfg.LONGITUDE, | |
cfg.WEEK, | |
cfg.SIG_OVERLAP, | |
(1.0 - cfg.SIGMOID_SENSITIVITY) + 1.0, | |
cfg.MIN_CONFIDENCE, | |
cfg.SPECIES_LIST_FILE, | |
os.path.basename(cfg.MODEL_PATH), | |
) | |
# Write result string to file | |
out_string += rstring | |
elif cfg.RESULT_TYPE == "kaleidoscope": | |
# Output format for kaleidoscope | |
header = "INDIR,FOLDER,IN FILE,OFFSET,DURATION,scientific_name,common_name,confidence,lat,lon,week,overlap,sensitivity" | |
out_string += header | |
folder_path, filename = os.path.split(afile_path) | |
parent_folder, folder_name = os.path.split(folder_path) | |
for timestamp in getSortedTimestamps(r): | |
rstring = "" | |
start, end = timestamp.split("-", 1) | |
for c in r[timestamp]: | |
if c[1] > cfg.MIN_CONFIDENCE and (not cfg.SPECIES_LIST or c[0] in cfg.SPECIES_LIST): | |
label = cfg.TRANSLATED_LABELS[cfg.LABELS.index(c[0])] | |
rstring += "\n{},{},{},{},{},{},{},{:.4f},{:.4f},{:.4f},{},{},{}".format( | |
parent_folder.rstrip("/"), | |
folder_name, | |
filename, | |
start, | |
float(end) - float(start), | |
label.split("_", 1)[0], | |
label.split("_", 1)[-1], | |
c[1], | |
cfg.LATITUDE, | |
cfg.LONGITUDE, | |
cfg.WEEK, | |
cfg.SIG_OVERLAP, | |
(1.0 - cfg.SIGMOID_SENSITIVITY) + 1.0, | |
) | |
# Write result string to file | |
out_string += rstring | |
else: | |
# CSV output file | |
header = "Start (s),End (s),Scientific name,Common name,Confidence\n" | |
# Write header | |
out_string += header | |
for timestamp in getSortedTimestamps(r): | |
rstring = "" | |
for c in r[timestamp]: | |
start, end = timestamp.split("-", 1) | |
if c[1] > cfg.MIN_CONFIDENCE and (not cfg.SPECIES_LIST or c[0] in cfg.SPECIES_LIST): | |
label = cfg.TRANSLATED_LABELS[cfg.LABELS.index(c[0])] | |
rstring += "{},{},{},{},{:.4f}\n".format(start, end, label.split("_", 1)[0], label.split("_", 1)[-1], c[1]) | |
# Write result string to file | |
out_string += rstring | |
# Save as file | |
with open(path, "w", encoding="utf-8") as rfile: | |
rfile.write(out_string) | |
def getSortedTimestamps(results: dict[str, list]): | |
"""Sorts the results based on the segments. | |
Args: | |
results: The dictionary with {segment: scores}. | |
Returns: | |
Returns the sorted list of segments and their scores. | |
""" | |
return sorted(results, key=lambda t: float(t.split("-", 1)[0])) | |
def getRawAudioFromFile(fpath: str): | |
"""Reads an audio file. | |
Reads the file and splits the signal into chunks. | |
Args: | |
fpath: Path to the audio file. | |
Returns: | |
The signal split into a list of chunks. | |
""" | |
# Open file | |
sig, rate = audio.openAudioFile(fpath, cfg.SAMPLE_RATE) | |
# Split into raw audio chunks | |
chunks = audio.splitSignal(sig, rate, cfg.SIG_LENGTH, cfg.SIG_OVERLAP, cfg.SIG_MINLEN) | |
return chunks | |
def predict(samples): | |
"""Predicts the classes for the given samples. | |
Args: | |
samples: Samples to be predicted. | |
Returns: | |
The prediction scores. | |
""" | |
# Prepare sample and pass through model | |
data = np.array(samples, dtype="float32") | |
prediction = model.predict(data) | |
# Logits or sigmoid activations? | |
if cfg.APPLY_SIGMOID: | |
prediction = model.flat_sigmoid(np.array(prediction), sensitivity=-cfg.SIGMOID_SENSITIVITY) | |
return prediction | |
def analyzeFile(item): | |
"""Analyzes a file. | |
Predicts the scores for the file and saves the results. | |
Args: | |
item: Tuple containing (file path, config) | |
Returns: | |
The `True` if the file was analyzed successfully. | |
""" | |
# Get file path and restore cfg | |
fpath: str = item[0] | |
cfg.setConfig(item[1]) | |
# Start time | |
start_time = datetime.datetime.now() | |
# Status | |
print(f"Analyzing {fpath}", flush=True) | |
try: | |
# Open audio file and split into 3-second chunks | |
chunks = getRawAudioFromFile(fpath) | |
# If no chunks, show error and skip | |
except Exception as ex: | |
print(f"Error: Cannot open audio file {fpath}", flush=True) | |
utils.writeErrorLog(ex) | |
return False | |
# Process each chunk | |
try: | |
start, end = 0, cfg.SIG_LENGTH | |
results = {} | |
samples = [] | |
timestamps = [] | |
for chunk_index, chunk in enumerate(chunks): | |
# Add to batch | |
samples.append(chunk) | |
timestamps.append([start, end]) | |
# Advance start and end | |
start += cfg.SIG_LENGTH - cfg.SIG_OVERLAP | |
end = start + cfg.SIG_LENGTH | |
# Check if batch is full or last chunk | |
if len(samples) < cfg.BATCH_SIZE and chunk_index < len(chunks) - 1: | |
continue | |
# Predict | |
p = predict(samples) | |
# Add to results | |
for i in range(len(samples)): | |
# Get timestamp | |
s_start, s_end = timestamps[i] | |
# Get prediction | |
pred = p[i] | |
# Assign scores to labels | |
p_labels = zip(cfg.LABELS, pred) | |
# Sort by score | |
p_sorted = sorted(p_labels, key=operator.itemgetter(1), reverse=True) | |
# Store top 5 results and advance indices | |
results[str(s_start) + "-" + str(s_end)] = p_sorted | |
# Clear batch | |
samples = [] | |
timestamps = [] | |
except Exception as ex: | |
# Write error log | |
print(f"Error: Cannot analyze audio file {fpath}.\n", flush=True) | |
utils.writeErrorLog(ex) | |
return False | |
# Save as selection table | |
try: | |
# We have to check if output path is a file or directory | |
if not cfg.OUTPUT_PATH.rsplit(".", 1)[-1].lower() in ["txt", "csv"]: | |
rpath = fpath.replace(cfg.INPUT_PATH, "") | |
rpath = rpath[1:] if rpath[0] in ["/", "\\"] else rpath | |
# Make target directory if it doesn't exist | |
rdir = os.path.join(cfg.OUTPUT_PATH, os.path.dirname(rpath)) | |
os.makedirs(rdir, exist_ok=True) | |
if cfg.RESULT_TYPE == "table": | |
rtype = ".BirdNET.selection.table.txt" | |
elif cfg.RESULT_TYPE == "audacity": | |
rtype = ".BirdNET.results.txt" | |
else: | |
rtype = ".BirdNET.results.csv" | |
saveResultFile(results, os.path.join(cfg.OUTPUT_PATH, rpath.rsplit(".", 1)[0] + rtype), fpath) | |
else: | |
saveResultFile(results, cfg.OUTPUT_PATH, fpath) | |
except Exception as ex: | |
# Write error log | |
print(f"Error: Cannot save result for {fpath}.\n", flush=True) | |
utils.writeErrorLog(ex) | |
return False | |
delta_time = (datetime.datetime.now() - start_time).total_seconds() | |
print("Finished {} in {:.2f} seconds".format(fpath, delta_time), flush=True) | |
return True | |
if __name__ == "__main__": | |
# Freeze support for executable | |
freeze_support() | |
# Parse arguments | |
parser = argparse.ArgumentParser(description="Analyze audio files with BirdNET") | |
parser.add_argument( | |
"--i", default="example/", help="Path to input file or folder. If this is a file, --o needs to be a file too." | |
) | |
parser.add_argument( | |
"--o", default="example/", help="Path to output file or folder. If this is a file, --i needs to be a file too." | |
) | |
parser.add_argument("--lat", type=float, default=-1, help="Recording location latitude. Set -1 to ignore.") | |
parser.add_argument("--lon", type=float, default=-1, help="Recording location longitude. Set -1 to ignore.") | |
parser.add_argument( | |
"--week", | |
type=int, | |
default=-1, | |
help="Week of the year when the recording was made. Values in [1, 48] (4 weeks per month). Set -1 for year-round species list.", | |
) | |
parser.add_argument( | |
"--slist", | |
default="", | |
help='Path to species list file or folder. If folder is provided, species list needs to be named "species_list.txt". If lat and lon are provided, this list will be ignored.', | |
) | |
parser.add_argument( | |
"--sensitivity", | |
type=float, | |
default=1.0, | |
help="Detection sensitivity; Higher values result in higher sensitivity. Values in [0.5, 1.5]. Defaults to 1.0.", | |
) | |
parser.add_argument( | |
"--min_conf", type=float, default=0.1, help="Minimum confidence threshold. Values in [0.01, 0.99]. Defaults to 0.1." | |
) | |
parser.add_argument( | |
"--overlap", type=float, default=0.0, help="Overlap of prediction segments. Values in [0.0, 2.9]. Defaults to 0.0." | |
) | |
parser.add_argument( | |
"--rtype", | |
default="table", | |
help="Specifies output format. Values in ['table', 'audacity', 'r', 'kaleidoscope', 'csv']. Defaults to 'table' (Raven selection table).", | |
) | |
parser.add_argument("--threads", type=int, default=4, help="Number of CPU threads.") | |
parser.add_argument( | |
"--batchsize", type=int, default=1, help="Number of samples to process at the same time. Defaults to 1." | |
) | |
parser.add_argument( | |
"--locale", | |
default="en", | |
help="Locale for translated species common names. Values in ['af', 'de', 'it', ...] Defaults to 'en'.", | |
) | |
parser.add_argument( | |
"--sf_thresh", | |
type=float, | |
default=0.03, | |
help="Minimum species occurrence frequency threshold for location filter. Values in [0.01, 0.99]. Defaults to 0.03.", | |
) | |
parser.add_argument( | |
"--classifier", | |
default=None, | |
help="Path to custom trained classifier. Defaults to None. If set, --lat, --lon and --locale are ignored.", | |
) | |
args = parser.parse_args() | |
# Set paths relative to script path (requested in #3) | |
script_dir = os.path.dirname(os.path.abspath(sys.argv[0])) | |
cfg.MODEL_PATH = os.path.join(script_dir, cfg.MODEL_PATH) | |
cfg.LABELS_FILE = os.path.join(script_dir, cfg.LABELS_FILE) | |
cfg.TRANSLATED_LABELS_PATH = os.path.join(script_dir, cfg.TRANSLATED_LABELS_PATH) | |
cfg.MDATA_MODEL_PATH = os.path.join(script_dir, cfg.MDATA_MODEL_PATH) | |
cfg.CODES_FILE = os.path.join(script_dir, cfg.CODES_FILE) | |
cfg.ERROR_LOG_FILE = os.path.join(script_dir, cfg.ERROR_LOG_FILE) | |
# Load eBird codes, labels | |
cfg.CODES = loadCodes() | |
cfg.LABELS = utils.readLines(cfg.LABELS_FILE) | |
# Set custom classifier? | |
if args.classifier is not None: | |
cfg.CUSTOM_CLASSIFIER = args.classifier # we treat this as absolute path, so no need to join with dirname | |
cfg.LABELS_FILE = args.classifier.replace(".tflite", "_Labels.txt") # same for labels file | |
cfg.LABELS = utils.readLines(cfg.LABELS_FILE) | |
args.lat = -1 | |
args.lon = -1 | |
args.locale = "en" | |
# Load translated labels | |
lfile = os.path.join( | |
cfg.TRANSLATED_LABELS_PATH, os.path.basename(cfg.LABELS_FILE).replace(".txt", "_{}.txt".format(args.locale)) | |
) | |
if not args.locale in ["en"] and os.path.isfile(lfile): | |
cfg.TRANSLATED_LABELS = utils.readLines(lfile) | |
else: | |
cfg.TRANSLATED_LABELS = cfg.LABELS | |
### Make sure to comment out appropriately if you are not using args. ### | |
# Load species list from location filter or provided list | |
cfg.LATITUDE, cfg.LONGITUDE, cfg.WEEK = args.lat, args.lon, args.week | |
cfg.LOCATION_FILTER_THRESHOLD = max(0.01, min(0.99, float(args.sf_thresh))) | |
if cfg.LATITUDE == -1 and cfg.LONGITUDE == -1: | |
if not args.slist: | |
cfg.SPECIES_LIST_FILE = None | |
else: | |
cfg.SPECIES_LIST_FILE = os.path.join(script_dir, args.slist) | |
if os.path.isdir(cfg.SPECIES_LIST_FILE): | |
cfg.SPECIES_LIST_FILE = os.path.join(cfg.SPECIES_LIST_FILE, "species_list.txt") | |
cfg.SPECIES_LIST = utils.readLines(cfg.SPECIES_LIST_FILE) | |
else: | |
cfg.SPECIES_LIST_FILE = None | |
cfg.SPECIES_LIST = species.getSpeciesList(cfg.LATITUDE, cfg.LONGITUDE, cfg.WEEK, cfg.LOCATION_FILTER_THRESHOLD) | |
if not cfg.SPECIES_LIST: | |
print(f"Species list contains {len(cfg.LABELS)} species") | |
else: | |
print(f"Species list contains {len(cfg.SPECIES_LIST)} species") | |
# Set input and output path | |
cfg.INPUT_PATH = args.i | |
cfg.OUTPUT_PATH = args.o | |
# Parse input files | |
if os.path.isdir(cfg.INPUT_PATH): | |
cfg.FILE_LIST = utils.collect_audio_files(cfg.INPUT_PATH) | |
print(f"Found {len(cfg.FILE_LIST)} files to analyze") | |
else: | |
cfg.FILE_LIST = [cfg.INPUT_PATH] | |
# Set confidence threshold | |
cfg.MIN_CONFIDENCE = max(0.01, min(0.99, float(args.min_conf))) | |
# Set sensitivity | |
cfg.SIGMOID_SENSITIVITY = max(0.5, min(1.0 - (float(args.sensitivity) - 1.0), 1.5)) | |
# Set overlap | |
cfg.SIG_OVERLAP = max(0.0, min(2.9, float(args.overlap))) | |
# Set result type | |
cfg.RESULT_TYPE = args.rtype.lower() | |
if not cfg.RESULT_TYPE in ["table", "audacity", "r", "kaleidoscope", "csv"]: | |
cfg.RESULT_TYPE = "table" | |
# Set number of threads | |
if os.path.isdir(cfg.INPUT_PATH): | |
cfg.CPU_THREADS = max(1, int(args.threads)) | |
cfg.TFLITE_THREADS = 1 | |
else: | |
cfg.CPU_THREADS = 1 | |
cfg.TFLITE_THREADS = max(1, int(args.threads)) | |
# Set batch size | |
cfg.BATCH_SIZE = max(1, int(args.batchsize)) | |
# Add config items to each file list entry. | |
# We have to do this for Windows which does not | |
# support fork() and thus each process has to | |
# have its own config. USE LINUX! | |
flist = [(f, cfg.getConfig()) for f in cfg.FILE_LIST] | |
# Analyze files | |
if cfg.CPU_THREADS < 2: | |
for entry in flist: | |
analyzeFile(entry) | |
else: | |
with Pool(cfg.CPU_THREADS) as p: | |
p.map(analyzeFile, flist) | |
# A few examples to test | |
# python3 analyze.py --i example/ --o example/ --slist example/ --min_conf 0.5 --threads 4 | |
# python3 analyze.py --i example/soundscape.wav --o example/soundscape.BirdNET.selection.table.txt --slist example/species_list.txt --threads 8 | |
# python3 analyze.py --i example/ --o example/ --lat 42.5 --lon -76.45 --week 4 --sensitivity 1.0 --rtype table --locale de | |