-
Notifications
You must be signed in to change notification settings - Fork 178
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Implement wallet RPC's JWT token authority
- Loading branch information
Showing
7 changed files
with
431 additions
and
103 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
import datetime | ||
import os | ||
from typing import Optional | ||
|
||
import jwt | ||
|
||
from jmbase.support import bintohex | ||
|
||
|
||
class InvalidScopeError(Exception): | ||
pass | ||
|
||
|
||
class JMSessionValidity: | ||
access = datetime.timedelta(minutes=30) | ||
refresh = datetime.timedelta(hours=4) | ||
|
||
|
||
class JMTokenSignatureKey: | ||
algorithm = "HS256" | ||
|
||
def __init__(self): | ||
self.access = self.get_random_key() | ||
self.refresh = self.get_random_key() | ||
|
||
@staticmethod | ||
def get_random_key(size: int = 16) -> str: | ||
"""Create a random key has an hexadecimal string.""" | ||
return bintohex(os.urandom(size)) | ||
|
||
def reset(self, refresh_token_only: bool): | ||
"""Invalidate previously issued token(s) by creating new signature key(s).""" | ||
self.refresh = self.get_random_key() | ||
if not refresh_token_only: | ||
self.access = self.get_random_key() | ||
|
||
|
||
class JMTokenAuthority: | ||
"""Manage authorization tokens.""" | ||
|
||
validity = JMSessionValidity() | ||
|
||
def __init__(self, wallet_name: Optional[str] = None): | ||
self.signature = JMTokenSignatureKey() | ||
self.wallet_name = wallet_name | ||
|
||
def verify( | ||
self, | ||
token: str, | ||
scope: str = "walletrpc", | ||
*, | ||
is_refresh: bool = False, | ||
verify_exp: bool = True, | ||
): | ||
"""Verify JWT token. | ||
Token must have a valid signature and its scope must contain both scopes in | ||
arguments and wallet_name property. | ||
""" | ||
token_type = "refresh" if is_refresh else "access" | ||
claims = jwt.decode( | ||
token, | ||
getattr(self.signature, token_type), | ||
algorithms=self.signature.algorithm, | ||
options={"verify_exp": verify_exp}, | ||
leeway=10, | ||
) | ||
token_claims = set(claims.get("scope", []).split()) | ||
if not set(scope.split()) | {self.wallet_name} <= token_claims: | ||
raise InvalidScopeError | ||
|
||
def _issue(self, scope: str, token_type: str) -> str: | ||
return jwt.encode( | ||
{ | ||
"exp": datetime.datetime.utcnow() + getattr(self.validity, token_type), | ||
"scope": f"{scope} {self.wallet_name}", | ||
}, | ||
getattr(self.signature, token_type), | ||
algorithm=self.signature.algorithm, | ||
) | ||
|
||
def issue(self, scope: str = "walletrpc") -> dict: | ||
"""Issue a new access and refresh token for said scope. | ||
Previously issued refresh token is invalidated. | ||
""" | ||
self.signature.reset(refresh_token_only=True) | ||
return { | ||
"token": self._issue(scope, "access"), | ||
"refresh_token": self._issue(scope, "refresh"), | ||
} | ||
|
||
def reset(self): | ||
"""Invalidate all previously issued tokens by creating new signature keys.""" | ||
self.signature.reset(refresh_token_only=False) |
Oops, something went wrong.