diff --git a/stepcovnet/config/TrainingConfig.py b/stepcovnet/config.py similarity index 54% rename from stepcovnet/config/TrainingConfig.py rename to stepcovnet/config.py index 81210b2..3e65658 100644 --- a/stepcovnet/config/TrainingConfig.py +++ b/stepcovnet/config.py @@ -1,20 +1,71 @@ -from typing import Type -from typing import Union +from abc import ABC +from typing import Type, Union import numpy as np from sklearn.model_selection import train_test_split +from stepcovnet.common.constants import NUM_ARROW_COMBS from stepcovnet.common.utils import get_channel_scalers -from stepcovnet.config.AbstractConfig import AbstractConfig from stepcovnet.dataset.ModelDataset import ModelDataset from stepcovnet.training.TrainingHyperparameters import TrainingHyperparameters +class AbstractConfig(ABC, object): + def __init__(self, dataset_config, lookback, difficulty, *args, **kwargs): + self.dataset_config = dataset_config + self.lookback = lookback + self.difficulty = difficulty + + @property + def arrow_input_shape(self): + return (None,) + + @property + def arrow_mask_shape(self): + return (None,) + + @property + def audio_input_shape(self): + return ( + self.lookback, + self.dataset_config["NUM_TIME_BANDS"], + self.dataset_config["NUM_FREQ_BANDS"], + 1, + ) + + @property + def label_shape(self): + return (NUM_ARROW_COMBS,) + + +class InferenceConfig(AbstractConfig): + def __init__( + self, audio_path, file_name, dataset_config, lookback, difficulty, scalers=None + ): + super(InferenceConfig, self).__init__( + dataset_config=dataset_config, lookback=lookback, difficulty=difficulty + ) + self.audio_path = audio_path + self.file_name = file_name + self.scalers = scalers + + class TrainingConfig(AbstractConfig): - def __init__(self, dataset_path: str, dataset_type: Type[ModelDataset], dataset_config, - hyperparameters: TrainingHyperparameters, all_scalers=None, limit: int = -1, - lookback: int = 1, difficulty: str = "challenge", tokenizer_name: str = None): - super(TrainingConfig, self).__init__(dataset_config=dataset_config, lookback=lookback, difficulty=difficulty) + def __init__( + self, + dataset_path: str, + dataset_type: Type[ModelDataset], + dataset_config, + hyperparameters: TrainingHyperparameters, + all_scalers=None, + limit: int = -1, + lookback: int = 1, + difficulty: str = "challenge", + tokenizer_name: str = None, + ): + super(TrainingConfig, self).__init__( + dataset_config=dataset_config, lookback=lookback, difficulty=difficulty + ) self.dataset_path = dataset_path self.dataset_type = dataset_type self.hyperparameters = hyperparameters @@ -23,7 +74,11 @@ def __init__(self, dataset_path: str, dataset_type: Type[ModelDataset], dataset_ self.tokenizer_name = tokenizer_name # Combine some of these to reduce the number of loops and save I/O reads - self.all_indexes, self.train_indexes, self.val_indexes = self.get_train_val_split() + ( + self.all_indexes, + self.train_indexes, + self.val_indexes, + ) = self.get_train_val_split() self.num_samples = self.get_num_samples(self.all_indexes) self.num_train_samples = self.get_num_samples(self.train_indexes) self.num_val_samples = self.get_num_samples(self.val_indexes) @@ -39,19 +94,16 @@ def get_train_val_split(self) -> Union[np.array, np.array, np.array]: total_samples = 0 index = 0 for song_start_index, song_end_index in dataset.song_index_ranges: - if not any(dataset.labels[song_start_index: song_end_index] < 0): + if not any(dataset.labels[song_start_index:song_end_index] < 0): all_indexes.append(index) total_samples += song_end_index - song_start_index if 0 < self.limit < total_samples: break index += 1 all_indexes = np.array(all_indexes) - train_indexes, val_indexes, _, _ = \ - train_test_split(all_indexes, - all_indexes, - test_size=0.1, - shuffle=True, - random_state=42) + train_indexes, val_indexes, _, _ = train_test_split( + all_indexes, all_indexes, test_size=0.1, shuffle=True, random_state=42 + ) return all_indexes, train_indexes, val_indexes def get_class_weights(self, indexes) -> dict: @@ -59,19 +111,29 @@ def get_class_weights(self, indexes) -> dict: with self.enter_dataset as dataset: for index in indexes: song_start_index, song_end_index = dataset.song_index_ranges[index] - encoded_arrows = dataset.onehot_encoded_arrows[song_start_index:song_end_index] + encoded_arrows = dataset.onehot_encoded_arrows[ + song_start_index:song_end_index + ] if labels is None: labels = encoded_arrows else: labels = np.concatenate((labels, encoded_arrows), axis=0) - class_counts = [labels[:, class_index].sum() for class_index in range(labels.shape[1])] + class_counts = [ + labels[:, class_index].sum() for class_index in range(labels.shape[1]) + ] - class_weights = dict(zip( - list(range(len(class_counts))), - list(0 if class_count == 0 else (len(labels) / class_count) / len(class_counts) - for class_count in class_counts) - )) + class_weights = dict( + zip( + list(range(len(class_counts))), + list( + 0 + if class_count == 0 + else (len(labels) / class_count) / len(class_counts) + for class_count in class_counts + ), + ) + ) return dict(enumerate(class_weights)) @@ -93,8 +155,10 @@ def get_train_scalers(self): with self.enter_dataset as dataset: for index in self.train_indexes: song_start_index, song_end_index = dataset.song_index_ranges[index] - features = dataset.features[song_start_index: song_end_index] - training_scalers = get_channel_scalers(features, existing_scalers=training_scalers) + features = dataset.features[song_start_index:song_end_index] + training_scalers = get_channel_scalers( + features, existing_scalers=training_scalers + ) return training_scalers def get_num_samples(self, indexes) -> int: @@ -107,4 +171,6 @@ def get_num_samples(self, indexes) -> int: @property def enter_dataset(self): - return self.dataset_type(self.dataset_path, difficulty=self.difficulty).__enter__() + return self.dataset_type( + self.dataset_path, difficulty=self.difficulty + ).__enter__() diff --git a/stepcovnet/config/AbstractConfig.py b/stepcovnet/config/AbstractConfig.py deleted file mode 100644 index ce7e496..0000000 --- a/stepcovnet/config/AbstractConfig.py +++ /dev/null @@ -1,26 +0,0 @@ -from abc import ABC - -from stepcovnet.common.constants import NUM_ARROW_COMBS - - -class AbstractConfig(ABC, object): - def __init__(self, dataset_config, lookback, difficulty, *args, **kwargs): - self.dataset_config = dataset_config - self.lookback = lookback - self.difficulty = difficulty - - @property - def arrow_input_shape(self): - return (None,) - - @property - def arrow_mask_shape(self): - return (None,) - - @property - def audio_input_shape(self): - return self.lookback, self.dataset_config["NUM_TIME_BANDS"], self.dataset_config["NUM_FREQ_BANDS"], 1, - - @property - def label_shape(self): - return (NUM_ARROW_COMBS,) diff --git a/stepcovnet/config/InferenceConfig.py b/stepcovnet/config/InferenceConfig.py deleted file mode 100644 index ee0fd12..0000000 --- a/stepcovnet/config/InferenceConfig.py +++ /dev/null @@ -1,9 +0,0 @@ -from stepcovnet.config.AbstractConfig import AbstractConfig - - -class InferenceConfig(AbstractConfig): - def __init__(self, audio_path, file_name, dataset_config, lookback, difficulty, scalers=None): - super(InferenceConfig, self).__init__(dataset_config=dataset_config, lookback=lookback, difficulty=difficulty) - self.audio_path = audio_path - self.file_name = file_name - self.scalers = scalers diff --git a/stepcovnet/config/__init__.py b/stepcovnet/config/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/stepcovnet/executor/TrainingExecutor.py b/stepcovnet/executor/TrainingExecutor.py index ee3adf8..67d2b39 100644 --- a/stepcovnet/executor/TrainingExecutor.py +++ b/stepcovnet/executor/TrainingExecutor.py @@ -2,11 +2,10 @@ import os import joblib -from tensorflow.keras.callbacks import EarlyStopping -from tensorflow.keras.callbacks import TensorBoard +from tensorflow.keras.callbacks import EarlyStopping, TensorBoard from tensorflow.python.keras.callbacks import ModelCheckpoint -from stepcovnet.config.TrainingConfig import TrainingConfig +from stepcovnet import config from stepcovnet.executor.AbstractExecutor import AbstractExecutor from stepcovnet.inputs.TrainingInput import TrainingInput from stepcovnet.training.TrainingHyperparameters import TrainingHyperparameters @@ -19,11 +18,17 @@ def __init__(self, stepcovnet_model): def execute(self, input_data: TrainingInput): hyperparameters = input_data.config.hyperparameters - weights = self.stepcovnet_model.model.get_weights() if hyperparameters.retrain else None + weights = ( + self.stepcovnet_model.model.get_weights() + if hyperparameters.retrain + else None + ) - self.stepcovnet_model.model.compile(loss=hyperparameters.loss, - metrics=hyperparameters.metrics, - optimizer=hyperparameters.optimizer) + self.stepcovnet_model.model.compile( + loss=hyperparameters.loss, + metrics=hyperparameters.metrics, + optimizer=hyperparameters.optimizer, + ) self.stepcovnet_model.model.summary() # Saving scalers and metadata in the case of errors during training self.save(input_data.config, pretrained=True, retrained=False) @@ -31,12 +36,16 @@ def execute(self, input_data: TrainingInput): self.save(input_data.config, training_history=history, retrained=False) if hyperparameters.retrain: - epochs_final = len(history.history['val_loss']) - retraining_history = self.retrain(input_data, - saved_original_weights=weights, - epochs=epochs_final, - callbacks=self.get_retraining_callbacks(hyperparameters)) - self.save(input_data.config, training_history=retraining_history, retrained=True) + epochs_final = len(history.history["val_loss"]) + retraining_history = self.retrain( + input_data, + saved_original_weights=weights, + epochs=epochs_final, + callbacks=self.get_retraining_callbacks(hyperparameters), + ) + self.save( + input_data.config, training_history=retraining_history, retrained=True + ) return self.stepcovnet_model def get_training_callbacks(self, hyperparameters: TrainingHyperparameters): @@ -45,15 +54,27 @@ def get_training_callbacks(self, hyperparameters: TrainingHyperparameters): log_path = hyperparameters.log_path patience = hyperparameters.patience callbacks = [ - ModelCheckpoint(filepath=os.path.join(model_out_path, model_name + '_callback'), monitor='val_loss', - verbose=0, save_best_only=True)] + ModelCheckpoint( + filepath=os.path.join(model_out_path, model_name + "_callback"), + monitor="val_loss", + verbose=0, + save_best_only=True, + ) + ] if patience > 0: - callbacks.append(EarlyStopping(monitor='val_loss', patience=patience, verbose=0)) + callbacks.append( + EarlyStopping(monitor="val_loss", patience=patience, verbose=0) + ) if log_path is not None: os.makedirs(os.path.join(log_path, "split_dataset"), exist_ok=True) - callbacks.append(TensorBoard(log_dir=os.path.join(log_path, "split_dataset"), - histogram_freq=1, profile_batch=100000000)) + callbacks.append( + TensorBoard( + log_dir=os.path.join(log_path, "split_dataset"), + histogram_freq=1, + profile_batch=100000000, + ) + ) return callbacks @staticmethod @@ -63,57 +84,85 @@ def get_retraining_callbacks(hyperparameters: TrainingHyperparameters): if log_path is not None: os.makedirs(os.path.join(log_path, "whole_dataset"), exist_ok=True) - callbacks.append(TensorBoard(log_dir=os.path.join(log_path, "whole_dataset"), - histogram_freq=1, profile_batch=100000000)) + callbacks.append( + TensorBoard( + log_dir=os.path.join(log_path, "whole_dataset"), + histogram_freq=1, + profile_batch=100000000, + ) + ) return callbacks def train(self, input_data, callbacks): - print("Training on %d samples (%d songs) and validating on %d samples (%d songs)" % ( - input_data.train_feature_generator.num_samples, - len(input_data.train_feature_generator.train_indexes), - input_data.val_feature_generator.num_samples, - len(input_data.val_feature_generator.train_indexes))) + print( + "Training on %d samples (%d songs) and validating on %d samples (%d songs)" + % ( + input_data.train_feature_generator.num_samples, + len(input_data.train_feature_generator.train_indexes), + input_data.val_feature_generator.num_samples, + len(input_data.val_feature_generator.train_indexes), + ) + ) print("\nStarting training...") - history = self.stepcovnet_model.model.fit(x=input_data.train_generator, - epochs=input_data.config.hyperparameters.epochs, - steps_per_epoch=len(input_data.train_feature_generator), - validation_steps=len(input_data.val_feature_generator), - callbacks=callbacks, - class_weight=input_data.config.train_class_weights, - validation_data=input_data.val_generator, - verbose=1) + history = self.stepcovnet_model.model.fit( + x=input_data.train_generator, + epochs=input_data.config.hyperparameters.epochs, + steps_per_epoch=len(input_data.train_feature_generator), + validation_steps=len(input_data.val_feature_generator), + callbacks=callbacks, + class_weight=input_data.config.train_class_weights, + validation_data=input_data.val_generator, + verbose=1, + ) print("\n*****************************") print("***** TRAINING FINISHED *****") print("*****************************\n") return history def retrain(self, input_data, saved_original_weights, epochs, callbacks): - print("Training on %d samples (%d songs)" % (input_data.all_feature_generator.num_samples, - len(input_data.all_feature_generator.train_indexes))) + print( + "Training on %d samples (%d songs)" + % ( + input_data.all_feature_generator.num_samples, + len(input_data.all_feature_generator.train_indexes), + ) + ) print("\nStarting retraining...") self.stepcovnet_model.model.set_weights(saved_original_weights) - history = self.stepcovnet_model.model.fit(x=input_data.all_generator, - epochs=epochs, - steps_per_epoch=len(input_data.all_feature_generator), - callbacks=callbacks, - class_weight=input_data.config.all_class_weights, - verbose=1) + history = self.stepcovnet_model.model.fit( + x=input_data.all_generator, + epochs=epochs, + steps_per_epoch=len(input_data.all_feature_generator), + callbacks=callbacks, + class_weight=input_data.config.all_class_weights, + verbose=1, + ) print("\n*******************************") print("***** RETRAINING FINISHED *****") print("*******************************\n") return history - def save(self, training_config: TrainingConfig, retrained, training_history=None, pretrained=False): + def save( + self, + training_config: config.TrainingConfig, + retrained, + training_history=None, + pretrained=False, + ): model_out_path = self.stepcovnet_model.model_root_path model_name = self.stepcovnet_model.model_name if pretrained: if training_config.all_scalers is not None: - joblib.dump(training_config.all_scalers, - open(os.path.join(model_out_path, model_name + '_scaler.pkl'), 'wb')) + joblib.dump( + training_config.all_scalers, + open( + os.path.join(model_out_path, model_name + "_scaler.pkl"), "wb" + ), + ) elif retrained: - model_name += '_retrained' + model_name += "_retrained" if not pretrained: - print("Saving model \"%s\" at %s" % (model_name, model_out_path)) + print('Saving model "%s" at %s' % (model_name, model_out_path)) self.stepcovnet_model.model.save(os.path.join(model_out_path, model_name)) if self.stepcovnet_model.metadata is None: self.stepcovnet_model.build_metadata_from_training_config(training_config) @@ -121,5 +170,5 @@ def save(self, training_config: TrainingConfig, retrained, training_history=None history_name = "retraining_history" if retrained else "training_history" self.stepcovnet_model.metadata[history_name] = training_history.history print("Saving model metadata at %s" % model_out_path) - with open(os.path.join(model_out_path, 'metadata.json'), 'w') as json_file: + with open(os.path.join(model_out_path, "metadata.json"), "w") as json_file: json_file.write(json.dumps(self.stepcovnet_model.metadata)) diff --git a/stepcovnet/inputs/InferenceInput.py b/stepcovnet/inputs/InferenceInput.py index a7115a7..7f43636 100644 --- a/stepcovnet/inputs/InferenceInput.py +++ b/stepcovnet/inputs/InferenceInput.py @@ -1,20 +1,24 @@ import numpy as np +from stepcovnet import config from stepcovnet.common.utils import get_samples_ngram_with_mask -from stepcovnet.config.InferenceConfig import InferenceConfig from stepcovnet.data_collection.sample_collection_helper import get_audio_features from stepcovnet.inputs.AbstractInput import AbstractInput class InferenceInput(AbstractInput): - def __init__(self, inference_config: InferenceConfig): + def __init__(self, inference_config: config.InferenceConfig): super(InferenceInput, self).__init__(config=inference_config) - self.audio_features = get_audio_features(wav_path=self.config.audio_path, - file_name=self.config.file_name, - config=self.config.dataset_config) - self.arrow_input_init, self.arrow_mask_init = get_samples_ngram_with_mask(samples=np.array([0]), - lookback=self.config.lookback, - reshape=True, - mask_padding_value=0) + self.audio_features = get_audio_features( + wav_path=self.config.audio_path, + file_name=self.config.file_name, + config=self.config.dataset_config, + ) + self.arrow_input_init, self.arrow_mask_init = get_samples_ngram_with_mask( + samples=np.array([0]), + lookback=self.config.lookback, + reshape=True, + mask_padding_value=0, + ) self.arrow_input_init = self.arrow_input_init[:-1, 1:] self.arrow_mask_init = self.arrow_mask_init[:-1, 1:] diff --git a/stepcovnet/inputs/TrainingInput.py b/stepcovnet/inputs/TrainingInput.py index 18b8595..b61e9f1 100644 --- a/stepcovnet/inputs/TrainingInput.py +++ b/stepcovnet/inputs/TrainingInput.py @@ -1,57 +1,67 @@ import tensorflow as tf -from stepcovnet.config.TrainingConfig import TrainingConfig +from stepcovnet import config from stepcovnet.inputs.AbstractInput import AbstractInput from stepcovnet.training.TrainingFeatureGenerator import TrainingFeatureGenerator class TrainingInput(AbstractInput): - def __init__(self, training_config: TrainingConfig): + def __init__(self, training_config: config.TrainingConfig): super(TrainingInput, self).__init__(config=training_config) self.output_types = ( - {"arrow_input": tf.dtypes.int32, - "arrow_mask": tf.dtypes.int32, - "audio_input": tf.dtypes.float64}, + { + "arrow_input": tf.dtypes.int32, + "arrow_mask": tf.dtypes.int32, + "audio_input": tf.dtypes.float64, + }, tf.dtypes.int8, # labels - tf.dtypes.float16 # sample weights + tf.dtypes.float16, # sample weights ) self.output_shape = ( - {"arrow_input": tf.TensorShape((None,) + self.config.arrow_input_shape), - "arrow_mask": tf.TensorShape((None,) + self.config.arrow_mask_shape), - "audio_input": tf.TensorShape((None,) + self.config.audio_input_shape)}, + { + "arrow_input": tf.TensorShape((None,) + self.config.arrow_input_shape), + "arrow_mask": tf.TensorShape((None,) + self.config.arrow_mask_shape), + "audio_input": tf.TensorShape((None,) + self.config.audio_input_shape), + }, tf.TensorShape((None,) + self.config.label_shape), # labels - tf.TensorShape((None,)) # sample weights + tf.TensorShape((None,)), # sample weights + ) + self.train_feature_generator = TrainingFeatureGenerator( + dataset_path=self.config.dataset_path, + dataset_type=self.config.dataset_type, + lookback=self.config.lookback, + batch_size=self.config.hyperparameters.batch_size, + indexes=self.config.train_indexes, + num_samples=self.config.num_train_samples, + scalers=self.config.train_scalers, + difficulty=self.config.difficulty, + warmup=True, + tokenizer_name=self.config.tokenizer_name, + ) + self.val_feature_generator = TrainingFeatureGenerator( + dataset_path=self.config.dataset_path, + dataset_type=self.config.dataset_type, + lookback=self.config.lookback, + batch_size=self.config.hyperparameters.batch_size, + indexes=self.config.val_indexes, + num_samples=self.config.num_val_samples, + scalers=self.config.train_scalers, + difficulty=self.config.difficulty, + shuffle=False, + tokenizer_name=self.config.tokenizer_name, + ) + self.all_feature_generator = TrainingFeatureGenerator( + dataset_path=self.config.dataset_path, + dataset_type=self.config.dataset_type, + lookback=self.config.lookback, + batch_size=self.config.hyperparameters.batch_size, + indexes=self.config.all_indexes, + num_samples=self.config.num_samples, + scalers=self.config.all_scalers, + difficulty=self.config.difficulty, + warmup=True, + tokenizer_name=self.config.tokenizer_name, ) - self.train_feature_generator = TrainingFeatureGenerator(dataset_path=self.config.dataset_path, - dataset_type=self.config.dataset_type, - lookback=self.config.lookback, - batch_size=self.config.hyperparameters.batch_size, - indexes=self.config.train_indexes, - num_samples=self.config.num_train_samples, - scalers=self.config.train_scalers, - difficulty=self.config.difficulty, - warmup=True, - tokenizer_name=self.config.tokenizer_name) - self.val_feature_generator = TrainingFeatureGenerator(dataset_path=self.config.dataset_path, - dataset_type=self.config.dataset_type, - lookback=self.config.lookback, - batch_size=self.config.hyperparameters.batch_size, - indexes=self.config.val_indexes, - num_samples=self.config.num_val_samples, - scalers=self.config.train_scalers, - difficulty=self.config.difficulty, - shuffle=False, - tokenizer_name=self.config.tokenizer_name) - self.all_feature_generator = TrainingFeatureGenerator(dataset_path=self.config.dataset_path, - dataset_type=self.config.dataset_type, - lookback=self.config.lookback, - batch_size=self.config.hyperparameters.batch_size, - indexes=self.config.all_indexes, - num_samples=self.config.num_samples, - scalers=self.config.all_scalers, - difficulty=self.config.difficulty, - warmup=True, - tokenizer_name=self.config.tokenizer_name) def get_tf_dataset(self, generator): return tf.data.Dataset.from_generator( diff --git a/stepcovnet/model/ArrowModel.py b/stepcovnet/model/ArrowModel.py index 4e4c986..ba7cf4f 100644 --- a/stepcovnet/model/ArrowModel.py +++ b/stepcovnet/model/ArrowModel.py @@ -1,22 +1,31 @@ from abc import abstractmethod import tensorflow as tf -from tensorflow.keras.layers import Input -from tensorflow.keras.layers import Layer +from tensorflow.keras.layers import Input, Layer -from stepcovnet.config.TrainingConfig import TrainingConfig +from stepcovnet import config from stepcovnet.model.AbstractModel import AbstractModel class ArrowModel(AbstractModel): - def __init__(self, training_config: TrainingConfig, name: str = "StepCOVNetArrowModel"): - arrow_input = Input(shape=training_config.arrow_input_shape, name="arrow_input", dtype=tf.int32) - arrow_mask = Input(shape=training_config.arrow_mask_shape, name="arrow_mask", dtype=tf.int32) + def __init__( + self, training_config: config.TrainingConfig, name: str = "StepCOVNetArrowModel" + ): + arrow_input = Input( + shape=training_config.arrow_input_shape, name="arrow_input", dtype=tf.int32 + ) + arrow_mask = Input( + shape=training_config.arrow_mask_shape, name="arrow_mask", dtype=tf.int32 + ) model_input = [arrow_input, arrow_mask] - model_output = self._create_arrow_model(arrow_input=arrow_input, arrow_mask=arrow_mask) + model_output = self._create_arrow_model( + arrow_input=arrow_input, arrow_mask=arrow_mask + ) - super(ArrowModel, self).__init__(model_input=model_input, model_output=model_output, name=name) + super(ArrowModel, self).__init__( + model_input=model_input, model_output=model_output, name=name + ) @abstractmethod def _create_arrow_model(self, arrow_input: Input, arrow_mask: Input) -> Layer: diff --git a/stepcovnet/model/AudioModel.py b/stepcovnet/model/AudioModel.py index 4dd4411..bcdaf71 100644 --- a/stepcovnet/model/AudioModel.py +++ b/stepcovnet/model/AudioModel.py @@ -1,21 +1,30 @@ from abc import abstractmethod import tensorflow as tf -from tensorflow.keras.layers import Input -from tensorflow.keras.layers import Layer +from tensorflow.keras.layers import Input, Layer -from stepcovnet.config.TrainingConfig import TrainingConfig +from stepcovnet import config from stepcovnet.model.AbstractModel import AbstractModel class AudioModel(AbstractModel): - def __init__(self, training_config: TrainingConfig, name: str = "StepCOVNetAudioModel"): - model_input = Input(shape=training_config.audio_input_shape, name="audio_input", dtype=tf.float64) + def __init__( + self, training_config: config.TrainingConfig, name: str = "StepCOVNetAudioModel" + ): + model_input = Input( + shape=training_config.audio_input_shape, + name="audio_input", + dtype=tf.float64, + ) model_output = self._create_audio_model(training_config, model_input) - super(AudioModel, self).__init__(model_input=model_input, model_output=model_output, name=name) + super(AudioModel, self).__init__( + model_input=model_input, model_output=model_output, name=name + ) @abstractmethod - def _create_audio_model(self, training_config: TrainingConfig, model_input: Input) -> Layer: + def _create_audio_model( + self, training_config: config.TrainingConfig, model_input: Input + ) -> Layer: ... diff --git a/stepcovnet/model/ClassifierModel.py b/stepcovnet/model/ClassifierModel.py index 4de1ebc..bdb8d6c 100644 --- a/stepcovnet/model/ClassifierModel.py +++ b/stepcovnet/model/ClassifierModel.py @@ -1,36 +1,49 @@ import tensorflow as tf -from tensorflow.keras.initializers import Constant -from tensorflow.keras.initializers import glorot_uniform -from tensorflow.keras.layers import Activation -from tensorflow.keras.layers import BatchNormalization -from tensorflow.keras.layers import concatenate -from tensorflow.keras.layers import Dense -from tensorflow.keras.layers import Dropout +from tensorflow.keras.initializers import Constant, glorot_uniform +from tensorflow.keras.layers import ( + Activation, + BatchNormalization, + concatenate, + Dense, + Dropout, +) +from stepcovnet import config from stepcovnet.common.constants import NUM_ARROW_COMBS -from stepcovnet.config.TrainingConfig import TrainingConfig from stepcovnet.model.AbstractModel import AbstractModel from stepcovnet.model.ArrowModel import ArrowModel from stepcovnet.model.AudioModel import AudioModel class ClassifierModel(AbstractModel): - def __init__(self, training_config: TrainingConfig, arrow_model: ArrowModel, audio_model: AudioModel, - name="StepCOVNet"): + def __init__( + self, + training_config: config.TrainingConfig, + arrow_model: ArrowModel, + audio_model: AudioModel, + name="StepCOVNet", + ): model_input = [arrow_model.input, audio_model.input] feature_concat = concatenate([arrow_model.output, audio_model.output]) - model = Dense(256, - kernel_initializer=tf.keras.initializers.he_uniform(42), - bias_initializer=tf.keras.initializers.Zeros(), - )(feature_concat) + model = Dense( + 256, + kernel_initializer=tf.keras.initializers.he_uniform(42), + bias_initializer=tf.keras.initializers.Zeros(), + )(feature_concat) model = BatchNormalization()(model) - model = Activation('relu')(model) + model = Activation("relu")(model) model = Dropout(0.5)(model) - model_output = Dense(NUM_ARROW_COMBS, activation="softmax", - bias_initializer=Constant(value=training_config.init_bias_correction), - kernel_initializer=glorot_uniform(42), dtype=tf.float32, name="onehot_encoded_arrows" - )(model) + model_output = Dense( + NUM_ARROW_COMBS, + activation="softmax", + bias_initializer=Constant(value=training_config.init_bias_correction), + kernel_initializer=glorot_uniform(42), + dtype=tf.float32, + name="onehot_encoded_arrows", + )(model) - super(ClassifierModel, self).__init__(model_input=model_input, model_output=model_output, name=name) + super(ClassifierModel, self).__init__( + model_input=model_input, model_output=model_output, name=name + ) diff --git a/stepcovnet/model/SimpleAudioModel.py b/stepcovnet/model/SimpleAudioModel.py index 20d5031..14abefe 100644 --- a/stepcovnet/model/SimpleAudioModel.py +++ b/stepcovnet/model/SimpleAudioModel.py @@ -1,10 +1,11 @@ -from tensorflow.keras.layers import Input -from tensorflow.keras.layers import Layer +from tensorflow.keras.layers import Input, Layer -from stepcovnet.config.TrainingConfig import TrainingConfig +from stepcovnet import config from stepcovnet.model.AudioModel import AudioModel class SimpleAudioModel(AudioModel): - def _create_audio_model(self, training_config: TrainingConfig, model_input: Input) -> Layer: + def _create_audio_model( + self, training_config: config.TrainingConfig, model_input: Input + ) -> Layer: raise NotImplementedError diff --git a/stepcovnet/model/StepCOVNetModel.py b/stepcovnet/model/StepCOVNetModel.py index d12d63e..624e629 100644 --- a/stepcovnet/model/StepCOVNetModel.py +++ b/stepcovnet/model/StepCOVNetModel.py @@ -2,21 +2,27 @@ import os from datetime import datetime -from tensorflow.keras.models import load_model -from tensorflow.keras.models import Model +from tensorflow.keras.models import load_model, Model -from stepcovnet.config.TrainingConfig import TrainingConfig +from stepcovnet import config class StepCOVNetModel(object): - def __init__(self, model_root_path: str, model_name: str = "StepCOVNet", model: Model = None, - metadata: dict = None): + def __init__( + self, + model_root_path: str, + model_name: str = "StepCOVNet", + model: Model = None, + metadata: dict = None, + ): self.model_root_path = os.path.abspath(model_root_path) self.model_name = model_name self.model: Model = model self.metadata = metadata - def build_metadata_from_training_config(self, training_config: TrainingConfig) -> dict: + def build_metadata_from_training_config( + self, training_config: config.TrainingConfig + ) -> dict: self.metadata = { "model_name": self.model_name, "creation_time": datetime.utcnow().strftime("%b %d %Y %H:%M:%S UTC"), @@ -25,17 +31,27 @@ def build_metadata_from_training_config(self, training_config: TrainingConfig) - "lookback": training_config.lookback, "difficulty": training_config.difficulty, "tokenizer_name": training_config.tokenizer_name, - "hyperparameters": str(training_config.hyperparameters) + "hyperparameters": str(training_config.hyperparameters), }, - "dataset_config": training_config.dataset_config + "dataset_config": training_config.dataset_config, } return self.metadata @classmethod - def load(cls, input_path: str, retrained: bool = False, compile_model: bool = False): - metadata = json.load(open(os.path.join(input_path, "metadata.json"), 'r')) + def load( + cls, input_path: str, retrained: bool = False, compile_model: bool = False + ): + metadata = json.load(open(os.path.join(input_path, "metadata.json"), "r")) model_name = metadata["model_name"] - model_path = os.path.join(input_path, model_name + '_retrained') if retrained \ + model_path = ( + os.path.join(input_path, model_name + "_retrained") + if retrained else os.path.join(input_path, model_name) + ) model = load_model(model_path, compile=compile_model) - return cls(model_root_path=input_path, model_name=model_name, model=model, metadata=metadata) + return cls( + model_root_path=input_path, + model_name=model_name, + model=model, + metadata=metadata, + ) diff --git a/stepcovnet/model/VggishAudioModel.py b/stepcovnet/model/VggishAudioModel.py index 76a0511..6853667 100644 --- a/stepcovnet/model/VggishAudioModel.py +++ b/stepcovnet/model/VggishAudioModel.py @@ -1,33 +1,47 @@ import tensorflow as tf -from tensorflow.keras.initializers import glorot_uniform -from tensorflow.keras.initializers import he_uniform -from tensorflow.keras.layers import BatchNormalization -from tensorflow.keras.layers import Bidirectional -from tensorflow.keras.layers import Conv2D -from tensorflow.keras.layers import Input -from tensorflow.keras.layers import LSTM -from tensorflow.keras.layers import TimeDistributed +from tensorflow.keras.initializers import glorot_uniform, he_uniform +from tensorflow.keras.layers import ( + BatchNormalization, + Bidirectional, + Conv2D, + Input, + LSTM, + TimeDistributed, +) -from stepcovnet.config.TrainingConfig import TrainingConfig +from stepcovnet import config from stepcovnet.model.AudioModel import AudioModel from stepcovnet.model.PretrainedModels import PretrainedModels class VggishAudioModel(AudioModel): - def _create_audio_model(self, training_config: TrainingConfig, model_input: Input) -> tf.keras.layers.Layer: + def _create_audio_model( + self, training_config: config.TrainingConfig, model_input: Input + ) -> tf.keras.layers.Layer: # Channel reduction if training_config.dataset_config["NUM_CHANNELS"] > 1: - vggish_input = TimeDistributed(Conv2D(1, (1, 1), strides=(1, 1), activation='linear', - padding='same', kernel_initializer=he_uniform(42), - bias_initializer=tf.keras.initializers.Zeros(), - image_shape=model_input.shape[1:], data_format='channels_last', - name='channel_reduction') - )(model_input) + vggish_input = TimeDistributed( + Conv2D( + 1, + (1, 1), + strides=(1, 1), + activation="linear", + padding="same", + kernel_initializer=he_uniform(42), + bias_initializer=tf.keras.initializers.Zeros(), + image_shape=model_input.shape[1:], + data_format="channels_last", + name="channel_reduction", + ) + )(model_input) else: vggish_input = model_input vggish_input = BatchNormalization()(vggish_input) - vggish_model = PretrainedModels.vggish_model(input_shape=training_config.audio_input_shape, - input_tensor=vggish_input, lookback=training_config.lookback) + vggish_model = PretrainedModels.vggish_model( + input_shape=training_config.audio_input_shape, + input_tensor=vggish_input, + lookback=training_config.lookback, + ) model_output = vggish_model(vggish_input) # VGGish model returns feature maps for avg/max pooling. Using LSTM for additional feature extraction. # Might be able to replace this with another method in the future diff --git a/stepmania_note_generator.py b/stepmania_note_generator.py index ba83b0b..99b091d 100644 --- a/stepmania_note_generator.py +++ b/stepmania_note_generator.py @@ -8,6 +8,7 @@ import joblib +from stepcovnet import config from stepcovnet.common.utils import ( get_bpm, get_filename, @@ -15,7 +16,6 @@ standardize_filename, write_file, ) -from stepcovnet.config.InferenceConfig import InferenceConfig from stepcovnet.executor.InferenceExecutor import InferenceExecutor from stepcovnet.inputs.InferenceInput import InferenceInput from stepcovnet.model.StepCOVNetModel import StepCOVNetModel @@ -106,7 +106,7 @@ def generate_notes( "Generating notes for %s\n-----------------------------------------\n" % audio_file_name ) - inference_config = InferenceConfig( + inference_config = config.InferenceConfig( audio_path=audio_files_path, file_name=audio_file_name, dataset_config=dataset_config, diff --git a/train.py b/train.py index 428f83b..cf337f0 100644 --- a/train.py +++ b/train.py @@ -4,7 +4,7 @@ import joblib -from stepcovnet.config.TrainingConfig import TrainingConfig +from stepcovnet import config from stepcovnet.data.ModelDatasetTypes import ModelDatasetTypes from stepcovnet.data.Tokenizers import Tokenizers from stepcovnet.executor.TrainingExecutor import TrainingExecutor @@ -40,7 +40,7 @@ def run_training( dataset_path, dataset_type, scalers, dataset_config = load_training_data(input_path) hyperparameters = TrainingHyperparameters(log_path=log_path) - training_config = TrainingConfig( + training_config = config.TrainingConfig( dataset_path=dataset_path, dataset_type=dataset_type, dataset_config=dataset_config,