#!/usr/bin/env python3 # # This python script takes one or more audio annotation tables (such as the ones generated by Raven) and selects from each referenced audio file new time intervals (new annotations) as close as possible to (but not overlapping with) the input annotations. # Hence, each newly created annotation is close to one of the input annotations. # This may be useful for example for constructing "negative control" audio clips, i.e., that capture the background soundscape without the sounds specific to the input annotations (e..g, bird calls). # When loading audios (e.g. for determining durations, if needed), various formats are supported, such as wav, flac or ogg. # Example usage: # create_nearby_audio_annotations.py -i "existing_annotations/*.tsv" -f -v -o new_annotations.tsv # # Tested using Python 3.11, Mac OS 13.6.5. # # Stilianos Louca # Copyright 2024 # # LICENSE AGREEMENT # - - - - - - - - - # All rights reserved. # Use and redistributions of this code is permitted for commercial and non-commercial purposes, # under the following conditions: # # * Redistributions must retain the above copyright notice, this list of # conditions and the following disclaimer in the code itself, as well # as in documentation and/or other materials provided with the code. # * Neither the name of the original author (Stilianos Louca), nor the names # of its contributors may be used to endorse or promote products derived # from this code without specific prior written permission. # * Proper attribution must be given to the original author, including a # reference to any peer-reviewed publication through which the code was published. # # THIS CODE IS PROVIDED BY THE COPYRIGHT HOLDER AND CONTRIBUTORS "AS IS" AND ANY # EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES # OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. # IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS CODE, # EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # - - - - - - - - - import time import argparse import math import glob import os, sys import pandas import numpy import re import fnmatch import librosa import scipy pandas.set_option('display.max_rows', None) pandas.set_option('display.min_rows', None) pandas.set_option('display.max_colwidth', None) pandas.set_option('display.max_columns', None) FILE_EXTENSION_TO_COLUMN_DELIMITER = {"txt":"\t", "tsv":"\t", "tab":"\t", "csv":",", "dnl":"\t"} ########################################### # AUXILIARY FUNCTIONS # get the right-most extension of a file # if skip_gz==True, and the file name ends with ".gz", then the ".gz" part is ignored (for example "data.tsv.gz" yields "tsv"). def file_extension(filepath, skip_gz): if(skip_gz and filepath.lower().endswith(".gz")): filepath = filepath[:-3] extension = filepath.rsplit(".",1)[-1] return extension def file_basename(filepath): basename = os.path.split(filepath)[1] if(basename.lower().endswith(".gz")): basename = basename[:-3] basename = os.path.splitext(basename)[0] return basename # guess the column delimiter of a classical table file, based on the file extension def infer_file_delimiter(filepath, default=" "): extension = file_extension(filepath, skip_gz=True).lower() return FILE_EXTENSION_TO_COLUMN_DELIMITER.get(extension, default) # open a local file for reading/writing, supporting optionally gzip compression (file extension ".gz") # mode can be 'wt', 'rt', 'wb', 'rb' or other standard modes def open_file(filepath, mode): if(mode.startswith("w") or mode.startswith("a")): parent = os.path.dirname(filepath) if(parent!=""): os.makedirs(parent, exist_ok=True) if(filepath.lower().endswith(".gz")): return gzip.open(filepath,mode) else: return open(filepath,mode) # return the set {0,..,N-1} \ subset # subset may be a slice object or a 1D array/list of integers or a single integer def integer_complement(N, subset): keep = numpy.ones(N, dtype=bool) keep[subset,...] = False # use ellipsis at the end to accommodate situations where subset is a tuple return numpy.nonzero(keep)[0] def get_date_time(): return time.strftime("%Y.%m.%d") + " " + time.strftime("%H:%M:%S") def get_shell_command(): arguments = sys.argv; arguments = [("'"+a+"'" if (a=="" or re.search(r"[\s*#:;]", a)) else ("'"+'\\\\t'+"'" if (a=='\\t') else a)).replace("\n","") for a in arguments]; return ' '.join(arguments); # similar to numpy.cumsum, but counting starts 1 later, i.e., cumsum0 at the k-th index is equal to the sum of X[0],..,X[k-1] def cumsum0(X,axis=None): if(axis is None): C = numpy.empty(X.size, dtype=X.dtype) C[0] = 0 C[1:] = numpy.cumsum(X)[:-1] else: C = numpy.zeros_like(X) if(axis==0): C[1,...] = numpy.cumsum(X,axis=0)[:-1,...] else: multislice = [slice(None)]*X.ndim multislice[axis] = slice(0,X.shape[axis]-1) cumvalues = numpy.cumsum(X[tuple(multislice)],axis=axis) multislice[axis] = slice(1,X.shape[axis]) C[tuple(multislice)] = cumvalues return C # sort one or more synchronized 1D numpy arrays, based on the values in the first passed array def sort_synchronized_arrays(*arrays): if(len(arrays)==0): return new_arrays = [None]*len(arrays) order = numpy.argsort(arrays[0]) for a in range(len(arrays)): new_arrays[a] = arrays[a][order] return (*new_arrays,) def check_output_file(file,force,verbose,verbose_prefix): if(os.path.exists(file)): if(force): if(os.path.isdir(file)): os.rmtree(file) else: os.remove(file) if(verbose): print("%sNote: Replacing output file '%s'" % (verbose_prefix,file)) else: print("%sERROR: Output file '%s' already exists\n%s Cowardly refusing to continue.\n%s Use --force to ignore this message" % (verbose_prefix,file,verbose_prefix,verbose_prefix)) sys.exit(1) else: parent = os.path.dirname(file); if(parent!=""): os.makedirs(parent,exist_ok=True) # split a string on a single-character separator such as ':' or ',' or ';', unless it is escaped def split_escaped(haystack, separator): parts = list(re.split(r'(?=0)] else: only_names = set(only_names) items_to_keep = [i for i in items_to_keep if (names[i] in only_names)] if((omit_names is not None) and (omit_names!="")): if(isinstance(omit_names, str)): omit_names = split_escaped(omit_names, ",") if(not case_sensitive): omit_names = [name.lower() for name in omit_names] if(allow_wildcards): items_to_keep = [i for i in items_to_keep if (next((n for n,name in enumerate(omit_names) if fnmatch.fnmatchcase(names[i],name)),-1)<0)] else: omit_names = set(omit_names) items_to_keep = [i for i in items_to_keep if (names[i] not in omit_names)] return items_to_keep ########################################### # MAIN BODY # parse command line arguments parser = argparse.ArgumentParser(description="Define new annotations in one or more audio files, avoiding (but staying nearby) existing annotations. The needed durations of the audios can either be computed from scratch from the audios themselves, or specified in separate tables, or assumed to be the maximum annotation end time found in the input annotations.", epilog="") parser.add_argument('-i','--in_annotations', required=True, type=str, help="Paths or wildcards to audio annotation tables. Multiple paths/wildcards must be separated by a colon. Each table may be in CSV, TSV or Excel format, similar to the annotation tables generated by Raven.") parser.add_argument('-d','--in_audio_durations', default="", type=str, help="Optional colon-separated list of paths or wildcards to input tables listing durations for the input audio files. Each table may be in CSV, TSV or Excel format, and must include a header row listing column names. These files should list the durations of considered audio files (i.e., referenced in the annotations) in seconds, one row per audio file, and can be used to avoid loading audio files just to determine their duration. Audios are identified by their file names (with extension), not their full paths. Audios not listed in any of these tables will be loaded from scratch (if existent) to determine their durations; for missing audio files the maximum annotation end time is assumed to be the duration.") parser.add_argument('-o','--output', required=True, type=str, help="Path to output table file, where the newly created annotations are to be saved. The column delimiter will be determined automatically based on the file extension."); # input/output options parser.add_argument('--audio_durations_paths_column', default="path", type=str, help="Column name listing audio file names or file paths in the --in_audio_durations table. If paths are listed and --match_audio_paths is set, only their file names are used. (default: '%(default)s)'") parser.add_argument('--audio_durations_column', default="duration", type=str, help="Column name listing audio durations in the --in_audio_durations tables, in seconds. (default: '%(default)s)'") parser.add_argument('--start_times_column', default="Begin Time (s)", type=str, help="Column name listing annotation start times (seconds) in the input & output annotations tables. (default: '%(default)s)'") parser.add_argument('--end_times_column', default="End Time (s)", type=str, help="Column name listing annotation end times (seconds) in the input & output annotations tables. (default: '%(default)s)'") parser.add_argument('--annotation_audio_paths_column', default="Begin File", type=str, help="Column name listing audio file names or paths in the input & output annotations tables. If audio durations are to be determined from the audio files themselves (as opposed to being specified in --in_audio_durations), then these should be file paths, not just names. (default: '%(default)s)'") parser.add_argument('--match_audio_paths', action='store_true', dest="match_audio_paths", default=False, help='Match audios listed in the --in_audio_durations tables and in the annotation tables based on their full paths, instead of just their file names.') parser.add_argument('--adopt_annotation_fields', default="", help="Optional comma-separated list of column names (or wildcards) to adopt as-is from the old annotations to the new annotations. Note that this may not always make sense. This may be used for example to make the new annotations have the same frequency window as the old annotations."); parser.add_argument('--include_comments_in_output', action='store_true', dest="include_comments_in_output", default=False, help='Whether to include basic overview comments in the output annotations table.'); parser.add_argument('--Nnew_per_old', default=1, type=int, help="Number of new annotations to create for each of the input annotations. (default: '%(default)s)'"); parser.add_argument('--min_new_length', default=None, type=float, help="Minimum acceptable length (in seconds) of new annotations. If left unspecified, then each input annotation's length is used as the minimum acceptable length for the associated new annotations. The minimum length affects the gaps that are available for placing an annotation."); parser.add_argument('--inadequate_gaps', default="error", type=str, choices=["error", "skip", "shorten"], help="How to handle cases where an audio file lacks sufficient gaps for inserting the requested number of new annotations. If 'skip', it means that the audio file is skipped once no more annotations can be inserted, so the total number of new annotations may end up being less than requested. 'shorten' means that pending new annotations for that audio file will be made shorter to fit."); parser.add_argument('-f','--force', action='store_true', dest="force", default=False, help='Replace existing output files without asking.'); parser.add_argument('--verbose_prefix', default=" ", help="Line prefix to be used for standard output messages. This may be useful if the script is part of another pipeline. (default: '%(default)s)'"); parser.add_argument('-v','--verbose', action='store_true', dest="verbose", default=False, help='Show lots of information.'); args = parser.parse_args() def abort(message, exit_code=1): print(message) sys.exit(exit_code) # find input annotations tables annotation_paths = [path for path_spec in args.in_annotations.split(":") for path in glob.glob(path_spec)] if(args.verbose): print("%sNote: Found %d annotation tables"%(args.verbose_prefix,len(annotation_paths))) if(len(annotation_paths)==0): abort("%sNothing to be done"%(args.verbose_prefix), 0) # load & merge annotation tables if(args.verbose): print("%sLoading %d annotation tables.."%(args.verbose_prefix,len(annotation_paths))) old_annotations = [None]*len(annotation_paths) for t,table_path in enumerate(annotation_paths): if(table_path.lower().endswith(".xlsx")): table = pandas.read_excel(table_path, na_filter=False) else: table = pandas.read_csv(table_path, delimiter=infer_file_delimiter(table_path, default="\t"), na_filter=False, comment="#") table["original_annotation_table"] = table_path old_annotations[t] = table old_annotations = pandas.concat(old_annotations, axis=0, join='outer', ignore_index=True, copy=False) unique_audio_paths = numpy.unique(old_annotations[args.annotation_audio_paths_column]) if(args.verbose): print("%s Note: Found a total of %d annotations, covering %d unique audio files"%(args.verbose_prefix,old_annotations.shape[0],len(unique_audio_paths))) # find & load audio durations from text files, if available durations = {} if(args.in_audio_durations!=""): duration_paths = [path for path_spec in args.in_audio_durations.split(":") for path in glob.glob(path_spec)] if(len(duration_paths)==0): if(args.verbose): print("%sNote: No duration tables found, or none specified"%(args.verbose_prefix)) else: if(args.verbose): print("%sLoading audio durations from %d input tables.."%(args.verbose_prefix,len(annotation_paths))) for t,table_path in enumerate(duration_paths): if(table_path.lower().endswith(".xlsx")): table = pandas.read_excel(table_path, na_filter=False) else: table = pandas.read_csv(table_path, delimiter=infer_file_delimiter(table_path, default="\t"), na_filter=False, comment="#") for path,duration in zip(table[args.audio_durations_paths_column],table[args.audio_durations_column]): audio_id = (path if args.match_audio_paths else os.path.basename(path)) durations[audio_id] = duration if(args.verbose): print("%s Note: Found durations for %s%d audios"%(args.verbose_prefix,("all " if (len(durations)==len(unique_audio_paths)) else ""),len(durations))) # determine durations of any audios not listed in the input duration tables unique_audio_ids = [(path if args.match_audio_paths else os.path.basename(path)) for path in unique_audio_paths] missing_audio_ids = set.difference(set(unique_audio_ids),durations.keys()) if(len(missing_audio_ids)>0): if(args.verbose): print("%sAttempting to determine durations of %d remaining audios, by loading the audios themselves.."%(args.verbose_prefix,len(missing_audio_ids))) for audio_path,audio_id in zip(unique_audio_paths,unique_audio_ids): if(audio_id in durations): continue if(os.path.exists(audio_path)): if(audio_path.lower().endswith(".wav")): # use scipy to load the wav file, which is faster than librosa sampling_rate, audio = scipy.io.wavfile.read(audio_path) else: # use librosa to load non-wav file, since scipy only supports wav audio, sampling_rate = librosa.load(audio_path, sr=None) durations[audio_id] = audio.shape[0]/sampling_rate if(args.verbose): if(len(durations)0],[audio_duration])) else: # annotations are not overlapping, so finding gaps is straightforward gap_starts = numpy.concatenate(([0],old_ends[nontrivials])) gap_ends = numpy.concatenate((old_starts[nontrivials],[audio_duration])) # create new annotations in available (sufficiently large) gaps, updating gaps on the fly new_annotations_this_audio = [] # this will become a list of dictionaries, listing the start & end times and other info for newly added annotations for this audio move_to_next_audio = False for old_start,old_end,old_row in zip(old_starts,old_ends,old_rows): focal_time = 0.5*(old_start+old_end) for n in range(args.Nnew_per_old): min_length = ((old_end-old_start) if (args.min_new_length is None) else args.min_new_length) # create a new annotation in one of the available gaps close to (old_start,old_end) # determine gaps large enough to fit a new annotation available_gaps = numpy.where(gap_ends-gap_starts>=min_length)[0] if(len(available_gaps)==0): if(args.inadequate_gaps=="error"): abort("%s ERROR: Audio '%s' lacks sufficient gaps for adding more annotations of length >=%g"%(args.verbose_prefix,audio_id,min_length)) elif(args.inadequate_gaps=="skip"): if(args.verbose): print("%s WARNING: Audio '%s' lacks sufficient gaps for adding more annotations of length >=%g. Skipping remainder and moving to the next audio"%(args.verbose_prefix,audio_id,min_length)) move_to_next_audio = True break elif(args.inadequate_gaps=="shorten"): if(args.verbose): print("%s Note: Audio '%s' lacks sufficient gaps for adding more annotations of length >=%g. Shortening the new annotation as needed"%(args.verbose_prefix,audio_id,min_length)) Nshortened += 1 available_gaps = numpy.asarray([numpy.argmax(gap_ends-gap_starts)]) # find closest available gap if(len(available_gaps)==1): # only one gap available, so pick this one gap = available_gaps[0] align = (-1 if (gap_starts[gap]>focal_time) else +1) # align left or right depending on the location of the gap relative to the focal_time elif(gap_starts[available_gaps[-1]]focal_time): # no gaps available on the left of focal_time, so simply pick the left-most gap and align the new annotation leftwards gap = available_gaps[0] align = -1 else: # gaps are available both on the left & right of focal_time, so pick whichever is closest # we slightly prefer gaps on the left, to save space for later annotations further right right_gap = available_gaps[gap_starts[available_gaps].searchsorted(focal_time, side="left")] left_gap = available_gaps[gap_ends[available_gaps].searchsorted(focal_time, side="right")-1] align = (-1 if (gap_starts[right_gap]-focal_time<0.9999*focal_time-gap_ends[left_gap]) else +1) gap = (right_gap if (align<0) else left_gap) # define new annotation in the chosen gap # depending on whether align is +1 or -1, place the annotation on the right or left corner of the gap length = min(min_length, gap_ends[gap]-gap_starts[gap]) if(align<0): # align annotation leftward within the gap new_annotations_this_audio.append({ args.start_times_column:gap_starts[gap], args.end_times_column:(gap_starts[gap]+length), args.annotation_audio_paths_column:audio_path, "old_annotation_row":old_row}) gap_starts[gap] += length # shorten gap to account for new annotation within it else: # align annotation rightward within the gap new_annotations_this_audio.append({ args.start_times_column:(gap_ends[gap]-length), args.end_times_column:gap_ends[gap], args.annotation_audio_paths_column:audio_path, "old_annotation_row":old_row}) gap_ends[gap] -= length # shorten gap to account for new annotation within it if(move_to_next_audio): break new_annotations += new_annotations_this_audio if(args.verbose): print("%s Note: Created in total %d new annotations"%(args.verbose_prefix,len(new_annotations))) if(Nshortened>0): print("%s Note: %d of the %d new annotations were shortened to fit in the available gaps"%(args.verbose_prefix,Nshortened,len(new_annotations))) # convert new annotations from a list of dictionaries to a dataframe new_annotations = pandas.DataFrame(new_annotations) new_annotations["Selection"] = range(1,new_annotations.shape[0]+1) if(args.adopt_annotation_fields!=""): adopt_annotation_fields = old_annotations.columns[filter_name_list(names=list(old_annotations.columns), only_names=args.adopt_annotation_fields, case_sensitive=False, allow_wildcards=True)] if(len(adopt_annotation_fields)>0): if(args.verbose): print("%sAdopting %d fields from the old annotations table as-is.."%(args.verbose_prefix,len(adopt_annotation_fields))) for field in adopt_annotation_fields: new_annotations[field] = list(old_annotations.loc[new_annotations["old_annotation_row"],field]) if(args.output!=""): if(args.verbose): print("%sSaving new annotations to file.."%(args.verbose_prefix)) delimiter = infer_file_delimiter(args.output, default="\t") check_output_file(file=args.output, force=args.force, verbose=args.verbose, verbose_prefix=args.verbose_prefix+" ") with open_file(args.output, "wt") as fout: if(args.include_comments_in_output): fout.write("# New annotations, inserted in gaps close to old annotations\n# Generated on: %s\n# Used command: %s\n#\n"%(get_date_time(),get_shell_command())) new_annotations.to_csv(fout,sep=delimiter,header=True,index=False)