Source code for compiam.timbre.stroke_classification.mridangam_stroke_classification

import os
import random

import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split
from sklearn import svm
from sklearn.neural_network import MLPClassifier

from compiam.exceptions import ModelNotTrainedError, DatasetNotLoadedError
from compiam.data import WORKDIR
from compiam.utils import get_logger

logger = get_logger(__name__)


[docs]class MridangamStrokeClassification: """Mridangam stroke classification.""" def __init__(self): """Mridangam stroke classification init method.""" ### IMPORTING OPTIONAL DEPENDENCIES try: global features_for_pred, process_strokes, features_for_pred, normalise_features from compiam.timbre.stroke_classification.mridangam_stroke_classification.stroke_features import ( features_for_pred, process_strokes, features_for_pred, normalise_features, ) except: raise ImportError( "In order to use this tool you need to have essentia installed. " "Please install essentia using: pip install essentia" ) ### self.dataset = None self.model = None self.feature_list = None self.computed_features_path = os.path.join( WORKDIR, "models", "timbre", "mridangam_stroke_classification", "pre-computed_features.csv", )
[docs] def load_mridangam_dataset(self, data_home=None, version="default", download=True): """Load mirdata dataloader for mirdangam stroke. :param data_home: folder where the dataset is found. :param version: version of the dataset to use. :param download: if True the dataset is downloaded. :returns: None, but initializes the dataset of the class and the file dict of strokes. """ from compiam import ( load_dataset, ) # Importing load function here to avoid circular imports self.dataset = load_dataset( "mridangam_stroke", data_home=data_home, version=version ) self.data_home = self.dataset.data_home if download: self.dataset.download() self.dataset.validate() else: if not os.path.exists(os.path.join(self.data_home, "mridangam_stroke_1.5")): raise ValueError( "Dataset not found, please re-run load_dataset with download=True" ) self.mridangam_ids = self.dataset.track_ids # Load Mridangam IDs self.mridangam_tracks = self.dataset.load_tracks() # Load Mridangam data self.stroke_names = self.list_strokes() self.stroke_dict = {item: [] for item in self.stroke_names} for i in self.mridangam_ids: self.stroke_dict[self.mridangam_tracks[i].stroke_name].append( self.mridangam_tracks[i].audio_path )
[docs] def list_strokes(self): """List available mridangam strokes in the dataset. :returns: list of strokes in the datasets. """ if self.dataset is None: raise DatasetNotLoadedError( """ Please load the dataset using the .load_mridangam_dataset() method or the strokes cannot be listed. """ ) stroke_names = [] for i in self.mridangam_ids: stroke_names.append(self.mridangam_tracks[i].stroke_name) return list(np.unique(stroke_names))
[docs] def dict_strokes(self): """List and convert to indexed dict the available mridangam strokes in the dataset. :returns: dict with strokes as values and unique integer as keys. """ if self.dataset is None: raise DatasetNotLoadedError( """ Please load the dataset using the .load_mridangam_dataset() method or the strokes cannot be listed. """ ) stroke_names = [] for i in self.mridangam_ids: stroke_names.append(self.mridangam_tracks[i].stroke_name) stroke_names = np.unique(stroke_names) return {idx: x for idx, x in enumerate(stroke_names)}
[docs] def train_model( self, model_type="svm", load_computed=False, balance=False, balance_ref="random" ): """Train a support vector machine for stroke classification. :param model_type: type of model to train. :param model_type: bool to indicate if the features are computed or loaded from file. :param balance: balance the number of instances per class to prevent biases. :param balance_ref: reference class for data balancement. :returns: accuracy in percentage and rounded to two decimals """ if (self.dataset is None) and (load_computed is False): raise DatasetNotLoadedError( "Dataset not found, please run load_mridangam_dataset" ) if (load_computed is True) and not os.path.exists(self.computed_features_path): raise ValueError( """ Training data not found. Please check you set the path correctly otherwise run .train_model() function with load_computed=False""" ) file_dict = {item: [] for item in self.list_strokes()} for i in self.mridangam_ids: file_dict[self.mridangam_tracks[i].stroke_name].append( self.mridangam_tracks[i].audio_path ) training_data, self.feature_list = process_strokes( file_dict, load_computed=load_computed ) # Let"s use sklearn"s preprocessing tools for applying normalisation to features data_modif = normalise_features(training_data, self.feature_list) if balance == True: strokes = training_data.stroke.unique() count_dict = training_data["stroke"].value_counts().to_dict() min_stroke = min(count_dict, key=count_dict.get) min_number = ( data_modif.stroke.value_counts()[min_stroke] if balance_ref == "lower" else data_modif.stroke.value_counts()[random.choice(strokes)] ) reshaped_stroke_list = [] for strk in strokes: if count_dict[strk] > min_number: reshaped_stroke_list.append( data_modif[data_modif.stroke == strk].sample(n=min_number) ) else: reshaped_stroke_list.append(data_modif[data_modif.stroke == strk]) # Merging after downsampling data_modif = pd.concat(reshaped_stroke_list) X = data_modif.iloc[:, : len(self.feature_list) - 1].values # Creating output values data_modif.stroke = pd.Categorical( data_modif.stroke ) # convert to categorical data y = np.array(data_modif.stroke.cat.codes) # create label encoded outputs X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.33, random_state=42 ) if model_type == "svm": self.model = svm.SVC(gamma=1 / (X_train.shape[-1] * X_train.var())) elif model_type == "mlp": self.model = MLPClassifier(alpha=1, max_iter=1000) else: raise ValueError( "Model not available. Please check the available options in the documentation." ) # Fit model with training data self.model.fit(X_train, y_train) # Evaluate y_pred = self.model.predict(X_test) logger.info( "{} model successfully trained with accuracy {}% in the testing set".format( model_type.upper(), round(np.sum(y_test == y_pred) / len(y_pred) * 100), 2, ) ) return round((np.sum(y_test == y_pred) / len(y_pred) * 100), 2)
[docs] def predict(self, file_list): """Predict stroke type from list of files. :param file_list: list of files for prediction. :returns: dict containing filenames as keys and estimated strokes as values. """ if self.model is None: raise ModelNotTrainedError( "The model is not trained. Please run train_model()." ) if not isinstance(file_list, list) and (isinstance(file_list, str)): file_list = [file_list] list_of_feats = [] for input_file in file_list: list_of_feats.append(features_for_pred(input_file=input_file)) list_of_feats = pd.DataFrame(list_of_feats, columns=self.feature_list[:-1]) data_modif = normalise_features(list_of_feats) input_feat = data_modif.iloc[:, :].values pred_strokes = self.model.predict(input_feat) return { x: y for x, y in zip(file_list, [self.dict_strokes()[x] for x in pred_strokes]) }