#!/usr/bin/env python3 # # This script computes pairwise similarities/differences between multiple audio files. # Various audio formats are supported, such as wav, flac or ogg. # Optionally, audios may be lowpass/highpass/bandpass filtered prior to any analysis. # # Tested on python 3.11, MacOS 14.5 # # Stilianos Louca # Copyright 2024 # # LICENSE AGREEMENT # - - - - - - - - - # All rights reserved. # Use and redistributions of this code is permitted for commercial and non-commercial purposes, # under the following conditions: # # * Redistributions must retain the above copyright notice, this list of # conditions and the following disclaimer in the code itself, as well # as in documentation and/or other materials provided with the code. # * Neither the name of the original author (Stilianos Louca), nor the names # of its contributors may be used to endorse or promote products derived # from this code without specific prior written permission. # * Proper attribution must be given to the original author, including a # reference to the peer-reviewed publication through which the code was published. # # THIS CODE IS PROVIDED BY THE COPYRIGHT HOLDER AND CONTRIBUTORS "AS IS" AND ANY # EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES # OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. # IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS CODE, # EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # - - - - - - - - - import os, sys import numpy import glob import re import librosa import time import fnmatch import math import gzip import warnings import argparse import scipy.signal, scipy.special from joblib import Parallel, delayed from joblib import parallel_backend import random import itertools from numpy import NaN, Inf FILE_EXTENSION_TO_COLUMN_DELIMITER = {"txt":"\t", "tsv":"\t", "tab":"\t", "csv":",", "dnl":"\t"} ################################# # MARK Misc auxiliary functions # convert an integer to a string, padded with as many zeros as needed for obtaining the same total width up until a specific maximum value # for example int2zero_padded_str(3,78) yields "03", and int2zero_padded_str(3,1673) yields "0003". def int2zero_padded_str(value,max_value): return ("%0"+str(1+int(math.log10(max(1,abs(max_value)))))+"d")%(value) # return the set {0,..,N-1} \ subset def get_complement(N,subset): keep = numpy.ones(N, dtype=bool) keep[subset] = 0 return numpy.nonzero(keep)[0] def get_date_time(): return time.strftime("%Y.%m.%d") + " " + time.strftime("%H:%M:%S") def get_shell_command(): arguments = sys.argv; arguments = [("'"+a+"'" if (a=="" or re.search(r"[\s*#:;]", a)) else ("'"+'\\\\t'+"'" if (a=='\\t') else a)) for a in arguments]; return ' '.join(arguments); # get the right-most extension of a file # if skip_gz==True, and the file name ends with ".gz", then the ".gz" part is ignored (for example "data.tsv.gz" yields "tsv"). def file_extension(file_path, skip_gz): if(skip_gz and file_path.lower().endswith(".gz")): file_path = file_path[:-3] extension = file_path.rsplit(".",1)[-1] return extension # guess the column delimiter of a classical table file, based on the file extension def infer_file_delimiter(file_path, default=" "): extension = file_extension(file_path, skip_gz=True).lower() return FILE_EXTENSION_TO_COLUMN_DELIMITER.get(extension, default) def check_output_file(file,force,verbose,verbose_prefix): if(os.path.exists(file)): if(force): if(os.path.isdir(file)): shutil.rmtree(file) else: os.remove(file) if(verbose): print(("%sNote: Replacing output file '%s'" % (verbose_prefix,file))) else: print(("%sERROR: Output file '%s' already exists\n%s Cowardly refusing to continue.\n%s Use --force to ignore this message" % (verbose_prefix,file,verbose_prefix,verbose_prefix))) sys.exit(1) else: parent = os.path.dirname(file); if(parent!=""): os.makedirs(parent, exist_ok=True) # open a local file for reading/writing, supporting optionally gzip compression (file extension ".gz") # mode can be 'wt', 'rt', 'wb', 'rb' or other standard modes def open_file(filepath, mode): if(mode.startswith("w") or mode.startswith("a")): parent = os.path.dirname(filepath) if(parent!=""): os.makedirs(parent, exist_ok=True) if(filepath.lower().endswith(".gz")): return gzip.open(filepath,mode) else: return open(filepath,mode) def load_first_column_from_file(filepath, delimiter=None, comment_prefix="#", header=False): if(delimiter is None): delimiter = infer_file_delimiter(filepath, default=" ") with open_file(filepath,"rt") as fin: lines = list(filter(len,[line.split(comment_prefix,1)[0] for line in fin])) if(header): lines = lines[1:] # omit the first data line, as it contains the table header first_column_entries = [(line.split(None,1) if (delimiter=="") else line.split(delimiter,1))[0].strip() for line in lines] return first_column_entries # execute a target function multiple times in parallel (if reasonable) # For fast functions jobs are first grouped into batches to reduce the overhead of dispatching lots of parallel jobs def run_parallel(function, # a callable (e.g., a function), to be executed multiple times. This function should take a single positional argument. If you want to run a function that takes multiple argument, use lambda as a wrapper. arguments, # a sequence (e.g., list) of length N, where N is the number of times the function should be executed. Each element of arguments[] is the single input argument for a single function call. Nthreads = None, # integer, number of parallel threads to use. Interpreted as specified in parse_Nthreads(). If 1, a simple loop is used without parallelization; a loop i used on purpose to facilitate debugging. fast = False, # whether the function is fast, i.e., it takes a short time to execute a single call compared to the overhead associated with dispatching jobs to processes. prefer = None, # either None, "processes" or "threads" randomize= False): # randomize the order in which tasks are executed, if parallelized. This may help with load balancing across threads. Nthreads = (Nthreads if (Nthreads>0) else max(1, os.cpu_count()+Nthreads)) N = len(arguments) order = list(range(N)) if(randomize): random.shuffle(order) if((Nthreads>1) and (N>1)): if(fast): # group jobs into batches # for fast functions this can speed up computation by ~ 25%. batch_size = max(1,int(0.2*N/Nthreads)) Nbatches = math.ceil(N/batch_size) def aux_run_batch(b): return [function(arguments[order[a]]) for a in range(b*batch_size,min((b+1)*batch_size,N))] results = Parallel(n_jobs=min(N,Nthreads), prefer=prefer, verbose=0)(delayed(aux_run_batch)(b) for b in range(Nbatches)) results = [r for rr in results for r in rr] # flatten results, i.e. unwrap batches else: # run standard parallelized results = Parallel(n_jobs=min(N,Nthreads), prefer=prefer, verbose=0)(delayed(function)(arguments[order[a]]) for a in range(N)) if(randomize): inverse_order = invert_permutation(order) results = [results[k] for k in inverse_order] return results else: results = [None]*len(arguments) for a,argument in enumerate(arguments): results[a] = function(argument) return results # return a list of indices corresponding to names to keep # if start_pool[] is provided, the order is preserved in the returned subset. Otherwise, the returned list will be sorted in ascending order. def filter_name_list( names, start_pool = None, # starting pool to further filter. This will typically be range(len(names)), but can also be a subset thereof in case of iterative filtering. Can also be None (equivalent to range(len(names))) only_names = None, # list or set of strings, can be None. Name wildcards to keep. omit_names = None, # list or set of strings, can be None. Name wildcards to exclude. case_sensitive = False, allow_wildcards = False): items_to_keep = (list(range(len(names))) if (start_pool is None) else start_pool) if(not case_sensitive): names = [name.lower() for name in names]; if(only_names is not None): if(not case_sensitive): only_names = [name.lower() for name in only_names]; if(allow_wildcards): items_to_keep = [i for i in items_to_keep if (next((n for n,name in enumerate(only_names) if fnmatch.fnmatchcase(names[i],name)),-1)>=0)] else: only_names = set(only_names) items_to_keep = [i for i in items_to_keep if (names[i] in only_names)] if(omit_names is not None): if(not case_sensitive): omit_names = [name.lower() for name in omit_names]; if(allow_wildcards): items_to_keep = [i for i in items_to_keep if (next((n for n,name in enumerate(omit_names) if fnmatch.fnmatchcase(names[i],name)),-1)<0)] else: omit_names = set(omit_names) items_to_keep = [i for i in items_to_keep if (names[i] not in omit_names)] return items_to_keep # apply a lowpass/highpass/bandpass filter to a single- or multi-channel audio stream def filter_frequencies( audio, # 1D or 2D numpy array of size Ntimes x Nchannels sampling_rate, # integer, number of sampling points per second min_frequency = 0, # float, minimum frequency (Hz) to retain. Set to <=0 to only apply a low-pass filter. max_frequency = Inf): # float, maximum frequency (Hz) to retain. Set to Inf to only apply a high-pass filter if((min_frequency<=0) and (not numpy.isfinite(max_frequency))): # nothing to do return audio elif(min_frequency<=0): # low-pass filter Wn = max_frequency btype = "lowpass" elif(not numpy.isfinite(max_frequency)): # high-pass filter Wn = min_frequency btype = "highpass" else: # bandpass filter Wn = (min_frequency, max_frequency) btype = "bandpass" # construct Butterworth filter filt = scipy.signal.butter(N=10, Wn=Wn, btype=btype, analog=False, fs=sampling_rate, output="sos") # apply filter filtered = scipy.signal.sosfilt(sos=filt, x=audio, axis=0) return filtered # Given an array F representing the upper-triangular part of a N1 x N2 x ... x Nn x NR x NR array, return the full corresponding array A. # Entries below the diagonal in the returned array A can either be set to zero (zero_lower=True) or set equal to the upper triangular part so that the returned matrix is symmetric (zero_lower=False). # Note that here "diagonal", "upper triangular", "lower triangular" and "symmetric" refer exclusively to the last two axes of the array A. def unflatten_upper_triangular( Ashape, # a tuple specifying the shape of the final (non-flattened) array A F, # Either a 1D numeric array, or a (n+1)-dim array of size N1 x N2 x .. x Nn x (NR*(NR+1)/2), listing the flattened values to be used to construct A. If an (n+1)-dim array, then F[i1,..,in,:] will be the upper-triangular values of A[i1,..,in,:,:]. zero_lower = True): # boolean, specifying how to fill the part of A below the diagonal. If True, A will be zero below the diagonal. If False, A will be symmetric. NR = Ashape[-1] upper_indices = numpy.triu_indices(NR) lower_indices = numpy.tril_indices(NR) if(len(Ashape)==2): A = numpy.zeros((NR,NR),dtype=F.dtype) A[upper_indices] = F if(not zero_lower): A[lower_indices] = (A.T)[lower_indices] else: if(F.ndim>2): F = F.reshape(numpy.prod(F.shape[:-1])+(F.shape[-1],)) # flatten the first n axes of F # first construct A with all but the last two axes flattened, for convenience A = numpy.zeros((numpy.prod(Ashape[:-2]),NR,NR), dtype=F.dtype) Nupper = int(NR*(NR+1)/2) # number of elements in the upper triangular part of a NR x NR matrix for l in range(A.shape[0]): if(F.ndim==1): A[l,...][upper_indices] = F[range(l*Nupper,(l+1)*Nupper)] else: A[l,...][upper_indices] = F[l,:] if(not zero_lower): A[l,...][lower_indices] = (A[l,...].T)[lower_indices] A = A.reshape(Ashape) # unflatten all axes return A # similar to numpy.cumsum, but counting starts 1 later, i.e., cumsum0 at the k-th index is equal to the sum of X[0],..,X[k-1] def cumsum0(X,axis=None): if(axis is None): C = numpy.empty(X.size, dtype=X.dtype) C[0] = 0 C[1:] = numpy.cumsum(X)[:-1] else: C = numpy.zeros_like(X) if(axis==0): C[1,...] = numpy.cumsum(X,axis=0)[:-1,...] else: multislice = [slice(None)]*X.ndim multislice[axis] = slice(0,X.shape[axis]-1) cumvalues = numpy.cumsum(X[tuple(multislice)],axis=axis) multislice[axis] = slice(1,X.shape[axis]) C[tuple(multislice)] = cumvalues return C ################################# # MARK Computing audio properties # calculate the modulus of the Fourier spectrum of a real-valued regularly sampled signal # The input signal may have multiple channels, in which case a separate spectrum is computed for each channel def get_spectrum(audio, # 1D numpy array of size NR or 2D numpy array of size NR x Nchannels, the signal to be processed method = "basic", # either "basic" or "Bartlett" or "interleaved". Note that "Bartlett" sacrifices frequency resolution (i.e., the fundamental frequency is higher) while "interleaved" sacrifices frequency range (i.e., the maximum frequency is lower). Nsegments = 5): # integer >=1, number of segments into which to split the signal, for Welch's or Bartlett's method. Irrelevant for "base" method Nsegments = min(audio.shape[0]//2,Nsegments) if((method=="basic") or (Nsegments<=1)): # basic estimator, no splitting of data spectrum = numpy.abs(scipy.fft.rfft(x=audio, axis=0, workers=args.Nthreads, norm="backward")) spectrum /= audio.shape[0]*args.sampling_rate # apply proper normalization frequencies = (args.sampling_rate/audio.shape[0])*numpy.arange(spectrum.shape[0]) elif(method=="Bartlett"): # split signal into non-overlapping consecutive segments, compute spectrum for each segment, then average across segments # note that averaging should be done AFTER taking the modulus (in the case of a Fourier spectrum) or squared modulus (in the case of a PSD) segment_length = audio.shape[0]//Nsegments spectrum = numpy.mean([numpy.abs(scipy.fft.rfft(x=audio[(s*segment_length):((s+1)*segment_length),:], axis=0, workers=args.Nthreads, norm="backward")) for s in range(Nsegments)],axis=0) spectrum /= segment_length*args.sampling_rate # apply proper normalization frequencies = (args.sampling_rate/segment_length)*numpy.arange(spectrum.shape[0]) elif(method=="interleaved"): # split signal into multiple interleaved sub-series of equal length, compute spectrum for each sub-series, then average across sub-series # note that averaging should be done AFTER taking the modulus (in the case of a Fourier spectrum) or squared modulus (in the case of a PSD) min_length = (audio.shape[0]-Nsegments+1)//Nsegments spectrum = numpy.mean([numpy.abs(scipy.fft.rfft(x=audio[numpy.arange(s,audio.shape[0],Nsegments)[:min_length],:], axis=0, workers=args.Nthreads, norm="backward")) for s in range(Nsegments)],axis=0) spectrum /= min_length*args.sampling_rate # apply proper normalization frequencies = (args.sampling_rate/audio.shape[0])*numpy.arange(spectrum.shape[0]) return frequencies, spectrum # compute the dominant mel frequencies for a set of audios def get_dominant_mel_freqs(audios): domfreq = numpy.zeros(len(audios)) n_mels = 64 frequencies = librosa.mel_frequencies(n_mels=n_mels, fmin=args.min_frequency, fmax=args.max_frequency) for w,audio in enumerate(audios): if(audio.ndim>1): audio = audio[:,0] # keep only first channel, if there are multiple # compute mel FFT mffts = librosa.feature.melspectrogram( y = audio, sr = args.sampling_rate, n_mels = n_mels, fmin = args.min_frequency, fmax = args.max_frequency) # determine the dominant frequency in each frame, then average across frames domfreq[w] = numpy.mean(frequencies[numpy.argmax(mffts,axis=0)]) return domfreq # compute MFFTs and MFCCs for a set of audios # the returned MFFTs will be a 2D array of size NW x NMFFTs # the returned MFCCs will be a 2D array of size NW x NMFCCs # Note that the returned MFFTs & MFCCs may include NaNs, for example if some clips were too short def get_MFCCs_and_MFFTs(audios): NW = len(audios) keep_length = int(args.sampling_rate*args.trim) frame_size = int(args.trim*args.sampling_rate/(args.Nframes+2)) MFCCs = numpy.full((NW,args.NMFCCs*args.Nframes), NaN, dtype=float) MFFTs = numpy.full((NW,args.NMFFTs*args.Nframes), NaN, dtype=float) if(args.min_frequency): mfft_freqs = librosa.mel_frequencies(n_mels=args.NMFFTs+1, fmin=args.min_frequency, fmax=args.max_frequency)[1:] else: mfft_freqs = librosa.mel_frequencies(n_mels=args.NMFFTs, fmin=args.min_frequency, fmax=args.max_frequency) for w,audio in enumerate(audios): if(audio.ndim>1): audio = audio[:,0] # keep only first channel, if there are multiple if(len(audio)0): mffts = librosa.feature.melspectrogram( y = audio, sr = args.sampling_rate, n_mels = args.NMFFTs+(1 if (args.min_frequency==0) else 0), n_fft = frame_size, hop_length = frame_size+1, fmin = args.min_frequency, fmax = args.max_frequency) mffts = mffts[:,1:(args.Nframes+1)] # discard the first and last frame to avoid edge effects if(args.min_frequency==0): mffts = mffts[1:,:] # discard the first MFFT MFFTs[w,:] = mffts.flat # store the flattened MFFTs # compute MFCCs if(args.NMFCCs>0): mfccs = librosa.feature.mfcc(y = audio, sr = args.sampling_rate, n_mels = max(128,1+args.NMFCCs), # need to manually increase n_mels when requesting a lot of MFCCs n_mfcc = args.NMFCCs, n_fft = frame_size, hop_length = frame_size+1, fmin = args.min_frequency, fmax = args.max_frequency) mfccs = mfccs[:,1:(args.Nframes+1)] # discard the first and last frame to avoid edge effects MFCCs[w,:] = mfccs.flat # store the flattened MFCCs return MFCCs, MFFTs, mfft_freqs def get_shannon_entropy(weights, axis): probabilities = weights/numpy.sum(weights,axis=axis,keepdims=True) with warnings.catch_warnings(): warnings.filterwarnings(action='ignore', message='invalid value encountered in log') warnings.filterwarnings(action='ignore', message='divide by zero encountered in log') warnings.filterwarnings(action='ignore', message='invalid value encountered in multiply') entropy = -numpy.nansum(probabilities*numpy.log2(probabilities),axis=axis) return entropy # get rolling-window averages along the 0-th axis of an array, with non-overlapping box windows # the last bit of the signal will be discarded if the signal's length is not an integer multiple of the desired window length def get_rolling_box_means(values,window_length): NT = values.shape[0] cumsums = numpy.pad(numpy.cumsum(values,axis=0),pad_width=[(1,0)]+[(0,0)]*(values.ndim-1),mode="constant",constant_values=0) window_rights = numpy.arange(0,NT+1,window_length) # right-most point of each window. The first window is a dummy window, for easier calculations below. if(window_rights[-1]>NT): window_rights = window_rights[:-1] # omit the last window, since it is too short return (cumsums[window_rights[1:],...] - cumsums[window_rights[:-1],...])/window_length # compute various acoustic parameters for one or more audios # In the case of multi-channel audios, each acoustic parameter is first computed separately for each channel, and then averaged across channels def get_acoustic_parameters(audios): NW = len(audios) freq_means = numpy.empty(NW) freq_stds = numpy.empty(NW) freq_modes = numpy.empty(NW) spectral_entropies = numpy.empty(NW) modulation_rates = numpy.empty(NW) # rates of change of the dominant frequency, i.e., average absolute rate of change evaluated between consecutive rolling windows. Closely related to the 'modulation_index' in warbleR, but here we ealuate the average absolute rate of change rather than the cumulative absolute change, thus accounting for audio duration. time_entropies = numpy.empty(NW) for a,audio in enumerate(audios): if(audio.ndim==1): audio = numpy.expand_dims(audio,axis=1) # make 2D even if only one channel, for consistency below duration = audio.shape[0]/args.sampling_rate window_span = (max(512/args.sampling_rate,duration/100) if (args.window_span is None) else args.window_span) window_length = min(audio.shape[0],int(window_span*args.sampling_rate)) time_entropies[a] = numpy.mean(get_shannon_entropy(get_rolling_box_means(values=audio**2,window_length=window_length),axis=0)) # compute a single Fourier spectrum of the entire audio (separately for each channel) frequencies, spectrum = get_spectrum(audio=audio, method=args.fourier_method, Nsegments=args.fourier_Nsegments) # filter frequencies in the spectrum if needed keep_freqs = (frequencies<=args.max_frequency) & (frequencies>=args.min_frequency) frequencies, spectrum = frequencies[keep_freqs], spectrum[keep_freqs,:] # compute acoustic params based on the power spectrum powers = numpy.abs(spectrum)**2 normalized_powers = powers/numpy.sum(powers,axis=0,keepdims=True) # 2D array of size NFT x Nchannels, normalized power spectrum freq_mean = numpy.sum(normalized_powers*numpy.expand_dims(frequencies,axis=1),axis=0) # 1D array of size Nchannels freq_means[a] = numpy.mean(freq_mean) freq_stds[a] = numpy.mean(numpy.sqrt(numpy.sum(normalized_powers*numpy.expand_dims(frequencies-freq_mean,axis=1)**2,axis=0))) freq_modes[a] = numpy.mean(frequencies[numpy.argmax(powers,axis=0)]) spectral_entropies[a] = numpy.mean(get_shannon_entropy(powers,axis=0)) # spectral entropy = Shannon entropy of the normalized power spectrum # compute rolling-window spectrum, aka. spectrogram rolling_spectrum = numpy.swapaxes(librosa.stft(y=audio.T, win_length=window_length,n_fft=window_length,window="boxcar",hop_length=window_length), axis1=0, axis2=2) # rolling_spectrum will be a 3D aray of size Nwindows x Nfrequencies x Nchannels rolling_frequencies = (1/window_span)*numpy.arange(rolling_spectrum.shape[-2]) # filter frequencies in the spectrogram if needed keep_freqs = (rolling_frequencies<=args.max_frequency) & (rolling_frequencies>=args.min_frequency) rolling_frequencies, rolling_spectrum = rolling_frequencies[keep_freqs], rolling_spectrum[:,keep_freqs,:] # compute acoustic params based on the spectrogram rolling_powers = numpy.abs(rolling_spectrum)**2 rolling_dominant_freqs = rolling_frequencies[numpy.argmax(rolling_powers,axis=1)] modulation_rates[a] = numpy.mean(numpy.sum(numpy.abs(numpy.diff(rolling_dominant_freqs,axis=0)/window_span),axis=0)) param_values = numpy.column_stack((freq_means,freq_stds,freq_modes,time_entropies,spectral_entropies,modulation_rates)) param_names = ["freq_mean", "freq_std", "freq_mode", "time_entropy", "spectral_entropy", "modulation_rate"] return param_values, param_names # compute all pairwise cross-correlations between a set of audios # The CCs are returned as a 2D numeric array of size NW x NW def pairwise_cross_correlations(audios, normalize=True): NW = len(audios) CCs = numpy.empty((NW,NW), dtype=float) def aux_single_CC(audio_pair): return numpy.max(numpy.abs(scipy.signal.correlate(in1=audios[audio_pair[0]], in2=audios[audio_pair[1]], mode="valid", method="auto"))) audio_pairs = [(a1,a2) for a1 in range(NW) for a2 in range(a1,NW)] Npairs = len(audio_pairs) CCs = numpy.asarray(run_parallel(function=aux_single_CC, arguments=audio_pairs, Nthreads=args.Nthreads, fast=True), dtype=float) # CCs = numpy.asarray(Parallel(n_jobs=min(Npairs,args.Nthreads), prefer="processes", pre_dispatch="all", batch_size=max(1,min(1000,int(0.5*Npairs/args.Nthreads))), verbose=0)(delayed(aux_single_CC)(audio1=audios[a1],audio2=audios[a2]) for (a1,a2) in audio_pairs), dtype=float) CCs = unflatten_upper_triangular(Ashape=(NW,NW), F=CCs, zero_lower=False) # normalize CCs if needed if(normalize): self_CCs = numpy.diag(CCs) CCs = (CCs/numpy.sqrt(numpy.expand_dims(self_CCs,axis=0)))/numpy.sqrt(numpy.expand_dims(self_CCs,axis=1)) return CCs # compute pairwise acoustic differences/similarities between multiple audio clips # The returned simdiffs[] will be a 3D numeric array of size NW x NW x Nmetrics, where NW is the number of input audios, listing pairwise differences/similarities # Some simdiffs may be NaN, for example if some audios were too short for some computations def get_audio_simdiffs( audios, # Sequence of length NW. Each element in audios[] should be an already loaded audio at args.sampling_rate, as a 1D numpy array (e.g. as loaded using librosa.load) audio_names = None, # Optional sequence of length NW, listing the name of each audio. Only needed when saving data to optional output files. verbosity = 0, verbose_prefix = ""): # get differences based on acoustic parameters if(verbosity>=1): print("%sComputing acoustic parameter differences.."%(verbose_prefix)) param_values, param_names = get_acoustic_parameters(audios) param_differences = numpy.abs(numpy.expand_dims(param_values,axis=0)-numpy.expand_dims(param_values,axis=1)) # get dominant frequencies if(verbosity>=1): print("%sComputing dominant frequency differences.."%(verbose_prefix)) domfreqs = get_dominant_mel_freqs(audios) domfreq_differences= numpy.abs(numpy.expand_dims(domfreqs,axis=0)-numpy.expand_dims(domfreqs,axis=1)) # get differences based on cross-correlations if(verbosity>=1): print("%sComputing cross correlations.."%(verbose_prefix)) CCs = pairwise_cross_correlations(audios) if(args.out_cross_correlations!=""): # save cross-correlations in matrix format to a file if(verbosity>=2): print("%s Saving cross correlations to file.."%(verbose_prefix)) delimiter = infer_file_delimiter(args.out_cross_correlations, default="\t") with open_file(args.out_cross_correlations, "wt") as fout: fout.write("audio"+delimiter+delimiter.join(audio_names)+"\n") for r in range(len(audios)): fout.write(audio_names[r]) for c in range(len(audios)): fout.write("%s%.10g"%(delimiter,CCs[r,c])) fout.write("\n") # get differences based on MFCCs & MFFTs if(verbosity>=1): print("%sComputing MFCC and MFFT differences.."%(verbose_prefix)) MFCCs, MFFTs, mfft_freqs = get_MFCCs_and_MFFTs(audios) MFCC_differences = numpy.abs(numpy.expand_dims(MFCCs,axis=0)-numpy.expand_dims(MFCCs,axis=1)) MFFT_differences = numpy.abs(numpy.expand_dims(MFFTs,axis=0)-numpy.expand_dims(MFFTs,axis=1)) # concatenate all distance matrixes along the 3rd axis simdiffs = numpy.concatenate((param_differences,numpy.expand_dims(domfreq_differences,axis=2),numpy.expand_dims(CCs,axis=2),MFCC_differences,MFFT_differences), axis=2) # construct names for all metrics metrics = param_names\ + ["dom_mel_freq", "CC"]\ + ["MFCC%s"%(int2zero_padded_str(1+k,MFCCs.shape[1])) for k in range(MFCCs.shape[1])]\ + ["MFFT%g"%(freq) for freq in mfft_freqs] return simdiffs, metrics ############################################################################## # MAIN SCRIPT BODY # parse command line arguments parser = argparse.ArgumentParser(description="Compute various pairwise audio similarities/differences, such as cross-correlations. These may be used e.g. for subsequent diarization.", epilog="") parser.add_argument('-i','--in_audios', required=True, type=str, help="Either a shell wildcard pointing to one or more audio files (e.g., .wav or .flac files), or a single path to a text file listing audio file paths in the first column (one path per line, both absolute and relative paths are allowed). Recursive wildcards (e.g. 'audios/**/*.wav') are supported. A text file is assumed if this path ends with a typical tabular text file extension; the column delimiter (if needed) is inferred based on the file extension.") parser.add_argument('-o','--out_simdiffs', default="", type=str, help="Optional path to a file for writing pairwise audio similarities & differences (one row per audio pair, one column per metric). The column delimiter will be automatically determined based on the file extension (e.g., CSV or TSV)."); parser.add_argument('--out_cross_correlations', default="", type=str, help="Optional path to a file for writing pairwise cross-correlations in matrix format. The column delimiter will be automatically determined based on the file extension (e.g., CSV or TSV)."); parser.add_argument('--out_audio_parents', default="", type=str, help="Optional path to a file for writing the names of the parent directories of the input audios (one row per audio). The column delimiter will be automatically determined based on the file extension (e.g., CSV or TSV)."); parser.add_argument('--out_same_parents', default="", type=str, help="Optional path to a file for writing for each pair of input audios whether they were in the same directory or not (one row per audio pair). The column delimiter will be automatically determined based on the file extension (e.g., CSV or TSV)."); # options for input audios table parser.add_argument('--in_audios_table_has_header', action='store_true', dest="in_audios_table_has_header", default=False, help='Specify that the --in_audios table (if applicable) has a header line listing column names. Only relevant if --in_audios points to a text file.'); parser.add_argument('--interpred_audio_paths', default="relative_to_cd", type=str, choices=["relative_to_cd", "relative_to_list", "names_near_list"], help="How to interpret the file paths listed in --in_audios (if applicable). 'relative_to_cd' means that relative paths are interpreted relative to the current working directory and absolute paths are interpreted as-is. 'relative_to_list' means that relative paths are interpreted relative to the --in_audios file's parent directory and absolute paths are interpreted as-is. 'names_near_list' means that only the provided file names should be considered, with audio clips assumed to be located in the same directory as --in_audios. Only relevant if --in_audios points to a text file."); parser.add_argument('--missing_audios', default="error", type=str, choices=["error", "omit", "blank"], help="How to handle missing audio files, specified in --in_audios. 'omit' means that such audios are omitted from the analysis. 'blank' means that such audios are kept, but their summaries are essentially blank. Only relevant if --in_audios points to a text file."); # technical parameters affecting how the similarities/differences are computed parser.add_argument('--NMFCCs', default=100, type=int, help="Number of MFCCs to compute. (default: %(default)s)"); parser.add_argument('--NMFFTs', default=100, type=int, help="Number of MFFTs to compute. (default: %(default)s)"); parser.add_argument('--Nframes', default=1, type=int, help="number of non-overlapping frames (sliding windows) for which to compute MFCCs & MFFTs, per clip. A larger number of frames implies a smaller window size for FFT, since frames are non-overlapping. (default: %(default)s)"); parser.add_argument('--trim', default=Inf, type=float, help="Optional maximum number of seconds to keep from each audio (counting from the audio's start) when computing MFCCs and MFFTs. This can be used to ensure that every clip generates the same number of features. Does not affect the calculation of other metrics, such as cross-correlations and dominant frequencies. Also see option --min_duration. This may, but does not need to be, equal to --min_duration. Making --trim longer than --min_duration is problematic, since trimming would not ensure that all clips have the same length for calculating MFCCs & MFFTs."); parser.add_argument('--sampling_rate', default=44100, type=int, help="Sampling rate to apply to each loaded audio, regardless of its original sampling rate. Audios with a different sampling rate will be resampled when loaded. (default: %(default)s)"); parser.add_argument('--min_frequency', default=0, type=float, help="Optional minimum frequency (Hz) to consider in each audio. If >0, a low-pass or bandpass will be applied to each audio prior to any analysis."); parser.add_argument('--max_frequency', default=Inf, type=float, help="Optional maximum frequency (Hz) to consider in each audio. If 0) else max(1, os.cpu_count()+args.Nthreads)) if(args.seed<=0): random.seed() args.seed = random.randrange(1,100000) logger.print("Note: Seeding random number generator with seed %d"%(args.seed)) random.seed(args.seed) ################################################ # FIND INPUT AUDIOS if(any((args.in_audios.lower().endswith("."+extension) or args.in_audios.lower().endswith("."+extension+".gz")) for extension in FILE_EXTENSION_TO_COLUMN_DELIMITER.keys())): if(args.verbosity): print("%sReading audio file paths from file.."%(args.verbose_prefix)) audio_files = load_first_column_from_file(filepath=args.in_audios, header=args.in_audios_table_has_header) if(args.verbosity>=2): print("%s Note: Input file specified %d audio files"%(args.verbose_prefix,len(audio_files))) if(args.interpred_audio_paths=="relative_to_cd"): pass elif(args.interpred_audio_paths=="relative_to_list"): list_dir = os.path.dirname(args.in_audios) audio_files = [(filepath if filepath.startswith("/") else os.path.join(list_dir, filepath)) for filepath in audio_files] elif(args.interpred_audio_paths=="names_near_list"): list_dir = os.path.dirname(args.in_audios) audio_files = [os.path.join(list_dir, os.path.basename(filepath)) for filepath in audio_files] # check for missing audio files if(args.missing_audios in ["error","omit"]): if(args.verbosity>=2): print("%s Checking existence of audios at specified paths.."%(args.verbose_prefix)) missings = [f for f,filepath in enumerate(audio_files) if (not os.path.exists(filepath))] if(len(missings)>0): if(args.missing_audios=="error"): abort("%s ERROR: %d out of %d loaded audio file paths do not exist"%(args.verbose_prefix,len(missings),len(audio_files))) else: if(args.verbosity>=2): print("%s Note: %d out of %d loaded audio file paths do not exist. These audios will be omitted"%(args.verbose_prefix,len(missings),len(audio_files))) keep = integer_complement(N=len(audio_files), subset=missings) audio_files = [audio_files[k] for k in keep] else: if(args.verbosity): print("%sSearching for audio files.."%(args.verbose_prefix)) audio_files = glob.glob(args.in_audios, recursive=True) if(args.verbosity>=2): print("%s Note: Found %d audio files"%(args.verbose_prefix,len(audio_files))) if(len(audio_files)==0): abort("%s ERROR: No audio files found"%(args.verbose_prefix)) audio_files = sorted(audio_files) # define names of input audios if(args.audio_naming=="file_basename"): audio_names = [os.path.splitext(os.path.basename(filepath))[0] for filepath in audio_files] elif(args.audio_naming=="parent_and_file_basename"): audio_names = [(os.path.basename(os.path.dirname(filepath))+"."+os.path.splitext(os.path.basename(filepath))[0]) for filepath in audio_files] elif(args.audio_naming=="file_path"): audio_names = audio_files if((args.only_audios!="") or (args.omit_audios!="")): keep = filter_name_list(names = audio_names, only_names = (None if (args.only_audios=="") else args.only_audios.split(",")), omit_names = (None if (args.omit_audios=="") else args.omit_audios.split(",")), case_sensitive = False, allow_wildcards = True) if(len(keep)=2): print("%s Note: Keeping only %d out of %d audios, based on name filters"%(args.verbose_prefix,len(keep),len(audio_names))) audio_files = [audio_files[a] for a in keep] audio_names = [audio_names[a] for a in keep] if((args.only_random_subset>0) and (args.only_random_subset=1): print("%sKeeping only a subset of %d audios, as requested.."%(args.verbose_prefix,args.only_random_subset)) keep = random.sample(range(len(audio_files)), k=args.only_random_subset) audio_files = [audio_files[a] for a in keep] audio_names = [audio_names[a] for a in keep] #################################################### # PROCESS AUDIOS if(args.verbosity>=1): print("%sLoading all %d audio files, resampling at rate %g Hz if needed.."%(args.verbose_prefix,len(audio_files),args.sampling_rate)) audios = [librosa.load(filepath, sr=args.sampling_rate)[0] for filepath in audio_files] if(args.min_duration>0): keep = [w for w,audio in enumerate(audios) if ((len(audio)/args.sampling_rate)>=args.min_duration)] if(len(keep)=2): print("%s Note: Omitting %d out of %d audios that are shorter than %g sec"%(args.verbose_prefix,len(audios)-len(keep),len(audios),args.min_duration)) audios = [audios[w] for w in keep] audio_files = [audio_files[w] for w in keep] audio_names = [audio_names[w] for w in keep] NW = len(audio_files) if((args.min_frequency>0) or numpy.isfinite(args.max_frequency)): if(args.verbosity>=1): print("%sBandpass-filtering all %d audios to within the range [%g, %g] Hz.."%(args.verbose_prefix,NW,args.min_frequency,args.max_frequency)) audios = [filter_frequencies(audio, sampling_rate=args.sampling_rate, min_frequency=args.min_frequency, max_frequency=args.max_frequency) for audio in audios] if(args.normalize_audio): if(args.verbosity>=1): print("%sNormalizing all %d audios to unit RMS.."%(args.verbose_prefix,NW)) audios = [audio/numpy.sqrt(numpy.mean(audio**2)) for audio in audios] # normalize RMS if(args.verbosity>=1): print("%sComputing pairwise similarities/differences between %d audios.."%(args.verbose_prefix,NW)) simdiffs, metrics = get_audio_simdiffs(audios=audios, audio_names=audio_names, verbosity=max(0,args.verbosity-1), verbose_prefix=args.verbose_prefix+" ") if(args.out_simdiffs!=""): if(args.verbosity>=1): print("%sWriting similarities/differences for %d audio pairs to output file.."%(args.verbose_prefix,NW*(NW+1)/2)) check_output_file(file=args.out_simdiffs,force=args.force,verbose=(args.verbosity>=2),verbose_prefix=(args.verbose_prefix+" ")) delimiter = infer_file_delimiter(args.out_simdiffs, default="\t") with open_file(args.out_simdiffs,"wt") as fout: fout.write("# Pairwise similarities & differences between %d audios\n# Generated on: %s\n# Used command: %s\n#\n"%(NW,get_date_time(),get_shell_command())) fout.write("audio1%saudio2%s%s\n"%(delimiter,delimiter,delimiter.join(metrics))) for w1 in range(NW): for w2 in range(w1,NW): fout.write("%s%s%s%s%s\n"%(audio_names[w1],delimiter,audio_names[w2],delimiter,delimiter.join("%.10g"%(simdiffs[w1,w2,m]) for m in range(simdiffs.shape[2])))) if(args.out_audio_parents!=""): if(args.verbosity>=1): print("%sWriting audio parent dirnames to output file.."%(args.verbose_prefix)) check_output_file(file=args.out_audio_parents,force=args.force,verbose=(args.verbosity>=2),verbose_prefix=(args.verbose_prefix+" ")) delimiter = infer_file_delimiter(args.out_audio_parents, default="\t") with open_file(args.out_audio_parents,"wt") as fout: fout.write("# Parent directory names for %d audios\n# Generated on: %s\n# Used command: %s\n#\n"%(NW,get_date_time(),get_shell_command())) fout.write("audio%sdir_name\n"%(delimiter)) for w,filepath in enumerate(audio_files): fout.write(audio_names[w]+delimiter+os.path.basename(os.path.dirname(filepath))+"\n") if(args.out_same_parents!=""): if(args.verbosity>=1): print("%sWriting audio same-parenthoods to output file.."%(args.verbose_prefix)) check_output_file(file=args.out_same_parents,force=args.force,verbose=(args.verbosity>=2),verbose_prefix=(args.verbose_prefix+" ")) delimiter = infer_file_delimiter(args.out_same_parents, default="\t") with open_file(args.out_same_parents,"wt") as fout: fout.write("# Same-parenthood status for %d audios, i.e., specifying for each pair of audios whether they were located in the same parent directory\n# Generated on: %s\n# Used command: %s\n#\n"%(NW,get_date_time(),get_shell_command())) fout.write("audio1%saudio2%ssame_dir\n"%(delimiter,delimiter)) dirpaths = [os.path.dirname(audio_file) for audio_file in audio_files] for w1 in range(NW): for w2 in range(w1,NW): fout.write("%s%s%s%s%s\n"%(audio_names[w1],delimiter,audio_names[w2],delimiter,("yes" if (dirpaths[w1]==dirpaths[w2]) else "no")))