import os
import warnings
import numpy as np
import compiam
from compiam.melody.raga_recognition.deepsrgm.raga_mapping import create_mapping
from compiam.exceptions import (
ModelNotTrainedError,
DatasetNotLoadedError,
)
from compiam.utils import get_logger, WORKDIR
from compiam.utils.download import download_remote_model
logger = get_logger(__name__)
[docs]class DEEPSRGM(object):
"""DEEPSRGM model for raga classification. This DEEPSGRM implementation has been
kindly provided by Shubham Lohiya and Swarada Bharadwaj.
"""
def __init__(
self,
model_path=None,
download_link=None,
download_checksum=None,
rnn="lstm",
mapping_path=None,
sample_rate=44100,
device=None,
):
"""DEEPSRGM init method.
:param model_path: path to file to the model weights
:param download_link: link to the remote pre-trained model.
:param download_checksum: checksum of the model file.
:param rnn: type of rnn used "lstm" or "gru"
:param mapping_path: path to raga to id JSON mapping
:param sample_rate: sampling rate which the model is trained
:param device: torch CUDA config to route model to GPU
"""
### IMPORTING OPTIONAL DEPENDENCIES
try:
global torch
import torch
global deepsrgmModel
from compiam.melody.raga_recognition.deepsrgm.model import deepsrgmModel
except:
raise ImportError(
"In order to use this tool you need to have torch installed. "
"Please install torch using: pip install torch==1.8.0"
)
###
if not device:
self.device = "cuda" if torch.cuda.is_available() else "cpu"
self.rnn = rnn
# To prevent CUDNN_STATUS_NOT_INITIALIZED error in case of incompatible GPU
try:
self.model = self._build_model(rnn=self.rnn)
except:
self.device = "cpu"
self.model = self._build_model(rnn=self.rnn)
self.model_path = model_path
self.download_link = download_link
self.download_checksum = download_checksum
self.sample_rate = sample_rate
self.trained = False
## Loading LSTM model by default
if self.model_path is not None:
self.load_model(model_path=self.model_path[self.rnn], rnn=self.rnn)
self.mapping_path = mapping_path
self.selected_ragas = [
5,
8,
10,
13,
17,
20,
22,
23,
24,
28,
] # pre-defined for release 0.1.0
if (mapping_path is not None) and (self.selected_ragas is not None):
self.load_mapping(self.selected_ragas)
self.dataset = None
def _build_model(self, rnn="lstm"):
"""Building DEEPSRM
:param rnn: lstm (default) or gru.
"""
return deepsrgmModel(rnn=rnn).to(self.device)
[docs] def load_mapping(self, selection=None):
"""Loading raga mapping for DEEPSRGM
:param selection: Selection of ragas for the DEEPSRGM model. A default selection
is initialized by default in compiam v1.0. Flexible selection and training of this
model is under development at this moment and will be available in the next release.
"""
selected_ragas = self.selected_ragas if selection is None else selection
self.mapping = create_mapping(self.mapping_path, selected_ragas)
[docs] def load_model(self, model_path, rnn="lstm"):
"""Loading weights for DEEPSRGM
:param model_path: path to model.
:param rnn: lstm (default) or gru.
"""
if not os.path.exists(model_path):
self.download_model(model_path)
if rnn == "gru":
self.model = self._build_model(rnn="gru")
self.model_path = model_path
weights = torch.load(model_path, weights_only=True, map_location=self.device)
new_weights = weights.copy()
keys_to_fix = [
".weight_ih_l0",
".weight_hh_l0",
".bias_ih_l0",
".bias_hh_l0",
]
keys_to_fix = [rnn + x for x in keys_to_fix]
for i in keys_to_fix:
new_weights[i.replace(rnn, "rnn")] = weights[i]
del new_weights[i]
self.model.load_state_dict(new_weights)
self.trained = True
[docs] def download_model(self, model_path=None, force_overwrite=False):
"""Download pre-trained model."""
download_path = (
os.sep + os.path.join(*model_path.split(os.sep)[:-2])
if model_path is not None
else os.path.join(WORKDIR, "models", "melody", "deepsrgm")
)
# Creating model folder to store the weights
if not os.path.exists(download_path):
os.makedirs(download_path)
download_remote_model(
self.download_link,
self.download_checksum,
download_path,
force_overwrite=force_overwrite,
)
[docs] def load_raga_dataset(self, data_home=None, download=False):
"""Load an instance of the Compmusic raga dataset to assist the tool
:param data_home: path where to store the dataset data
:param download:
"""
self.dataset = compiam.load_dataset("compmusic_raga", data_home=data_home)
if download:
self.dataset.download()
warnings.warn(
"""
The audio of this dataset is private. Please request it in the
Zenodo link provided in the DOWNLOAD_INFO of the dataloader,
and download and unzip it following the instructions.
"""
)
[docs] def get_features(
self,
input_data=None,
input_sr=44100,
pitch_path=None,
tonic_path=None,
from_mirdata=False,
track_id=None,
k=5,
):
"""Computing features for prediction of DEEPSRM
:param input_data: path to audio file or numpy array like audio signal.
:param input_sr: sampling rate of the input array of data (if any). This variable is only
relevant if the input is an array of data instead of a filepath.
:param pitch_path: path to pre-computed pitch file (if available).
:param tonic_path: path to pre-computed tonic file (if available).
:param from_mirdata: boolean to indicate if the features are parsed from the mirdata loader of
Indian Art Music Raga Recognition Dataset (must be specifically this one).
:param track_id: track id for the Indian Art Music Raga Recognition Dataset if from_mirdata is
set to True.
:param k: k indicating the precision of the pitch feature.
"""
if (pitch_path is not None) and (tonic_path is not None):
freqs = open(pitch_path).read().strip().split("\n")
tonic = eval(open(tonic_path).read().strip())
elif from_mirdata:
if self.dataset is None:
raise DatasetNotLoadedError(
"Dataloader is not initialized. Have you run .load_raga_dataset()?"
)
if track_id is None:
raise ValueError(
"To load a track we need a track id. See mirdata instructions \
to know how to list the available ids."
)
track = self.dataset.track(track_id)
pitch_path = track.pitch_post_processed_path
tonic_path = track.tonic_fine_tuned_path
freqs = open(pitch_path).read().strip().split("\n")
tonic = eval(open(tonic_path).read().strip())
else:
try:
import essentia.standard as estd
melodia = compiam.melody.pitch_extraction.Melodia
melodia = melodia(sample_rate=self.sample_rate)
tonic_extraction = (
compiam.melody.tonic_identification.TonicIndianMultiPitch
)
tonic_extraction = tonic_extraction(sample_rate=self.sample_rate)
except:
raise ImportError(
"In order to use these tools to extract the features you need to have essentia installed."
"Please install essentia using: pip install essentia"
)
if isinstance(input_data, str):
if not os.path.exists(input_data):
raise FileNotFoundError("Target audio not found.")
audio = estd.MonoLoader(
filename=input_data, sampleRate=self.sample_rate
)()
elif isinstance(input_data, np.ndarray):
logger.warning(
"Resampling... (input sampling rate is {input_sr}Hz, make sure this is correct)"
)
resampling = estd.Resample(
inputSampleRate=input_sr, outputSampleRate=self.sample_rate
)
audio = resampling(input_data)
else:
raise ValueError("Input must be path to audio signal or an audio array")
logger.info("Extracting pitch track using melodia...")
freqs = melodia.extract(audio)[:, 1]
logger.info("Extracting tonic using multi-pitch approach...")
tonic = tonic_extraction.extract(audio)
# Normalise pitch
feature = np.round(1200 * np.log2(freqs / tonic) * (k / 100)).clip(0)
N = 200
a = []
if len(feature) <= 5000:
raise ValueError(
"""
Audio signal is not longer enough for a proper estimation. Please provide a larger audio.
"""
)
for i in range(N):
c = np.random.randint(0, len(feature) - 5000)
a.append(feature[c : c + 5000])
return np.array(a)
[docs] def predict(self, features, threshold=0.6, gpu="-1"):
"""Predict raga for recording
:param features: all subsequences for a certain music recording
:param threshold: majority voting threshold
:param gpu: Id of the available GPU to use (-1 by default, to run on CPU)
:return: recognition result
"""
## Setting up GPU if any
os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu)
if isinstance(features, str):
raise ValueError(
"Please first extract features using .get_features() and use \
these as input for this predict function."
)
# Make sure model is loaded
if self.trained is False:
raise ModelNotTrainedError(
"""
Model is not trained. Please load model before running inference!
You can load the pre-trained instance with the load_model wrapper.
"""
)
# Make sure mapping is loaded
if self.mapping is None:
self.load_mapping(self.selected_ragas)
list_of_ragas = list(self.mapping.values())
# Predict
logger.info(
"Performing prediction for the following {} ragas: {}".format(
len(list_of_ragas), list_of_ragas
)
)
with torch.no_grad():
out = self.model.forward(torch.from_numpy(features).to(self.device).long())
preds = torch.argmax(out, axis=-1)
majority, _ = torch.mode(preds)
majority = int(majority)
votes = float(torch.sum(preds == majority)) / features.shape[0]
if votes >= threshold:
logger.info(
"Input music sample belongs to the {} raga".format(
self.mapping[majority]
)
)
logger.info(
"CONFUSED - Closest raga predicted is {} with {} votes".format(
self.mapping[majority], (votes * 100)
)
)
return self.mapping[majority]