Source code for compiam.melody.pitch_extraction.ftanet_carnatic

import os
import math
import gdown
import zipfile
import librosa

import numpy as np
from compiam.exceptions import ModelNotTrainedError

from compiam.utils.pitch import normalisation, resampling
from compiam.melody.pitch_extraction.ftanet_carnatic.pitch_processing import (
    batchize_test,
    get_est_arr,
)
from compiam.melody.pitch_extraction.ftanet_carnatic.cfp import cfp_process
from compiam.io import write_csv
from compiam.utils import get_logger, WORKDIR

logger = get_logger(__name__)


[docs]class FTANetCarnatic(object): """FTA-Net melody extraction tuned to Carnatic Music.""" def __init__(self, model_path=None, sample_rate=8000, gpu="-1"): """FTA-Net melody extraction init method. :param model_path: path to file to the model weights. :param sample_rate: Sample rate to which the audio is sampled for extraction. """ ### IMPORTING OPTIONAL DEPENDENCIES try: global tf import tensorflow as tf except: raise ImportError( "In order to use this tool you need to have tensorflow installed. " "Please install tensorflow using: pip install tensorflow==2.7.2" ) ### ## Setting up GPU if specified os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu) self.gpu = gpu self.model = self._build_model() self.sample_rate = sample_rate self.trained = False self.model_path = model_path if self.model_path is not None: self.load_model(self.model_path)
[docs] @staticmethod def SF_Module(x_list, n_channel, reduction, limitation): """Selection and fusion module. Implementation taken from https://github.com/yushuai/FTANet-melodic :param x_list: list of tensor inputs. :param n_channel: number of feature channels. :param reduction: the rate to which the data is compressed. :param limitation: setting a compressing limit. :returns: a tensor with the fused and selected feature map. """ ## Split fused = None for x_s in x_list: if fused == None: fused = x_s else: fused = tf.keras.layers.Add()([fused, x_s]) ## Fuse fused = tf.keras.layers.GlobalAveragePooling2D()(fused) fused = tf.keras.layers.BatchNormalization()(fused) fused = tf.keras.layers.Dense( max(n_channel // reduction, limitation), activation="selu" )(fused) ## Select masks = [] for i in range(len(x_list)): masks.append(tf.keras.layers.Dense(n_channel)(fused)) mask_stack = tf.keras.layers.Lambda( tf.keras.backend.stack, arguments={"axis": -1} )(masks) # (n_channel, n_kernel) mask_stack = tf.keras.layers.Softmax(axis=-2)(mask_stack) selected = None for i, x_s in enumerate(x_list): mask = tf.keras.layers.Lambda(lambda z: z[:, :, i])(mask_stack) mask = tf.keras.layers.Reshape((1, 1, n_channel))(mask) x_s = tf.keras.layers.Multiply()([x_s, mask]) if selected == None: selected = x_s else: selected = tf.keras.layers.Add()([selected, x_s]) return selected
[docs] @staticmethod def FTA_Module(x, shape, kt, kf): """Selection and fusion module. Implementation taken from https://github.com/yushuai/FTANet-melodic :param x: input tensor. :param shape: the shape of the input tensor. :param kt: kernel size for time attention. :param kf: kernel size for frequency attention. :returns: the resized input, the time-attention map, and the frequency-attention map. """ x = tf.keras.layers.BatchNormalization()(x) ## Residual x_r = tf.keras.layers.Conv2D( shape[2], (1, 1), padding="same", activation="relu" )(x) ## Time Attention # Attn Map (1, T, C), FC a_t = tf.keras.layers.Lambda(tf.keras.backend.mean, arguments={"axis": -3})(x) a_t = tf.keras.layers.Conv1D(shape[2], kt, padding="same", activation="selu")( a_t ) a_t = tf.keras.layers.Conv1D(shape[2], kt, padding="same", activation="selu")( a_t ) # 2 a_t = tf.keras.layers.Softmax(axis=-2)(a_t) a_t = tf.keras.layers.Reshape((1, shape[1], shape[2]))(a_t) # Reweight x_t = tf.keras.layers.Conv2D( shape[2], (3, 3), padding="same", activation="selu" )(x) x_t = tf.keras.layers.Conv2D( shape[2], (5, 5), padding="same", activation="selu" )(x_t) x_t = tf.keras.layers.Multiply()([x_t, a_t]) # Frequency Attention # Attn Map (F, 1, C), Conv1D a_f = tf.keras.layers.Lambda(tf.keras.backend.mean, arguments={"axis": -2})(x) a_f = tf.keras.layers.Conv1D(shape[2], kf, padding="same", activation="selu")( a_f ) a_f = tf.keras.layers.Conv1D(shape[2], kf, padding="same", activation="selu")( a_f ) a_f = tf.keras.layers.Softmax(axis=-2)(a_f) a_f = tf.keras.layers.Reshape((shape[0], 1, shape[2]))(a_f) # Reweight x_f = tf.keras.layers.Conv2D( shape[2], (3, 3), padding="same", activation="selu" )(x) x_f = tf.keras.layers.Conv2D( shape[2], (5, 5), padding="same", activation="selu" )(x_f) x_f = tf.keras.layers.Multiply()([x_f, a_f]) return x_r, x_t, x_f
def _build_model(self, input_shape=(320, 128, 3)): """Building the entire FTA-Net. Implementation taken from https://github.com/yushuai/FTANet-melodic :param input_shape: input shape. :returns: a tensorflow Model instance of the FTA-Net. """ visible = tf.keras.layers.Input(shape=input_shape) x = tf.keras.layers.BatchNormalization()(visible) ## Bottom # bm = BatchNormalization()(x) bm = x bm = tf.keras.layers.Conv2D( 16, (4, 1), padding="valid", strides=(4, 1), activation="selu" )( bm ) # 80 bm = tf.keras.layers.Conv2D( 16, (4, 1), padding="valid", strides=(4, 1), activation="selu" )( bm ) # 20 bm = tf.keras.layers.Conv2D( 16, (4, 1), padding="valid", strides=(4, 1), activation="selu" )( bm ) # 5 bm = tf.keras.layers.Conv2D( 1, (5, 1), padding="valid", strides=(5, 1), activation="selu" )( bm ) # 1 shape = input_shape x_r, x_t, x_f = self.FTA_Module(x, (shape[0], shape[1], 32), 3, 3) x = self.SF_Module([x_r, x_t, x_f], 32, 4, 4) x = tf.keras.layers.MaxPooling2D((2, 2))(x) x_r, x_t, x_f = self.FTA_Module(x, (shape[0] // 2, shape[1] // 2, 64), 3, 3) x = self.SF_Module([x_r, x_t, x_f], 64, 4, 4) x = tf.keras.layers.MaxPooling2D((2, 2))(x) x_r, x_t, x_f = self.FTA_Module(x, (shape[0] // 4, shape[1] // 4, 128), 3, 3) x = self.SF_Module([x_r, x_t, x_f], 128, 4, 4) x_r, x_t, x_f = self.FTA_Module(x, (shape[0] // 4, shape[1] // 4, 128), 3, 3) x = self.SF_Module([x_r, x_t, x_f], 128, 4, 4) x = tf.keras.layers.UpSampling2D((2, 2))(x) x_r, x_t, x_f = self.FTA_Module(x, (shape[0] // 2, shape[1] // 2, 64), 3, 3) x = self.SF_Module([x_r, x_t, x_f], 64, 4, 4) x = tf.keras.layers.UpSampling2D((2, 2))(x) x_r, x_t, x_f = self.FTA_Module(x, (shape[0], shape[1], 32), 3, 3) x = self.SF_Module([x_r, x_t, x_f], 32, 4, 4) x_r, x_t, x_f = self.FTA_Module(x, (shape[0], shape[1], 1), 3, 3) x = self.SF_Module([x_r, x_t, x_f], 1, 4, 4) x = tf.keras.layers.Concatenate(axis=1)([bm, x]) # Softmax x = tf.keras.layers.Lambda(tf.keras.backend.squeeze, arguments={"axis": -1})(x) x = tf.keras.layers.Softmax(axis=-2)(x) return tf.keras.models.Model(inputs=visible, outputs=x) def load_model(self, model_path): if ".data-00000-of-00001" not in model_path: path_to_check = model_path + ".data-00000-of-00001" if not os.path.exists(path_to_check): self.download_model(model_path) # Dowloading model weights self.model.load_weights(model_path).expect_partial() self.model_path = model_path self.trained = True
[docs] def download_model(self, model_path=None): """Download pre-trained model.""" url = "https://drive.google.com/uc?id=1YxJKyaNg7_4T_P-BmK6AJMZ8FgRQsgie&export=download" unzip_path = ( os.sep + os.path.join(*model_path.split(os.sep)[:-2]) if model_path is not None else os.path.join(WORKDIR, "models", "melody", "ftanet") ) if not os.path.exists(unzip_path): os.makedirs(unzip_path) output = os.path.join(unzip_path, "carnatic.zip") gdown.download(url, output, quiet=False) # Unzip file with zipfile.ZipFile(output, "r") as zip_ref: zip_ref.extractall(unzip_path) # Delete zip file after extraction os.remove(output) logger.warning("Files downloaded and extracted successfully.")
[docs] def predict( self, input_data, input_sr=44100, hop_size=80, batch_size=5, out_step=None, gpu="-1", ): """Extract melody from input_data. Implementation taken (and slightly adapted) from https://github.com/yushuai/FTANet-melodic. :param input_data: path to audio file or numpy array like audio signal. :param input_sr: sampling rate of the input array of data (if any). This variable is only relevant if the input is an array of data instead of a filepath. :param hop_size: hop size between frequency estimations. :param batch_size: batches of seconds that are passed through the model (defaulted to 5, increase if enough computational power, reduce if needed). :param out_step: particular time-step duration if needed at output :param gpu: Id of the available GPU to use (-1 by default, to run on CPU) :returns: a 2-D list with time-stamps and pitch values per timestamp. """ ## Setting up GPU if any if gpu != self.gpu: os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu) self.gpu = gpu if self.trained is False: raise ModelNotTrainedError( """Model is not trained. Please load model before running inference! You can load the pre-trained instance with the load_model wrapper.""" ) # Loading and resampling audio if isinstance(input_data, str): if not os.path.exists(input_data): raise FileNotFoundError("Target audio not found.") audio, _ = librosa.load(input_data, sr=self.sample_rate) elif isinstance(input_data, np.ndarray): logger.warning( f"Resampling... (input sampling rate is assumed {input_sr}Hz, \ make sure this is correct and change input_sr otherwise)" ) audio = librosa.resample( input_data, orig_sr=input_sr, target_sr=self.sample_rate ) else: raise ValueError("Input must be path to audio signal or an audio array") audio_shape = audio.shape if len(audio_shape) > 1: audio_channels = min(audio_shape) if audio_channels == 1: audio = audio.flatten() else: audio = np.mean(audio, axis=np.argmin(audio_shape)) xlist = [] timestamps = [] audio_len = len(audio) batch_min = self.sample_rate * 60 * batch_size freqs = [] if audio_len > batch_min: iters = math.ceil(audio_len / batch_min) for i in np.arange(iters): if i < iters - 1: audio_in = audio[batch_min * i : batch_min * (i + 1)] if i == iters - 1: audio_in = audio[batch_min * i :] feature, _, time_arr = cfp_process( audio_in, sr=self.sample_rate, hop=hop_size ) data = batchize_test(feature, size=128) xlist.append(data) timestamps.append(time_arr) estimation = get_est_arr(self.model, xlist, timestamps, batch_size=16) if i == 0: freqs = estimation[:, 1] else: freqs = np.concatenate((freqs, estimation[:, 1])) else: feature, _, time_arr = cfp_process(audio, sr=self.sample_rate, hop=hop_size) data = batchize_test(feature, size=128) xlist.append(data) timestamps.append(time_arr) # Getting estimatted pitch estimation = get_est_arr(self.model, xlist, timestamps, batch_size=16) freqs = estimation[:, 1] TStamps = np.linspace(0, audio_len / self.sample_rate, len(freqs)) freqs[freqs < 50] = 0 output = np.array([TStamps, freqs]).transpose() if out_step is not None: new_len = int((audio_len / self.sample_rate) // out_step) return resampling(output, new_len) return output
[docs] @staticmethod def normalise_pitch(pitch, tonic, bins_per_octave=120, max_value=4): """Normalise pitch given a tonic. :param pitch: a 2-D list with time-stamps and pitch values per timestamp. :param tonic: recording tonic to normalize the pitch to. :param bins_per_octave: number of frequency bins per octave. :param max_value: maximum value to clip the normalized pitch to. :returns: a 2-D list with time-stamps and normalised to a given tonic pitch values per timestamp. """ return normalisation( pitch, tonic, bins_per_octave=bins_per_octave, max_value=max_value )
[docs] @staticmethod def save_pitch(data, output_path): """Calling the write_csv function in compiam.io to write the output pitch curve in a fle :param data: the data to write :param output_path: the path where the data is going to be stored :returns: None """ return write_csv(data, output_path)