forked from salu133445/musegan
-
Notifications
You must be signed in to change notification settings - Fork 0
/
lmd2lpd.py
250 lines (226 loc) · 11.4 KB
/
lmd2lpd.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
from __future__ import print_function
import os
import json
import warnings
import numpy as np
import scipy.sparse
from config import settings
from midi2pianoroll import midi_to_pianorolls
import pickle
if settings['multicore'] > 1:
import joblib
warnings.filterwarnings('ignore')
def msd_id_to_dirs(msd_id):
"""Given an MSD ID, generate the path prefix.
E.g. TRABCD12345678 -> A/B/C/TRABCD12345678"""
return os.path.join(msd_id[2], msd_id[3], msd_id[4], msd_id)
def get_midi_path(msd_id, midi_md5):
"""Given an MSD ID and MIDI MD5, return path to a MIDI file.
kind should be one of 'matched' or 'aligned'. """
return os.path.join(settings['dataset_path'], msd_id_to_dirs(msd_id), midi_md5 + '.mid')
def make_sure_path_exists(path):
"""Create all intermediate-level directories if the given path not exist"""
# while True:
print("make path")
print(path)
if not os.path.exists(path):
try:
os.makedirs(path)
# break
except OSError as e:
print('Cannot make dir ' + path)
pass
def save_npz(filepath, arrays=None, sparse_matrices=None):
""""Save the given matrices into one single '.npz' file."""
arrays_dict = {}
if arrays:
if isinstance(arrays, dict):
arrays_dict = arrays
else:
# if arg arrays is given as other iterable, set to default name, 'arr_0', 'arr_1', ...
for idx, array in enumerate(arrays):
arrays_dict['arr_' + str(idx)] = array
if sparse_matrices:
if isinstance(sparse_matrices, dict):
# convert sparse matrices to sparse representations of arrays if any
for sparse_matrix_name, sparse_matrix in sparse_matrices.iteritems():
csc_matrix = scipy.sparse.csc_matrix(sparse_matrix)
# emmbed indices into filenames for future use when loading
arrays_dict['_'.join([sparse_matrix_name, 'csc_data'])] = csc_matrix.data
arrays_dict['_'.join([sparse_matrix_name, 'csc_indices'])] = csc_matrix.indices
arrays_dict['_'.join([sparse_matrix_name, 'csc_indptr'])] = csc_matrix.indptr
arrays_dict['_'.join([sparse_matrix_name, 'csc_shape'])] = csc_matrix.shape
else:
# convert sparse matrices to sparse representations of arrays if any
for idx, sparse_matrix in enumerate(sparse_matrices):
csc_matrix = scipy.sparse.csc_matrix(sparse_matrix)
# emmbed indices into filenames for future use when loading
arrays_dict['_'.join([str(idx), 'csc_data'])] = csc_matrix.data
arrays_dict['_'.join([str(idx), 'csc_indices'])] = csc_matrix.indices
arrays_dict['_'.join([str(idx), 'csc_indptr'])] = csc_matrix.indptr
arrays_dict['_'.join([str(idx), 'csc_shape'])] = csc_matrix.shape
# save to a compressed npz file
if not filepath.endswith('.npz'):
filepath = filepath + '.npz'
np.savez_compressed(filepath, **arrays_dict)
def load_npz(filepath):
"""Load the file and return the numpy arrays and scipy csc_matrices."""
with np.load(filepath) as loaded:
# serach for non-sparse arrays
arrays_name = [filename for filename in loaded.files if "_csc_" not in filename]
arrays = {array_name: loaded[array_name] for array_name in arrays_name}
# serach for csc matrices
csc_matrices_name = sorted([filename for filename in loaded.files if "_csc_" in filename])
csc_matrices = {}
if csc_matrices_name:
for idx in range(len(csc_matrices_name)/4):
csc_matrix_name = csc_matrices_name[4*idx][:-9] # remove tailing 'csc_data'
csc_matrices[csc_matrix_name] = scipy.sparse.csc_matrix((loaded[csc_matrices_name[4*idx]],
loaded[csc_matrices_name[4*idx+1]],
loaded[csc_matrices_name[4*idx+2]]),
shape=loaded[csc_matrices_name[4*idx+3]])
return arrays, csc_matrices
def get_piano_roll_statistics(piano_roll, onset_array, midi_data):
"""Get the statistics of a piano-roll."""
# get the binarized version of the piano_roll
piano_roll_bool = (piano_roll > 0)
# occurrence beat ratio
sum_rhythm_bool = piano_roll_bool.sum(dtype=bool, axis=1)
occ_beats = sum_rhythm_bool.reshape((settings['beat_resolution'], -1)).sum(dtype=bool, axis=0)
occurrence_beat_ratio = occ_beats / float(midi_data['num_beats'])
# occurrence bar ratio
sum_rhythm_bool = piano_roll_bool.sum(dtype=bool, axis=1)
if midi_data['time_signature'] is not None:
num_step_bar = settings['beat_resolution'] * int(midi_data['time_signature'][0])
occ_bars = sum_rhythm_bool.reshape((num_step_bar, -1)).sum(dtype=bool, axis=0)
occurrence_bar_ratio = occ_bars / float(midi_data['num_bars'])
else:
occurrence_bar_ratio = None
# average notes simultaneously
sum_rhythm_int = piano_roll_bool.sum(axis=1)
avg_notes_simultaneously = sum_rhythm_int.sum() / float(sum_rhythm_int.sum()) if sum_rhythm_int.sum() > 0 else 0.0
# max notes simultaneously
max_notes_simultaneously = max(sum_rhythm_int)
# rhythm complexity
rhythm_complexity = float(np.sum(onset_array)) / float(occ_bars) if occ_bars > 0 else 0.0
# pitch complexity
if midi_data['time_signature'] is not None:
sum_pitch_bar = piano_roll_bool.reshape(-1, settings['beat_resolution']*midi_data['time_signature'][-1], 128) \
.sum(axis=1)
pitch_complexity_bar = (sum_pitch_bar > 0).sum(axis=1)
pitch_complexity = np.sum(pitch_complexity_bar) / float(occ_bars) if occ_bars > 0 else 0.0
return {'occurrence beat ratio': occurrence_beat_ratio,
'occurrence bar ratio': occurrence_bar_ratio,
'average notes simultaneously': avg_notes_simultaneously,
'max notes simultaneously': max_notes_simultaneously,
'rhythm complexity': rhythm_complexity,
'pitch complexity': pitch_complexity}
def save_dict_to_json(data, filepath):
"""Save the data dictionary to the given filepath."""
with open(filepath, 'w') as outfile:
json.dump(data, outfile)
def converter(filepath):
"""Given the midi_filepath, convert it to piano-rolls and save the
piano-rolls along with other side products. Return a key value pair for
storing midi info to a dictionary."""
# get the msd_id and midi_md5
print('filepath:', filepath)
midi_md5 = os.path.splitext(os.path.basename(filepath))[0]
# print('midi_md5', midi_md5)
if settings['link_to_msd']:
msd_id = os.path.basename(os.path.dirname(filepath))
# convert the midi file into piano-rolls
# with open('lmd_genre_artist_py2.pkl', 'r') as f:
# genre_dict = pickle.load(f)
# print(filepath.split('/')[-2])
# if filepath.split('/')[-2] not in genre_dict.keys():
# print('Not have genre info')
# return None
try:
piano_rolls, onset_rolls, info_dict, chords, key, key_from_signature = midi_to_pianorolls(filepath, beat_resolution=settings['beat_resolution'])
# with open('key_list_fixed.txt', 'a') as f:
# row = ','.join([midi_md5, str(key), str(key_from_signature)])
# f.write(row)
# f.write('\n')
#
# print('write finish')
#numerators = info_dict['midi_arrays']['time_signature_numerators']
#denominators = info_dict['midi_arrays']['time_signature_denominators']
#for numerator, denominator in zip(numerators, denominators):
# if numerator != 4 or denominator != 4:
# print("not 4/4")
# return None
except Exception as err:
print(err)
return None
# get the path to save the results
if settings['link_to_msd']:
result_midi_dir = os.path.join(settings['result_path'], msd_id_to_dirs(msd_id), midi_md5)
else:
result_midi_dir = os.path.join(settings['result_path' ], midi_md5[0], midi_md5)
# save the piano-rolls an the onset-rolls into files
# print(result_midi_dir)
make_sure_path_exists(result_midi_dir)
print("save npz")
save_npz(os.path.join(result_midi_dir, 'piano_rolls.npz'), sparse_matrices=piano_rolls)
save_npz(os.path.join(result_midi_dir, 'onset_rolls.npz'), sparse_matrices=onset_rolls)
# save_npz(os.path.join(result_midi_dir, 'chords.npz'), arrays=chords)
# print("additional save")
# save the midi arrays into files
sparse_matrices_keys = ['tempo_array', 'beat_array', 'downbeat_array']
sparse_matrices = {key: value for key, value in info_dict['midi_arrays'].iteritems() if key in sparse_matrices_keys}
arrays = {key: value for key, value in info_dict['midi_arrays'].iteritems() if key not in sparse_matrices_keys}
save_npz(os.path.join(result_midi_dir, 'arrays.npz'), arrays=arrays, sparse_matrices=sparse_matrices)
# save the instrument dictionary into a json file
save_dict_to_json(info_dict['instrument_info'], os.path.join(result_midi_dir, 'instruments.json'))
# save_dict_to_json(genre_dict[filepath.split('/')[-2]], os.path.join(result_midi_dir, 'genre.json'))
# add a key value pair storing the midi_md5 of the selected midi file if link_to_msd is set True
if settings['link_to_msd']:
return (msd_id, {midi_md5: info_dict['midi_info']})
else:
return (midi_md5, info_dict['midi_info'])
def main():
num_songs = 0
make_sure_path_exists(settings['result_path'])
# traverse from dataset root directory and serarch for midi files
midi_filepaths = []
for dirpath, subdirs, filenames in os.walk(settings['dataset_path']):
if not filenames:
continue
for filename in filenames:
if filename.endswith('.mid'):
print(filename)
# if os.path.exists(os.path.join(settings['result_path'], filename[0], filename[:-4])):
# print('Already exists', filename)
# continue
midi_filepaths.append(os.path.join(dirpath, filename))
# break
midi_filepaths.reverse()
# parallelize the converter if in multicore mode
if settings['multicore'] > 1:
#print(midi_filepaths)
kv_pairs = joblib.Parallel(n_jobs=settings['multicore'], verbose=5)(
joblib.delayed(converter)(midi_filepath) for midi_filepath in midi_filepaths)
# save the midi dict into a json file
kv_pairs = [kv_pair for kv_pair in kv_pairs if kv_pair is not None]
num_songs += len(kv_pairs)
midi_dict = {}
for key in set([kv_pair[0] for kv_pair in kv_pairs]):
midi_dict[key] = {}
for kv_pair in kv_pairs:
midi_dict[kv_pair[0]].update(kv_pair[1])
# save_dict_to_json(midi_dict, os.path.join(settings['result_path'], 'midis.json'))
else:
midi_dict = {}
for midi_filepath in midi_filepaths:
print(midi_filepath)
kv_pair = converter(midi_filepath)
if kv_pair is None:
continue
num_songs += len(kv_pair)
midi_dict[kv_pair[0]] = kv_pair[1]
# save the midi dict into a json file
# save_dict_to_json(midi_dict, os.path.join(settings['result_path'], 'midis.json'))
print("the number of songs: %d" % num_songs)
if __name__ == "__main__":
main()