Source code for compiam.structure.segmentation.dhrupad_bandish_segmentation

import os
import glob
import gdown
import librosa
import zipfile

import numpy as np
import matplotlib.pyplot as plt
from compiam.exceptions import ModelNotTrainedError

from compiam.utils import get_logger
from compiam.data import WORKDIR

logger = get_logger(__name__)


[docs]class DhrupadBandishSegmentation: """Dhrupad Bandish Segmentation""" def __init__( self, mode="net", fold=0, model_path=None, splits_path=None, annotations_path=None, features_path=None, original_audios_path=None, processed_audios_path=None, device=None, ): """Dhrupad Bandish Segmentation init method. :param mode: net, voc, or pakh. That indicates the source for s.t.m. estimation. Use the net mode if audio is a mixture signal, else use voc or pakh for clean/source-separated vocals or pakhawaj tracks. :param fold: 0, 1 or 2, it is the validation fold to use during training :param model_path: path to file to the model weights. :param splits_path: path to file to audio splits. :param annotations_path: path to file to the annotations. :param features_path: path to file to the computed features. :param original_audios_path: path to file to the original audios from the dataset (see README.md in compIAM/models/structure/dhrupad_bandish_segmentation/audio_original) :param processed_audios_path: path to file to the processed audio files :param device: indicate whether the model will run on the GPU. """ ### IMPORTING OPTIONAL DEPENDENCIES try: global torch import torch global split_audios from compiam.structure.segmentation.dhrupad_bandish_segmentation.audio_processing import ( split_audios, ) global extract_features, makechunks from compiam.structure.segmentation.dhrupad_bandish_segmentation.feature_extraction import ( extract_features, makechunks, ) global class_to_categorical, categorical_to_class, build_model, smooth_boundaries from compiam.structure.segmentation.dhrupad_bandish_segmentation.model_utils import ( class_to_categorical, categorical_to_class, build_model, smooth_boundaries, ) global pars import compiam.structure.segmentation.dhrupad_bandish_segmentation.params as pars except: raise ImportError( "In order to use this tool you need to have torch installed. " "Please install torch using: pip install torch" ) ### if not device: self.device = "cuda" if torch.cuda.is_available() else "cpu" # Load mode by default: update with self.update_mode() self.mode = mode # Load fold by default: update with self.update_fold() self.fold = fold self.classes = pars.classes_dict[self.mode] # To prevent CUDNN_STATUS_NOT_INITIALIZED error in case of incompatible GPU try: self.model = self._build_model() except: self.device = "cpu" self.model = self._build_model() self.model_path = model_path self.loaded_model_path = None self.trained = False if self.model_path is not None: path_to_model = os.path.join( self.model_path[self.mode], "saved_model_fold_" + str(self.fold) + ".pt" ) self.load_model(path_to_model) # Loading pre-trained model for given mode self.splits_path = ( splits_path if splits_path is not None else os.path.join( WORKDIR, "models", "structure", "dhrupad_bandish_segmentation", "splits" ) ) self.annotations_path = ( annotations_path if annotations_path is not None else os.path.join( WORKDIR, "models", "structure", "dhrupad_bandish_segmentation", "annotations", ) ) self.features_path = ( features_path if features_path is not None else os.path.join( WORKDIR, "models", "structure", "dhrupad_bandish_segmentation", "features", ) ) self.original_audios_path = ( original_audios_path if original_audios_path is not None else os.path.join( WORKDIR, "models", "structure", "dhrupad_bandish_segmentation", "audio_original", ) ) self.processed_audios_path = ( processed_audios_path if processed_audios_path is not None else os.path.join( WORKDIR, "models", "structure", "dhrupad_bandish_segmentation", "audio_sections", ) ) def _build_model(self): """Building non-trained model""" return ( build_model(pars.input_height, pars.input_len, len(self.classes)) .float() .to(self.device) )
[docs] def load_model(self, model_path): """Loading weights for model, given self.mode and self.fold :param model_path: path to model weights """ if not os.path.exists(model_path): self.download_model(model_path) self.model = self._build_model() self.model.load_state_dict(torch.load(model_path, map_location=self.device)) self.model.eval() self.loaded_model_path = model_path self.trained = True
[docs] def download_model(self, model_path=None): """Download pre-trained model.""" url = "https://drive.google.com/uc?id=1SVkvHFjL5yh5M7cjnM98JK-b1QnHYw7a&export=download" unzip_path = ( os.sep + os.path.join(*model_path.split(os.sep)[:-4]) if model_path is not None else os.path.join( WORKDIR, "models", "structure", "dhrupad_bandish_segmentation" ) ) if not os.path.exists(unzip_path): os.makedirs(unzip_path) output = os.path.join(unzip_path, "baseline.zip") gdown.download(url, output, quiet=False) # Unzip file with zipfile.ZipFile(output, "r") as zip_ref: zip_ref.extractall(unzip_path) # Delete zip file after extraction os.remove(output) logger.warning("Files downloaded and extracted successfully.")
[docs] def update_mode(self, mode): """Update mode for the training and sampling. Mode is one of net, voc, pakh, indicating the source for s.t.m. estimation. Use the net mode if audio is a mixture signal, else use voc or pakh for clean/source-separated vocals or pakhawaj tracks. :param mode: new mode to use """ self.mode = mode self.classes = pars.classes_dict[mode] path_to_model = os.path.join( self.model_path[self.mode], "saved_model_fold_" + str(self.fold) + ".pt" ) self.load_model(path_to_model)
[docs] def update_fold(self, fold): """Update data fold for the training and sampling :param fold: new fold to use """ self.fold = fold path_to_model = os.path.join( self.model_path[self.mode], "saved_model_fold_" + str(self.fold) + ".pt" ) self.load_model(path_to_model)
[docs] def train(self, verbose=True): """Train the Dhrupad Bandish Segmentation model :param verbose: showing details of the model """ logger.info("Splitting audios...") split_audios( save_dir=self.processed_audios_path, annotations_path=self.annotations_path, audios_path=self.original_audios_path, ) logger.info("Extracting features...") extract_features( self.processed_audios_path, self.annotations_path, self.features_path, self.mode, ) # generate cross-validation folds for training songlist = os.listdir(self.features_path) labels_stm = np.load( os.path.join(self.features_path, "labels_stm.npy"), allow_pickle=True ).item() partition = {"train": [], "validation": []} n_folds = 3 all_folds = [] for i_fold in range(n_folds): all_folds.append( np.loadtxt( os.path.join( self.splits_path, self.mode, "fold_" + i_fold + ".csv" ), delimiter=",", dtype=str, ) ) val_fold = all_folds[self.fold] train_fold = np.array([]) for i_fold in np.delete(np.arange(0, n_folds), self.fold): if len(train_fold) == 0: train_fold = all_folds[i_fold] else: train_fold = np.append(train_fold, all_folds[i_fold]) for song in songlist: try: ids = glob.glob(os.path.join(self.features_path + song, "*.pt")) except: continue section_name = "_".join(song.split("_")[0:4]) if section_name in val_fold: partition["validation"].extend(ids) elif section_name in train_fold: partition["train"].extend(ids) # generators training_set = torch.utils.data.Dataset( self.features_path, partition["train"], labels_stm ) training_generator = torch.utils.data.data.DataLoader(training_set, **pars) validation_set = torch.utils.data.Dataset( self.features_path, partition["validation"], labels_stm ) validation_generator = torch.utils.data.DataLoader(validation_set, **pars) # model definition and training criterion = torch.nn.CrossEntropyLoss(reduction="mean") optimizer = torch.optim.Adam(self.parameters(), lr=0.0001) if verbose: logger.info(self.model) n_params = 0 for param in self.model.parameters(): n_params += torch.prod(torch.tensor(param.shape)) logger.info("Num of trainable params: %d\n" % n_params) ##training epochs loop train_loss_epoch = [] train_acc_epoch = [] val_loss_epoch = [] val_acc_epoch = [] n_idle = 0 if not os.path.exists(os.path.join(self.model_path, self.mode)): os.mkdir(os.path.join(self.model_path, self.mode)) for epoch in range(pars.max_epochs): if n_idle == 50: break train_loss_epoch += [0] train_acc_epoch += [0] val_loss_epoch += [0] val_acc_epoch += [0] n_iter = 0 ##training self.model.train() for local_batch, local_labels, _ in training_generator: # map labels to class ids local_labels = class_to_categorical(local_labels, self.classes) # add channel dimension if len(local_batch.shape) == 3: local_batch = local_batch.unsqueeze(1) # transfer to GPU local_batch, local_labels = local_batch.float().to( self.device ), local_labels.to(self.device) # update weights optimizer.zero_grad() outs = self.model(local_batch).squeeze() loss = criterion(outs, local_labels.long()) loss.backward() optimizer.step() # append loss and acc to arrays train_loss_epoch[-1] += loss.item() acc = ( np.sum( ( np.argmax(outs.detach().cpu().numpy(), 1) == local_labels.detach().cpu().numpy() ) ) / pars.batch_size ) train_acc_epoch[-1] += acc n_iter += 1 train_loss_epoch[-1] /= n_iter train_acc_epoch[-1] /= n_iter n_iter = 0 ##validation self.model.eval() with torch.set_grad_enabled(False): for local_batch, local_labels, _ in validation_generator: # map labels to class ids local_labels = pars.class_to_categorical(local_labels, self.classes) # add channel dimension if len(local_batch.shape) == 3: local_batch = local_batch.unsqueeze(1) # transfer to GPU local_batch, local_labels = local_batch.float().to( self.device ), local_labels.to(self.device) # evaluate model outs = self.model(local_batch).squeeze() loss = criterion(outs, local_labels.long()) # append loss and acc to arrays val_loss_epoch[-1] += loss.item() acc = ( np.sum( ( np.argmax(outs.detach().cpu().numpy(), 1) == local_labels.detach().cpu().numpy() ) ) / pars.batch_size ) val_acc_epoch[-1] += acc n_iter += 1 val_loss_epoch[-1] /= n_iter val_acc_epoch[-1] /= n_iter # save if val_loss reduced if val_loss_epoch[-1] == min(val_loss_epoch): torch.save( self.model.state_dict(), os.path.join( self.model_path, self.mode, "saved_model_fold_%d.pt" % self.fold ), ) n_idle = 0 else: n_idle += 1 # log loss in current epoch logger.info( "Epoch no: %d/%d\tTrain loss: %f\tTrain acc: %f\tVal loss: %f\tVal acc: %f" % ( epoch, pars.max_epochs, train_loss_epoch[-1], train_acc_epoch[-1], val_loss_epoch[-1], val_acc_epoch[-1], ) ) self.trained = True
[docs] def predict_stm( self, input_data, input_sr=44100, save_output=False, output_path=None ): """Predict Dhrupad Bandish Segmentation :param input_data: path to audio file or numpy array like audio signal. :param input_sr: sampling rate of the input array of data (if any). This variable is only relevant if the input is an array of data instead of a filepath. :param save_output: boolean indicating whether the output figure for the estimation is stored. :param output_path: if the input is an array, and the user wants to save the estimation, the output_path must be provided, path/to/picture.png. """ if not isinstance(save_output, bool): raise ValueError("save_output must be a boolean") if isinstance(input_data, str): if not os.path.exists(input_data): raise FileNotFoundError("Target audio not found.") audio, sr = librosa.load(input_data, sr=pars.fs) if output_path is None: output_path = os.path.basename(input_data).replace( input_data.split(".")[-1], "png" ) elif isinstance(input_data, np.ndarray): logger.warning( f"Resampling... (input sampling rate is {input_sr}Hz, make sure this is correct)" ) audio = librosa.resample(input_data, orig_sr=input_sr, target_sr=pars.fs) if (save_output is True) and (output_path is None): raise ValueError( "Please provide an output_path in order to save the estimation" ) else: raise ValueError("Input must be path to audio signal or an audio array") if save_output is True: if not os.path.exists(os.path.basename(output_path)): os.mkdir(os.path.basename(output_path)) if self.trained is False: raise ModelNotTrainedError( """ Model is not trained. Please load model before running inference! You can load the pre-trained instance with the load_model wrapper. """ ) # convert to mel-spectrogram melgram = librosa.feature.melspectrogram( y=audio, sr=pars.fs, n_fft=pars.nfft, hop_length=pars.hopsize, win_length=pars.winsize, n_mels=pars.input_height, fmin=20, fmax=8000, ) melgram = 10 * np.log10(1e-10 + melgram) melgram_chunks = makechunks(melgram, pars.input_len, pars.input_hop) # predict s.t.m. versus time stm_vs_time = [] for chunk in melgram_chunks: model_in = ( (torch.tensor(chunk).unsqueeze(0)).unsqueeze(1).float().to(self.device) ) self.model.to(self.device) model_out = self.model.forward(model_in) model_out = torch.nn.Softmax(1)(model_out).detach().numpy() stm_vs_time.append(np.argmax(model_out)) # smooth predictions with a minimum section duration of 5s stm_vs_time = smooth_boundaries(stm_vs_time, pars.min_sec_dur) # plot plt.plot(np.arange(len(stm_vs_time)) * 0.5, stm_vs_time) plt.yticks(np.arange(-1, 6), [""] + ["1", "2", "4", "8", "16"] + [""]) plt.grid("on", linestyle="--", axis="y") plt.xlabel("Time (s)", fontsize=12) plt.ylabel("Surface tempo multiple", fontsize=12) if save_output is True: plt.savefig(output_path) else: plt.show() return stm_vs_time