"""Module to analyze audio samples. """ import argparse import datetime import json import operator import os import sys from multiprocessing import Pool, freeze_support import numpy as np import audio import config as cfg import model import species import utils def loadCodes(): """Loads the eBird codes. Returns: A dictionary containing the eBird codes. """ with open(cfg.CODES_FILE, "r") as cfile: codes = json.load(cfile) return codes def saveResultFile(r: dict[str, list], path: str, afile_path: str): """Saves the results to the hard drive. Args: r: The dictionary with {segment: scores}. path: The path where the result should be saved. afile_path: The path to audio file. """ # Make folder if it doesn't exist if os.path.dirname(path): os.makedirs(os.path.dirname(path), exist_ok=True) # Selection table out_string = "" if cfg.RESULT_TYPE == "table": # Raven selection header header = "Selection\tView\tChannel\tBegin Time (s)\tEnd Time (s)\tLow Freq (Hz)\tHigh Freq (Hz)\tSpecies Code\tCommon Name\tConfidence\n" selection_id = 0 # Write header out_string += header # Read native sample rate high_freq = audio.get_sample_rate(afile_path) / 2 if high_freq > cfg.SIG_FMAX: high_freq = cfg.SIG_FMAX # Extract valid predictions for every timestamp for timestamp in getSortedTimestamps(r): rstring = "" start, end = timestamp.split("-", 1) for c in r[timestamp]: if c[1] > cfg.MIN_CONFIDENCE and (not cfg.SPECIES_LIST or c[0] in cfg.SPECIES_LIST): selection_id += 1 label = cfg.TRANSLATED_LABELS[cfg.LABELS.index(c[0])] rstring += "{}\tSpectrogram 1\t1\t{}\t{}\t{}\t{}\t{}\t{}\t{:.4f}\n".format( selection_id, start, end, cfg.SIG_FMIN, high_freq, cfg.CODES[c[0]] if c[0] in cfg.CODES else c[0], label.split("_", 1)[-1], c[1], ) # Write result string to file out_string += rstring elif cfg.RESULT_TYPE == "audacity": # Audacity timeline labels for timestamp in getSortedTimestamps(r): rstring = "" for c in r[timestamp]: if c[1] > cfg.MIN_CONFIDENCE and (not cfg.SPECIES_LIST or c[0] in cfg.SPECIES_LIST): label = cfg.TRANSLATED_LABELS[cfg.LABELS.index(c[0])] rstring += "{}\t{}\t{:.4f}\n".format(timestamp.replace("-", "\t"), label.replace("_", ", "), c[1]) # Write result string to file out_string += rstring elif cfg.RESULT_TYPE == "r": # Output format for R header = "filepath,start,end,scientific_name,common_name,confidence,lat,lon,week,overlap,sensitivity,min_conf,species_list,model" out_string += header for timestamp in getSortedTimestamps(r): rstring = "" start, end = timestamp.split("-", 1) for c in r[timestamp]: if c[1] > cfg.MIN_CONFIDENCE and (not cfg.SPECIES_LIST or c[0] in cfg.SPECIES_LIST): label = cfg.TRANSLATED_LABELS[cfg.LABELS.index(c[0])] rstring += "\n{},{},{},{},{},{:.4f},{:.4f},{:.4f},{},{},{},{},{},{}".format( afile_path, start, end, label.split("_", 1)[0], label.split("_", 1)[-1], c[1], cfg.LATITUDE, cfg.LONGITUDE, cfg.WEEK, cfg.SIG_OVERLAP, (1.0 - cfg.SIGMOID_SENSITIVITY) + 1.0, cfg.MIN_CONFIDENCE, cfg.SPECIES_LIST_FILE, os.path.basename(cfg.MODEL_PATH), ) # Write result string to file out_string += rstring elif cfg.RESULT_TYPE == "kaleidoscope": # Output format for kaleidoscope header = "INDIR,FOLDER,IN FILE,OFFSET,DURATION,scientific_name,common_name,confidence,lat,lon,week,overlap,sensitivity" out_string += header folder_path, filename = os.path.split(afile_path) parent_folder, folder_name = os.path.split(folder_path) for timestamp in getSortedTimestamps(r): rstring = "" start, end = timestamp.split("-", 1) for c in r[timestamp]: if c[1] > cfg.MIN_CONFIDENCE and (not cfg.SPECIES_LIST or c[0] in cfg.SPECIES_LIST): label = cfg.TRANSLATED_LABELS[cfg.LABELS.index(c[0])] rstring += "\n{},{},{},{},{},{},{},{:.4f},{:.4f},{:.4f},{},{},{}".format( parent_folder.rstrip("/"), folder_name, filename, start, float(end) - float(start), label.split("_", 1)[0], label.split("_", 1)[-1], c[1], cfg.LATITUDE, cfg.LONGITUDE, cfg.WEEK, cfg.SIG_OVERLAP, (1.0 - cfg.SIGMOID_SENSITIVITY) + 1.0, ) # Write result string to file out_string += rstring else: # CSV output file header = "Start (s),End (s),Scientific name,Common name,Confidence\n" # Write header out_string += header for timestamp in getSortedTimestamps(r): rstring = "" for c in r[timestamp]: start, end = timestamp.split("-", 1) if c[1] > cfg.MIN_CONFIDENCE and (not cfg.SPECIES_LIST or c[0] in cfg.SPECIES_LIST): label = cfg.TRANSLATED_LABELS[cfg.LABELS.index(c[0])] rstring += "{},{},{},{},{:.4f}\n".format(start, end, label.split("_", 1)[0], label.split("_", 1)[-1], c[1]) # Write result string to file out_string += rstring # Save as file with open(path, "w", encoding="utf-8") as rfile: rfile.write(out_string) def getSortedTimestamps(results: dict[str, list]): """Sorts the results based on the segments. Args: results: The dictionary with {segment: scores}. Returns: Returns the sorted list of segments and their scores. """ return sorted(results, key=lambda t: float(t.split("-", 1)[0])) def getRawAudioFromFile(fpath: str): """Reads an audio file. Reads the file and splits the signal into chunks. Args: fpath: Path to the audio file. Returns: The signal split into a list of chunks. """ # Open file sig, rate = audio.openAudioFile(fpath, cfg.SAMPLE_RATE) # Split into raw audio chunks chunks = audio.splitSignal(sig, rate, cfg.SIG_LENGTH, cfg.SIG_OVERLAP, cfg.SIG_MINLEN) return chunks def predict(samples): """Predicts the classes for the given samples. Args: samples: Samples to be predicted. Returns: The prediction scores. """ # Prepare sample and pass through model data = np.array(samples, dtype="float32") prediction = model.predict(data) # Logits or sigmoid activations? if cfg.APPLY_SIGMOID: prediction = model.flat_sigmoid(np.array(prediction), sensitivity=-cfg.SIGMOID_SENSITIVITY) return prediction def analyzeFile(item): """Analyzes a file. Predicts the scores for the file and saves the results. Args: item: Tuple containing (file path, config) Returns: The `True` if the file was analyzed successfully. """ # Get file path and restore cfg fpath: str = item[0] cfg.setConfig(item[1]) # Start time start_time = datetime.datetime.now() # Status print(f"Analyzing {fpath}", flush=True) try: # Open audio file and split into 3-second chunks chunks = getRawAudioFromFile(fpath) # If no chunks, show error and skip except Exception as ex: print(f"Error: Cannot open audio file {fpath}", flush=True) utils.writeErrorLog(ex) return False # Process each chunk try: start, end = 0, cfg.SIG_LENGTH results = {} samples = [] timestamps = [] for chunk_index, chunk in enumerate(chunks): # Add to batch samples.append(chunk) timestamps.append([start, end]) # Advance start and end start += cfg.SIG_LENGTH - cfg.SIG_OVERLAP end = start + cfg.SIG_LENGTH # Check if batch is full or last chunk if len(samples) < cfg.BATCH_SIZE and chunk_index < len(chunks) - 1: continue # Predict p = predict(samples) # Add to results for i in range(len(samples)): # Get timestamp s_start, s_end = timestamps[i] # Get prediction pred = p[i] # Assign scores to labels p_labels = zip(cfg.LABELS, pred) # Sort by score p_sorted = sorted(p_labels, key=operator.itemgetter(1), reverse=True) # Store top 5 results and advance indices results[str(s_start) + "-" + str(s_end)] = p_sorted # Clear batch samples = [] timestamps = [] except Exception as ex: # Write error log print(f"Error: Cannot analyze audio file {fpath}.\n", flush=True) utils.writeErrorLog(ex) return False # Save as selection table try: # We have to check if output path is a file or directory if not cfg.OUTPUT_PATH.rsplit(".", 1)[-1].lower() in ["txt", "csv"]: rpath = fpath.replace(cfg.INPUT_PATH, "") rpath = rpath[1:] if rpath[0] in ["/", "\\"] else rpath # Make target directory if it doesn't exist rdir = os.path.join(cfg.OUTPUT_PATH, os.path.dirname(rpath)) os.makedirs(rdir, exist_ok=True) if cfg.RESULT_TYPE == "table": rtype = ".BirdNET.selection.table.txt" elif cfg.RESULT_TYPE == "audacity": rtype = ".BirdNET.results.txt" else: rtype = ".BirdNET.results.csv" saveResultFile(results, os.path.join(cfg.OUTPUT_PATH, rpath.rsplit(".", 1)[0] + rtype), fpath) else: saveResultFile(results, cfg.OUTPUT_PATH, fpath) except Exception as ex: # Write error log print(f"Error: Cannot save result for {fpath}.\n", flush=True) utils.writeErrorLog(ex) return False delta_time = (datetime.datetime.now() - start_time).total_seconds() print("Finished {} in {:.2f} seconds".format(fpath, delta_time), flush=True) return True if __name__ == "__main__": # Freeze support for executable freeze_support() # Parse arguments parser = argparse.ArgumentParser(description="Analyze audio files with BirdNET") parser.add_argument( "--i", default="example/", help="Path to input file or folder. If this is a file, --o needs to be a file too." ) parser.add_argument( "--o", default="example/", help="Path to output file or folder. If this is a file, --i needs to be a file too." ) parser.add_argument("--lat", type=float, default=-1, help="Recording location latitude. Set -1 to ignore.") parser.add_argument("--lon", type=float, default=-1, help="Recording location longitude. Set -1 to ignore.") parser.add_argument( "--week", type=int, default=-1, help="Week of the year when the recording was made. Values in [1, 48] (4 weeks per month). Set -1 for year-round species list.", ) parser.add_argument( "--slist", default="", help='Path to species list file or folder. If folder is provided, species list needs to be named "species_list.txt". If lat and lon are provided, this list will be ignored.', ) parser.add_argument( "--sensitivity", type=float, default=1.0, help="Detection sensitivity; Higher values result in higher sensitivity. Values in [0.5, 1.5]. Defaults to 1.0.", ) parser.add_argument( "--min_conf", type=float, default=0.1, help="Minimum confidence threshold. Values in [0.01, 0.99]. Defaults to 0.1." ) parser.add_argument( "--overlap", type=float, default=0.0, help="Overlap of prediction segments. Values in [0.0, 2.9]. Defaults to 0.0." ) parser.add_argument( "--rtype", default="table", help="Specifies output format. Values in ['table', 'audacity', 'r', 'kaleidoscope', 'csv']. Defaults to 'table' (Raven selection table).", ) parser.add_argument("--threads", type=int, default=4, help="Number of CPU threads.") parser.add_argument( "--batchsize", type=int, default=1, help="Number of samples to process at the same time. Defaults to 1." ) parser.add_argument( "--locale", default="en", help="Locale for translated species common names. Values in ['af', 'de', 'it', ...] Defaults to 'en'.", ) parser.add_argument( "--sf_thresh", type=float, default=0.03, help="Minimum species occurrence frequency threshold for location filter. Values in [0.01, 0.99]. Defaults to 0.03.", ) parser.add_argument( "--classifier", default=None, help="Path to custom trained classifier. Defaults to None. If set, --lat, --lon and --locale are ignored.", ) args = parser.parse_args() # Set paths relative to script path (requested in #3) script_dir = os.path.dirname(os.path.abspath(sys.argv[0])) cfg.MODEL_PATH = os.path.join(script_dir, cfg.MODEL_PATH) cfg.LABELS_FILE = os.path.join(script_dir, cfg.LABELS_FILE) cfg.TRANSLATED_LABELS_PATH = os.path.join(script_dir, cfg.TRANSLATED_LABELS_PATH) cfg.MDATA_MODEL_PATH = os.path.join(script_dir, cfg.MDATA_MODEL_PATH) cfg.CODES_FILE = os.path.join(script_dir, cfg.CODES_FILE) cfg.ERROR_LOG_FILE = os.path.join(script_dir, cfg.ERROR_LOG_FILE) # Load eBird codes, labels cfg.CODES = loadCodes() cfg.LABELS = utils.readLines(cfg.LABELS_FILE) # Set custom classifier? if args.classifier is not None: cfg.CUSTOM_CLASSIFIER = args.classifier # we treat this as absolute path, so no need to join with dirname cfg.LABELS_FILE = args.classifier.replace(".tflite", "_Labels.txt") # same for labels file cfg.LABELS = utils.readLines(cfg.LABELS_FILE) args.lat = -1 args.lon = -1 args.locale = "en" # Load translated labels lfile = os.path.join( cfg.TRANSLATED_LABELS_PATH, os.path.basename(cfg.LABELS_FILE).replace(".txt", "_{}.txt".format(args.locale)) ) if not args.locale in ["en"] and os.path.isfile(lfile): cfg.TRANSLATED_LABELS = utils.readLines(lfile) else: cfg.TRANSLATED_LABELS = cfg.LABELS ### Make sure to comment out appropriately if you are not using args. ### # Load species list from location filter or provided list cfg.LATITUDE, cfg.LONGITUDE, cfg.WEEK = args.lat, args.lon, args.week cfg.LOCATION_FILTER_THRESHOLD = max(0.01, min(0.99, float(args.sf_thresh))) if cfg.LATITUDE == -1 and cfg.LONGITUDE == -1: if not args.slist: cfg.SPECIES_LIST_FILE = None else: cfg.SPECIES_LIST_FILE = os.path.join(script_dir, args.slist) if os.path.isdir(cfg.SPECIES_LIST_FILE): cfg.SPECIES_LIST_FILE = os.path.join(cfg.SPECIES_LIST_FILE, "species_list.txt") cfg.SPECIES_LIST = utils.readLines(cfg.SPECIES_LIST_FILE) else: cfg.SPECIES_LIST_FILE = None cfg.SPECIES_LIST = species.getSpeciesList(cfg.LATITUDE, cfg.LONGITUDE, cfg.WEEK, cfg.LOCATION_FILTER_THRESHOLD) if not cfg.SPECIES_LIST: print(f"Species list contains {len(cfg.LABELS)} species") else: print(f"Species list contains {len(cfg.SPECIES_LIST)} species") # Set input and output path cfg.INPUT_PATH = args.i cfg.OUTPUT_PATH = args.o # Parse input files if os.path.isdir(cfg.INPUT_PATH): cfg.FILE_LIST = utils.collect_audio_files(cfg.INPUT_PATH) print(f"Found {len(cfg.FILE_LIST)} files to analyze") else: cfg.FILE_LIST = [cfg.INPUT_PATH] # Set confidence threshold cfg.MIN_CONFIDENCE = max(0.01, min(0.99, float(args.min_conf))) # Set sensitivity cfg.SIGMOID_SENSITIVITY = max(0.5, min(1.0 - (float(args.sensitivity) - 1.0), 1.5)) # Set overlap cfg.SIG_OVERLAP = max(0.0, min(2.9, float(args.overlap))) # Set result type cfg.RESULT_TYPE = args.rtype.lower() if not cfg.RESULT_TYPE in ["table", "audacity", "r", "kaleidoscope", "csv"]: cfg.RESULT_TYPE = "table" # Set number of threads if os.path.isdir(cfg.INPUT_PATH): cfg.CPU_THREADS = max(1, int(args.threads)) cfg.TFLITE_THREADS = 1 else: cfg.CPU_THREADS = 1 cfg.TFLITE_THREADS = max(1, int(args.threads)) # Set batch size cfg.BATCH_SIZE = max(1, int(args.batchsize)) # Add config items to each file list entry. # We have to do this for Windows which does not # support fork() and thus each process has to # have its own config. USE LINUX! flist = [(f, cfg.getConfig()) for f in cfg.FILE_LIST] # Analyze files if cfg.CPU_THREADS < 2: for entry in flist: analyzeFile(entry) else: with Pool(cfg.CPU_THREADS) as p: p.map(analyzeFile, flist) # A few examples to test # python3 analyze.py --i example/ --o example/ --slist example/ --min_conf 0.5 --threads 4 # python3 analyze.py --i example/soundscape.wav --o example/soundscape.BirdNET.selection.table.txt --slist example/species_list.txt --threads 8 # python3 analyze.py --i example/ --o example/ --lat 42.5 --lon -76.45 --week 4 --sensitivity 1.0 --rtype table --locale de