import os
import tqdm
import librosa
import math
import numpy as np
import soundfile as sf
from compiam.separation.singing_voice_extraction.cold_diff_sep.model.vad import VAD
from compiam.exceptions import ModelNotTrainedError
from compiam.utils import get_logger, WORKDIR
from compiam.utils.download import download_remote_model
logger = get_logger(__name__)
[docs]class ColdDiffSep(object):
"""Leakage-aware singing voice separation model for Carnatic Music."""
def __init__(
self, model_path=None, download_link=None, download_checksum=None, gpu="-1"
):
"""Leakage-aware singing voice separation init method.
:param model_path: path to file to the model weights.
:param download_link: link to the remote pre-trained model.
:param download_checksum: checksum of the model file.
:param gpu: Id of the available GPU to use (-1 by default, to run on CPU), use string: '0', '1', etc.
"""
### IMPORTING OPTIONAL DEPENDENCIES
try:
global tf
import tensorflow as tf
global DiffWave
from compiam.separation.singing_voice_extraction.cold_diff_sep.model import (
DiffWave,
)
global UnetConfig
from compiam.separation.singing_voice_extraction.cold_diff_sep.model.config import (
Config as UnetConfig,
)
global get_mask
from compiam.separation.singing_voice_extraction.cold_diff_sep.model.clustering import (
get_mask,
)
global compute_stft, compute_signal_from_stft, next_power_of_2, get_overlap_window
from compiam.separation.singing_voice_extraction.cold_diff_sep.model.signal_processing import (
get_overlap_window,
compute_stft,
compute_signal_from_stft,
next_power_of_2,
)
except:
raise ImportError(
"In order to use this tool you need to have tensorflow installed. "
"Please install tensorflow using: pip install tensorflow==2.7.2"
)
###
## Setting up GPU if specified
os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu)
self.gpu = gpu
self.unet_config = UnetConfig()
self.model = DiffWave(self.unet_config)
self.sample_rate = self.unet_config.sr
self.trained = False
self.model_path = model_path
self.download_link = download_link
self.download_checksum = download_checksum
if self.model_path is not None:
self.load_model(self.model_path)
def load_model(self, model_path):
if ".data-00000-of-00001" not in model_path:
path_to_check = model_path + ".data-00000-of-00001"
if not os.path.exists(path_to_check):
self.download_model(model_path) # Downloading model weights
self.model.restore(model_path).expect_partial()
self.model_path = model_path
self.trained = True
[docs] def separate(
self,
input_data,
input_sr=44100,
clusters=5,
scheduler=4,
chunk_size=3,
gpu="-1",
):
"""Separate singing voice from mixture.
:param input_data: Audio signal to separate.
:param input_sr: sampling rate of the input array of data (if any). This variable is only
relevant if the input is an array of data instead of a filepath.
:param clusters: Number of clusters to use to build the separation masks.
:param scheduler: Scheduler factor to weight the clusters to be more or less restirctive with the interferences.
:param gpu: Id of the available GPU to use (-1 by default, to run on CPU), use string: '0', '1', etc.
:return: Singing voice signal.
"""
## Setting up GPU if any
if gpu != self.gpu:
os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu)
self.gpu = gpu
if self.trained is False:
raise ModelNotTrainedError(
""" Model is not trained. Please load model before running inference!
You can load the pre-trained instance with the load_model wrapper."""
)
# Loading and resampling audio
if isinstance(input_data, str):
if not os.path.exists(input_data):
raise FileNotFoundError("Target audio not found.")
audio, _ = librosa.load(input_data, sr=self.sample_rate)
elif isinstance(input_data, np.ndarray):
logger.warning(
f"Resampling... (input sampling rate is assumed {input_sr}Hz, \
make sure this is correct and change input_sr otherwise)"
)
audio = librosa.resample(
input_data, orig_sr=input_sr, target_sr=self.sample_rate
)
else:
raise ValueError("Input must be path to audio signal or an audio array")
mixture = tf.convert_to_tensor(audio, dtype=tf.float32)
if mixture.shape[0] == 2:
mixture = tf.reduce_mean(mixture, axis=0)
output_voc = np.zeros(mixture.shape)
hopsized_chunk = int((chunk_size * 22050) / 2)
runs = math.floor(mixture.shape[0] / hopsized_chunk)
trim_low = 0
for trim in tqdm.tqdm(np.arange((runs * 2) - 1)):
try:
trim_high = int(trim_low + (hopsized_chunk * 2))
# Get input mixture spectrogram
mix_trim = mixture[trim_low:trim_high]
mix_mag, mix_phase = compute_stft(mix_trim[None], self.unet_config)
new_len = next_power_of_2(mix_mag.shape[1])
mix_mag_trim = mix_mag[:, :new_len, :]
mix_phase_trim = mix_phase[:, :new_len, :]
# Get and stack cold diffusion steps
diff_feat = self.model(mix_mag_trim, mode="train")
diff_feat = tf.transpose(diff_feat, [1, 0, 2, 3])
diff_feat_t = tf.squeeze(
tf.reshape(
diff_feat, [1, 8, diff_feat.shape[-2] * diff_feat.shape[-1]]
),
axis=0,
).numpy()
# Normalize features, all energy curves having same range
normalized_feat = []
for j in np.arange(diff_feat_t.shape[1]):
normalized_curve = diff_feat_t[:, j] / (
np.max(np.abs(diff_feat_t[:, j])) + 1e-6
)
normalized_feat.append(normalized_curve)
normalized_feat = np.array(normalized_feat, dtype=np.float32)
# Compute mask using unsupervised clustering and reshape to magnitude spec shape
mask = get_mask(normalized_feat, clusters, scheduler)
mask = tf.convert_to_tensor(
mask, dtype=tf.float32
) # Move mask to tensor and cast to float
mask = tf.reshape(mask, mix_mag_trim.shape)
# Getting last step of computed features and applying mask
diff_feat_t = tf.reshape(diff_feat_t[-1, :], mix_mag_trim.shape)
output_signal = tf.math.multiply(diff_feat_t, mask)
# Silence unvoiced regions
output_signal = compute_signal_from_stft(
output_signal, mix_phase_trim, self.unet_config
)
# From here on, pred_audio is numpy
pred_audio = tf.squeeze(output_signal, axis=0).numpy()
vad = VAD(
pred_audio,
sr=22050,
nFFT=512,
win_length=0.025,
hop_length=0.01,
threshold=0.99,
)
if np.sum(vad) / len(vad) < 0.25:
pred_audio = np.zeros(pred_audio.shape)
# Get boundary
boundary = None
boundary = "start" if trim == 0 else None
boundary = "end" if trim == runs - 2 else None
placehold_voc = np.zeros(output_voc.shape)
placehold_voc[trim_low : trim_low + pred_audio.shape[0]] = (
pred_audio * get_overlap_window(pred_audio, boundary=boundary)
)
output_voc += placehold_voc
trim_low += pred_audio.shape[0] // 2
except:
output_voc = output_voc * (
np.max(np.abs(mixture.numpy()))
/ (np.max(np.abs(output_voc)) + 1e-6)
)
output_voc = output_voc[:trim_low]
return output_voc
return output_voc * (
np.max(np.abs(mixture.numpy())) / (np.max(np.abs(output_voc)) + 1e-6)
)
# TODO: write a function to store audio
# Building intuitive filename with model config
# filefolder = os.path.join(args.input_signal.split("/")[:-1])
# filename = args.input_signal.split("/")[-1].split(".")[:-1]
# filename = filename[0] if len(filename) == 1 else ".".join(filename)
# filename = filename + "_" + str(clusters) + "_" + str(scheduler) + "pred_voc"
# sf.write(
# os.path.join(filefolder, filename + ".wav"),
# output_voc,
# 22050) # Writing to file
[docs] def download_model(self, model_path=None, force_overwrite=False):
"""Download pre-trained model."""
download_path = (
os.sep + os.path.join(*model_path.split(os.sep)[:-2])
if model_path is not None
else os.path.join(WORKDIR, "models", "separation", "cold_diff_sep")
)
# Creating model folder to store the weights
if not os.path.exists(download_path):
os.makedirs(download_path)
download_remote_model(
self.download_link,
self.download_checksum,
download_path,
force_overwrite=force_overwrite,
)