Source code for compiam.melody.pattern.sancara_search.extraction.self_sim

import os
import shutil
import skimage
import matplotlib.pyplot as plt
import numpy as np

from scipy.spatial.distance import pdist, squareform
from scipy.signal import convolve2d

from compiam.melody.pattern.sancara_search.extraction.img import (
    remove_diagonal,
    convolve_array,
    binarize,
    diagonal_gaussian,
    apply_bin_op,
    make_symmetric,
    edges_to_contours,
)
from compiam.utils import create_if_not_exists, run_or_cache

from compiam.melody.pattern.sancara_search.extraction.sequence import (
    convert_seqs_to_timestep,
    remove_below_length,
    add_center_to_mask,
)
from compiam.melody.pattern.sancara_search.extraction.evaluation import (
    evaluate,
    get_coverage,
    get_grouping_accuracy,
)
from compiam.melody.pattern.sancara_search.extraction.visualisation import (
    plot_all_sequences,
    plot_pitch,
    flush_matplotlib,
)
from compiam.melody.pattern.sancara_search.extraction.io import (
    load_sim_matrix,
    write_all_sequence_audio,
    load_yaml,
    load_pkl,
    write_pkl,
)
from compiam.melody.pattern.sancara_search.extraction.pitch import (
    cents_to_pitch,
    pitch_seq_to_cents,
    pitch_to_cents,
    get_timeseries,
    interpolate_below_length,
)
from compiam.melody.pattern.sancara_search.extraction.segments import (
    line_through_points,
    trim_silence,
    break_all_segments,
    remove_short,
    extend_segments,
    join_all_segments,
    extend_groups_to_mask,
    group_segments,
    group_overlapping,
    group_by_distance,
    trim_silence,
)
from compiam.utils import get_logger

logger = get_logger(__name__)


[docs]def self_similarity( features, exclusion_mask=None, timestep=None, hop_length=None, sr=44100 ): """ Compute self similarity matrix between features in <features>. If an <exclusion_mask> is passed. Regions corresponding to that mask will be excluded from the computation and the returned matrix will correspond only to those regions marked as 0 in the mask. :param features: array of features extracted from audio :type features: np.ndarray :param exclusion_mask: array of 0 and 1, should be masked or not? [Optional] :type exclusion_mask: np.ndarray or None :param timestep: time in seconds between elements of <exclusion_mask> Only required if <exclusion_mask> is passed :type timestep: float or None :param hop_length: number of audio frames corresponding to one element in <features> Only required if <exclusion_mask> is passed :type hop_length: int or None :param sr: sampling rate of audio corresponding to <features> Only required if <exclusion_mask> is passed :type sr: int or None :returns: if exclusion mask is passed return... matrix - self similarity matrix orig_sparse_lookup - dict of {index in orig array: index of same element in sparse array} sparse_orig_lookup - dict of {index in sparse array: index of same element in orig array} boundaries_orig - list of boundaries between wanted and unwanted regions in orig array boundaries_sparse - list of boundaries between formally separated wanted regions in sparse array else return matrix - self similarity matrix :rtype: (np.ndarray, dict, dict, list, list) or np.ndarray """ em = not (exclusion_mask is None) if em: assert all( [not timestep is None, not hop_length is None, not sr is None] ), "To use exclusion mask, <timestep>, <hop_length> and <sr> must also be passed" # Deal with masking if any if em: features_mask = convert_mask(features, exclusion_mask, timestep, hop_length, sr) ( orig_sparse_lookup, sparse_orig_lookup, boundaries_orig, boundaries_sparse, ) = get_conversion_mappings(features_mask) else: orig_sparse_lookup = None sparse_orig_lookup = None boundaries_orig = None boundaries_sparse = None # Indices we want to keep good_ix = np.where(features_mask == 0)[0] # Compute self similarity sparse_features = features[good_ix] matrix = create_ss_matrix(sparse_features) # Normalise self similarity matrix matrix_norm = normalise_self_sim(matrix) if em: return ( matrix_norm, orig_sparse_lookup, sparse_orig_lookup, boundaries_orig, boundaries_sparse, ) else: return matrix_norm
[docs]def convert_mask(arr, mask, timestep, hop_length, sr): """ Get mask of excluded regions in the same dimension as array, <arr> :param arr: array corresponding to features extracted from audio :type arr: np.ndarray :param mask: Mask indicating whether element should be excluded (different dimensions to <arr>) :type mask: np.ndarray :param timestep: time in seconds between each element in <mask> :type timestep: float :param hop_length: how many frames of audio correspond to each element in <arr> :type hop_length: int :param sr: sampling rate of audio from which <arr> was computed :type sr: int :returns: array of mask values equal in length to one dimension of <arr> - 0/1 is masked? :rtype: np.ndarray """ # get mask of silent and stable regions new_mask = [] for i in range(arr.shape[0]): # what is the time at this element of arr? t = (i + 1) * hop_length / sr # find index in mask ix = round(t / timestep) # append mask value for this point new_mask.append(mask[ix]) return np.array(new_mask)
[docs]def get_conversion_mappings(mask): """ Before reducing an array to only include elements that do not correspond to <mask>. We want to record the relationship between the new (sparse) array index and the old (orig) array. :param mask: mask of 0/1 - is element to be excluded :param type: np.ndarray :returns: orig_sparse_lookup - dict of {index in orig array: index of same element in sparse array} sparse_orig_lookup - dict of {index in sparse array: index of same element in orig array} boundaries_orig - list of boundaries between wanted and unwanted regions in orig array boundaries_sparse - list of boundaries between formally separated wanted regions in sparse array :rtype: (dict, dict, list, list) """ # Indices we want to keep good_ix = np.where(mask == 0)[0] # Mapping between old and new indices orig_sparse_lookup = {g: s for s, g in enumerate(good_ix)} sparse_orig_lookup = {s: g for g, s in orig_sparse_lookup.items()} # Indices corresponding to boundaries # between wanted and unwanted regions # in original array boundaries_orig = [] for i in range(1, len(mask)): curr = mask[i] prev = mask[i - 1] if curr == 0 and prev == 1: boundaries_orig.append(i) elif curr == 1 and prev == 0: boundaries_orig.append(i - 1) # Boundaries corresponding to newly joined # regions in sparse array boundaries_sparse = np.array([orig_sparse_lookup[i] for i in boundaries_orig]) # Boundaries contain two consecutive boundaries for each gap # but not if the excluded region leads to the end of the track red_boundaries_sparse = [] boundaries_mask = [0] * len(boundaries_sparse) for i in range(len(boundaries_sparse)): if i == 0: red_boundaries_sparse.append(boundaries_sparse[i]) boundaries_mask[i] = 1 if boundaries_mask[i] == 1: continue curr = boundaries_sparse[i] prev = boundaries_sparse[i - 1] if curr - prev == 1: red_boundaries_sparse.append(prev) boundaries_mask[i] = 1 boundaries_mask[i - 1] = 1 else: red_boundaries_sparse.append(curr) boundaries_mask[i] = 1 boundaries_sparse = np.array(sorted(list(set(red_boundaries_sparse)))) return orig_sparse_lookup, sparse_orig_lookup, boundaries_orig, boundaries_sparse
[docs]def create_ss_matrix(feats, mode="cosine"): """ Compute self similarity matrix between features in <feats> using distance measure, <mode> :param feats: array of features :type feats: np.ndarray :param mode: name of distance measure (recognised by scipy.spatial.distance) :type mode: str :returns: self similarity matrix :rtype: np.ndarray """ matrix = squareform(pdist(np.vstack(feats.detach().numpy()), metric=mode)) return matrix
[docs]def normalise_self_sim(matrix): """ Normalise self similarity matrix: invert and convolve :param matrix: self similarity matrix :type matrix: np.ndarray :returns: matrix normalized, same dimensions :rtype: np.ndarray """ matrix = 1 / (matrix + 1e-6) for k in range(-8, 9): eye = 1 - np.eye(*matrix.shape, k=k) matrix = matrix * eye flength = 10 ey = np.eye(flength) + np.eye(flength, k=1) + np.eye(flength, k=-1) matrix = convolve2d(matrix, ey, mode="same") diag_mask = np.ones(matrix.shape) diag_mask = (diag_mask - np.diag(np.ones(matrix.shape[0]))).astype(np.bool) mat_min = np.min(matrix[diag_mask]) mat_max = np.max(matrix[diag_mask]) matrix[~diag_mask] = 0 matrix = zero_normalise(matrix) return matrix
def zero_normalise(matrix): matrix = matrix - matrix.min() matrix = matrix / (matrix.max() + 1e-8) return matrix
[docs]def get_report_paths(out_dir): """ Get dictionary of fielpaths relevant to progress plots in extract_segments() :params out_dir: directory path to save plots in :type out_dir: str :returns: dict of filepaths :rtype: dict """ sim = os.path.join(out_dir, "1_simsave.png") if out_dir else None conv = os.path.join(out_dir, "2_conv.png") if out_dir else None binar = os.path.join(out_dir, "3_binary.png") if out_dir else None diag = os.path.join(out_dir, "4_diag.png") if out_dir else None gauss = os.path.join(out_dir, "5_gauss.png") if out_dir else None cont = os.path.join(out_dir, "6_cont.png") if out_dir else None close = os.path.join(out_dir, "6_close.png") if out_dir else None binop = os.path.join(out_dir, "7_binop.png") if out_dir else None return { "sim": sim, "conv": conv, "binar": binar, "diag": diag, "gauss": gauss, "cont": cont, "close": close, "binop": binop, }
[docs]def save_matrix(X, filepath): """ if <filepath>, save <X> at <filepath> :param X: matrix to save :type X: np.ndarray :param filepath: filepath :type filepath: str or None """ if filepath: create_if_not_exists(filepath) skimage.io.imsave(filepath, X)
[docs]def get_param_hash_filepath(out_dir, *params): """ Build filepath by creating string of input <params> in <out_dir> :params out_dir: directory path :type out_dir: str :params params: arguments, any type :type params: arguments, any type :returns: filepath unique to input params in <out_dir> :rtype: str """ if out_dir is None: return None param_hash = str(params) return os.path.join(out_dir, f"{param_hash}.pkl")
[docs]def sparse_to_original(all_segments, boundaries_sparse, lookup): """ Convert indices corresponding to segments in <all_segments> to their non-sparse form using mapping in <lookup> :param all_segments: list of segments, [(x0,y0),(x1,y1),...] :type all_segments: list :param boundaries_sparse: list indices in sparse array corresponding to splits in original array :type boundaries_sparse: list :param lookup: dict of sparse_index:non-sparse index :type lookup: dict :returns: <all_segments> with indices replaced according to lookup :rtype: list """ boundaries_sparse = [x for x in boundaries_sparse if x != 0] all_segments_scaled_x = [] for seg in all_segments: ((x0, y0), (x1, y1)) = seg get_x, get_y = line_through_points(x0, y0, x1, y1) boundaries_in_x = sorted([i for i in boundaries_sparse if i >= x0 and i <= x1]) current_x0 = x0 if boundaries_in_x: for b in boundaries_in_x: x0_ = current_x0 x1_ = b - 1 y0_ = int(get_y(x0_)) y1_ = int(get_y(x1_)) all_segments_scaled_x.append(((x0_, y0_), (x1_, y1_))) current_x0 = b + 1 if current_x0 > x1: x0_ = current_x0 x1_ = x1 y0_ = int(get_y(x0_)) y1_ = int(get_y(x1_)) all_segments_scaled_x.append(((x0_, y0_), (x1_, y1_))) else: all_segments_scaled_x.append(((x0, y0), (x1, y1))) all_segments_scaled_x_reduced = remove_short(all_segments_scaled_x, 1) all_segments_scaled = [] for seg in all_segments_scaled_x_reduced: ((x0, y0), (x1, y1)) = seg get_x, get_y = line_through_points(x0, y0, x1, y1) boundaries_in_y = sorted([i for i in boundaries_sparse if i >= y0 and i <= y1]) current_y0 = y0 if boundaries_in_y: for b in boundaries_in_y: y0_ = current_y0 y1_ = b - 1 x0_ = int(get_x(y0_)) x1_ = int(get_x(y1_)) all_segments_scaled.append(((x0_, y0_), (x1_, y1_))) current_y0 = b + 1 if current_y0 < y1: y0_ = current_y0 y1_ = y1 x0_ = int(get_x(y0_)) x1_ = int(get_x(y1_)) all_segments_scaled.append(((x0_, y0_), (x1_, y1_))) else: all_segments_scaled.append(((x0, y0), (x1, y1))) all_segments_scaled_reduced = remove_short(all_segments_scaled, 1) all_segments_converted = [] de = 0 for i, seg in enumerate(all_segments_scaled_reduced): ((x0, y0), (x1, y1)) = seg while ( (x0 in boundaries_sparse) or (x1 in boundaries_sparse) or (y0 in boundaries_sparse) or (y1 in boundaries_sparse) ): if x0 in boundaries_sparse: get_x, get_y = line_through_points(x0, y0, x1, y1) x0 = x0 + 1 x1 = x1 y0 = round(get_y(x0)) y1 = round(get_y(x1)) if x1 in boundaries_sparse: get_x, get_y = line_through_points(x0, y0, x1, y1) x0 = x0 x1 = x1 - 1 y0 = round(get_y(x0)) y1 = round(get_y(x1)) if y0 in boundaries_sparse: get_x, get_y = line_through_points(x0, y0, x1, y1) y0 = y0 + 1 y1 = y1 x0 = round(get_x(y0)) x1 = round(get_x(y1)) if y1 in boundaries_sparse: get_x, get_y = line_through_points(x0, y0, x1, y1) y0 = y0 y1 = y1 - 1 x0 = round(get_x(y0)) x1 = round(get_x(y1)) x0_ = lookup[x0 + de] y0_ = lookup[y0 + de] x1_ = lookup[x1 + de] y1_ = lookup[y1 + de] all_segments_converted.append(((x0_, y0_), (x1_, y1_))) return all_segments_converted
def zero_norm_matrix(X): X = X - X.min() X /= X.max() + 1e-8 return X
[docs]class segmentExtractor: """ Manipulate and extract segments from self similarity matrix """ def __init__(self, X, window_size, sr=44100, cache_dir=None): self.X = X self.shape = X.shape self.window_size = window_size self.sr = sr self.cache_dir = cache_dir # initialise arrays self.X_conv = None self.X_bin = None self.X_diag = None self.X_gauss = None self.X_cont = None self.X_sym = None self.X_fill = None self.X_binop = None self.X_proc = None # Initialize status self.emphasized = False self.extracted = False # cache paths self._cache_base = os.path.join(cache_dir, "{0}", "") if cache_dir else None self._segment_convolve_cache = ( self._cache_base.format("convolve") if cache_dir else None ) self._segment_cache = self._cache_base.format("segments") if cache_dir else None self._segment_ext_cache = ( self._cache_base.format("segments_extended") if cache_dir else None ) self._segment_join_cache = ( self._cache_base.format("segments_joined") if cache_dir else None ) self._segment_group_cache = ( self._cache_base.format("segments_groups") if cache_dir else None ) self._segment_group_overlap_cache = ( self._cache_base.format("segment_overlap") if cache_dir else None )
[docs] def emphasize_diagonals( self, bin_thresh=0.025, gauss_sigma=None, cont_thresh=None, etc_kernel_size=10, binop_dim=3, image_report=False, verbose=False, ): """ From self similarity matrix, self.X. Emphasize diagonals using a series of image processing steps. :param bin_thresh: Threshold for binarization of self similarity array. Values below this threshold are set to 0 (not significant), those above or equal too are set to 1. Very important parameter :type bin_thresh: float :param gauss_sigma: If not None, sigma for diagonal gaussian blur to apply to matrix :type gauss_sigma: float or None :param cont_thresh: Only applicable if <gauss_sigma>. This binary threshold isreapplied after gaussian blur to ensure matrix of 0 and 1. if None, equal to <bin_thresh> :type cont_thresh: float or None :param etc_kernel_size: Kernel size for morphological closing :type etc_kernel_size: int :param binop_dim: square dimension of binary opening structure (square matrix of zeros with 1 across the diagonal) :type binop_dim: int :param image_report: str corresponding to folder to save progress images in. :type image_report: None :param verbose: Display progress :type verbose: bool :returns: list of segments in the form [((x0,y0),(x1,y1)),..] :rtype: list """ self.bin_thresh = bin_thresh self.gauss_sigma = gauss_sigma self.etc_kernel_size = etc_kernel_size self.binop_dim = binop_dim self.image_report = image_report self.report_fns = get_report_paths(image_report) # Save original self similarity matrix save_matrix(self.X, self.report_fns["sim"]) #################### ## Convert params ## #################### self.cont_thresh = self.bin_thresh if not cont_thresh else cont_thresh ######################### ## Emphasize Diagonals ## ######################### if verbose: logger.info("Convolving similarity matrix") self.conv_path = get_param_hash_filepath( self._segment_convolve_cache, ) self.X_conv = run_or_cache(convolve_array, [self.X], self.conv_path) self.X_conv = zero_norm_matrix(self.X_conv) save_matrix(self.X_conv, self.report_fns["conv"]) if verbose: logger.info("Binarizing convolved array") self.X_bin = binarize(self.X_conv, self.bin_thresh) save_matrix(self.X_bin, self.report_fns["binar"]) if verbose: logger.info("Removing diagonal") self.X_diag = remove_diagonal(self.X_bin) save_matrix(self.X_diag, self.report_fns["diag"]) if self.gauss_sigma: if verbose: logger.info("Applying diagonal gaussian filter") self.X_gauss = diagonal_gaussian(self.X_diag, self.gauss_sigma) save_matrix(self.X_gauss, self.report_fns["gauss"]) if verbose: logger.info("Binarize gaussian blurred similarity matrix") self.X_cont = binarize(self.X_gauss, self.cont_thresh) save_matrix(self.X_cont, self.report_fns["cont"]) else: self.X_gauss = self.X_diag self.X_cont = self.X_gauss if verbose: logger.info("Ensuring symmetry between upper and lower triangle in array") self.X_sym = make_symmetric(self.X_cont) if verbose: logger.info("Identifying and isolating regions between edges") self.X_fill = edges_to_contours(self.X_sym, self.etc_kernel_size) save_matrix(self.X_fill, self.report_fns["close"]) if verbose: logger.info( "Cleaning isolated non-directional regions using morphological opening" ) self.X_binop = apply_bin_op(self.X_fill, self.binop_dim) if verbose: logger.info("Ensuring symmetry between upper and lower triangle in array") self.X_proc = make_symmetric(self.X_binop) save_matrix(self.X_proc, self.report_fns["binop"]) self.emphasized = True return self.X_proc
[docs] def extract_segments( self, etc_kernel_size=10, binop_dim=3, perc_tail=0.5, bin_thresh_segment=None, min_diff_trav=0.5, min_pattern_length_seconds=2, boundaries=None, lookup=None, break_mask=None, timestep=None, verbose=False, ): """ From self similarity matrix, <self.X_proc>. Return list of segments, each corresponding to two regions of the input axis. :param etc_kernel_size: Kernel size for morphological closing :type etc_kernel_size: int :param binop_dim: square dimension of binary opening structure (square matrix of zeros with 1 across the diagonal) :type binop_dim: int :param perc_tail: Percentage either size of a segment along its trajectory considered for lower threshold for significance :type perc_tail: int :param bin_thresh_segment: Reduced <bin_thresh> threshold for areas neighbouring identified segments. If None, use 0.5*<bin_thresh> :type bin_thresh_segment: float :param min_diff_trav: Min time difference in seconds between two segments for them to be joined to one. :type min_diff_trav: float :param min_pattern_length_seconds: Minimum length of any returned pattern in seconds :type min_pattern_length_seconds: float :param boundaries: list of boundaries in <X> corresponding to breaks due to sparsity :type boundaries: list or None :param lookup: Lookup of sparse index (in X): non-sparse index :type lookup: dict :param break_mask: any segment that traverses a non-zero element in <break_mask> is broken into two according to this non-zero value :type break_mask: array :param timestep: Time in seconds between each element in <break_mask> :type timestep: float or None :param verbose: Display progress :type verbose: bool :returns: list of segments in the form [((x0,y0),(x1,y1)),..] :rtype: list """ ############ ## Checks ## ############ if not self.emphasized: raise Exception( "Please run self.emphasize_diagonals before attempting to extract segments." ) if break_mask is not None: assert ( timestep is not None ), "If <break_mask> is passed, timestep too must be specified" if boundaries is not None: assert ( lookup is not None ), "If <boundaries> is passed, lookup too must be specified" ############ ## Params ## ############ self.min_diff_trav = min_diff_trav # in terms of elements matrix elements self.min_length_cqt = min_pattern_length_seconds * self.sr / self.window_size # translate min_diff_trav to corresponding diagonal distance self.min_diff_trav_hyp = (2 * min_diff_trav**2) ** 0.5 self.min_diff_trav_seq = self.min_diff_trav_hyp * self.sr / self.window_size self.bin_thresh_segment = ( self.bin_thresh * 0.5 if not bin_thresh_segment else bin_thresh_segment ) self.perc_tail = perc_tail self.min_pattern_length_seconds = min_pattern_length_seconds self.boundaries = boundaries self.lookup = lookup self.break_mask = break_mask self.timestep = timestep ###################### ## Extract segments ## ###################### if verbose: logger.info("Extracting segments") self.seg_path = get_param_hash_filepath( self._segment_cache, self.bin_thresh, self.gauss_sigma, self.cont_thresh, self.etc_kernel_size, self.binop_dim, ) self.all_segments = run_or_cache( segments_from_matrix, [self.X_bin], self.seg_path ) if verbose: logger.info("Extending Segments") self.seg_ext_path = get_param_hash_filepath( self._segment_ext_cache, self.bin_thresh, self.gauss_sigma, self.cont_thresh, self.etc_kernel_size, self.binop_dim, self.perc_tail, self.bin_thresh_segment, ) args = [ self.all_segments, self.X_sym, self.X_conv, self.perc_tail, self.bin_thresh_segment, ] self.all_segments_extended = run_or_cache( extend_segments, args, self.seg_ext_path ) if verbose: logger.info(f" {len(self.all_segments_extended)} extended segments...") self.all_segments_extended_reduced = remove_short(self.all_segments_extended, 1) if verbose: logger.info("Converting sparse segment indices to original") if not self.boundaries is None: self.all_segments_converted = sparse_to_original( self.all_segments_extended_reduced, self.boundaries, self.lookup ) else: self.all_segments_converted = self.all_segments_extended_reduced if verbose: logger.info("Joining segments that are sufficiently close") self.seg_join_path = get_param_hash_filepath( self._segment_join_cache, self.bin_thresh, self.gauss_sigma, self.cont_thresh, self.etc_kernel_size, self.binop_dim, self.perc_tail, self.bin_thresh_segment, self.min_diff_trav_seq, ) args = [self.all_segments_converted, self.min_diff_trav_seq] self.all_segments_joined = run_or_cache( join_all_segments, [self.all_segments_converted, self.min_diff_trav_seq], self.seg_join_path, ) if verbose: logger.info(f" {len(self.all_segments_joined)} joined segments...") if verbose: logger.info("Breaking segments with silent/stable regions") if not self.break_mask is None: self.all_broken_segments = break_all_segments( self.all_segments_joined, self.break_mask, self.window_size, self.sr, self.timestep, ) else: self.all_broken_segments = self.all_segments_joined if verbose: logger.info(f" {len(self.all_broken_segments)} broken segments...") if verbose: logger.info("Reducing Segments") self.all_segments_reduced = remove_short( self.all_broken_segments, self.min_length_cqt ) if verbose: logger.info( f" {len(self.all_segments_reduced)} segments above minimum length of {self.min_pattern_length_seconds}s..." ) self.extracted = True return self.all_segments_reduced
def group_segments( self, all_segments, break_mask, pitch, ext_mask_tol=0.5, match_tol=1, dupl_perc_overlap_inter=0.9, dupl_perc_overlap_intra=0.55, group_len_var=1.0, n_dtw=10, thresh_dtw=10, thresh_cos=None, min_pattern_length_seconds=2, min_in_group=2, verbose=False, ): ############ ## Params ## ############ self.pitch = pitch break_mask = break_mask self.ext_mask_tol = ext_mask_tol self.match_tol = match_tol self.dupl_perc_overlap_inter = dupl_perc_overlap_inter self.dupl_perc_overlap_intra = dupl_perc_overlap_intra self.group_len_var = group_len_var self.n_dtw = n_dtw self.thresh_dtw = thresh_dtw self.thresh_cos = thresh_cos self.min_pattern_length_seconds = min_pattern_length_seconds self.min_in_group = min_in_group if verbose: logger.info("Identifying Segment Groups") self.group_path = get_param_hash_filepath( self._segment_group_cache, self.bin_thresh, self.gauss_sigma, self.cont_thresh, self.etc_kernel_size, self.binop_dim, self.perc_tail, self.bin_thresh_segment, self.min_diff_trav_seq, self.min_length_cqt, self.match_tol, ) args = [ all_segments, self.min_length_cqt, self.match_tol, break_mask, self.window_size, self.timestep, self.sr, self.pitch, ] all_groups = run_or_cache(group_segments, args, self.group_path) if verbose: logger.info("Extending segments to silence/stability") all_groups_ext = extend_groups_to_mask( all_groups, break_mask, self.window_size, self.sr, self.timestep, toler=self.ext_mask_tol, ) if verbose: logger.info("Trimming Silence") all_groups_sil = trim_silence( all_groups_ext, self.pitch, self.window_size, self.sr, self.timestep ) all_groups_sil = [[(i, j) for i, j in x if j > i] for x in all_groups_sil] all_groups_sil = [ remove_group_duplicates(g, self.dupl_perc_overlap_intra) for g in all_groups_sil ] if verbose: logger.info("Identifying Segment Groups") self.segment_overlap_path = get_param_hash_filepath( self._segment_group_overlap_cache, self.bin_thresh, self.gauss_sigma, self.cont_thresh, self.etc_kernel_size, self.binop_dim, self.perc_tail, self.bin_thresh_segment, self.min_diff_trav_seq, self.min_length_cqt, self.match_tol, self.dupl_perc_overlap_inter, self.group_len_var, ) all_groups = run_or_cache( group_overlapping, [all_groups_sil, self.dupl_perc_overlap_inter, self.group_len_var], self.segment_overlap_path, ) if self.thresh_dtw: if verbose: logger.info("Joining geometrically close groups using pitch tracks") all_groups_dtw = group_by_distance( all_groups, self.pitch, self.n_dtw, self.thresh_dtw, self.thresh_cos, self.group_len_var, self.window_size, self.sr, self.timestep, ) if verbose: logger.info(f" {len(all_groups_dtw)} groups after join...") else: all_groups_dtw = all_groups # all_groups_over = group_overlapping(all_groups_dtw, 0.1, group_len_var) all_groups_rgd = [ remove_group_duplicates(g, self.dupl_perc_overlap_intra) for g in all_groups_dtw ] if verbose: logger.info("Grouping overlapping") all_groups_dov = group_overlapping( all_groups_rgd, self.dupl_perc_overlap_inter, self.group_len_var ) if verbose: logger.info(f" {len(all_groups_dov)} groups after join...") if verbose: logger.info("Extending to mask") all_groups_extdov = extend_groups_to_mask( all_groups_dov, break_mask, self.window_size, self.sr, self.timestep, toler=self.ext_mask_tol, ) if verbose: logger.info("Trimming Silence") all_groups_ts = trim_silence( all_groups_extdov, self.pitch, self.window_size, self.sr, self.timestep ) all_groups_final = [ remove_group_duplicates(g, self.dupl_perc_overlap_intra) for g in all_groups_ts ] if verbose: logger.info("Convert sequences to pitch track timesteps") starts_seq, lengths_seq = convert_seqs_to_timestep( all_groups_final, self.window_size, self.sr, self.timestep ) if verbose: logger.info("Applying exclusion functions") starts_seq_exc, lengths_seq_exc = remove_below_length( starts_seq, lengths_seq, self.timestep, self.min_pattern_length_seconds ) starts = [p for p in starts_seq_exc if len(p) >= self.min_in_group] lengths = [p for p in lengths_seq_exc if len(p) >= self.min_in_group] starts_sec = [[x * self.timestep for x in p] for p in starts] lengths_sec = [[x * self.timestep for x in l] for l in lengths] return starts, lengths def display_matrix(self, X, title=None, title_size=9, figsize=(3, 3)): if not isinstance(X, np.ndarray): raise ValueError("X must be a 2d numpy array") fig, ax = plt.subplots(figsize=figsize) if title: plt.title(title, fontsize=title_size) ax.imshow(X, interpolation="nearest") plt.axis("off") plt.tight_layout() plt.show() def display_all_matrices(self, title_size=9, figsize=(3, 3)): if not self.emphasized: raise Exception( "Please run self.emphasize_diagonals before attempting to extract segments." ) self.display_matrix( self.X, "Self Similarity", title_size=title_size, figsize=figsize ) self.display_matrix( self.X_conv, "Convolved", title_size=title_size, figsize=figsize ) self.display_matrix( self.X_diag, f"Binarized (threshold={self.bin_thresh})", title_size=title_size, figsize=figsize, ) if self.gauss_sigma: self.display_matrix( self.X_gauss, f"Diagonal gaussian blur (sigma={self.gauss_sigma})", title_size=title_size, figsize=figsize, ) self.display_matrix( self.X_cont, f"Gaussian binarized (threshold={self.bin_thresh_segment})", title_size=title_size, figsize=figsize, ) self.display_matrix( self.X_fill, f"Morphological opening (kernel size={self.etc_kernel_size})", title_size=title_size, figsize=figsize, ) self.display_matrix( self.X_binop, f"Morphological closing (square dimension={self.binop_dim}", title_size=title_size, figsize=figsize, ) self.display_matrix( self.X_proc, "Final Matrix", title_size=title_size, figsize=figsize ) def print_steps(self): logger.info("Current Status") logger.info("--------------") logger.info(f"Input matrix of shape: {self.shape}") logger.info(f"Windows size: {self.window_size}") logger.info(f"Sampling rate of original audio: {self.sr}\n") logger.info("Segment extraction") logger.info("------------------") if self.emphasized: logger.info("Convolved matrix available at self.X_conv") logger.info( f"Binarized matrix available at self.X_bin (threshold={self.bin_thresh})" ) if self.gauss_sigma: logger.info( f"Gaussian smoothed matrix available at self.X_gauss (sigma={self.gauss_sigma})" ) else: logger.info(f"No gaussian smoothing was applied") logger.info( f"Morphologically closed matrix available at self.X_fill (kernel size={self.etc_kernel_size})" ) logger.info( f"Morphologically opened matrix available at self.X_binop (square dimension of binary opening structure={self.binop_dim})" ) logger.info( "Final matrix after all steps applied, available at self.X_proc" ) else: logger.info( "No processes have been applied to the input matrix (see self.emphasize_diagonals)" ) if self.extracted: logger.info("") else: logger.info("No segments have been extracted (see self.extract_segments)") def cache_paths(self): return { "convolved": self.conv_path, "extracted_segments": self.seg_path, "extended_segments": self.seg_ext_path, "joined_segments": self.seg_join_path, } def clear_cache(self): shutil.rmtree(self.cache_dir) def __repr__(self): return f"segmentExtractor(X={self.shape}, window_size={self.window_size}, sr={self.sr}, cache_dir={self.cache_dir})"