|
1 | | -"""Time-based One-Time Passwords. |
2 | | -
|
3 | | -Inspired by https://github.com/susam/mintotp |
4 | | -""" |
| 1 | +"""Time-based One-Time Passwords.""" |
5 | 2 | import hmac |
6 | 3 | import time |
7 | 4 | from dataclasses import dataclass |
8 | 5 |
|
9 | 6 |
|
10 | 7 | @dataclass |
11 | 8 | class OTPGenerator: |
12 | | - """ |
| 9 | + """Time-based One-Time Password generator. |
| 10 | +
|
| 11 | + Inspired by https://github.com/susam/mintotp |
| 12 | +
|
13 | 13 | See https://datatracker.ietf.org/doc/html/rfc2104 |
14 | 14 | See https://datatracker.ietf.org/doc/html/rfc4226 |
15 | 15 | See https://datatracker.ietf.org/doc/html/rfc6238 |
| 16 | +
|
| 17 | + >>> totp = OTPGenerator(b'') |
| 18 | + >>> totp(0) |
| 19 | + '328482' |
| 20 | + >>> totp(29) |
| 21 | + '328482' |
| 22 | + >>> totp(30) |
| 23 | + '812658' |
| 24 | + >>> totp(31) |
| 25 | + '812658' |
| 26 | +
|
| 27 | + >>> OTPGenerator(b'', time_step=60).totp(60) |
| 28 | + '812658' |
| 29 | +
|
| 30 | + >>> OTPGenerator(b'', digits=20).totp(30) |
| 31 | + '00000000001230812658' |
| 32 | +
|
| 33 | + >>> OTPGenerator(b'', digest='sha256').totp(30) |
| 34 | + '007993' |
16 | 35 | """ |
17 | 36 | key: bytes |
18 | 37 | time_step: int = 30 |
@@ -70,18 +89,74 @@ def _reduce(self, code: int) -> str: |
70 | 89 | return str_code[-self.digits:] |
71 | 90 |
|
72 | 91 | def counter(self, time: float) -> int: |
| 92 | + assert time >= 0, time |
73 | 93 | return int(time / self.time_step) |
74 | 94 |
|
75 | 95 | def totp(self, time: float) -> str: |
76 | 96 | """Time-based One-Time Password. |
77 | 97 |
|
78 | 98 | See https://datatracker.ietf.org/doc/html/rfc6238#section-4 |
| 99 | +
|
| 100 | + >>> from math import nextafter |
| 101 | + >>> just_started = nextafter(0, 1) |
| 102 | + >>> almost_there = nextafter(30, 0) |
| 103 | +
|
| 104 | + >>> totp = OTPGenerator(b'') |
| 105 | + >>> assert totp(0) == totp(just_started) == totp(almost_there) |
| 106 | + >>> assert totp(almost_there) != totp(30) |
| 107 | +
|
| 108 | + >>> totp2 = OTPGenerator(b'', time_step=60) |
| 109 | + >>> assert totp(0) == totp2(0) == totp2(30) |
| 110 | + >>> assert totp(30) == totp2(60) |
79 | 111 | """ |
80 | 112 | return self.hotp(self.counter(time)) |
81 | 113 |
|
82 | 114 | def current(self, *, clock=time.time) -> str: |
83 | 115 | return self.totp(clock()) |
84 | 116 |
|
| 117 | + __call__ = totp |
| 118 | + |
| 119 | + def _check_boundaries(self, step=0): |
| 120 | + """ |
| 121 | + >>> from itertools import product |
| 122 | +
|
| 123 | + >>> keys = {b'', b'ayy', b'lmao'} |
| 124 | + >>> time_steps = {1, 30, int(1e10)} |
| 125 | + >>> digitses = {0, 6, 10_000} |
| 126 | + >>> digests = {'sha1', 'sha256', 'md5'} |
| 127 | + >>> steps = range(1, 100) |
| 128 | +
|
| 129 | + >>> param_lists = product(keys, time_steps, digitses, digests, steps) |
| 130 | + >>> for *params, step in param_lists: |
| 131 | + ... totp = OTPGenerator(*params) |
| 132 | + ... try: |
| 133 | + ... totp._check_boundaries(step) |
| 134 | + ... except AssertionError as exc: |
| 135 | + ... print(step, totp, exc) |
| 136 | +
|
| 137 | + """ |
| 138 | + assert isinstance(step, int) |
| 139 | + from math import nextafter |
| 140 | + |
| 141 | + t1 = self.time_step * step |
| 142 | + t2 = t1 + self.time_step |
| 143 | + |
| 144 | + before_t1 = self(nextafter(t1, float('-inf'))) |
| 145 | + at_t1 = self(t1) |
| 146 | + after_t1 = self(nextafter(t1, float('inf'))) |
| 147 | + |
| 148 | + before_t2 = self(nextafter(t2, float('-inf'))) |
| 149 | + at_t2 = self(t2) |
| 150 | + after_t2 = self(nextafter(t2, float('inf'))) |
| 151 | + |
| 152 | + assert at_t1 == after_t1, 'at_t1 != after_t1' |
| 153 | + assert after_t1 == before_t2, 'after_t1 != before_t2' |
| 154 | + assert at_t2 == after_t2, 'at_t2 != after_t2' |
| 155 | + |
| 156 | + assert before_t1 != at_t1, 'before_t1 == at_t1' |
| 157 | + assert before_t2 != at_t2, 'before_t2 == at_t2' |
| 158 | + assert before_t1 != after_t2, 'before_t1 == after_t2' |
| 159 | + |
85 | 160 |
|
86 | 161 | def totp(key_b32: str): |
87 | 162 | return OTPGenerator.from_b32(key_b32).current() |
|
0 commit comments