-
Notifications
You must be signed in to change notification settings - Fork 34
/
pick-tracks
executable file
·400 lines (348 loc) · 16 KB
/
pick-tracks
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
#!/usr/bin/env python
import dataclasses as dcs, collections as cs, subprocess as sp, contextlib as cl
import os, sys, re, random, math, time, logging, pathlib as pl, functools as ft
try: import mutagen
except ImportError: mutagen = None
try: from unidecode import unidecode
except ImportError: unidecode = None
@dcs.dataclass
class CopyConf:
src: pl.Path; dst: pl.Path; df_min: float
file_list: bool; ext_filter: bool; order: bool; src_rm: bool
@dcs.dataclass
class CleanupConf: p: pl.Path; df_min: float; df_pre: float; as_needed: bool; order: bool
@dcs.dataclass
class RenameConf:
fmt: str; fmt_fs: bool; max_len: int;
tags_check: list; tags_map: list; field_proc: list
MiB = float(2**20)
err_fmt = lambda err: f'[{err.__class__.__name__}] {err}'
log = logging.getLogger('pt')
_size_units=list(
reversed(list((u, 2 ** (i * 10))
for i, u in enumerate('BKMGT'))) )
def size_human(size):
for u, u1 in _size_units:
if size > u1: break
return f'{size / float(u1):.1f}{u}'
def size_human_parse(size):
if not size or not isinstance(size, str): return size
if size[-1].isdigit(): return float(size)
for u, u1 in _size_units:
if size[-1] == u: break
else: raise ValueError(f'Unrecognized units: {size[-1]} (value: {size!r})')
return float(size[:-1]) * u1
def time_diff_str( ts, ts0=None, now='now', ext=None,
_units=dict( h=3600, m=60, s=1,
y=365.25*86400, mo=30.5*86400, w=7*86400, d=1*86400 ) ):
res, s = list(), abs( (ts - ts0) if ts0 is not None
and not getattr(ts, 'total_seconds', False) else ts )
if not isinstance(s, (int, float)): s = s.total_seconds()
if s <= 0: return now
for unit, unit_s in sorted(_units.items(), key=lambda v: v[1], reverse=True):
val = math.floor(s / float(unit_s))
if not val: continue
res.append(f'{val:.0f}{unit}')
if len(res) >= 2: break
s -= val * unit_s
if not res: return now
if ext: res.append(ext)
return ' '.join(res)
tags_default = dict(x='x', xxx='xxx')
tags_subs = {
r'[\\/]': '_', r'^\.+': '_', r'[\x00-\x1f]': '_', r':': '-_',
r'<': '(', r'>': ')', r'\*': '+', r'[|!"]': '-', r'[\?\*]': '_',
'[\'’]': '', r'\.+$': '_', r'\s+$': '', r'\s': '_' }
def track_name( rename, p,
_track_name_subs = list((re.compile(k), v) for k,v in tags_subs.items()) ):
name = p.name
if not rename: return name
try: tags_raw = mutagen.File(p).tags
except: return name
if not tags_raw: return name
tags = tags_default.copy()
for k, v in tags_raw.items():
if not isinstance(v, list): v = getattr(v, 'text', None)
if not v: continue
tags[k] = str(v[0] if isinstance(v, list) else v)
if not tags: return name
tags = cs.defaultdict(str, tags.items())
for k, vs in rename.tags_map.items():
with cl.suppress(StopIteration):
tags[k] = next(tags[v] for v in vs if tags[v])
for k, a, b in rename.field_proc:
if k not in tags: continue
tags[k] = re.sub(a, b, tags[k])
if ( rename.tags_check and
not all(list(tags.get(k) for k in rename.tags_check)) ): return name
name, ext = rename.fmt.format(**tags), os.path.splitext(name)[-1]
if rename.max_len > 0: name = name[:rename.max_len]
if rename.fmt_fs:
for sub_re, sub in _track_name_subs: name = sub_re.sub(sub, name)
name += ext
if unidecode: name = unidecode(name)
return name
def build_file_list(copy):
if copy.src:
if copy.file_list:
src_lists, src_files = list(), list()
for src in copy.src:
with open(src) as src: src_lists.append(src.read())
for path in '\0'.join(src_lists).replace('\n', '\0').split('\0'):
path = pl.Path(path.rstrip('\r'))
if not path: continue
if not path.is_file():
log.warn('Skipping non-file path: %s', path)
continue
src_files.append(path)
else:
src_files = ['-type', 'f']
if copy.ext_filter: src_files += '( -name *.mp3 -o -name *.ogg )'.split()
cmd = list(map(str, ['find', '-L', *copy.src, *src_files]))
log.debug('Source-file discovery cmd: %s', ' '.join(cmd))
cmd = sp.run(cmd, check=True, stdout=sp.PIPE)
src_files = cmd.stdout.decode().splitlines()
else: src_files = list()
src_files.sort()
# Same files accessible via multiple symlinks get removed here
src_files = dict((pl.Path(p).resolve(), pl.Path(p)) for p in src_files)
log.debug('Source-file list size: %s file(s)', len(src_files))
return list(src_files.values())
def df_get(path):
df = os.statvfs(path or copy.dst)
return float(df.f_bavail * df.f_bsize)
def pop_path(path_list):
idx = random.randint(0, len(path_list) - 1)
return path_list.pop(idx)
def cleanup_run(cleanup, paths, df=None, dry_run=False):
if df is None: df = cleanup.df_min
elif df <= 0: return
if dry_run:
return log.debug('Cleanup dry-run (goal: %.0f MiB)', df / MiB)
while paths:
if (df := df_get(cleanup.p)) >= df: break
p = pop_path(paths)
log.debug('Removing file (df: %.1f / %.1f MiB): %s', df / MiB, df / MiB, p)
try: p.unlink()
except OSError as err:
log.warning('Failed to remove file (%s): %s', p, err_fmt(err))
def run_copy(copy, cleanup, rename, dry_run):
src_files = build_file_list(copy)
### Build a list of files for removal, if any cleanup is requested
if cleanup:
cleanup_paths = list()
cleanup_func = ft.partial(cleanup_run, cleanup, cleanup_paths, dry_run=dry_run)
if not cleanup.p.exists():
if not (dry_run and cleanup.p == copy.dst):
log.info('Cleanup path does not exists: %s', cleanup.p)
else:
res = sp.run(
['find', cleanup.p, '-type', 'f'], check=True, stdout=sp.PIPE )
cleanup_paths = res.stdout.decode().splitlines()
if cleanup.df_pre:
log.debug( 'Doing cleanup (up to: %.1f MiB)'
' of the path: %r', cleanup.df_pre / MiB, cleanup.p )
cleanup_func(cleanup.df_pre)
# Run --clean-as-needed cleanup to specified --df-min, even with no sources
if cleanup.as_needed and cleanup.df_min and not src_files: cleanup_func(cleanup.df_min)
free_path = copy.dst # can be nx dir with --dry-run
while not free_path.exists():
if free_path == free_path.parent: break
free_path = free_path.parent
free = ft.partial(df_get, free_path) #copy.dst if copy.dst.exists() else )
stats = lambda: f'{size_human(df0 - df)}, rate: {rate / MiB:.2f} MiB/s'
rate_a = 0.3 # ewma exponent for per-file rate values, 0 < rate_a < 1
### Copy files
df_last, ts_last = df0, ts0 = free(), time.monotonic()
df, ts, rate = free(), time.monotonic(), 0
while src_files:
# ewma is used here reflect changing speeds
# (due to cached writes and/or other activity) faster.
df, ts = free(), time.monotonic()
rate_n = (df_last - df) / ((ts - ts_last) or 0.001)
rate = ( rate_n if not rate else
(rate_a * rate_n + (1 - rate_a) * rate) )
df_last, ts_last = df, ts
if copy.df_min:
if df < copy.df_min: break
to_fill = df - copy.df_min
time_left = time_diff_str(int(to_fill / rate) if rate != 0 else 0)
log.debug(
' - space left to fill: %.1f MiB, rate: %.2f MiB/s, left: %s',
to_fill / MiB, rate / MiB, time_left )
src = pop_path(src_files)
dst = copy.dst / track_name(rename, src)
log.debug('Copy: %s -> %s', src, dst.name)
try:
if cleanup and cleanup.as_needed: # free space for a file, if requested
src_df = (os.stat(src).st_size + 1*2**20) * 1.2 # for any possible fs overhead
if df < src_df: cleanup_func(src_df)
if not dry_run:
rsync_opts = ['--inplace', '--size-only', '--append-verify']
if copy.src_rm: rsync_opts += ['--remove-source-files']
rsync_opts.extend([src, dst])
try: sp.run(['rsync', *rsync_opts], check=True)
except sp.CalledProcessError as err:
if not opts.ignore_errors: raise
log.warning('File transfer error: %s', err_fmt(err))
except KeyboardInterrupt:
return log.info('Aborted: %s', stats()) or 1
### Success
rate = (df0 - (df := free())) / (time.monotonic() - ts0) # mean rate
log.info( 'Finished%s: %s, df-after: %s',
' [dry-run]' if dry_run else '', stats(), size_human(df) )
def run_rename(copy, rename, dry_run):
src_files = build_file_list(copy)
for p in src_files:
dst_name = track_name(rename, p)
if p.name == dst_name: continue
dst = p.with_name(dst_name)
log.debug('Rename: %s -> %s', p, dst.name)
if not dry_run: p.rename(dst)
def main(argv=None):
default_name_fmt_tags_repr = '''
artist=albumartist,artist,TPE2,
TPE1,TPE2,TSOP,TXXX:Album Artist Credit,TXXX:Artist Credit,xxx|
title=title,TIT2| album=album,TALB,xxx| tracknumber=tracknumber,TRCK,x|'''
default_name_fmt_tags = (
default_name_fmt_tags_repr.replace('\t', '').replace('| ', '|').replace('\n', '') )
default_name_proc = ['tracknumber=/.*$--']
import argparse, textwrap
dd = lambda text: re.sub( r' \t+', ' ',
textwrap.dedent(text).strip('\n') + '\n' ).replace('\t', ' ')
parser = argparse.ArgumentParser(
usage='%(prog)s [opts] src [src...] dst',
formatter_class=argparse.RawTextHelpFormatter,
description='Randomly pick and copy tracks from source(s) to destination.')
group = parser.add_argument_group('Source/destination')
group.add_argument('src', nargs='*', help='Source path(s).')
group.add_argument('dst', nargs='?', help='Destination path.')
group.add_argument('-l', '--src-list', action='store_true',
help='Treat "src" argument(s) as a list of files to copy (either newline or zero-delimeted).')
group.add_argument('-M', '--rename', action='store_true', help=dd('''
Rename all src file(s) in dirs they are in, using name formatting options.
All path args are treated as "src" with this option.'''))
group.add_argument('-m', '--rm-src', action='store_true',
help='Use --remove-source-files with rsync, i.e. move files.')
group.add_argument('--any-ext', action='store_true',
help='Disable filtering for mp3/ogg files only (by filename extension) and copy any files.')
group = parser.add_argument_group('Filename generation')
group.add_argument('-N', '--raw-names', action='store_true',
help='Do not rename tracks according to --name-format.')
group.add_argument('-f', '--name-format',
metavar='py_format_tpl', default='{title}_[{tracknumber}]_--_{artist}', help=dd('''
Format to rename tracks to when copying to destination.
Same syntax as for python str.format, with keys
from mutagenwrapper (see docs for full list of these).
Will not be used if mutagenwrapper is not available.
Default: %(default)s'''))
group.add_argument('--name-format-check',
metavar='key1[,key2,...]', default='title', help=dd('''
Comma-separated list of keys to check before renaming file according to --name-format.
If any of them is empty or missing, source filename will be kept.
Can be empty. Default: %(default)s'''))
group.add_argument('--name-format-mappings',
metavar='k1=v1[,v2,...][|k2=v3[v4,...]|...]', default=default_name_fmt_tags,
help=dd('''
Fallback keys to use if direct match is empty.
Default: {}''').format(dd(default_name_fmt_tags_repr).rstrip()))
group.add_argument('-p', '--name-field-proc',
action='append', metavar='field=src--dst', help=dd(f'''
Source/dest args for re.sub() on specific -f/--name-format fields, applied in order.
Different fields and regexps can be specified by using the option multiple times.
Empty value disables defaults, which are: {default_name_proc}'''))
group.add_argument('--name-format-raw', action='store_true',
help='Avoid doing any string replacements on filename (to make it more fs-friendly).')
group.add_argument('--name-len-max',
type=int, metavar='number', default=160,
help='Trim name to specified length.'
' No collision-detection afterwards. Default: %(default)s')
group = parser.add_argument_group('Free space cleanup')
group.add_argument('-s', '--df-min', metavar='float [BKMGT]', help=dd('''
Threshold of df on dst path to stop on.
Simple number (float) will be interpreted as MiB, but unit letter
(B, K, M, G, T) can be specified immediately after, e.g. "2.2G" for 2.2 GiB.'''))
group.add_argument('-c', '--df-preclean', metavar='float [BKMGT]', help=dd('''
Before copying stuff, clean up files from
destination path (also see --clean-path) up to specified free-space
threshold (or until theres nothing left). Files to remove are picked at random.
Same value spec format as with --df-min.'''))
group.add_argument('-a', '--clean-as-needed', action='store_true', help=dd('''
Clean up files from destination path
(also see --clean-path) as necessary to copy *all* the source files.
Files to remove are picked at random. Mutually exclusive with --df-min.'''))
group.add_argument('-r', '--clean-path', metavar='path',
help='Clean stuff from the specified path instead of destination.')
group = parser.add_argument_group('Sort/shuffle options')
group.add_argument('-o', '--ordering', action='store_true',
help='Copy tracks in (alphanumeric) order based on'
' their full path. Default is to copy files in random order.')
group = parser.add_argument_group('Debug/misc')
group.add_argument('-i', '--ignore-errors', action='store_true', help=dd('''
Don't abort on any errors from rsync.
These can usually happen due to e.g. fs name restrictions and whatever other io errors.'''))
group.add_argument('--dry-run', action='store_true', help='Dont do the actual cp/rm part.')
group.add_argument('--debug', action='store_true', help='Verbose operation mode.')
opts = parser.parse_args(argv if argv is not None else sys.argv[1:])
logging.basicConfig(
level=logging.DEBUG if opts.debug else logging.INFO,
format='{levelname} :: {message}', style="{" )
if opts.rename:
if opts.dst: opts.src += [opts.dst]; opts.dst = None
elif opts.src and not opts.dst:
opts.src, opts.dst = opts.src[:-1], opts.src[-1]
if not opts.dst: parser.error('No "dst" destination-path specified.')
if not opts.src: parser.error('No "src" source-path(s) specified.')
if opts.clean_as_needed and opts.df_min:
parser.error('Options --df-min and --clean-as-needed do not make sense together')
if opts.rename: dst = None
elif not (dst := pl.Path(opts.dst)).exists():
log.debug('Creating dst path: %s', dst)
if not opts.dry_run: sp.run(['mkdir', '-p', dst], check=True)
if dst_df_min := opts.df_min:
try: dst_df_min = float(dst_df_min) * MiB
except ValueError: dst_df_min = size_human_parse(dst_df_min)
copy = CopyConf( list(map(pl.Path, opts.src)),
dst, dst_df_min, opts.src_list, not opts.any_ext, opts.ordering, opts.rm_src )
if cleanup := opts.df_preclean or opts.clean_as_needed:
cleanup_p = pl.Path(opts.clean_path) if opts.clean_path else copy.dst
cleanup = CleanupConf(cleanup_p, 0, 0, opts.clean_as_needed, opts.ordering)
for k, v in ('df_min', opts.df_min), ('df_pre', opts.df_preclean):
if not v: continue
try: v = float(v) * MiB
except ValueError: v = size_human_parse(v)
setattr(cleanup, k, v)
if ( copy.dst and cleanup.p != copy.dst and
os.statvfs(cleanup.p).f_fsid != os.statvfs(copy.dst).f_fsid ):
parser.error( 'Cleanup-path is located on different'
f' fs from destination dir [ {copy.dst} ]: {cleanup.p}' )
if not opts.name_format or not mutagen:
if not opts.raw_names:
log.warning( 'Auto-setting --raw-names due to missing prequesites for'
' tag-based renaming (empty format: %r, mutagen module available: %r)',
not opts.name_format, bool(mutagen) )
opts.raw_names = True
if not opts.raw_names and not unidecode:
log.warning('FIlename transliteration will be disabled due to missing unidecode module')
if rename := not opts.raw_names:
tags_split = lambda tags: list(t.strip() for t in tags.split(',') if t.strip())
tags_check = tags_split(opts.name_format_check)
tags_map, specs = dict(), opts.name_format_mappings.split('|')
for spec in specs:
if not spec.strip(): continue
k, tags = spec.split('=', 1)
tags_map[k] = tags_split(tags)
if not unidecode:
log.warning('Failed to import unidecode module, name transliteration will be disabled.')
field_proc, field_proc_raw = list(), opts.name_field_proc or default_name_proc
for s in field_proc_raw:
try: k, v = s.split('=', 1); a, b = v.split('--', 1)
except: parser.error(f'Invalid -p/--name-field-proc spec: {s!r}')
field_proc.append((k, a, b))
rename = RenameConf( opts.name_format,
not opts.name_format_raw, opts.name_len_max, tags_check, tags_map, field_proc )
if copy.dst:
return run_copy(copy, cleanup, rename, opts.dry_run)
else:
return run_rename(copy, rename, opts.dry_run)
if __name__ == '__main__': sys.exit(main())