Source code for compiam.structure.segmentation.dhrupad_bandish_segmentation

import os
import glob
import gdown
import librosa
import zipfile

import numpy as np
import matplotlib.pyplot as plt
from compiam.exceptions import ModelNotTrainedError

from compiam.data import WORKDIR
from compiam.utils import get_logger
from compiam.utils.download import download_remote_model

logger = get_logger(__name__)


[docs]class DhrupadBandishSegmentation: """Dhrupad Bandish Segmentation""" def __init__( self, mode="net", fold=0, model_path=None, splits_path=None, annotations_path=None, features_path=None, original_audios_path=None, processed_audios_path=None, download_link=None, download_checksum=None, device=None, ): """Dhrupad Bandish Segmentation init method. :param mode: net, voc, or pakh. That indicates the source for s.t.m. estimation. Use the net mode if audio is a mixture signal, else use voc or pakh for clean/source-separated vocals or pakhawaj tracks. :param fold: 0, 1 or 2, it is the validation fold to use during training. :param model_path: path to file to the model weights. :param splits_path: path to file to audio splits. :param annotations_path: path to file to the annotations. :param features_path: path to file to the computed features. :param original_audios_path: path to file to the original audios from the dataset (see README.md in compIAM/models/structure/dhrupad_bandish_segmentation/audio_original) :param processed_audios_path: path to file to the processed audio files. :param download_link: link to the remote pre-trained model. :param download_checksum: checksum of the model file. :param device: indicate whether the model will run on the GPU. """ ### IMPORTING OPTIONAL DEPENDENCIES try: global torch import torch global split_audios from compiam.structure.segmentation.dhrupad_bandish_segmentation.audio_processing import ( split_audios, ) global extract_features, makechunks from compiam.structure.segmentation.dhrupad_bandish_segmentation.feature_extraction import ( extract_features, makechunks, ) global class_to_categorical, categorical_to_class, build_model, smooth_boundaries from compiam.structure.segmentation.dhrupad_bandish_segmentation.model_utils import ( class_to_categorical, categorical_to_class, build_model, smooth_boundaries, ) global pars import compiam.structure.segmentation.dhrupad_bandish_segmentation.params as pars except: raise ImportError( "In order to use this tool you need to have torch installed. " "Please install torch using: pip install torch" ) ### if not device: self.device = "cuda" if torch.cuda.is_available() else "cpu" # Load mode by default: update with self.update_mode() self.mode = mode # Load fold by default: update with self.update_fold() self.fold = fold self.classes = pars.classes_dict[self.mode] # To prevent CUDNN_STATUS_NOT_INITIALIZED error in case of incompatible GPU try: self.model = self._build_model() except: self.device = "cpu" self.model = self._build_model() self.model_path = model_path self.download_link = download_link self.download_checksum = download_checksum self.loaded_model_path = None self.trained = False if self.model_path is not None: path_to_model = os.path.join( self.model_path[self.mode], "saved_model_fold_" + str(self.fold) + ".pt" ) self.load_model(path_to_model) # Loading pre-trained model for given mode self.splits_path = ( splits_path if splits_path is not None else os.path.join( WORKDIR, "models", "structure", "dhrupad_bandish_segmentation", "splits" ) ) self.annotations_path = ( annotations_path if annotations_path is not None else os.path.join( WORKDIR, "models", "structure", "dhrupad_bandish_segmentation", "annotations", ) ) self.features_path = ( features_path if features_path is not None else os.path.join( WORKDIR, "models", "structure", "dhrupad_bandish_segmentation", "features", ) ) self.original_audios_path = ( original_audios_path if original_audios_path is not None else os.path.join( WORKDIR, "models", "structure", "dhrupad_bandish_segmentation", "audio_original", ) ) self.processed_audios_path = ( processed_audios_path if processed_audios_path is not None else os.path.join( WORKDIR, "models", "structure", "dhrupad_bandish_segmentation", "audio_sections", ) ) def _build_model(self): """Building non-trained model""" return ( build_model(pars.input_height, pars.input_len, len(self.classes)) .float() .to(self.device) )
[docs] def load_model(self, model_path): """Loading weights for model, given self.mode and self.fold :param model_path: path to model weights """ if not os.path.exists(model_path): self.download_model(model_path) self.model = self._build_model() self.model.load_state_dict( torch.load(model_path, weights_only=True, map_location=self.device) ) self.model.eval() self.loaded_model_path = model_path self.trained = True
[docs] def download_model(self, model_path=None, force_overwrite=False): """Download pre-trained model.""" print("modelpathhh", model_path) download_path = ( os.sep + os.path.join(*model_path.split(os.sep)[:-4]) if model_path is not None else os.path.join( WORKDIR, "models", "structure", "dhrupad_bandish_segmentation" ) ) # Creating model folder to store the weights if not os.path.exists(download_path): os.makedirs(download_path) download_remote_model( self.download_link, self.download_checksum, download_path, force_overwrite=force_overwrite, )
[docs] def update_mode(self, mode): """Update mode for the training and sampling. Mode is one of net, voc, pakh, indicating the source for s.t.m. estimation. Use the net mode if audio is a mixture signal, else use voc or pakh for clean/source-separated vocals or pakhawaj tracks. :param mode: new mode to use """ self.mode = mode self.classes = pars.classes_dict[mode] path_to_model = os.path.join( self.model_path[self.mode], "saved_model_fold_" + str(self.fold) + ".pt" ) self.load_model(path_to_model)
[docs] def update_fold(self, fold): """Update data fold for the training and sampling :param fold: new fold to use """ self.fold = fold path_to_model = os.path.join( self.model_path[self.mode], "saved_model_fold_" + str(self.fold) + ".pt" ) self.load_model(path_to_model)
[docs] def train(self, verbose=True): """Train the Dhrupad Bandish Segmentation model :param verbose: showing details of the model """ logger.info("Splitting audios...") split_audios( save_dir=self.processed_audios_path, annotations_path=self.annotations_path, audios_path=self.original_audios_path, ) logger.info("Extracting features...") extract_features( self.processed_audios_path, self.annotations_path, self.features_path, self.mode, ) # generate cross-validation folds for training songlist = os.listdir(self.features_path) labels_stm = np.load( os.path.join(self.features_path, "labels_stm.npy"), allow_pickle=True ).item() partition = {"train": [], "validation": []} n_folds = 3 all_folds = [] for i_fold in range(n_folds): all_folds.append( np.loadtxt( os.path.join( self.splits_path, self.mode, "fold_" + i_fold + ".csv" ), delimiter=",", dtype=str, ) ) val_fold = all_folds[self.fold] train_fold = np.array([]) for i_fold in np.delete(np.arange(0, n_folds), self.fold): if len(train_fold) == 0: train_fold = all_folds[i_fold] else: train_fold = np.append(train_fold, all_folds[i_fold]) for song in songlist: try: ids = glob.glob(os.path.join(self.features_path + song, "*.pt")) except: continue section_name = "_".join(song.split("_")[0:4]) if section_name in val_fold: partition["validation"].extend(ids) elif section_name in train_fold: partition["train"].extend(ids) # generators training_set = torch.utils.data.Dataset( self.features_path, partition["train"], labels_stm ) training_generator = torch.utils.data.data.DataLoader(training_set, **pars) validation_set = torch.utils.data.Dataset( self.features_path, partition["validation"], labels_stm ) validation_generator = torch.utils.data.DataLoader(validation_set, **pars) # model definition and training criterion = torch.nn.CrossEntropyLoss(reduction="mean") optimizer = torch.optim.Adam(self.parameters(), lr=0.0001) if verbose: logger.info(self.model) n_params = 0 for param in self.model.parameters(): n_params += torch.prod(torch.tensor(param.shape)) logger.info("Num of trainable params: %d\n" % n_params) ##training epochs loop train_loss_epoch = [] train_acc_epoch = [] val_loss_epoch = [] val_acc_epoch = [] n_idle = 0 if not os.path.exists(os.path.join(self.model_path, self.mode)): os.mkdir(os.path.join(self.model_path, self.mode)) for epoch in range(pars.max_epochs): if n_idle == 50: break train_loss_epoch += [0] train_acc_epoch += [0] val_loss_epoch += [0] val_acc_epoch += [0] n_iter = 0 ##training self.model.train() for local_batch, local_labels, _ in training_generator: # map labels to class ids local_labels = class_to_categorical(local_labels, self.classes) # add channel dimension if len(local_batch.shape) == 3: local_batch = local_batch.unsqueeze(1) # transfer to GPU local_batch, local_labels = local_batch.float().to( self.device ), local_labels.to(self.device) # update weights optimizer.zero_grad() outs = self.model(local_batch).squeeze() loss = criterion(outs, local_labels.long()) loss.backward() optimizer.step() # append loss and acc to arrays train_loss_epoch[-1] += loss.item() acc = ( np.sum( ( np.argmax(outs.detach().cpu().numpy(), 1) == local_labels.detach().cpu().numpy() ) ) / pars.batch_size ) train_acc_epoch[-1] += acc n_iter += 1 train_loss_epoch[-1] /= n_iter train_acc_epoch[-1] /= n_iter n_iter = 0 ##validation self.model.eval() with torch.set_grad_enabled(False): for local_batch, local_labels, _ in validation_generator: # map labels to class ids local_labels = pars.class_to_categorical(local_labels, self.classes) # add channel dimension if len(local_batch.shape) == 3: local_batch = local_batch.unsqueeze(1) # transfer to GPU local_batch, local_labels = local_batch.float().to( self.device ), local_labels.to(self.device) # evaluate model outs = self.model(local_batch).squeeze() loss = criterion(outs, local_labels.long()) # append loss and acc to arrays val_loss_epoch[-1] += loss.item() acc = ( np.sum( ( np.argmax(outs.detach().cpu().numpy(), 1) == local_labels.detach().cpu().numpy() ) ) / pars.batch_size ) val_acc_epoch[-1] += acc n_iter += 1 val_loss_epoch[-1] /= n_iter val_acc_epoch[-1] /= n_iter # save if val_loss reduced if val_loss_epoch[-1] == min(val_loss_epoch): torch.save( self.model.state_dict(), os.path.join( self.model_path, self.mode, "saved_model_fold_%d.pt" % self.fold ), ) n_idle = 0 else: n_idle += 1 # log loss in current epoch logger.info( "Epoch no: %d/%d\tTrain loss: %f\tTrain acc: %f\tVal loss: %f\tVal acc: %f" % ( epoch, pars.max_epochs, train_loss_epoch[-1], train_acc_epoch[-1], val_loss_epoch[-1], val_acc_epoch[-1], ) ) self.trained = True
[docs] def predict_stm( self, input_data, input_sr=44100, save_output=False, output_path=None ): """Predict Dhrupad Bandish Segmentation :param input_data: path to audio file or numpy array like audio signal. :param input_sr: sampling rate of the input array of data (if any). This variable is only relevant if the input is an array of data instead of a filepath. :param save_output: boolean indicating whether the output figure for the estimation is stored. :param output_path: if the input is an array, and the user wants to save the estimation, the output_path must be provided, path/to/picture.png. """ if not isinstance(save_output, bool): raise ValueError("save_output must be a boolean") if isinstance(input_data, str): if not os.path.exists(input_data): raise FileNotFoundError("Target audio not found.") audio, sr = librosa.load(input_data, sr=pars.fs) if output_path is None: output_path = os.path.basename(input_data).replace( input_data.split(".")[-1], "png" ) elif isinstance(input_data, np.ndarray): logger.warning( f"Resampling... (input sampling rate is {input_sr}Hz, make sure this is correct)" ) audio = librosa.resample(input_data, orig_sr=input_sr, target_sr=pars.fs) if (save_output is True) and (output_path is None): raise ValueError( "Please provide an output_path in order to save the estimation" ) else: raise ValueError("Input must be path to audio signal or an audio array") if save_output is True: if not os.path.exists(os.path.basename(output_path)): os.mkdir(os.path.basename(output_path)) if self.trained is False: raise ModelNotTrainedError( """ Model is not trained. Please load model before running inference! You can load the pre-trained instance with the load_model wrapper. """ ) # convert to mel-spectrogram melgram = librosa.feature.melspectrogram( y=audio, sr=pars.fs, n_fft=pars.nfft, hop_length=pars.hopsize, win_length=pars.winsize, n_mels=pars.input_height, fmin=20, fmax=8000, ) melgram = 10 * np.log10(1e-10 + melgram) melgram_chunks = makechunks(melgram, pars.input_len, pars.input_hop) # predict s.t.m. versus time stm_vs_time = [] for chunk in melgram_chunks: model_in = ( (torch.tensor(chunk).unsqueeze(0)).unsqueeze(1).float().to(self.device) ) self.model.to(self.device) model_out = self.model.forward(model_in) model_out = torch.nn.Softmax(1)(model_out).detach().numpy() stm_vs_time.append(np.argmax(model_out)) # smooth predictions with a minimum section duration of 5s stm_vs_time = smooth_boundaries(stm_vs_time, pars.min_sec_dur) # plot plt.plot(np.arange(len(stm_vs_time)) * 0.5, stm_vs_time) plt.yticks(np.arange(-1, 6), [""] + ["1", "2", "4", "8", "16"] + [""]) plt.grid("on", linestyle="--", axis="y") plt.xlabel("Time (s)", fontsize=12) plt.ylabel("Surface tempo multiple", fontsize=12) if save_output is True: plt.savefig(output_path) else: plt.show() return stm_vs_time