Source code for widgets.playerframe

import os
import json

import numpy as np
import pyqtgraph.dockarea as pgdock
from PyQt5.QtWidgets import QVBoxLayout, QFrame, QLayout, QSizePolicy
from PyQt5.Qt import pyqtSignal

from .playbackframe import PlaybackFrame
from .timeserieswidget import TimeSeriesWidget
from .waveformwidget import WaveformWidget
from .scoredialog import ScoreDialog
from .widgetutilities import cursor_pos_sample, current_pitch
from utilities.playback import Playback
from cultures.makam.featureparsers import (read_raw_audio, load_pitch, load_pd,
                                           load_tonic, get_feature_paths,
                                           load_notes, get_sections,
                                           generate_score_map,
                                           mp3_to_wav_converter)


DOCS_PATH = os.path.join(os.path.dirname(__file__), '..', 'cultures',
                         'documents')
COLORS_RGB = [(77, 157, 224, 70), (255, 217, 79, 70), (224, 76, 114, 70),
              (250, 240, 202, 70), (255, 89, 100, 70), (255, 196, 165, 70),
              (102, 0, 17, 70), (201, 149, 18, 70), (58, 1, 92, 70),
              (117, 112, 201, 70)]


class DockAreaWidget(pgdock.DockArea):
    def __init__(self, temporary=False, home=None):
        pgdock.DockArea.__init__(self, temporary=temporary, home=home)
        self.allowedAreas = ['top', 'bottom']
        self.layout.setSizeConstraint(QLayout.SetMinimumSize)

    def floatDock(self, dock):
        pass


[docs]class PlayerFrame(QFrame): samplerate = 44100. update_histogram = pyqtSignal(float) def __init__(self, recid, parent=None): QFrame.__init__(self, parent=parent) self.recid = recid self.__set_design() self.playback = Playback() self.load_file(self.recid) # flags self.score_visible = False self.hist_visible = False self.index = 0 # signals self.playback.positionChanged.connect(self.player_pos_changed) self.waveform_widget.region_wf.sigRegionChangeFinished.connect( self.wf_region_changed) self.waveform_widget.region_wf.clicked.connect( self.wf_region_item_clicked) def load_file(self, mbid): ftr = 'audioanalysis' + '--' + 'pitch_filtered' + '.json' self.__load_pitch(ftr) self.feature_paths = get_feature_paths(mbid) self.__set_waveform() self.playback.pause() self.playback.set_source(self.feature_paths['audio_path_wav']) if hasattr(self, 'ts_widget'): self.ts_widget.zoom_selection.clearPlots() self.ts_widget.right_axis.clearPlots() histogram = os.path.join(DOCS_PATH, self.recid, 'audioanalysis--pitch_distribution.json') vals, bins = load_pd(histogram) self.ts_widget.plot_histogram_raxis(vals, bins) def __load_pitch(self, feature): feature_path = os.path.join(DOCS_PATH, self.recid, feature) (self.time_stamps, self.pitch_plot, self.max_pitch, self.min_pitch, self.samplerate, self.hopsize) = load_pitch(feature_path) def __set_design(self): """ Sets general settings of frame widget, adds dock area and dock widgets. """ self.setWindowTitle('Player') self.resize(1200, 550) self.dock_area = DockAreaWidget() # dock fixed waveform dock_waveform = pgdock.Dock(name="Waveform", area='Top', hideTitle=True, closable=False, autoOrientation=False) dock_waveform.setMinimumHeight(100) dock_waveform.layout.setSizeConstraint(QLayout.SetMinimumSize) dock_waveform.widgetArea.setSizePolicy(QSizePolicy(QSizePolicy.Minimum, QSizePolicy.Minimum)) # initializing waveform widget self.waveform_widget = WaveformWidget() # adding waveform widget to waveform dock dock_waveform.addWidget(self.waveform_widget) dock_waveform.allowedAreas = ['top'] dock_waveform.setAcceptDrops(False) # adding waveform dock to dock area self.dock_area.addDock(dock_waveform, position='top') # adding dock area to frame layout = QVBoxLayout(self) layout.addWidget(self.dock_area) def __set_waveform(self): """ Reads the audio and plots the waveform. """ if not os.path.exists(self.feature_paths['audio_path_wav']): mp3_to_wav_converter(self.feature_paths['audio_path_mp3']) (raw_audio, len_audio, min_audio, max_audio) = read_raw_audio(self.feature_paths['audio_path_wav']) self.waveform_widget.min_raw_audio = min_audio self.waveform_widget.plot_waveform(raw_audio) def wf_region_item_clicked(self): self.playback_pause() def closeEvent(self, QCloseEvent): super(QFrame, self).closeEvent(QCloseEvent) if hasattr(self, 'waveform_widget'): self.waveform_widget.clear() self.waveform_widget.close() if hasattr(self, 'ts_widget'): self.ts_widget.clear() self.ts_widget.close() if hasattr(self, 'playback'): self.playback.pause() self.close() def __add_ts_widget(self): self.ts_widget = TimeSeriesWidget(self) self.ts_widget.add_1d_view() dock_ts = pgdock.Dock(name='Time Series', area='bottom', closable=True) dock_ts.allowedAreas = ['top', 'bottom'] dock_ts.addWidget(self.ts_widget) dock_ts.layout.setSizeConstraint(QLayout.SetMinimumSize) dock_ts.sigClosed.connect(self.__dock_ts_closed) self.dock_area.addDock(dock_ts) # signals self.ts_widget.wheel_event.connect(self.waveform_widget.wheelEvent) def __dock_ts_closed(self): pass
[docs] def plot_1d_data(self, f_type, feature): """ Plots 1D data. :param f_type: :param feature: """ if not hasattr(self, 'ts_widget'): self.__add_ts_widget() ftr = f_type + '--' + feature + '.json' feature_path = os.path.join(DOCS_PATH, self.recid, ftr) if feature == 'pitch' or feature == 'pitch_filtered': self.__load_pitch(ftr) x_min, x_max = self.waveform_widget.get_waveform_region if hasattr(self.ts_widget, 'zoom_selection'): self.ts_widget.hopsize = self.hopsize self.ts_widget.samplerate = self.samplerate self.ts_widget.pitch_plot = self.pitch_plot self.ts_widget.plot_pitch(pitch_plot=self.pitch_plot, x_start=x_min, x_end=x_max, hop_size=self.hopsize) self.is_pitch_plotted = True histogram = \ os.path.join(DOCS_PATH, self.recid, 'audioanalysis--pitch_distribution.json') vals, bins = load_pd(histogram) self.ts_widget.plot_histogram_raxis(vals, bins) if feature == 'tonic': tonic_values = load_tonic(feature_path) self.ts_widget.add_tonic(tonic_values)
def playback_play(self): self.playback.play() def playback_pause(self): self.playback.pause()
[docs] def wf_region_changed(self): """ Updates the plots according to the change in waveform region item. """ pos = self.playback.position() / 1000. # playback pos in seconds x_min, x_max = self.waveform_widget.get_waveform_region if not x_min < pos < x_max: self.playback_pause() pos_ms = x_min * 1000. pos_sample = x_min * self.samplerate self.playback.setPosition(pos_ms) #self.parent().playback_frame.slider.setValue(pos_sample) self.waveform_widget.update_wf_vline(pos_sample) if hasattr(self, 'ts_widget'): if hasattr(self.ts_widget, 'zoom_selection'): if self.ts_widget.is_pitch_plotted: self.ts_widget.update_plot(start=x_min, stop=x_max, hop_size=self.hopsize) self.ts_widget.vline.setPos([x_min, 0]) if self.ts_widget.is_notes_added: self.ts_widget.update_notes(x_min, x_max)
[docs] def player_pos_changed(self, playback_pos): """ Updates the positions of cursors when playback position is changed. Changes the waveform region item according to the position of playback. :param playback_pos: (int) Position of player in milliseconds. """ if self.playback.state() == 1: playback_pos_sec = playback_pos / 1000. playback_pos_sample = playback_pos_sec * self.samplerate self.waveform_widget.update_wf_vline(playback_pos_sample) #self.parent().playback_frame.slider.setValue(playback_pos_sample) # checks the position of linear region item. If the position of # waveform cursor is xmin, xmax = self.waveform_widget.get_waveform_region diff = (xmax - xmin) * 0.1 if not playback_pos_sec <= xmax - diff: x_start = xmax - diff x_end = x_start + (xmax - xmin) self.waveform_widget.change_wf_region(x_start, x_end) # checks if time series widget is initialized or not if hasattr(self, 'ts_widget'): self.ts_widget.vline.setPos([playback_pos_sec, 0]) # checks if horizontal line of y-axis exists if hasattr(self.ts_widget, 'hline_histogram'): if self.ts_widget.pitch_plot is not None: self.ts_widget.set_hist_cursor_pos(playback_pos_sec) if self.score_visible: self.__update_score(playback_pos_sec) pos_sample = cursor_pos_sample(playback_pos_sec, self.samplerate, self.hopsize) if self.hist_visible: pitch = current_pitch(pos_sample, self.pitch_plot) self.update_histogram.emit(pitch)
def __update_score(self, playback_pos_sec): index = self.find_current_note_index(self.ts_widget.notes_start, self.ts_widget.notes_end, playback_pos_sec) if index: workid = self.metadata[index][0] score_index = self.metadata[index][1] svg_path = self.notes_map[workid][str(score_index)] self.score_dialog.score_widget.update_note(svg_path, score_index)
[docs] def add_1d_roi_items(self, f_type, item): """ Adds 1d roi item. :param f_type: (str) Feature type :param item: (str) Feature subtype """ if not hasattr(self, 'ts_widget'): self.__add_ts_widget() if item == 'notes': ftr = f_type + '--' + item + '.json' feature_path = os.path.join(DOCS_PATH, self.recid, ftr) notes_dict = load_notes(feature_path) notes = [] metadata = [] for workid in notes_dict.keys(): for dic in notes_dict[workid]: interval = dic['interval'] pitch = dic['performed_pitch']['value'] notes.append([interval[0], interval[1], pitch]) metadata.append([workid, dic['index_in_score'], dic['label'], dic['symbol']]) self.ts_widget.notes = np.array(notes) self.ts_widget.notes_start = self.ts_widget.notes[:, 0] self.ts_widget.notes_end = self.ts_widget.notes[:, 1] self.metadata = metadata x_min, x_max = self.waveform_widget.get_waveform_region self.ts_widget.update_notes(x_min, x_max) self.ts_widget.is_notes_added = True
def add_sections_to_waveform(self, feature_path): sections = get_sections(feature_path) colors = {} color_index = 0 for work in sections: for section in sections[work]: sec_name = section['name'].split('--')[0] try: color = colors[sec_name] except KeyError: color = COLORS_RGB[color_index] colors[sec_name] = color color_index += 1 self.waveform_widget.add_section(np.array(section['time']), section['name'], section['title'], color) def open_score_dialog(self, mbid): metadata_path = os.path.join(DOCS_PATH, mbid, 'audioanalysis--metadata.json') works = json.load(open(metadata_path))['works'] notes_map = {} for work in works: notes_array = generate_score_map(work['mbid']) notes_map[work['mbid']] = notes_array self.notes_map = notes_map self.score_dialog = ScoreDialog(self) self.score_dialog.show() self.score_visible = True def find_current_note_index(self, n_array_start, n_array_end, value): index = (np.abs(n_array_start - value)).argmin() val_start = n_array_start[index] val_end = n_array_end[index] if self.index != index: if val_start < value < val_end: self.index = index return index # score indexes starts with 1 else: return None