#!/usr/bin/env python3 # # This python script takes an audio annotation table (such as the ones generated by Raven) and one or more tables of audio segment durations, and re-defines the annotations w.r.t. to the original segments. # This may be useful if annotations were originally defined w.r.t. a composite audio (i.e., constructed by concatenating multiple audio segments), and shall now be redefined w.r.t. the original segments. # ATTENTION: Only annotations restricted within a single segment, i.e., not spanning multiple segments, can be redefined. Annotations violating this requirement will trigger an error by default. # Use as: # deconcatenate_audio_annotations.py -i "old_annotations.tsv" -s "segment_summaries/*.tsv" -f -v -o "new_annotations.tsv" # # Tested using Python 3.11, Mac OS 13.6.5. # # Stilianos Louca # Copyright 2024 # # LICENSE AGREEMENT # - - - - - - - - - # All rights reserved. # Use and redistributions of this code is permitted for commercial and non-commercial purposes, # under the following conditions: # # * Redistributions must retain the above copyright notice, this list of # conditions and the following disclaimer in the code itself, as well # as in documentation and/or other materials provided with the code. # * Neither the name of the original author (Stilianos Louca), nor the names # of its contributors may be used to endorse or promote products derived # from this code without specific prior written permission. # * Proper attribution must be given to the original author, including a # reference to any peer-reviewed publication through which the code was published. # # THIS CODE IS PROVIDED BY THE COPYRIGHT HOLDER AND CONTRIBUTORS "AS IS" AND ANY # EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES # OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. # IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS CODE, # EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # - - - - - - - - - import time import argparse import math import glob import os, sys import pandas import numpy import re import fnmatch pandas.set_option('display.max_rows', None) pandas.set_option('display.min_rows', None) pandas.set_option('display.max_colwidth', None) pandas.set_option('display.max_columns', None) FILE_EXTENSION_TO_COLUMN_DELIMITER = {"txt":"\t", "tsv":"\t", "tab":"\t", "csv":",", "dnl":"\t"} ########################################### # AUXILIARY FUNCTIONS # get the right-most extension of a file # if skip_gz==True, and the file name ends with ".gz", then the ".gz" part is ignored (for example "data.tsv.gz" yields "tsv"). def file_extension(filepath, skip_gz): if(skip_gz and filepath.lower().endswith(".gz")): filepath = filepath[:-3] extension = filepath.rsplit(".",1)[-1] return extension def file_basename(filepath): basename = os.path.split(filepath)[1] if(basename.lower().endswith(".gz")): basename = basename[:-3] basename = os.path.splitext(basename)[0] return basename # guess the column delimiter of a classical table file, based on the file extension def infer_file_delimiter(filepath, default=" "): extension = file_extension(filepath, skip_gz=True).lower() return FILE_EXTENSION_TO_COLUMN_DELIMITER.get(extension, default) # open a local file for reading/writing, supporting optionally gzip compression (file extension ".gz") # mode can be 'wt', 'rt', 'wb', 'rb' or other standard modes def open_file(filepath, mode): if(mode.startswith("w") or mode.startswith("a")): parent = os.path.dirname(filepath) if(parent!=""): os.makedirs(parent, exist_ok=True) if(filepath.lower().endswith(".gz")): return gzip.open(filepath,mode) else: return open(filepath,mode) # return the set {0,..,N-1} \ subset # subset may be a slice object or a 1D array/list of integers or a single integer def integer_complement(N, subset): keep = numpy.ones(N, dtype=bool) keep[subset,...] = False # use ellipsis at the end to accommodate situations where subset is a tuple return numpy.nonzero(keep)[0] def get_date_time(): return time.strftime("%Y.%m.%d") + " " + time.strftime("%H:%M:%S") def get_shell_command(): arguments = sys.argv; arguments = [("'"+a+"'" if (a=="" or re.search(r"[\s*#:;]", a)) else ("'"+'\\\\t'+"'" if (a=='\\t') else a)).replace("\n","") for a in arguments]; return ' '.join(arguments); # similar to numpy.cumsum, but counting starts 1 later, i.e., cumsum0 at the k-th index is equal to the sum of X[0],..,X[k-1] def cumsum0(X,axis=None): if(axis is None): C = numpy.empty(X.size, dtype=X.dtype) C[0] = 0 C[1:] = numpy.cumsum(X)[:-1] else: C = numpy.zeros_like(X) if(axis==0): C[1,...] = numpy.cumsum(X,axis=0)[:-1,...] else: multislice = [slice(None)]*X.ndim multislice[axis] = slice(0,X.shape[axis]-1) cumvalues = numpy.cumsum(X[tuple(multislice)],axis=axis) multislice[axis] = slice(1,X.shape[axis]) C[tuple(multislice)] = cumvalues return C def check_output_file(file,force,verbose,verbose_prefix): if(os.path.exists(file)): if(force): if(os.path.isdir(file)): os.rmtree(file) else: os.remove(file) if(verbose): print("%sNote: Replacing output file '%s'" % (verbose_prefix,file)) else: print("%sERROR: Output file '%s' already exists\n%s Cowardly refusing to continue.\n%s Use --force to ignore this message" % (verbose_prefix,file,verbose_prefix,verbose_prefix)) sys.exit(1) else: parent = os.path.dirname(file); if(parent!=""): os.makedirs(parent,exist_ok=True) # split a string on a single-character separator such as ':' or ',' or ';', unless it is escaped def split_escaped(haystack, separator): parts = list(re.split(r'(?=0)] else: only_names = set(only_names) items_to_keep = [i for i in items_to_keep if (names[i] in only_names)] if((omit_names is not None) and (omit_names!="")): if(isinstance(omit_names, str)): omit_names = split_escaped(omit_names, ",") if(not case_sensitive): omit_names = [name.lower() for name in omit_names] if(allow_wildcards): items_to_keep = [i for i in items_to_keep if (next((n for n,name in enumerate(omit_names) if fnmatch.fnmatchcase(names[i],name)),-1)<0)] else: omit_names = set(omit_names) items_to_keep = [i for i in items_to_keep if (names[i] not in omit_names)] return items_to_keep ########################################### # MAIN BODY # parse command line arguments parser = argparse.ArgumentParser(description="Redefine annotations to apply to the original segments of a composite audio.", epilog="") parser.add_argument('-i','--in_annotations', required=True, type=str, help="Path to an audio annotation table, listing annotations w.r.t. one or more composite audios. This table may be in CSV, TSV or Excel format, similar to the annotation tables generated by Raven. Audio files referenced in the annotations will be matched to segment lists (--in_segment_infos) based on their basenames.") parser.add_argument('-s','--in_segment_infos', required=True, type=str, help="Paths or shell wildcards to input tables listing audio segment durations. Multiple paths/wildcards must be separated by a colon. Each file must be in CSV/TSV/Excel format, and must include a header line that specifies column names. Each file corresponds to a separate composite audio, and the name of the file and the audio must match. In each file, each row corresponds to a separate segment, i.e. a separate constituent of the concatenated audio, in the order of concatenation.") parser.add_argument('-o','--output', required=True, type=str, help="Path to output table file, where the redefined annotations are to be saved. The column delimiter will be determined automatically based on the file extension."); # input/output options parser.add_argument('--segment_paths_column', default="path", type=str, help="Column name listing segment file paths in the segment info tables. These paths will be used to specify the audio files in the converted annotations. (default: '%(default)s)'") parser.add_argument('--segment_durations_column', default="duration", type=str, help="Column name listing segment durations in the segment info tables. Durations must be measured in the same time unit as the start & end times listed in the input annotations (typically seconds). (default: '%(default)s)'") parser.add_argument('--annotation_start_times_column', default="Begin Time (s)", type=str, help="Column name listing annotation start times in the input annotations tables. Time units must be the same as the segment durations listed in --in_segment_infos. (default: '%(default)s)'") parser.add_argument('--annotation_end_times_column', default="End Time (s)", type=str, help="Column name listing annotation end times in the input annotations tables. Time units must be the same as the segment durations listed in --in_segment_infos. (default: '%(default)s)'") parser.add_argument('--annotation_audio_file_column', default="Begin File", type=str, help="Column name listing audio file paths or names in the input annotations tables. Entries in this column will be replaced to point to the constituent segment paths. Only the file basenames are relevant for mapping composite audios to segment info tables. (default: '%(default)s)'") parser.add_argument('--prepend_to_segment_info_names', default="", type=str, help="Optional string to prepend as a prefix to the basenames of segment info files, before interpreting them as composite audio names. For example, if your segment info files are names 'audio1.tsv' & 'audio2.tsv' while the annotations table refers to 'composite_audio1.wav' and 'composite_audio2.wav', this may be set to 'composite_'.") parser.add_argument('--append_to_segment_info_names', default="", type=str, help="Optional string to append as a suffix to the basenames of segment info files, before interpreting them as composite audio names. For example, if your segment info files are names 'audio1.tsv' & 'audio2.tsv' while the annotations table refers to 'audio1_composite.wav' and 'audio2_composite.wav', this may be set to '_composite'.") parser.add_argument('--only_annotation_colums', default="", type=str, help="Optional comma-separated list of column names (or wildcards) to restrict to in the output annotations.") parser.add_argument('--omit_annotation_colums', default="", type=str, help="Optional comma-separated list of column names (or wildcards) to omit from the output annotations.") parser.add_argument('--include_comments_in_output', action='store_true', dest="include_comments_in_output", default=False, help='Whether to include basic overview comments in the output annotations table.'); # options for exception handling parser.add_argument('--missing_segment_info', default="error", type=str, choices=["error", "keep_unchanged", "omit"], help="How to handle cases where a segment infos table was not found for some audio sources referenced in the input annotations. 'omit' means that affected annotations will be omitted from the output. 'keep_unchanged' means that affected annotations will not be converted."); parser.add_argument('--multisegment_annotations', default="error", type=str, choices=["error", "keep_unchanged", "omit"], help="How to handle cases where an input annotation spans multiple segments. 'omit' means that affected annotations will be omitted from the output. 'keep_unchanged' means that affected annotations will not be converted."); parser.add_argument('-f','--force', action='store_true', dest="force", default=False, help='Replace existing output files without asking.'); parser.add_argument('--verbose_prefix', default=" ", help="Line prefix to be used for standard output messages. This may be useful if the script is part of another pipeline. (default: '%(default)s)'"); parser.add_argument('-v','--verbose', action='store_true', dest="verbose", default=False, help='Show lots of information.'); args = parser.parse_args() def abort(message, exit_code=1): print(message) sys.exit(exit_code) # find input segment info tables segment_info_paths = [path for path_spec in args.in_segment_infos.split(":") for path in glob.glob(path_spec)] if(args.verbose): print("%sNote: Found %d segment info tables"%(args.verbose_prefix,len(segment_info_paths))) if(len(segment_info_paths)==0): abort("%sNothing to be done"%(args.verbose_prefix), 0) # assuming that the basename of each segment-info table is the name of the corresponding composite audio composite_name2info_path = {(args.prepend_to_segment_info_names+file_basename(path)+args.append_to_segment_info_names):path for path in segment_info_paths} # load annotations if(args.verbose): print("%sLoading annotations from file.."%(args.verbose_prefix)) if(args.in_annotations.lower().endswith(".xlsx")): annotations = pandas.read_excel(args.in_annotations, na_filter=False) else: annotations = pandas.read_csv(args.in_annotations, delimiter=infer_file_delimiter(args.in_annotations), na_filter=False, comment="#") annotations["composite_audio_name"] = [file_basename(path) for path in annotations[args.annotation_audio_file_column]] unique_composite_names = sorted(set(annotations["composite_audio_name"])) if(args.verbose): print("%s Note: Found %d annotations, referencing %d distinct audios"%(args.verbose_prefix,annotations.shape[0],len(unique_composite_names))) # iterate through annotations, one composite audio at a time, loading the corresponding segment_infos table and performing the conversion omit_composite_names = set() omit_annotations = [] for composite_name in unique_composite_names: if(args.verbose): print("%sConverting annotations w.r.t. composite audio '%s'.."%(args.verbose_prefix,composite_name)) if(composite_name not in composite_name2info_path): if(args.missing_segment_info=="error"): abort("%s ERROR: Missing segment info table corresponding to composite audio '%s'"%(args.verbose_prefix,composite_name)) elif(args.missing_segment_info=="keep_unchanged"): if(args.verbose): print("%s Note: Missing segment info table corresponding to this composite audio, so affected annotations will be kept unchanged."%(args.verbose_prefix)) continue elif(args.missing_segment_info=="omit"): if(args.verbose): print("%s Note: Missing segment info table corresponding to this composite audio, so affected annotations will be omitted."%(args.verbose_prefix)) omit_composite_names.add(composite_name) # load segment infos for this composite audio segment_info_path = composite_name2info_path[composite_name] if(segment_info_path.lower().endswith(".xlsx")): segment_infos = pandas.read_excel(segment_info_path, na_filter=False) else: segment_infos = pandas.read_csv(segment_info_path, delimiter=infer_file_delimiter(segment_info_path), na_filter=False, comment="#") if(args.verbose): print("%s Note: Composite audio comprises %d segments, according to the loaded segment info table"%(args.verbose_prefix,segment_infos.shape[0])) # determine the segments spanned by annotations referencing this composite audio segment_starts = cumsum0(segment_infos["duration"]) # position of each segment's start in the composite audio focal_annotations = numpy.where(annotations["composite_audio_name"]==composite_name)[0] start_segments = segment_starts.searchsorted(annotations[args.annotation_start_times_column].iloc[focal_annotations], side="right")-1 end_segments = segment_starts.searchsorted(annotations[args.annotation_end_times_column].iloc[focal_annotations], side="right")-1 if(numpy.any(start_segments!=end_segments)): multisegment_annotations = focal_annotations[start_segments!=end_segments] if(args.multisegment_annotations=="error"): abort("%s ERROR: %d annotations span multiple segments, which is not allowed"%(args.verbose_prefix,len(multisegment_annotations))) elif(args.multisegment_annotations=="omit"): if(args.verbose): print("%s WARNING: %d annotations span multiple segments, which is not allowed. These will be omitted from the output."%(args.verbose_prefix,len(multisegment_annotations))) omit_annotations += list(multisegment_annotations) elif(args.multisegment_annotations=="keep_unchanged"): if(args.verbose): print("%s WARNING: %d annotations span multiple segments, which is not allowed. These will be kept unchanged."%(args.verbose_prefix,len(multisegment_annotations))) unisegment_annotations = integer_complement(N=focal_annotations.size, subset=multisegment_annotations) focal_annotations = focal_annotations[unisegment_annotations] start_segments = start_segments[unisegment_annotations] end_segments = end_segments[unisegment_annotations] # convert annotations referencing this composite audio annotations.iloc[focal_annotations,annotations.columns.get_loc(args.annotation_start_times_column)] -= segment_starts[start_segments] annotations.iloc[focal_annotations,annotations.columns.get_loc(args.annotation_end_times_column)] -= segment_starts[start_segments] annotations.iloc[focal_annotations,annotations.columns.get_loc(args.annotation_audio_file_column)] = segment_infos[args.segment_paths_column].iloc[start_segments] # remove any annotations flagged for removal if(len(omit_composite_names)>0): omit_annotations += [a for a,composite_name in enumerate(annotations["composite_audio_name"]) if (composite_name in omit_composite_names)] if(len(omit_annotations)>0): keep_annotations = integer_complement(N=annotations.shape[0],subset=numpy.unique(omit_annotations)) if(args.verbose): print("%sNote: Omitting %d annotations that could not be converted"%(args.verbose_prefix,len(omit_annotations))) annotations = annotations.iloc[keep_annotations,:] if((args.only_annotation_colums!="") or (args.omit_annotation_colums!="")): keep_columns = filter_name_list(names = list(annotations.columns), only_names = args.only_annotation_colums, omit_names = args.omit_annotation_colums, case_sensitive = False, allow_wildcards = True) if(len(keep_columns)