Skip to content

feat(sdk): remove warrant dependency to remove native dependencies #63

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
black
isort
jsonschema
pyjwt
nose2
prance
pycodestyle
pylint
pytest
pytest-cov
responses
pyjwt==1.7.1
boto3
aws_requests_auth
openapi-spec-validator
2 changes: 0 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
"prance",
"requests",
"aws_requests_auth",
"warrant",
"pyjwt",
"openapi-spec-validator",
]

Expand Down
2 changes: 1 addition & 1 deletion staxapp/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
from botocore import UNSIGNED
from botocore.client import Config as BotoConfig
from botocore.exceptions import ClientError
from warrant import AWSSRP

from staxapp.aws_srp import AWSSRP
from staxapp.config import Config as StaxConfig
from staxapp.exceptions import InvalidCredentialsException

Expand Down
293 changes: 293 additions & 0 deletions staxapp/aws_srp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,293 @@
"""
Copyright 2021 Brian Jinwright <opensource@capless.io>

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

Imported from https://github.com/capless/warrant to reduce external dependencies required by this library and just
use the SRP functions.
"""

import base64
import binascii
import datetime
import hashlib
import hmac
import os
import re

import boto3
import six


class WarrantException(Exception):
"""Base class for all Warrant exceptions"""


class ForceChangePasswordException(WarrantException):
"""Raised when the user is forced to change their password"""


# https://github.com/aws/amazon-cognito-identity-js/blob/master/src/AuthenticationHelper.js#L22
n_hex = (
"FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD1"
+ "29024E088A67CC74020BBEA63B139B22514A08798E3404DD"
+ "EF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245"
+ "E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7ED"
+ "EE386BFB5A899FA5AE9F24117C4B1FE649286651ECE45B3D"
+ "C2007CB8A163BF0598DA48361C55D39A69163FA8FD24CF5F"
+ "83655D23DCA3AD961C62F356208552BB9ED529077096966D"
+ "670C354E4ABC9804F1746C08CA18217C32905E462E36CE3B"
+ "E39E772C180E86039B2783A2EC07A28FB5C55DF06F4C52C9"
+ "DE2BCBF6955817183995497CEA956AE515D2261898FA0510"
+ "15728E5A8AAAC42DAD33170D04507A33A85521ABDF1CBA64"
+ "ECFB850458DBEF0A8AEA71575D060C7DB3970F85A6E1E4C7"
+ "ABF5AE8CDB0933D71E8C94E04A25619DCEE3D2261AD2EE6B"
+ "F12FFA06D98A0864D87602733EC86A64521F2B18177B200C"
+ "BBE117577A615D6C770988C0BAD946E208E24FA074E5AB31"
+ "43DB5BFCE0FD108E4B82D120A93AD2CAFFFFFFFFFFFFFFFF"
)
# https://github.com/aws/amazon-cognito-identity-js/blob/master/src/AuthenticationHelper.js#L49
g_hex = "2"
info_bits = bytearray("Caldera Derived Key", "utf-8")


def hash_sha256(buf):
"""AuthenticationHelper.hash"""
a = hashlib.sha256(buf).hexdigest()
return (64 - len(a)) * "0" + a


def hex_hash(hex_string):
return hash_sha256(bytearray.fromhex(hex_string))


def hex_to_long(hex_string):
return int(hex_string, 16)


def long_to_hex(long_num):
return "%x" % long_num


def get_random(nbytes):
random_hex = binascii.hexlify(os.urandom(nbytes))
return hex_to_long(random_hex)


def pad_hex(long_int):
"""
Converts a Long integer (or hex string) to hex format padded with zeroes for hashing
:param {Long integer|String} long_int Number or string to pad.
:return {String} Padded hex string.
"""
if not isinstance(long_int, six.string_types):
hash_str = long_to_hex(long_int)
else:
hash_str = long_int
if len(hash_str) % 2 == 1:
hash_str = "0%s" % hash_str
elif hash_str[0] in "89ABCDEFabcdef":
hash_str = "00%s" % hash_str
return hash_str


def compute_hkdf(ikm, salt):
"""
Standard hkdf algorithm
:param {Buffer} ikm Input key material.
:param {Buffer} salt Salt value.
:return {Buffer} Strong key material.
@private
"""
prk = hmac.new(salt, ikm, hashlib.sha256).digest()
info_bits_update = info_bits + bytearray(chr(1), "utf-8")
hmac_hash = hmac.new(prk, info_bits_update, hashlib.sha256).digest()
return hmac_hash[:16]


def calculate_u(big_a, big_b):
"""
Calculate the client's value U which is the hash of A and B
:param {Long integer} big_a Large A value.
:param {Long integer} big_b Server B value.
:return {Long integer} Computed U value.
"""
u_hex_hash = hex_hash(pad_hex(big_a) + pad_hex(big_b))
return hex_to_long(u_hex_hash)


class AWSSRP(object):

NEW_PASSWORD_REQUIRED_CHALLENGE = "NEW_PASSWORD_REQUIRED"
PASSWORD_VERIFIER_CHALLENGE = "PASSWORD_VERIFIER"

def __init__(
self,
username,
password,
pool_id,
client_id,
pool_region=None,
client=None,
client_secret=None,
):
if pool_region is not None and client is not None:
raise ValueError(
"pool_region and client should not both be specified "
"(region should be passed to the boto3 client instead)"
)

self.username = username
self.password = password
self.pool_id = pool_id
self.client_id = client_id
self.client_secret = client_secret
self.client = (
client if client else boto3.client("cognito-idp", region_name=pool_region)
)
self.big_n = hex_to_long(n_hex)
self.g = hex_to_long(g_hex)
self.k = hex_to_long(hex_hash("00" + n_hex + "0" + g_hex))
self.small_a_value = self.generate_random_small_a()
self.large_a_value = self.calculate_a()

def generate_random_small_a(self):
"""
helper function to generate a random big integer
:return {Long integer} a random value.
"""
random_long_int = get_random(128)
return random_long_int % self.big_n

def calculate_a(self):
"""
Calculate the client's public value A = g^a%N
with the generated random number a
:param {Long integer} a Randomly generated small A.
:return {Long integer} Computed large A.
"""
big_a = pow(self.g, self.small_a_value, self.big_n)
# safety check
if (big_a % self.big_n) == 0:
raise ValueError("Safety check for A failed")
return big_a

def get_password_authentication_key(self, username, password, server_b_value, salt):
"""
Calculates the final hkdf based on computed S value, and computed U value and the key
:param {String} username Username.
:param {String} password Password.
:param {Long integer} server_b_value Server B value.
:param {Long integer} salt Generated salt.
:return {Buffer} Computed HKDF value.
"""
u_value = calculate_u(self.large_a_value, server_b_value)
username_password = "%s%s:%s" % (self.pool_id.split("_")[1], username, password)
username_password_hash = hash_sha256(username_password.encode("utf-8"))

x_value = hex_to_long(hex_hash(pad_hex(salt) + username_password_hash))
g_mod_pow_xn = pow(self.g, x_value, self.big_n)
int_value2 = server_b_value - self.k * g_mod_pow_xn
s_value = pow(int_value2, self.small_a_value + u_value * x_value, self.big_n)
hkdf = compute_hkdf(
bytearray.fromhex(pad_hex(s_value)),
bytearray.fromhex(pad_hex(long_to_hex(u_value))),
)
return hkdf

def get_auth_params(self):
auth_params = {
"USERNAME": self.username,
"SRP_A": long_to_hex(self.large_a_value),
}
if self.client_secret is not None:
auth_params.update(
{
"SECRET_HASH": self.get_secret_hash(
self.username, self.client_id, self.client_secret
)
}
)
return auth_params

@staticmethod
def get_secret_hash(username, client_id, client_secret):
message = bytearray(username + client_id, "utf-8")
hmac_obj = hmac.new(bytearray(client_secret, "utf-8"), message, hashlib.sha256)
return base64.standard_b64encode(hmac_obj.digest()).decode("utf-8")

def process_challenge(self, challenge_parameters):
user_id_for_srp = challenge_parameters["USER_ID_FOR_SRP"]
salt_hex = challenge_parameters["SALT"]
srp_b_hex = challenge_parameters["SRP_B"]
secret_block_b64 = challenge_parameters["SECRET_BLOCK"]
# re strips leading zero from a day number (required by AWS Cognito)
timestamp = re.sub(
r" 0(\d) ",
r" \1 ",
datetime.datetime.utcnow().strftime("%a %b %d %H:%M:%S UTC %Y"),
)
hkdf = self.get_password_authentication_key(
user_id_for_srp, self.password, hex_to_long(srp_b_hex), salt_hex
)
secret_block_bytes = base64.standard_b64decode(secret_block_b64)
msg = (
bytearray(self.pool_id.split("_")[1], "utf-8")
+ bytearray(user_id_for_srp, "utf-8")
+ bytearray(secret_block_bytes)
+ bytearray(timestamp, "utf-8")
)
hmac_obj = hmac.new(hkdf, msg, digestmod=hashlib.sha256)
signature_string = base64.standard_b64encode(hmac_obj.digest())
response = {
"TIMESTAMP": timestamp,
"USERNAME": user_id_for_srp,
"PASSWORD_CLAIM_SECRET_BLOCK": secret_block_b64,
"PASSWORD_CLAIM_SIGNATURE": signature_string.decode("utf-8"),
}
if self.client_secret is not None:
response.update(
{
"SECRET_HASH": self.get_secret_hash(
self.username, self.client_id, self.client_secret
)
}
)
return response

def authenticate_user(self, client=None):
boto_client = self.client or client
auth_params = self.get_auth_params()
response = boto_client.initiate_auth(
AuthFlow="USER_SRP_AUTH",
AuthParameters=auth_params,
ClientId=self.client_id,
)
if response["ChallengeName"] == self.PASSWORD_VERIFIER_CHALLENGE:
challenge_response = self.process_challenge(response["ChallengeParameters"])
tokens = boto_client.respond_to_auth_challenge(
ClientId=self.client_id,
ChallengeName=self.PASSWORD_VERIFIER_CHALLENGE,
ChallengeResponses=challenge_response,
)

if tokens.get("ChallengeName") == self.NEW_PASSWORD_REQUIRED_CHALLENGE:
raise ForceChangePasswordException(
"Change password before authenticating"
)

return tokens
else:
raise NotImplementedError(
"The %s challenge is not supported" % response["ChallengeName"]
)
Loading