Skip to content
This repository was archived by the owner on Oct 2, 2023. It is now read-only.

Feature/oath otp validator support #38

Open
wants to merge 3 commits into
base: lang-library-refactor
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 153 additions & 1 deletion toopher/toopher_api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import uuid
import requests_oauthlib
import sys
import binascii
import csv
import json

DEFAULT_BASE_URL = 'https://api.toopher.com/v1'
VERSION = '2.0.0'
Expand Down Expand Up @@ -67,6 +70,7 @@ def __init__(self, key, secret, api_url, api):
self.authentication_requests = AuthenticationRequests(api)
self.users = Users(api)
self.user_terminals = UserTerminals(api)
self.oath_otp_validators = OathOtpValidators(api)


class ApiRawRequester(object):
Expand Down Expand Up @@ -166,7 +170,7 @@ def get_by_id(self, user_id):

def get_by_name(self, username):
url = '/users'
users = self.api.advanced.raw.get(url, user_name=username)
users = self.api.advanced.raw.get(url, name=username)

if len(users) > 1:
raise ToopherApiError('Multiple users with name = %s' % username)
Expand Down Expand Up @@ -313,6 +317,29 @@ def reset(self):
params = {'name': self.name}
self.api.advanced.raw.post(url, **params)

def associate_oath_otp_validator(self, oath_otp_validator):
url = '/users/{0}/oath_otp_validators/associate'.format(self.id)
params = {}
if oath_otp_validator.id:
# identifying a previously-provisioned validator by ID
params['id'] = oath_otp_validator.id
elif oath_otp_validator.requester_specified_id and not oath_otp_validator.secret:
# identifying a previously-provisioned validator by requester_specified_id
params['requester_specified_id'] = oath_otp_validator.requester_specified_id
elif oath_otp_validator.secret:
# create and return a new validator
params.update(oath_otp_validator._creation_dict())

result = self.api.advanced.raw.post(url, **params)
return OathOtpValidator(result, self.api)

def dissociate_oath_otp_validator(self, oath_otp_validator):
url = '/users/{0}/oath_otp_validators/associate'.format(self.id)
params = { 'id' : oath_otp_validator.id }

result = self.api.advanced.raw.post(url, **params)
return OathOtpValidator(result, self.api)

def _update(self, json_response):
self.raw_response = json_response
try:
Expand Down Expand Up @@ -363,3 +390,128 @@ def _update(self, json_response):
self.name = json_response['name']
except Exception as e:
raise ToopherApiError('Could not parse action from response: %s' % e.args)


class OathOtpValidators(object):
class ImportFormat(object):
SAFENET = {
'csv_dialect' : 'whitespace',
'secret_format' : 'hex',
'header_map' : {
'Serial' : 'requester_specified_id',
'Seed' : 'secret'
},
'defaults' : {
'otp_type' : 'hotp',
'otp_digits' : 6,
'algorithm' : 'sha1'
}
}

def __init__(self, api):
self.api = api

def provision(self, secret, requester_specified_id=None, otp_type='totp', otp_digits=8, algorithm='sha1', totp_step_size=30, **kwargs):
url = '/oath_otp_validators/provision'
params = {
'secret': secret,
'requester_specified_id': requester_specified_id,
'otp_type': otp_type,
'otp_digits': otp_digits,
'algorithm': algorithm,
'totp_step_size': totp_step_size
}
params.update(kwargs)
result = self.api.advanced.raw.post(url, **params)
return OathOtpValidator(result, self.api)

def bulk_provision_from_csv(self, file_or_filename, style):
def decode_validator_secret(encoded_secret, encoding):
if encoding == 'hex':
return encoded_secret
elif encoding == 'ascii':
return binascii.hexlify(bytearray(encoded_secret, 'utf-8'))
else:
raise ValueError('Unsupport secret encoding {0}'.format(encoding))

def read_csv(f):
csv.register_dialect('whitespace', delimiter=' ', skipinitialspace=True)
dialect = style['csv_dialect']
f.seek(0)
reader = csv.DictReader(f, dialect=dialect)
validators = []
for row in reader:
validator = style['defaults'].copy()
for csv_field_name, api_field_name in style['header_map'].iteritems():
validator[api_field_name] = row[csv_field_name]
if 'secret_format' in style:
validator['secret'] = decode_validator_secret(validator['secret'], style['secret_format'])
validators.append(validator)
return validators

if isinstance(file_or_filename, basestring):
with open(file_or_filename) as f:
validators = read_csv(f)
else:
validators = read_csv(file_or_filename)

url = '/oath_otp_validators/bulk_provision'
params = {
'oath_otp_validators' : json.dumps(validators)
}
response = self.api.advanced.raw.post(url, **params)
validators_json_list = response['oath_otp_validators']
validators_list = [OathOtpValidator(json_validator, self.api) for json_validator in validators_json_list]
return validators_list

def get_by_id(self, validator_id):
url = '/oath_otp_validators/{0}'.format(validator_id)
return OathOtpValidator(self.api.advanced.raw.get(url), api)

def get_by_requester_specified_id(self, validator_requester_specified_id):
url = '/oath_otp_validators'
params = { 'requester_specified_id' : validator_requester_specified_id }
return OathOtpValidator(self.api.advanced.raw.get(url, **params), api)


class OathOtpValidator(ToopherBase):
def __init__(self, json_response, api ):
self.api = api
self._update(json_response)

def associate_with_user(self, user):
url = '/users/{0}/oath_otp_validators/associate'.format(user.id)
params = { 'id' : self.id }
result = self.api.advanced.raw.post(url, **params)
self._update(result)

def dissociate_from_user(self, user):
url = '/users/{0}/oath_otp_validators/dissociate'.format(user.id)
params = { 'id' : self.id }
result = self.api.advanced.raw.post(url, **params)

def resynchronize(self, otp_sequence):
url = '/oath_otp_validators/{0}/resync'.format(self.id)
params = { 'otp_sequence' : ','.join(otp_sequence) }
self.api.advanced.raw.post(url, **params)

def deactivate(self):
url = '/oath_otp_validators/{0}/deactivate'.format(self.id)
self.api.advanced.raw.post(url)

def _update(self, json_response):
try:
self.id = json_response['id']
self.requester_specified_id = json_response['requester_specified_id']
self.otp_type = json_response['otp_type']
self.otp_digits = json_response['otp_digits']
self.algorithm = json_response['algorithm']
self.totp_step_size = json_response['totp_step_size']
self.hotp_event_counter = json_response['hotp_event_counter']
except Exception:
import traceback
print traceback.format_exc()
raise ToopherApiError("Could not parse oath_otp_validator from response")

self._raw_data = json_response