-
Notifications
You must be signed in to change notification settings - Fork 34
/
secret-token-backup
executable file
·420 lines (363 loc) · 18.1 KB
/
secret-token-backup
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
#!/usr/bin/env python
import contextlib as cl, subprocess as sp, datetime as dt, pathlib as pl
import os, sys, time, stat, errno, tempfile, json, base64
p_err = lambda *a,**kw: print('ERROR:', *a, **kw, file=sys.stderr, flush=True) or 1
@cl.contextmanager
def safe_replacement(path, *open_args, mode=None, xattrs=None, **open_kws):
'Context to atomically create/replace file-path in-place unless errors are raised'
path, xattrs = str(path), None
if mode is None:
try: mode = stat.S_IMODE(os.lstat(path).st_mode)
except FileNotFoundError: pass
if xattrs is None and getattr(os, 'getxattr', None): # MacOS
try: xattrs = dict((k, os.getxattr(path, k)) for k in os.listxattr(path))
except FileNotFoundError: pass
except OSError as err:
if err.errno != errno.ENOTSUP: raise
open_kws.update( delete=False,
dir=os.path.dirname(path), prefix=os.path.basename(path)+'.' )
if not open_args: open_kws.setdefault('mode', 'w')
with tempfile.NamedTemporaryFile(*open_args, **open_kws) as tmp:
try:
if mode is not None: os.fchmod(tmp.fileno(), mode)
if xattrs:
for k, v in xattrs.items(): os.setxattr(path, k, v)
yield tmp
if not tmp.closed: tmp.flush()
try: os.fdatasync(tmp)
except AttributeError: pass # MacOS
os.rename(tmp.name, path)
finally:
try: os.unlink(tmp.name)
except FileNotFoundError: pass
class TokenDB:
_db, _db_schema = None, ['''
create table if not exists tokens (
lookup not null primary key on conflict replace,
ts real not null, ct text not null );''']
def __init__(self, path, lock_timeout=60):
import sqlite3
self._sqlite, self._db_kws = sqlite3, dict(
database=path, isolation_level='IMMEDIATE', timeout=lock_timeout )
def _db_init(self):
self._db = self._sqlite.connect(**self._db_kws)
with self() as c:
c.execute('pragma journal_mode=wal')
c.execute('pragma user_version')
if (sv := c.fetchall()[0][0]) == (sv_chk := len(self._db_schema)): return
elif sv > sv_chk:
raise RuntimeError('DB schema [{sv}] newer than the script [{sv_chk}]')
for sv, sql in enumerate(self._db_schema[sv:], sv+1):
for st in sql.split(';'): c.execute(st)
c.execute(f'pragma user_version = {sv}')
def close(self):
if not self._db: return
self._db = self._db.close()
def __enter__(self): return self
def __exit__(self, *err): self.close()
@cl.contextmanager
def __call__(self):
if not self._db: self._db_init()
with self._db as conn, cl.closing(conn.cursor()) as c: yield c
def init(self, keys_str):
with self() as c:
c.execute( 'insert into tokens (lookup, ts, ct)'
' values (?, ?, ?)', ('', time.time(), keys_str) )
def info(self):
with self() as c:
c.execute('select ct, ts from tokens where lookup = ?', ('',))
keys_str, ts_init = c.fetchone()
c.execute('select count(*), max(ts) from tokens where lookup != ?', ('',))
if row := c.fetchone(): token_count, ts_max = row
else: token_count = ts_max = 0
return keys_str, token_count, ts_init, ts_max
def keys(self):
with self() as c:
c.execute('select ct from tokens where lookup = ?', ('',))
return c.fetchone()[0]
def set(self, lookup, ct, ts=None):
with self() as c:
c.execute( 'replace into tokens (lookup, ct, ts)'
' values (?, ?, ?)', (lookup.strip(), ct, ts or time.time()) )
def get(self, lookup):
if not (lookup := lookup.strip()): raise KeyError(lookup)
with self() as c:
c.execute('select ct, ts from tokens where lookup = ?', (lookup,))
if row := c.fetchone(): ct, ts = row; return ct, ts
def get_random(self):
with self() as c:
c.execute( 'select lookup, ct, ts from tokens'
' where lookup != ? order by random() limit 1', ('',) )
if row := c.fetchone(): lookup, ct, ts = row; return lookup, ct, ts
def export_tokens(self):
with self() as c:
c.execute('select lookup, ts, ct from tokens where lookup != ?', ('',))
while row := c.fetchone():
lookup, ts, ct = row; yield row
@cl.contextmanager
def import_tokens(self):
with self() as c:
yield lambda lookup, ct, ts: c.execute(
'replace into tokens (lookup, ct, ts) values'
' (?, ?, ?)', (lookup.strip(), ct, ts or time.time()) )
def encrypt(keys_str, token, comment):
pt = json.dumps([token.strip(), comment.strip()])
with tempfile.NamedTemporaryFile(prefix='.stb.age-rcpt-file.') as tmp:
tmp.write(keys_str.encode()); tmp.flush()
try: age = sp.run( ['age', '-R', tmp.name],
check=True, input=pt.encode(), stdout=sp.PIPE )
except sp.CalledProcessError: sys.exit(p_err('age encryption failed'))
return age.stdout
def decrypt(identity, ct):
try: age = sp.run(
['age', '--decrypt', '-i', identity], check=True, input=ct, stdout=sp.PIPE )
except sp.CalledProcessError: sys.exit(p_err('age decryption failed'))
token, comment = json.loads(age.stdout)
return token, comment
def main(argv=None):
import argparse, re, textwrap
dd = lambda text: re.sub( r' \t+', ' ',
textwrap.dedent(text).strip('\n') + '\n' ).replace('\t', ' ')
parser = argparse.ArgumentParser(
usage='%(prog)s [opts]',
formatter_class=argparse.RawTextHelpFormatter, description=dd(r'''
Tool to store secret tokens, encrypting them to an arbitrary number
of "recipient" public keys, and to easily retrieve/decrypt or re-key them later.
Uses sqlite db file for storing encrypted data and "age" tool for encryption.
Allows using age plugins for e.g. PIV yubikey-tap to decrypt each token.
Stored keys/tokens/comments are always stripped of leading/trailing whitespace.
"Comment" string options support escape sequences like \n \t \0 and such.'''),
epilog=dd(r'''
Usage examples:
% age-keygen -o stb.key
Public key: age1...
% ./secret-token-backup init -c 'Test Storage\nNothing of value here' age1...
% ./secret-token-backup set -c 'my token\n\twith a comment' test1 <<< secret1
% ./secret-token-backup info --list
...
### List of all stored tokens [1]:
#
# 1 [ test1 ] - 2024-02-20 02:58:23 - 243 B
% ./secret-token-backup get test1
secret1
% ./secret-token-backup get test1 -ct
## my token
## with a comment
## Last updated: 2024-02-20 02:58:23
secret1
### To change public key list in the db, re-encrypt tokens via export/import
% ./secret-token-backup export -o tokens.json && rm stb.key stb.db
% ./secret-token-backup init @new-pubkey-list.txt
% ./secret-token-backup import -f tokens.json && rm tokens.json
### A look at db internals and simple backup
% sqlite3 stb.db .dump''') + ' ')
parser.add_argument('-d', '--db', metavar='file', default='stb.db', help=dd('''
SQLite database file used for encrypted data. Default: %(default)s
Has a fixed immutable set of public keys as encryption targets stored in it,
which can only be changed by exporting data into plaintext and re-encrypting.
Must be an existing file, except for the "init" command that creates it.'''))
parser.add_argument('-i', '--identity', metavar='file', default='stb.key', help=dd('''
Identity file with decryption key information, from age or age-plugin-yubikey,
only used/required for token decryption operations - "get" and "export",
and not checked or needed to be present otherwise. Default: %(default)s
Passed through to age command's -i/--identity option, can be fd with %% prefix.'''))
cmds = parser.add_subparsers(title='Commands',
description='Use -h/--help with these to list command-specific options.', dest='call')
cmd = cmds.add_parser('init',
formatter_class=argparse.RawTextHelpFormatter,
help='Create new database with a set of age tool recipient keys.')
cmd.add_argument('key', nargs='+', help=dd('''
List of recipient keys, or key filenames prefixed by @ character
(e.g. @myfile.txt), or fd numbers with optional %% prefix (e.g. 3 or %%3).
This list cannot be changed later, except via creating new db and importing
a list of plaintext/decrypted secrets into it (e.g. created via "export" command).
Recipient files can have #-comments, "-" can be used to read one from stdin.
Examples: age1ql3z7hjy54pw..., ssh-ed25519 AAAA..., 5, %%5, @mykeys.txt,
@~/.ssh/id_ed25519.pub, @~bob/.ssh/authorized_keys, @/srv/age/keyfile.txt'''))
cmd.add_argument('-c', '--comment', metavar='text', default='', help=dd('''
Optional arbitrary comment string to store along with the recipient keys.
Will be printed before keys when using "info" command.'''))
cmd = cmds.add_parser('info',
formatter_class=argparse.RawTextHelpFormatter,
help='Print all recipient keys from the db file, and basic stats on its contents.')
cmd.add_argument('-l', '--list', action='store_true', help=dd('''
List all stored lookup-keys/timestamps in db, and encrypted token sizes.'''))
cmd = cmds.add_parser('set',
formatter_class=argparse.RawTextHelpFormatter,
help='Encrypt and store token from stdin under a specified lookup-key.')
cmd.add_argument('lookup', nargs='?', help=dd('''
Lookup-key to store encrypted secret token under, for later identification/retrieval.
Any existing token/comment under this key will be replaced.
Will be read from stdin, if not specified.'''))
cmd.add_argument('-c', '--comment', metavar='text', default='', help=dd('''
Optional arbitrary comment string to assign to a token, encrypted alongside it.
Can be optionally queried via "get" command or exported later.'''))
cmd.add_argument('-f', '--input-file', metavar='file', help=dd('''
Use input file instead of stdin stream for reading value, or fd with %% prefix.'''))
cmd = cmds.add_parser('get',
formatter_class=argparse.RawTextHelpFormatter,
help='Recover and decrypt/print secret token for the lookup-key.')
cmd.add_argument('lookup', nargs='?', help=dd('''
Lookup-key to find encrypted token. Missing key in db will raise an error.
Will be read from stdin, if not specified.'''))
cmd.add_argument('-p', '--print-lookup', action='store_true', help=dd('''
Print lookup key on the first line, prefixed by # characters.'''))
cmd.add_argument('-c', '--comment', action='store_true', help=dd('''
Query and print comment string before decrypted token, prefixed by # characters.'''))
cmd.add_argument('-C', '--comment-value', action='store_true', help=dd('''
Only print comment field for the lookup-key as-is, without anything else.'''))
cmd.add_argument('-t', '--timestamp', action='store_true', help=dd('''
Print last update timestamp before decrypted token.'''))
cmd.add_argument('-T', '--timestamp-value', action='store_true', help=dd('''
Same as -t/--timestamp, but only print raw posix timestamp value to stdout.
Does not do any kind of decryption, as times are stored
in db unencrypted, not using "identity" option in any way.'''))
cmd.add_argument('-o', '--output-file', metavar='file', help=dd('''
Use specified output file instead of stdout stream, or fd with %% prefix.'''))
cmd.add_argument('-r', '--random', action='store_true', help=dd('''
Pick random lookup key from db. Any specified key won't be used. For quick testing.'''))
cmd = cmds.add_parser('wrap',
formatter_class=argparse.RawTextHelpFormatter,
help='Run encryption command and backup raw secret under wrapped key.',
description=dd('''
Secret-wrapping/encryption command and its arguments is passed on the
command line here (use "--" to make sure args don't get misread as options).
Input in the form of "<b64-salt> <b64-secret>" line is expected on stdin,
of both encryption command and this wrapper, result gets printed to stdout.
Command result is also used to build a lookup key to separately
encrypt/store the input secret under in the db, for pre-configured recipients.
Intended to wrap fido2-hmac-desalinate (fhd) command and have its semantics.'''))
cmd.add_argument('cmd', nargs='*', help=dd('''
Command and any number of optional arguments, to run with passed-through
"<b64-salt> <b64-secret>" stdin, and read wrapped/encrypted secret from
its stdout. Use "--" to make sure command's opts aren't used by this script.'''))
cmd.add_argument('-c', '--comment', metavar='text', default='', help=dd('''
Optional comment string to assign to stored secret, encrypted alongside it.
Can be optionally queried via "get" command or exported later.'''))
cmd.add_argument('-k', '--lookup-key-fmt',
metavar='tpl', default='fhd.{salt}.{ct}', help=dd('''
Format string (python str.format) for resulting lookup key
to encrypt/store source secret under. Default: %(default)s
Following template keys can be used: salt, ct'''))
cmd.add_argument('-r', '--reverse-lookup', action='store_true', help=dd('''
Run reverse of normal operation - format lookup key from stdin,
and decrypt/return backed-up secret, instead of storing it in the db.
Doesn't need or use wrapped-command or -c/--comment option, if any.'''))
cmd = cmds.add_parser('export',
formatter_class=argparse.RawTextHelpFormatter,
help='Decrypt and dump all plaintext tokens/data in JSON format.',
description='Format should be reasonably self-descriptive, similar to db structure.')
cmd.add_argument('-o', '--output-file', metavar='file', help=dd('''
Use specified output file instead of stdout stream, or fd with %% prefix.'''))
cmd = cmds.add_parser('import',
formatter_class=argparse.RawTextHelpFormatter,
help='Populate db using JSON data from stdin or -i/--input-file.',
description='Input should have same structure as -e/--export command produces.')
cmd.add_argument('-f', '--input-file', metavar='file', help=dd('''
Use specified input file instead of stdin stream, or fd with %% prefix.'''))
opts = parser.parse_args(argv)
cmd, db = opts.call, TokenDB(opts.db)
str_comment = lambda s, _map=dict(zip( r'\0abtnvfre',
'\\\0\a\b\t\n\v\f\r\x1b' )): re.sub(r'\\[\\0abtnvfre]', lambda m: _map[m[0][1]], s.strip())
ts_repr = lambda ts: ( '' if not ts else
dt.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S') )
b64_dec = lambda s: base64.standard_b64decode(s).decode()
b64_enc = lambda s: base64.standard_b64encode(s).decode()
@cl.contextmanager
def out_func(path):
if not path or path == '-': return (yield lambda s,end='': print(s, end=end))
if path[0] == '%': dst_file = open(int(path[1:]), 'w')
else: dst_file = safe_replacement(path)
with dst_file as dst: yield lambda s,end='': print(s, file=dst, end=end)
@cl.contextmanager
def in_file(path):
if not path or path == '-': return (yield sys.stdin)
if path[0] == '%': path = int(path[1:])
with open(path) as src: yield src
@cl.contextmanager
def ident_path(path):
with in_file(path) as src: ident = src.read()
with tempfile.NamedTemporaryFile(prefix='.stb.age-ident.') as tmp:
tmp.write(ident.encode()); tmp.flush(); yield tmp.name
if cmd == 'init':
keys = list()
if opts.comment:
keys.extend( f'## {line}' for line in
str_comment(opts.comment).splitlines() )
keys.append('')
for key in filter(None, opts.key):
if key[0] == '@': src = pl.Path(key[1:]).expanduser()
elif key[0] == '%': src = int(key[1:])
elif key.isdigit(): src = int(key)
elif key == '-': key = sys.stdin.read().strip()
else: src = None # key string
if src:
with open(src) as src: key = src.read().strip()
keys.extend([key, ''])
else: keys.append(key)
keys.append('')
return db.init('\n'.join(keys).strip() + '\n')
if not pl.Path(opts.db).exists():
parser.error(f'Database file must first be created using "init" command: {opts.db}')
if cmd == 'info':
keys, token_count, ts_init, ts_max = db.info()
print(f'{keys.strip()}\n' + '\n## [info] '.join([
'', f'Stored tokens: {token_count:,}',
f'DB created: {ts_repr(ts_init)}', f'Last update: {ts_repr(ts_max)}' ]))
if opts.list:
def _size_str(sz, _units=list(reversed(
list((u, 2 ** (i * 10)) for i, u in enumerate('BKMGT'))) )):
for u, u1 in _units:
if sz > u1: break
return f'{sz / u1:.1f}'.removesuffix('.0') + f' {u}'
print(f'\n### List of all stored tokens [{token_count:,}]:\n#')
for n, (lookup, ts, ct) in enumerate(db.export_tokens(), 1):
print(f'# {n:>3d} [ {lookup} ] - {ts_repr(ts)} - {_size_str(len(ct))}')
elif cmd == 'set':
with in_file(opts.input_file) as src: value = src.read()
db.set( opts.lookup or sys.stdin.read(),
encrypt(db.keys(), value, str_comment(opts.comment)) )
elif cmd == 'get':
if opts.random:
if not (data := db.get_random()): return p_err('Database is empty')
lookup, ct, ts = data
elif not (data := db.get(lookup := opts.lookup or sys.stdin.read())):
return p_err(f'Lookup-key not found in db: {lookup}')
else: ct, ts = data
with out_func(opts.output_file) as out, ident_path(opts.identity) as ident:
if opts.timestamp_value: return out(ts)
token, comment = decrypt(ident, ct)
if opts.comment_value: return out(comment)
if opts.print_lookup: out(f'### {lookup}\n')
if opts.comment:
for line in comment.splitlines(): out(f'## {line}\n')
if opts.timestamp: out(f'## Last updated: {ts_repr(ts)}\n')
out(token)
elif cmd == 'wrap':
salt, token_b64 = sys.stdin.read().strip().split(None, 1)
if not opts.reverse_lookup:
if not opts.cmd: parser.error('Wrapped command must be specified')
try: wrap = sp.run( opts.cmd, check=True,
input=f'{salt} {token_b64}'.encode(), stdout=sp.PIPE )
except sp.CalledProcessError as err: return p_err('secret-wrapper cmd failed')
lookup = opts.lookup_key_fmt.format(salt=salt, ct=b64_enc(wrap.stdout))
db.set(lookup, encrypt(db.keys(), b64_dec(token_b64), str_comment(opts.comment)))
sys.stdout.flush(); (out := sys.stdout.buffer).write(wrap.stdout); out.flush()
else:
lookup = opts.lookup_key_fmt.format(salt=salt, ct=token_b64)
if not (data := db.get(lookup)): return p_err(f'Lookup-key not found in db: {lookup}')
with ident_path(opts.identity) as ident: print(decrypt(ident, data[0])[0])
elif cmd == 'export':
with out_func(opts.output_file) as out, ident_path(opts.identity) as ident:
for lookup, ts, ct in db.export_tokens():
token, comment = decrypt(ident, ct)
out(json.dumps(dict(lookup=lookup, ts=ts, token=token, comment=comment)), end='\n')
elif cmd == 'import':
with in_file(opts.input_file) as src, db.import_tokens() as add:
for line in src:
if not (line := line.strip()): continue
line = json.loads(line)
ct = encrypt(db.keys(), line['token'], line['comment'])
add(line['lookup'], ct, line['ts'])
else: parser.error(f'Unrecognized command: {cmd}')
if __name__ == '__main__': sys.exit(main())