#!/usr/bin/env python3 # # This python script examines and summarizes multiple audio files, for example in terms of their duration and RMS. # Optionally, the script can also plot spectrograms of the audio files as PDFs. # Audio files are either specified using a wildcard (e.g. pointing to all wav or flac files in a directory) or using a text file that lists audio file paths (one file path per row). # CAUTION: Non-wav files are loaded using the librosa module, which sometimes normalizes audio to be within [-1,1]. Hence, the computed RMS will not be the true RMS of the original audio. # # The script can also apply more sophisticated normalization to loaded audios, for example normalizing the RMS within a specific time interval and even restricted to a specific frequerncy range. # This allows for harmonizing data from different microphones that may have different sensitivities, for example by adding a strong known harmonic sound at a specific frequency for a specific limited time. # # Example usage, normalizing audios based on an "injected" calibration sine wave at 100 Hz played during the first 30 sec. # summarize_audios.py -i "my_audio_files/*.wav" -f -v -o summaries.tsv --normalize "rms" --normalize_within_frequencies "99:101" --normalize_start_time 0 --normalize_end_time 30 --start_time 30 # # Tested using Python 3.11, Mac OS 14.5 # # Stilianos Louca # Copyright 2024 # # LICENSE AGREEMENT # - - - - - - - - - # All rights reserved. # Use and redistributions of this code is permitted for commercial and non-commercial purposes, # under the following conditions: # # * Redistributions must retain the above copyright notice, this list of # conditions and the following disclaimer in the code itself, as well # as in documentation and/or other materials provided with the code. # * Neither the name of the original author (Stilianos Louca), nor the names # of its contributors may be used to endorse or promote products derived # from this code without specific prior written permission. # * Proper attribution must be given to the original author, including a # reference to any peer-reviewed publication through which the code was published. # # THIS CODE IS PROVIDED BY THE COPYRIGHT HOLDER AND CONTRIBUTORS "AS IS" AND ANY # EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES # OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. # IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS CODE, # EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # - - - - - - - - - import time import argparse import librosa # for importing non-wav files import scipy import math import glob import os, sys import numpy import re import matplotlib import matplotlib.pyplot as pyplot import pandas from numpy import nan, inf import fnmatch from joblib import Parallel, delayed from joblib import parallel_backend import random FILE_EXTENSION_TO_COLUMN_DELIMITER = {"txt":"\t", "tsv":"\t", "tab":"\t", "csv":",", "dnl":"\t"} ########################################### # AUXILIARY FUNCTIONS # get the right-most extension of a file # if skip_gz==True, and the file name ends with ".gz", then the ".gz" part is ignored (for example "data.tsv.gz" yields "tsv"). def file_extension(filepath, skip_gz): if(skip_gz and filepath.lower().endswith(".gz")): filepath = filepath[:-3] extension = filepath.rsplit(".",1)[-1] return extension # guess the column delimiter of a classical table file, based on the file extension def infer_file_delimiter(filepath, default=" "): extension = file_extension(filepath, skip_gz=True).lower() return FILE_EXTENSION_TO_COLUMN_DELIMITER.get(extension, default) # return the set {0,..,N-1} \ subset # subset may be a slice object or a 1D array/list of integers or a single integer def integer_complement(N, subset): keep = numpy.ones(N, dtype=bool) keep[subset,...] = False # use ellipsis at the end to accommodate situations where subset is a tuple return numpy.nonzero(keep)[0] # open a local file for reading/writing, supporting optionally gzip compression (file extension ".gz") # mode can be 'wt', 'rt', 'wb', 'rb' or other standard modes def open_file(filepath, mode): if(mode.startswith("w") or mode.startswith("a")): parent = os.path.dirname(filepath) if(parent!=""): os.makedirs(parent, exist_ok=True) if(filepath.lower().endswith(".gz")): return gzip.open(filepath,mode) else: return open(filepath,mode) def load_first_column_from_file(filepath, delimiter=None, comment_prefix="#", header=False): if(delimiter is None): delimiter = infer_file_delimiter(filepath, default=" ") with open_file(filepath,"rt") as fin: lines = list(filter(len,[line.split(comment_prefix,1)[0] for line in fin])) if(header): lines = lines[1:] # omit the first data line, as it contains the table header first_column_entries = [(line.split(None,1) if (delimiter=="") else line.split(delimiter,1))[0].strip() for line in lines] return first_column_entries def get_date_time(): return time.strftime("%Y.%m.%d") + " " + time.strftime("%H:%M:%S") def get_shell_command(): arguments = sys.argv; arguments = [("'"+a+"'" if (a=="" or re.search(r"[\s*#:;]", a)) else ("'"+'\\\\t'+"'" if (a=='\\t') else a)).replace("\n","") for a in arguments]; return ' '.join(arguments); def float_or_nan(s): try: f = float(s) return f except ValueError: return nan def check_output_file(file,force,verbose,verbose_prefix): if(os.path.exists(file)): if(force): if(os.path.isdir(file)): shutil.rmtree(file) else: os.remove(file) if(verbose): print(("%sNote: Replacing output file '%s'" % (verbose_prefix,file))) else: print(("%sERROR: Output file '%s' already exists\n%s Cowardly refusing to continue.\n%s Use --force to ignore this message" % (verbose_prefix,file,verbose_prefix,verbose_prefix))) sys.exit(1) else: parent = os.path.dirname(file); if(parent!=""): os.makedirs(parent, exist_ok=True) ############################################################## # improved dictionary, with more flexible syntax for assigning & querying members. # Use e.g., as: # mydict = IDict(x=10, y=20, city="Vancouver") # z = mydict.x + mydict.y # print(mydict.city) # print(hasattr(mydict, "city")) # this will print True # Since it is a subclass of the standard dict, one can also use all of classical dict syntax, for example: # print(mydict.keys()) # print(mydict.values()) # print(mydict["city"]) # print(mydict.get("town", None)) # print("city" in mydict) # this will print True # One can even pass a IDict as a dictionary of arguments to a function, for example: # myfunction(**mydict) class IDict(dict): __delattr__ = dict.__delitem__ # add or modify member elements in-place, and return self # this replaces the standard dict.update() method, the only difference being that it returns self instead of None. def update(self,**kwargs): super().update(kwargs) return self # function for creating a shallow copy of the IDict instance and adding (or modifying) one or more member elements # use for example as: # newdict = olddict.copy_and_update(nearest_town="Whistler") # print(newdict.nearest_town) # Note that copying is not deep, i.e., the values stored in the parent & child may both still refer to the same objects. def copy_and_update(self,**kwargs): new = copy.copy(self) new.update(**kwargs) return new # enable retrieving a member value using the syntax . # standard dictionary attributes take precedence over stored keys, so you should avoid storing keys likely to be standard dict attributes def __getattr__(self, key): if(key.startswith("__")): return dict.__getattribute__(self, key) try: return dict.__getattribute__(self, key) except: return self[key] # enable setting a member value using the syntax . = __setattr__ = dict.__setitem__ # execute a target function multiple times in parallel (if reasonable) # For fast functions jobs are first grouped into batches to reduce the overhead of dispatching lots of parallel jobs def run_parallel(function, # a callable (e.g., a function), to be executed multiple times. This function should take a single positional argument. If you want to run a function that takes multiple argument, use lambda as a wrapper. arguments, # a sequence (e.g., list) of length N, where N is the number of times the function should be executed. Each element of arguments[] is the single input argument for a single function call. Nthreads = None, # integer, number of parallel threads to use. Interpreted as specified in parse_Nthreads(). If 1, a simple loop is used without parallelization; a loop i used on purpose to facilitate debugging. fast = False, # whether the function is fast, i.e., it takes a short time to execute a single call compared to the overhead associated with dispatching jobs to processes. prefer = None, # either None, "processes" or "threads" randomize= False): # randomize the order in which tasks are executed, if parallelized. This may help with load balancing across threads. Nthreads = (Nthreads if (Nthreads>0) else max(1, os.cpu_count()+Nthreads)) N = len(arguments) order = list(range(N)) if(randomize): random.shuffle(order) if((Nthreads>1) and (N>1)): if(fast): # group jobs into batches # for fast functions this can speed up computation by ~ 25%. batch_size = max(1,int(0.2*N/Nthreads)) Nbatches = math.ceil(N/batch_size) def aux_run_batch(b): return [function(arguments[order[a]]) for a in range(b*batch_size,min((b+1)*batch_size,N))] results = Parallel(n_jobs=min(N,Nthreads), prefer=prefer, verbose=0)(delayed(aux_run_batch)(b) for b in range(Nbatches)) results = [r for rr in results for r in rr] # flatten results, i.e. unwrap batches else: # run standard parallelized results = Parallel(n_jobs=min(N,Nthreads), prefer=prefer, verbose=0)(delayed(function)(arguments[order[a]]) for a in range(N)) if(randomize): inverse_order = invert_permutation(order) results = [results[k] for k in inverse_order] return results else: results = [None]*len(arguments) for a,argument in enumerate(arguments): results[a] = function(argument) return results # return a list of indices corresponding to names to keep # if start_pool[] is provided, the order is preserved in the returned subset. Otherwise, the returned list will be sorted in ascending order. def filter_names(names, start_pool = None, # starting pool to further filter. This will typically be range(len(names)), but can also be a subset thereof in case of iterative filtering. Can also be None (equivalent to range(len(names))) only_names = None, # optional list or set of strings, or a single string. If a single string, it is split at commas. If None or "" or an empty sequence, this filter is ignored. Name wildcards to keep. omit_names = None, # optional list or set of strings, or a single string. If a single string, it is split at commas. If None or "" or an empty sequence, this filter is ignored. Name wildcards to exclude. case_sensitive = False, allow_wildcards = False): items_to_keep = (list(range(len(names))) if (start_pool is None) else start_pool) if(not case_sensitive): names = [name.lower() for name in names]; if((only_names is not None) and (len(only_names)>0)): if(isinstance(only_names,str)): only_names = split_escaped(only_names,',') if(not case_sensitive): only_names = [name.lower() for name in only_names]; if(allow_wildcards): items_to_keep = [i for i in items_to_keep if (next((n for n,name in enumerate(only_names) if fnmatch.fnmatchcase(names[i],name)),-1)>=0)] else: only_names = set(only_names) items_to_keep = [i for i in items_to_keep if (names[i] in only_names)] if((omit_names is not None) and (len(omit_names)>0)): if(isinstance(omit_names,str)): omit_names = split_escaped(omit_names,',') if(not case_sensitive): omit_names = [name.lower() for name in omit_names]; if(allow_wildcards): items_to_keep = [i for i in items_to_keep if (next((n for n,name in enumerate(omit_names) if fnmatch.fnmatchcase(names[i],name)),-1)<0)] else: omit_names = set(omit_names) items_to_keep = [i for i in items_to_keep if (names[i] not in omit_names)] return items_to_keep # apply a lowpass/highpass/bandpass filter to a single- or multi-channel audio stream def filter_frequencies( audio, # 1D or 2D numpy array of size Ntimes x Nchannels sampling_rate, # integer, number of sampling points per second min_frequency = 0, # float, minimum frequency (Hz) to retain. Set to <=0 to only apply a low-pass filter. max_frequency = inf): # float, maximum frequency (Hz) to retain. Set to inf to only apply a high-pass filter if((min_frequency<=0) and (not numpy.isfinite(max_frequency))): # nothing to do return audio elif(min_frequency<=0): # low-pass filter Wn = max_frequency btype = "lowpass" elif(not numpy.isfinite(max_frequency)): # high-pass filter Wn = min_frequency btype = "highpass" else: # bandpass filter Wn = (min_frequency, max_frequency) btype = "bandpass" # construct Butterworth filter filt = scipy.signal.butter(N=10, Wn=Wn, btype=btype, analog=False, fs=sampling_rate, output="sos") # apply filter filtered = scipy.signal.sosfilt(sos=filt, x=audio, axis=0) return filtered # Simple trapezoid integration of a function along a single axis def integrate_along_axis(X, Y, minX=-inf, maxX=inf, axis=0, average=False): start = numpy.searchsorted(X, minX, side="left") end = numpy.searchsorted(X, maxX, side="right") if(end<=start+1): return (Y[start] if average else 0) non_integrated_axes = tuple(range(axis))+tuple(range(axis+1,Y.ndim)) integral = numpy.sum(numpy.expand_dims(numpy.diff(X[start:end]),axis=non_integrated_axes)*0.5*(numpy.take(Y, indices=range(start,end-1), axis=axis)+numpy.take(Y, indices=range(start+1,end), axis=axis)),axis=axis) if(average): integral /= (X[end-1]-X[start]) return integral # estimate the power spectral density (PSD) or the modulus of the Fourier spectrum of a real-valued regularly sampled signal # The input signal may have multiple channels, in which case a separate spectrum is computed for each channel def get_spectrum(audio, # 1D numpy array of size NR or 2D numpy array of size NR x Nchannels, the signal to be processed sampling_rate, # sampling rate in Hz Nsegments = 1, # integer, number of Bartlett segments (i.e., consecutive non-overlapping segments) into which to split the signal. Increasing this sacrifices frequency resolution (i.e., increases the base frequency) in favor of accuracy. The classical (basic) Fourier spectrum is obtained for Nsegments=1 and Ninterleaves=1. If <=0, this is chosen automatically based on freq_step. Ninterleaves = 1, # integer, number of interleaved subseries into which to split each Bartlett segment. Increasing this sacrifices frequency span (i.e., reduces the maximum covered frequency) in favor of accuracy. The classical (basic) Fourier spectrum is obtained for Nsegments=1 and Ninterleaves=1. If <=0, this is chosen automatically based on max_frequency. min_frequency = 0, max_frequency = inf, # maximum requested frequency. If Ninterleaves==0, then max_frequency is used to choose the optimal Ninterleaves, i.e. achieving greatest accuracy for the spectrogram while still capturing frequencies up to max_frequency. freq_step = 0, PSD = True, # whether to estimate the power spectral density (PSD) rather than the complex-valued Fourier spectrum. Nthreads = 1): audio = audio - numpy.mean(audio, axis=0, keepdims=True) # centralize audio to avoid the zero-frequency duration = audio.shape[0]/sampling_rate Nsegments = min(audio.shape[0]//2, (max(1,int(duration*freq_step-1)) if (Nsegments<=0) else Nsegments)) Ninterleaves = min(audio.shape[0]//2, (max(1,int(0.5*sampling_rate/max_frequency)-1) if (Ninterleaves<=0) else Ninterleaves)) if((Ninterleaves<=1) and (Nsegments<=1)): # basic estimator, no splitting of data spectrum = numpy.abs(scipy.fft.rfft(x=audio, axis=0, workers=args.Nthreads, norm="backward")) if(PSD): spectrum = numpy.abs(spectrum)**2 spectrum /= sampling_rate * audio.shape[0] # apply proper normalization factor frequencies = (sampling_rate/audio.shape[0])*numpy.arange(spectrum.shape[0]) else: if(Ninterleaves==1): # split signal into non-overlapping consecutive segments (no interleaving), compute spectrum for each segment, then average across segments # note that averaging should be done AFTER taking the modulus (in the case of a Fourier spectrum) or squared modulus (in the case of a PSD) segment_length = audio.shape[0]//Nsegments partial_spectra = run_parallel(function = lambda s: numpy.abs(scipy.fft.rfft(x=audio[(s*segment_length):((s+1)*segment_length),:], axis=0, workers=1, norm="backward")), arguments = range(Nsegments), Nthreads = Nthreads, fast = True) #partial_spectra = [numpy.abs(scipy.fft.rfft(x=audio[(s*segment_length):((s+1)*segment_length),:], axis=0, workers=args.Nthreads, norm="backward")) for s in range(Nsegments)] if(PSD): spectrum = numpy.mean([numpy.abs(ps)**2 for ps in partial_spectra],axis=0) else: spectrum = numpy.mean(partial_spectra,axis=0) spectrum /= sampling_rate * segment_length # apply proper normalization factor frequencies = (sampling_rate/segment_length)*numpy.arange(spectrum.shape[0]) elif(Nsegments==1): # split signal into multiple interleaved sub-series of equal length, compute spectrum for each sub-series, then average across sub-series # note that averaging should be done AFTER taking the modulus (in the case of a Fourier spectrum) or squared modulus (in the case of a PSD) min_length = (audio.shape[0]-Ninterleaves+1)//Ninterleaves partial_spectra = run_parallel(function = lambda s: numpy.abs(scipy.fft.rfft(x=audio[numpy.arange(s,audio.shape[0],Ninterleaves)[:min_length],:], axis=0, workers=1, norm="backward")), arguments = range(Ninterleaves), Nthreads = Nthreads, fast = True) #partial_spectra = [numpy.abs(scipy.fft.rfft(x=audio[numpy.arange(s,audio.shape[0],Ninterleaves)[:min_length],:], axis=0, workers=args.Nthreads, norm="backward")) for s in range(Ninterleaves)] if(PSD): spectrum = numpy.mean([numpy.abs(ps)**2 for ps in partial_spectra],axis=0) else: spectrum = numpy.mean(partial_spectra,axis=0) spectrum /= sampling_rate * min_length # apply proper normalization factor frequencies = (sampling_rate/audio.shape[0])*numpy.arange(spectrum.shape[0]) else: # combination of 'Bartlett' & 'interleaved', i.e., split signal into non-overlapping consecutive segments and then split each segment into interleaving sub-series segment_length = audio.shape[0]//Nsegments partial_spectra = run_parallel( function = lambda s: get_spectrum(audio[(s*segment_length):((s+1)*segment_length),:], sampling_rate=sampling_rate, Nsegments=1, Ninterleaves=Ninterleaves, PSD=PSD, Nthreads=1).spectrum, arguments = range(Nsegments), Nthreads = Nthreads, fast = True) spectrum = numpy.mean(partial_spectra, axis=0) frequencies = (sampling_rate/segment_length)*numpy.arange(spectrum.shape[0]) # filter frequencies if needed if((max_frequency0)): keep_freqs = (frequencies<=max_frequency) & (frequencies>=min_frequency) frequencies, spectrum = frequencies[keep_freqs], spectrum[keep_freqs,:] return IDict(frequencies=frequencies, spectrum=spectrum, Nsegments=Nsegments, Ninterleaves=Ninterleaves) def analyze_spectrum(audio, # 1D numpy array of size NR or 2D numpy array of size NR x Nchannels, the signal to be processed sampling_rate, # sampling rate in Hz Nsegments = 1, # integer, number of Bartlett segments (i.e., consecutive non-overlapping segments) into which to split the signal. Increasing this sacrifices frequency resolution (i.e., increases the base frequency) in favor of accuracy. The classical (basic) Fourier spectrum is obtained for Nsegments=1 and Ninterleaves=1. If <=0, this is chosen automatically based on freq_step. Ninterleaves = 1, # integer, number of interleaved subseries into which to split each Bartlett segment. Increasing this sacrifices frequency span (i.e., reduces the maximum covered frequency) in favor of accuracy. The classical (basic) Fourier spectrum is obtained for Nsegments=1 and Ninterleaves=1. If <=0, this is chosen automatically based on max_frequency. freq_step = 0, min_frequency = 0, max_frequency = inf, # maximum requested frequency. If Ninterleaves==0, then max_frequency is used to choose the optimal Ninterleaves, i.e. achieving greatest accuracy for the spectrogram while still capturing frequencies up to max_frequency. max_value = None, # optional maximum value shown on the vertical axis PSD = True, # whether to estimate the power spectral density (PSD) rather than the modulus of the complex-valued Fourier spectrum. plot_file = "", # optional file path for plotting the spectrum plot_title = "", focal_intervals = None): # optional sequence of 2-tuples specifying focal frequency intervals if(audio.ndim==1): audio = numpy.expand_dims(audio,axis=1) # make 2D even if only one channel, for consistency below duration = audio.shape[0]/sampling_rate spectrum = get_spectrum(audio=audio, sampling_rate=sampling_rate, Nsegments=Nsegments, Ninterleaves=Ninterleaves, freq_step=freq_step, min_frequency=min_frequency, max_frequency=max_frequency, PSD=PSD, Nthreads=args.Nthreads) plot_title += ("\nAll %d channels"%(audio.shape[1]) if (audio.shape[1]>1) else "\nSingle-channel audio") + ", duration %g s\nNsegments=%d, Ninterleaves=%d"%(duration,spectrum.Nsegments,spectrum.Ninterleaves) # compute integrated powers in focal frequency intervals if needed, averaged over all channels if((focal_intervals is None) or (len(focal_intervals)==0)): focal_powers = [] else: power = (spectrum.spectrum if PSD else numpy.abs(spectrum.spectrum)**2) focal_powers = [numpy.mean(integrate_along_axis(X=spectrum.frequencies, Y=power, minX=interval[0], maxX=interval[1], axis=0, average=False)) for interval in focal_intervals] plot_title += "\nPowers in frequency intervals: "+", ".join("%.3g in [%g,%g]"%(focal_powers[i],interval[0],interval[1]) for i,interval in enumerate(focal_intervals)) # plot spectrum as a curve (one subplot per channel) if(plot_file!=""): fig, axes = pyplot.subplots(nrows=audio.shape[1], ncols=1, figsize=(args.plot_width+2, args.plot_height+2), sharex=True, squeeze=False) axes = axes.ravel() for channel,ax in enumerate(axes): ax.plot(spectrum.frequencies, spectrum.spectrum[:,channel], linewidth=1, color="#1f598c") if(channel==0): ax.set_title(plot_title) if(channel==audio.shape[1]-1): ax.set_xlabel("frequency (Hz)") ax.set_ylabel(("PSD" if PSD else "|Fourier spectrum|")) if(args.log_frequency): ax.set_xscale("log") if(args.log_spectrum): ax.set_yscale("log") if(max_value is not None): ax.set_ylim((0,max_value)) fig.savefig(plot_file, dpi=200, format=os.path.splitext(plot_file)[1][1:], bbox_inches='tight') pyplot.close(fig) return focal_powers def save_powergram( audio, sampling_rate, window_span = 0, plot_file = "", # optional file path for plotting the spectrum plot_title = ""): if(audio.ndim==1): audio = numpy.expand_dims(audio,axis=1) # make 2D even if only one channel, for consistency below if(window_span<=0): window_span = 0.1*audio.shape[0]/sampling_rate window_length = min(audio.shape[0],int(window_span*sampling_rate)) window_starts = numpy.arange(0,audio.shape[0]-window_length+1,window_length) window_centers= window_starts+int(window_length/2) RMSs = numpy.asarray([numpy.sqrt(numpy.mean(audio[start:(start+window_length),...]**2,axis=0)) for start in window_starts]) # plot powergram as a curve over time (one subplot per channel) plot_title += "\nRolling window span %g"%(window_span) + ("\nAll %d channels"%(audio.shape[1]) if (audio.shape[1]>1) else "\nSingle-channel audio") if(plot_file!=""): fig, axes = pyplot.subplots(nrows=audio.shape[1], ncols=1, figsize=(args.plot_width+2, args.plot_height+2), sharex=True, squeeze=False) axes = axes.ravel() for channel,ax in enumerate(axes): ax.plot((window_starts+0.5*window_length)/sampling_rate, RMSs[:,channel], linewidth=1, color="#1f598c") if(channel==0): ax.set_title(plot_title) if(channel==audio.shape[1]-1): ax.set_xlabel("time (sec)") ax.set_ylabel("RMS") fig.savefig(plot_file, dpi=200, format=os.path.splitext(plot_file)[1][1:], bbox_inches='tight') pyplot.close(fig) # Normalize an entire multichannel audio, based on the RMS (or some other intensity metric) measured in a specific time and frequency interval. # For example, an audio may be infused with a temporary "calibration" sine wave at a specific frequency, and you wish to rescale the entire audio so that the infused signal's RMS is 1. def normalize_audio(audio, sampling_rate, normalize = "none", # either "none" or "rms" or "max_abs" or "mean_abs" normalize_start_time = 0, # optional start time (sec) to focus on when computing the normalization factor normalize_end_time = inf, # optional end time (sec) to focus on when computing the normalization factor normalize_within_frequencies = ""): # optional frequency range (Hz) to focus on when computing the normalization factor, in the format :. if(normalize=="none"): return audio, 1 # nothing to do focal_audio = audio # trim audio if needed if((normalize_start_time!=0) or (normalize_end_time!=inf)): duration = focal_audio.shape[0]/sampling_rate normalize_start_time = max(0,min(duration,(normalize_start_time if (normalize_start_time>=0) else duration+normalize_start_time))) normalize_end_time = max(0,min(duration,(normalize_end_time if (normalize_end_time>=0) else duration+normalize_end_time))) if(normalize_end_time<=normalize_start_time): if(args.verbose): print("%s WARNING: Audio duration (%g sec) is too short for normalization"%(args.verbose_prefix,duration)) return None, nan elif((normalize_start_time>0) or (normalize_end_time=1, number of Bartlett segments (i.e., consecutive non-overlapping segments) into which to split the signal within each rolling window. Increasing this sacrifices frequency resolution (i.e., increases the base frequency) in favor of accuracy. The classical (basic) spectrogram is obtained for Nsegments=1 and Ninterleaves=1. Ninterleaves = 1, # integer >=1, number of interleaved subseries into which to split each Bartlett segment. Increasing this sacrifices frequency span (i.e., reduces the maximum covered frequency) in favor of accuracy. The classical (basic) spectrogram is obtained for Nsegments=1 and Ninterleaves=1. freq_step = 0, # desired resolution in frequency space, measured in Hz. min_frequency = 0, max_frequency = inf, # maximum requested frequency (Hz). If Ninterleaves==0, then max_frequency is used to choose the optimal Ninterleaves, i.e. achieving greatest accuracy for the spectrogram while still capturing frequencies up to max_frequency. PSD = True): # whether to estimate the power spectral density (PSD) rather than the modulus of the complex-valued Fourier spectrum. if(audio.ndim==1): audio = numpy.expand_dims(audio,axis=1) # make 2D even if only one channel, for consistency below # determine rolling window locations window_length = min(audio.shape[0],int(window_span*sampling_rate)) hop_length = int(window_length/4) window_starts = numpy.arange(0,audio.shape[0]-window_length+1,hop_length) window_centers = window_starts+int(window_length/2) # compute spectrum in each rolling window, then combine all into a 3D array of size Nwindows x Nfrequencies x Nchannels spectra = run_parallel( function = lambda start: get_spectrum(audio = audio[start:(start+window_length),...], sampling_rate = sampling_rate, Nsegments = Nsegments, Ninterleaves = Ninterleaves, freq_step = freq_step, min_frequency = min_frequency, max_frequency = max_frequency, PSD = PSD, Nthreads = 1), arguments = window_starts, Nthreads = args.Nthreads, fast = False) frequencies = spectra[0].frequencies spectrogram = numpy.asarray([spectrum.spectrum for spectrum in spectra]) return IDict(frequencies=frequencies, spectrogram=spectrogram, window_centers=window_centers, Nsegments=spectra[0].Nsegments, Ninterleaves=spectra[0].Ninterleaves) def analyze_spectrogram(audio, sampling_rate, window_span, # rolling window span (seconds). The longer this is the more accurate the spectra will be and/or the greater resolution will be in frequency space (depending on freq_step & Ninterleaves), at the expense of reducing temporal resolution. If <=0, this is chosen heuristically. Nsegments = 1, # integer, number of Bartlett segments (i.e., consecutive non-overlapping segments) into which to split the signal in each rolling window. Increasing this sacrifices frequency resolution (i.e., increases the base frequency) in favor of accuracy. The classical (basic) Fourier spectrum is obtained for Nsegments=1 and Ninterleaves=1. If <=0, this is chosen automatically based on freq_step. Ninterleaves = 1, # integer, number of interleaved subseries into which to split each Bartlett segment. Increasing this sacrifices frequency span (i.e., reduces the maximum covered frequency) in favor of accuracy. The classical (basic) Fourier spectrum is obtained for Nsegments=1 and Ninterleaves=1. If <=0, this is chosen automatically based on max_frequency. freq_step = 0, # resolution in frequency space (Hz). min_frequency = 0, max_frequency = inf, # maximum requested frequency. If Ninterleaves==0, then max_frequency is used to choose the optimal Ninterleaves, i.e. achieving greatest accuracy for the spectrogram while still capturing frequencies up to max_frequency. PSD = True, # whether to estimate the power spectral density (PSD) rather than the modulus of the complex-valued Fourier spectrum. plot_file = "", plot_title = ""): if(audio.ndim==1): audio = numpy.expand_dims(audio,axis=1) # make 2D even if only one channel, for consistency below audio = audio - numpy.mean(audio, axis=0, keepdims=True) # centralize audio to avoid the zero-frequency duration = audio.shape[0]/sampling_rate window_span = (0.1*duration if (window_span<=0) else window_span) # duration = audio.shape[0]/sampling_rate # window_span = (max(512/sampling_rate,duration/200) if (window_span<=0) else window_span) # window_length = min(audio.shape[0],int(window_span*sampling_rate)) # compute rolling-window spectrum, aka. spectrogram # hop_length = int(window_length/4) # spectrum = numpy.swapaxes(librosa.stft(y=audio.T, win_length=window_length, n_fft=window_length, window="hann", hop_length=hop_length), axis1=0, axis2=2) # spectrum will be a 3D aray of size Nwindows x Nfrequencies x Nchannels # frequencies = (1/window_span)*numpy.arange(spectrum.shape[-2]) # filter frequencies in the spectrogram if needed # keep_freqs = (frequencies<=args.max_frequency) & (frequencies>=args.min_frequency) # frequencies, spectrum = frequencies[keep_freqs], spectrum[:,keep_freqs,:] spectrogram = get_spectrogram(audio = audio, sampling_rate = sampling_rate, window_span = window_span, Nsegments = Nsegments, Ninterleaves = Ninterleaves, freq_step = freq_step, min_frequency = min_frequency, max_frequency = max_frequency, PSD = PSD) # plot spectrogram as contourplot (one plot per channel) if(plot_file!=""): plot_title += ("\nAll %d channels"%(audio.shape[1]) if (audio.shape[1]>1) else "\nSingle-channel audio") + ", duration %g s\nNsegments=%d, Ninterleaves=%d, window span %g sec"%(duration,spectrogram.Nsegments,spectrogram.Ninterleaves,window_span) fig, axes = pyplot.subplots(nrows=audio.shape[1], ncols=1, figsize=(args.plot_width+2, args.plot_height+2), sharex=True, squeeze=False) axes = axes.ravel() for channel,ax in enumerate(axes): channel_spectrum = numpy.abs(spectrogram.spectrogram[...,channel]) min_nonzero = numpy.min(channel_spectrum[channel_spectrum>0]) if(args.log_spectrum): color_levels = numpy.concatenate(([0],numpy.geomspace(min_nonzero, numpy.max(channel_spectrum), num=500))) else: color_levels = 500 cplot = ax.contourf( spectrogram.window_centers/sampling_rate, spectrogram.frequencies, channel_spectrum.T, norm = ("log" if args.log_spectrum else "linear"), levels = color_levels, cmap = "inferno") cplot.set_edgecolor("face") if(channel==0): ax.set_title(plot_title) if(channel==audio.shape[1]-1): ax.set_xlabel("time (s)") ax.set_ylabel("frequency (Hz)") if(args.log_frequency): ax.set_yscale("log") fig.savefig(plot_file, dpi=200, format=os.path.splitext(plot_file)[1][1:], bbox_inches='tight') pyplot.close(fig) ########################################### # MAIN BODY # parse command line arguments parser = argparse.ArgumentParser(description="Summarize audio clips.", epilog="") parser.add_argument('-i','--in_audios', required=True, type=str, help="Either a shell wildcard pointing to one or more audio files (e.g., .wav or .flac files), or a single path to a text file listing audio file paths in the first column (one path per line, both absolute and relative paths are allowed). A text file is assumed if this path ends with a typical tabular text file extension such as csv or tsv; the column delimiter (if needed) is inferred based on the file extension.") parser.add_argument('--in_config', default="", type=str, help="Optional path to an input table specifying settings for individual audios, such as specific start_time & end_time. Each row specifies an audio name or a wildcard representing multiple audios (in column 'name') and the corresponding settings to use for that audio. Columns considered (by default and only if present) include 'start_time', 'end_time', 'min_frequency', 'max_frequency', 'normalize', 'normalize_start_time', 'normalize_end_time', 'normalize_within_frequencies', 'spectrum_Nsegments', 'spectrum_Ninterleaves', 'spectrum_freq_step', 'spectrum_focal_intervals' and a few others. To use different column names see --config_namings. The table must have a non-comment header line containing column names. Argument values specified in this table take priority over the command line arguments provided to the script.") parser.add_argument('--config_namings', default="", type=str, help="Optional comma-separated list of argname:colname pairs specifying in which column to seek which argument, in the in_config table. By default, argument values are sought in columns with the same names as the script's command line arguments, for example 'start_time' and 'end_time'.") parser.add_argument('-o','--out_summaries', default="", type=str, help="Optional path to output table where all summaries should be written."); parser.add_argument('-a','--out_spectra', default="", type=str, help="Optional path to output directory where all Fourier spectra (modulus only) should be saved as PDFs (one file per input audio). For multi-channel audios, a separate spectrum is computed for each channel and saved as a separate subplot."); parser.add_argument('-s','--out_spectrograms', default="", type=str, help="Optional path to output directory where all spectrograms should be saved as PDFs (one file per input audio). For multi-channel audios, a separate spectrogram is computed for each channel and saved as a separate subplot."); parser.add_argument('-p','--out_powergrams', default="", type=str, help="Optional path to output directory for plotting powergrams (rolling-window RMS over time) as PDFs (one file per input audio). For multi-channel audios, a separate powergram is computed for each channel and saved as a separate subplot."); parser.add_argument('--in_audios_table_has_header', action='store_true', dest="in_audios_table_has_header", default=False, help='Specify that the --in_audios table (if applicable) has a header line listing column names. Only relevant if --in_audios points to a text file.'); parser.add_argument('--interpred_audio_paths', default="relative_to_cd", type=str, choices=["relative_to_cd", "relative_to_list", "names_near_list"], help="How to interpret the file paths listed in --in_audios (if applicable). 'relative_to_cd' means that relative paths are interpreted relative to the current working directory and absolute paths are interpreted as-is. 'relative_to_list' means that relative paths are interpreted relative to the --in_audios file's parent directory and absolute paths are interpreted as-is. 'names_near_list' means that only the provided file names should be considered, with audio clips assumed to be located in the same directory as --in_audios. Only relevant if --in_audios points to a text file."); parser.add_argument('--missing_audios', default="error", type=str, choices=["error", "omit", "blank"], help="How to handle missing audio files, specified in --in_audios. 'omit' means that such audios are omitted from the analysis. 'blank' means that such audios are kept, but their summaries are essentially blank. Only relevant if --in_audios points to a text file."); parser.add_argument('--output_order', default="given", type=str, choices=["given", "given_r", "alphabetical_paths", "alphabetical_paths_r", "alphabetical_names", "alphabetical_names_r"], help="How to sort audios in the output summaries table. 'given' retains the original order, i.e. as specified in the --in_adios file or as returned by the operating system when seeking the audio files. 'alphabetical_paths' sorts audios according to their full paths. 'alphabetical_names' sorts audios according to their file names."); parser.add_argument('--audio_naming', default="file_name", type=str, choices=["file_name","file_basename"], help="How to define the names of input audios."); parser.add_argument('--only_audio_names', default="", type=str, help="Optional comma-separated list of audio names (or wildcards) to restrict the analysis to."); parser.add_argument('--omit_audio_names', default="", type=str, help="Optional comma-separated list of audio names (or wildcards) to omit from the analysis."); parser.add_argument('--start_time', default=0, type=float, help="Optional start time (sec) for analysis, omitting any audio data prior to this time. May also be a negative number, in which case it is counted from the end. Note that this does not reduce the time interval that can be considered for normalization (options --normalize_start_time & --normalize_end_time), i.e., trimming is done after normalization."); parser.add_argument('--end_time', default=inf, type=float, help="Optional end time (sec) for analysis, omitting any audio data past this time. May also be a negative number, in which case it is counted from the end. Note that this does not reduce time interval that can be considered for normalization (options --normalize_start_time & --normalize_end_time), i.e., trimming is done after normalization."); parser.add_argument('--min_frequency', default=0, type=float, help="Optional minimum frequency (Hz) to keep in the audio. If >0, a low-pass or bandpass filter will be applied prior to computing any audio summaries or spectra."); parser.add_argument('--max_frequency', default=inf, type=float, help="Optional maximum frequency (Hz) to keep in the audio. If =0) and (args.end_time0) else max(1, os.cpu_count()+args.Nthreads)) if((args.spectrum_Nsegments<=0) and (args.spectrum_freq_step<=0)): args.spectrum_Nsegments=1 elif((args.spectrum_Nsegments>0) and (args.spectrum_freq_step>0)): abort("%sERROR: Cannot provide both --spectrum_freq_step and --spectrum_Nsegments; must only provide one of the two"%(args.verbose_prefix)) if((args.spectrum_Ninterleaves<=0) and (not numpy.isfinite(args.max_frequency))): args.spectrum_Ninterleaves=1 elif((args.spectrum_Ninterleaves>0) and numpy.isfinite(args.max_frequency)): abort("%sERROR: Cannot provide both --spectrum_Ninterleaves and --max_frequency; must only provide one of the two"%(args.verbose_prefix)) if((args.spectrogram_Nsegments<=0) and (args.spectrogram_freq_step<=0)): args.spectrogram_Nsegments=1 elif((args.spectrogram_Nsegments>0) and (args.spectrogram_freq_step>0)): abort("%sERROR: Cannot provide both --spectrogram_freq_step and --spectrogram_Nsegments; must only provide one of the two"%(args.verbose_prefix)) if((args.spectrogram_Ninterleaves<=0) and (not numpy.isfinite(args.max_frequency))): args.spectrogram_Ninterleaves=1 elif((args.spectrogram_Ninterleaves>0) and numpy.isfinite(args.max_frequency)): abort("%sERROR: Cannot provide both --spectrogram_Ninterleaves and --max_frequency; must only provide one of the two"%(args.verbose_prefix)) spectrum_focal_intervals = ([] if (args.spectrum_focal_intervals=="") else [(float_or_nan(interval.split(":",1)[0]),float_or_nan(interval.split(":",1)[1])) for interval in args.spectrum_focal_intervals.split(",")]) # find input audios if(any((args.in_audios.lower().endswith("."+extension) or args.in_audios.lower().endswith("."+extension+".gz")) for extension in FILE_EXTENSION_TO_COLUMN_DELIMITER.keys())): if(args.verbose): print("%sReading audio file paths from file.."%(args.verbose_prefix)) audio_filepaths = load_first_column_from_file(filepath=args.in_audios, header=args.in_audios_table_has_header) if(args.verbose): print("%s Note: Input file specified %d audio files"%(args.verbose_prefix,len(audio_filepaths))) if(args.interpred_audio_paths=="relative_to_cd"): pass elif(args.interpred_audio_paths=="relative_to_list"): list_dir = os.path.dirname(args.in_audios) audio_filepaths = [(filepath if filepath.startswith("/") else os.path.join(list_dir, filepath)) for filepath in audio_filepaths] elif(args.interpred_audio_paths=="names_near_list"): list_dir = os.path.dirname(args.in_audios) audio_filepaths = [os.path.join(list_dir, os.path.basename(filepath)) for filepath in audio_filepaths] # check for missing audio files if(args.missing_audios in ["error","omit"]): if(args.verbose): print("%s Checking existence of audios at specified paths.."%(args.verbose_prefix)) missings = [f for f,filepath in enumerate(audio_filepaths) if (not os.path.exists(filepath))] if(len(missings)>0): if(args.missing_audios=="error"): abort("%s ERROR: %d out of %d loaded audio file paths do not exist, for example: %s"%(args.verbose_prefix,len(missings),len(audio_filepaths),audio_filepaths[missings[0]])) else: if(args.verbose): print("%s Note: %d out of %d loaded audio file paths do not exist. These audios will be omitted"%(args.verbose_prefix,len(missings),len(audio_filepaths))) keep = integer_complement(N=len(audio_filepaths), subset=missings) audio_filepaths = [audio_filepaths[k] for k in keep] else: if(args.verbose): print("%sSearching for audio files.."%(args.verbose_prefix)) audio_filepaths = glob.glob(args.in_audios) if(args.verbose): print("%s Note: Found %d audio files"%(args.verbose_prefix,len(audio_filepaths))) # determine audio names if(args.audio_naming=="file_name"): audio_names = [os.path.basename(filepath) for filepath in audio_filepaths] elif(args.audio_naming=="file_basename"): audio_names = [os.path.splitext(os.path.basename(filepath))[0] for filepath in audio_filepaths] # filter audios if needed if((args.only_audio_names!="") or (args.omit_audio_names!="")): keep_audios = filter_names(names=audio_names, only_names=args.only_audio_names, omit_names=args.omit_audio_names, case_sensitive=False, allow_wildcards=True) if(len(keep_audios)=0) else duration+start_time))) end_time = max(0,min(duration,(end_time if (end_time>=0) else duration+end_time))) if(end_time<=start_time): if(args.verbose): print("%s WARNING: Audio duration (%g sec) is too short for trimming. Skipping this audio"%(args.verbose_prefix,duration)) continue elif((start_time>0) or (end_time0) or numpy.isfinite(max_frequency)): if(args.verbose): print("%s Filtering frequencies.."%(args.verbose_prefix)) audio = filter_frequencies(audio=audio, sampling_rate=sampling_rate, min_frequency=min_frequency, max_frequency=max_frequency) # keep track of basic summary stats durations[a] = duration Nchannels[a] = (1 if (audio.ndim==1) else audio.shape[1]) RMS[a] = numpy.sqrt(numpy.mean(audio**2)) sampling_rates[a] = sampling_rate # save spectrogram if needed if(args.out_spectrograms!=""): if(args.verbose): print("%s Generating spectrogram.."%(args.verbose_prefix)) spectrogram_file = os.path.join(args.out_spectrograms, os.path.splitext(audio_name)[0]+"."+args.plot_format) check_output_file(file=spectrogram_file, force=args.force, verbose=args.verbose, verbose_prefix=args.verbose_prefix+" ") #analyze_spectrogram(audio=audio, sampling_rate=sampling_rate, window_span=window_span, plot_file=spectrogram_file, plot_title="Spectrogram of audio '%s'"%(audio_name)) analyze_spectrogram(audio = audio, sampling_rate = sampling_rate, window_span = window_span, Nsegments = spectrogram_Nsegments, Ninterleaves = spectrogram_Ninterleaves, freq_step = spectrogram_freq_step, min_frequency = min_frequency, max_frequency = max_frequency, PSD = (spectrogram_type=="PSD"), plot_file = spectrogram_file, plot_title = "Spectrogram of audio '%s'"%(audio_name)) # save PSD or Fourier spectrum if needed if(args.out_spectra!=""): if(args.verbose): print("%s Generating %s spectrum.."%(args.verbose_prefix,spectrum_type)) spectrum_file = os.path.join(args.out_spectra, os.path.splitext(audio_name)[0]+"."+args.plot_format) check_output_file(file=spectrum_file, force=args.force, verbose=args.verbose, verbose_prefix=args.verbose_prefix+" ") focal_powers[a,:] = analyze_spectrum(audio = audio, sampling_rate = sampling_rate, Nsegments = spectrum_Nsegments, Ninterleaves = spectrum_Ninterleaves, freq_step = spectrum_freq_step, min_frequency = min_frequency, max_frequency = max_frequency, max_value = args.spectrum_max, plot_file = spectrum_file, plot_title = "%s of audio '%s'"%(("Fourier spectrum" if (spectrum_type=="Fourier") else "PSD"),audio_name), PSD = (spectrum_type=="PSD"), focal_intervals = spectrum_focal_intervals) # save powergram if(args.out_powergrams!=""): if(args.verbose): print("%s Generating powergram.."%(args.verbose_prefix)) powergram_file = os.path.join(args.out_powergrams, os.path.splitext(audio_name)[0]+"."+args.plot_format) check_output_file(file=powergram_file, force=args.force, verbose=args.verbose, verbose_prefix=args.verbose_prefix+" ") save_powergram( audio = audio, sampling_rate = sampling_rate, window_span = window_span, plot_file = powergram_file, plot_title = "Powergram (rolling-window RMS) of audio '%s'"%(audio_name)) # save summaries to file if(args.out_summaries!=""): if(args.verbose): print("%sSaving summaries to output file.."%(args.verbose_prefix)) check_output_file(file=args.out_summaries, force=args.force, verbose=args.verbose, verbose_prefix=args.verbose_prefix+" ") delimiter = infer_file_delimiter(args.out_summaries, default="\t") with open_file(args.out_summaries,"wt") as fout: fout.write("# Summaries of %d audios, in frequency band [%g, %g] Hz\n# Generated on: %s\n# Used command: %s\n# Times are measured in seconds, frequencies in Hz\n"%(Naudios,args.min_frequency,args.max_frequency,get_date_time(),get_shell_command())) if(Naudios>0): fout.write("# Average duration = %g sec (min %g, max %g)\n"%(numpy.nanmean(durations),numpy.nanmin(durations),numpy.nanmax(durations))) fout.write("#\nname%spath%soriginal_duration%sduration%ssampling_rate%sNchannels%soriginal_RMS%sRMS%snormalization_factor%s\n"%(delimiter,delimiter,delimiter,delimiter,delimiter,delimiter,delimiter,delimiter,"".join("%spower_from_%g_to_%g_Hz"%(delimiter,interval[0],interval[1]) for interval in spectrum_focal_intervals))) for a,filepath in enumerate(audio_filepaths): fout.write(delimiter.join((audio_names[a],filepath,"%f"%(original_durations[a]),"%f"%(durations[a]),(str(int(sampling_rates[a])) if numpy.isfinite(sampling_rates[a]) else "nan"),str(Nchannels[a]),"%.5g"%(RMS[a]),"%.5g"%(original_RMS[a]),"%.5g"%(normalizations[a]))+tuple("%g"%(pw) for pw in focal_powers[a,:]))+"\n") ########### # TO DO: # - Add Ninterleaves and Nsegments and step to spectrogram calculation