Skip to content

Create structured constants for validation dicts used for openid auth. #71

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions mig/shared/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,31 @@ def _is_unicode(val):
return (type(val) == _TYPE_UNICODE)


def encode_ascii_string(unicode_string):
"""Given a supplied input which can be either a string or bytes
return a representation that is gauranteed to be an ASCII string
in which any unicode characters are escaped. Output is suitable
for use in situations in Python 2 that are required to be ASCII
but we want to preserve any contained unicode characters e.g.
exception messages.
"""

assert _is_unicode(unicode_string)

if PY2:
prepared_string = bytearray(unicode_string, 'utf8')
else:
prepared_string = unicode_string

encoded_latin1 = codecs.decode(prepared_string, 'raw_unicode_escape')
encoded_ascii = list(encoded_latin1)
for index, character in enumerate(encoded_latin1):
character_ord = ord(character)
if character_ord > 127:
encoded_ascii[index] = "\%s" % hex(character_ord)[1:]
return ''.join(encoded_ascii)


def ensure_native_string(string_or_bytes):
"""Given a supplied input which can be either a string or bytes
return a representation providing string operations while ensuring that
Expand Down
9 changes: 9 additions & 0 deletions mig/shared/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import os
import sys

from mig.shared.localtypes import AsciiEnum

MIG_BASE = os.path.realpath(os.path.join(os.path.dirname(__file__), '../..'))
MIG_ENV = os.getenv('MIG_ENV', 'default')

Expand Down Expand Up @@ -80,6 +82,13 @@
AUTH_NONE, AUTH_GENERIC, AUTH_CERTIFICATE = "None", "Generic", "X.509 Certificate"
AUTH_OPENID_CONNECT, AUTH_OPENID_V2 = "OpenID Connect", "OpenID 2.0"

class AUTH(AsciiEnum):
NONE = AUTH_NONE
GENERIC = AUTH_GENERIC
CERTIFICATE = AUTH_CERTIFICATE
OPENID_CONNECT = AUTH_OPENID_CONNECT
OPENID_V2 = AUTH_OPENID_V2

AUTH_MIG_OID = "Site %s" % AUTH_OPENID_V2
AUTH_EXT_OID = "Ext %s" % AUTH_OPENID_V2
AUTH_MIG_OIDC = "Site %s" % AUTH_OPENID_CONNECT
Expand Down
174 changes: 90 additions & 84 deletions mig/shared/functionality/autocreate.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@
from mig.shared.base import client_id_dir, canonical_user, mask_creds, \
fill_user, distinguished_name_to_user, fill_distinguished_name, \
get_site_base_url, requested_page
from mig.shared.defaults import AUTH_CERTIFICATE, AUTH_OPENID_V2, \
AUTH_OPENID_CONNECT, AUTH_MIG_CERT, AUTH_EXT_CERT, AUTH_MIG_OID, \
from mig.shared.defaults import \
AUTH, AUTH_MIG_CERT, AUTH_EXT_CERT, AUTH_MIG_OID, \
AUTH_EXT_OID, AUTH_MIG_OIDC, AUTH_EXT_OIDC, keyword_auto
from mig.shared.fileio import write_file
from mig.shared.functional import validate_input, REJECT_UNSET
Expand All @@ -74,41 +74,76 @@
pass


AUTOCREATE_AUTH_DEFAULTS = {
AUTH.OPENID_V2: {
# NOTE: that we only get sreg.required here if user is
# already logged in at OpenID provider when signing up so
# that we do not get the required attributes
'openid.ns.sreg': [''],
'openid.sreg.nickname': [''],
'openid.sreg.fullname': [''],
'openid.sreg.o': [''],
'openid.sreg.ou': [''],
'openid.sreg.timezone': [''],
'openid.sreg.short_id': [''],
'openid.sreg.full_name': [''],
'openid.sreg.organization': [''],
'openid.sreg.organizational_unit': [''],
'openid.sreg.email': [''],
'openid.sreg.country': ['DK'],
'openid.sreg.state': [''],
'openid.sreg.locality': [''],
'openid.sreg.role': [''],
'openid.sreg.roles': [''],
'openid.sreg.association': [''],
'openid.sreg.required': [''],
'openid.ns': [''],
'password': [''],
'comment': [''],
'accept_terms': [''],
'proxy_upload': [''],
'proxy_uploadfilename': [''],
'authsig': ['']
},
AUTH.OPENID_CONNECT: {
# IMPORTANT: consistently lowercase to avoid case sensitive validation
# NOTE: at least one of sub, oid or upn should be set - check later
'oidc.claim.sub': [''],
'oidc.claim.oid': [''],
'oidc.claim.upn': [''],
'oidc.claim.iss': REJECT_UNSET,
'oidc.claim.aud': REJECT_UNSET,
'oidc.claim.nickname': [''],
'oidc.claim.fullname': [''],
'oidc.claim.name': [''],
'oidc.claim.o': [''],
'oidc.claim.ou': [''],
'oidc.claim.organization': [''],
'oidc.claim.organizational_unit': [''],
'oidc.claim.timezone': [''],
'oidc.claim.email': [''],
'oidc.claim.country': [''],
'oidc.claim.state': [''],
'oidc.claim.locality': [''],
'oidc.claim.role': [''],
'oidc.claim.roles': [''],
'oidc.claim.association': [''],
'comment': [''],
'accept_terms': [''],
'proxy_upload': [''],
'proxy_uploadfilename': [''],
},
}


def signature(configuration, auth_type):
"""Signature of the main function"""

if auth_type == AUTH_OPENID_V2:
# Please note that we only get sreg.required here if user is
# already logged in at OpenID provider when signing up so
# that we do not get the required attributes
defaults = {
'openid.ns.sreg': [''],
'openid.sreg.nickname': [''],
'openid.sreg.fullname': [''],
'openid.sreg.o': [''],
'openid.sreg.ou': [''],
'openid.sreg.timezone': [''],
'openid.sreg.short_id': [''],
'openid.sreg.full_name': [''],
'openid.sreg.organization': [''],
'openid.sreg.organizational_unit': [''],
'openid.sreg.email': [''],
'openid.sreg.country': ['DK'],
'openid.sreg.state': [''],
'openid.sreg.locality': [''],
'openid.sreg.role': [''],
'openid.sreg.roles': [''],
'openid.sreg.association': [''],
'openid.sreg.required': [''],
'openid.ns': [''],
'password': [''],
'comment': [''],
'accept_terms': [''],
'proxy_upload': [''],
'proxy_uploadfilename': [''],
'authsig': ['']
}
elif auth_type == AUTH_CERTIFICATE:
if auth_type == AUTH.OPENID_V2:
defaults = AUTOCREATE_AUTH_DEFAULTS[AUTH.OPENID_V2]
elif auth_type == AUTH.OPENID_CONNECT:
defaults = AUTOCREATE_AUTH_DEFAULTS[AUTH.OPENID_CONNECT]
elif auth_type == AUTH.CERTIFICATE:
# We end up here from extcert if conf allows auto creation
# TODO: switch to add fields from cert_field_order in shared.defaults
defaults = {
Expand All @@ -133,35 +168,6 @@ def signature(configuration, auth_type):
peers_default = ['']
for field_name in configuration.site_peers_explicit_fields:
defaults['peers_%s' % field_name] = peers_default
elif auth_type == AUTH_OPENID_CONNECT:
# IMPORTANT: consistently lowercase to avoid case sensitive validation
# NOTE: at least one of sub, oid or upn should be set - check later
defaults = {
'oidc.claim.sub': [''],
'oidc.claim.oid': [''],
'oidc.claim.upn': [''],
'oidc.claim.iss': REJECT_UNSET,
'oidc.claim.aud': REJECT_UNSET,
'oidc.claim.nickname': [''],
'oidc.claim.fullname': [''],
'oidc.claim.name': [''],
'oidc.claim.o': [''],
'oidc.claim.ou': [''],
'oidc.claim.organization': [''],
'oidc.claim.organizational_unit': [''],
'oidc.claim.timezone': [''],
'oidc.claim.email': [''],
'oidc.claim.country': [''],
'oidc.claim.state': [''],
'oidc.claim.locality': [''],
'oidc.claim.role': [''],
'oidc.claim.roles': [''],
'oidc.claim.association': [''],
'comment': [''],
'accept_terms': [''],
'proxy_upload': [''],
'proxy_uploadfilename': [''],
}
else:
raise ValueError('no such auth_type: %s' % auth_type)
return ['text', defaults]
Expand Down Expand Up @@ -263,14 +269,14 @@ def populate_prefilters(configuration, prefilter_map, auth_type):
illegal_handler = lookup_filter_illegal_handler(filter_name)
_logger.debug("populate prefilters found filter illegal char handler %s" %
illegal_handler)
if auth_type == AUTH_OPENID_V2:
if auth_type == AUTH.OPENID_V2:
if filter_name and 'full_name' in configuration.auto_add_filter_fields:
def _filter_helper(x):
return filter_commonname(x, illegal_handler)
# NOTE: KUIT OpenID 2.0 provides full name as 'fullname'
for name in ('openid.sreg.fullname', 'openid.sreg.full_name'):
prefilter_map[name] = _filter_helper
elif auth_type == AUTH_OPENID_CONNECT:
elif auth_type == AUTH.OPENID_CONNECT:
if configuration.auto_add_filter_method and \
'full_name' in configuration.auto_add_filter_fields:
def _filter_helper(x):
Expand All @@ -279,7 +285,7 @@ def _filter_helper(x):
for name in ('oidc.claim.fullname', 'oidc.claim.full_name',
'oidc.claim.name'):
prefilter_map[name] = _filter_helper
elif auth_type == AUTH_CERTIFICATE:
elif auth_type == AUTH.CERTIFICATE:
if configuration.auto_add_filter_method and \
'full_name' in configuration.auto_add_filter_fields:
def _filter_helper(x):
Expand Down Expand Up @@ -309,7 +315,7 @@ def main(client_id, user_arguments_dict, environ=None):
% configuration.short_title})
(auth_type, auth_flavor) = detect_client_auth(configuration, environ)
identity = extract_client_id(configuration, environ, lookup_dn=False)
if client_id and auth_type == AUTH_CERTIFICATE:
if client_id and auth_type == AUTH.CERTIFICATE:
if auth_flavor == AUTH_MIG_CERT:
base_url = configuration.migserver_https_mig_cert_url
elif auth_flavor == AUTH_EXT_CERT:
Expand All @@ -321,7 +327,7 @@ def main(client_id, user_arguments_dict, environ=None):
return (output_objects, returnvalues.SYSTEM_ERROR)
# NOTE: simple filters to handle unsupported chars e.g. in full name
populate_prefilters(configuration, prefilter_map, auth_type)
elif identity and auth_type == AUTH_OPENID_V2:
elif identity and auth_type == AUTH.OPENID_V2:
if auth_flavor == AUTH_MIG_OID:
base_url = configuration.migserver_https_mig_oid_url
elif auth_flavor == AUTH_EXT_OID:
Expand All @@ -333,7 +339,7 @@ def main(client_id, user_arguments_dict, environ=None):
return (output_objects, returnvalues.SYSTEM_ERROR)
# NOTE: simple filters to handle unsupported chars e.g. in full name
populate_prefilters(configuration, prefilter_map, auth_type)
elif identity and auth_type == AUTH_OPENID_CONNECT:
elif identity and auth_type == AUTH.OPENID_CONNECT:
if auth_flavor == AUTH_MIG_OIDC:
base_url = configuration.migserver_https_mig_oidc_url
elif auth_flavor == AUTH_EXT_OIDC:
Expand All @@ -343,7 +349,7 @@ def main(client_id, user_arguments_dict, environ=None):
output_objects.append({'object_type': 'error_text', 'text':
'%s sign up not supported' % auth_flavor})
return (output_objects, returnvalues.SYSTEM_ERROR)
oidc_keys = list(signature(configuration, AUTH_OPENID_CONNECT)[1])
oidc_keys = list(signature(configuration, AUTH.OPENID_CONNECT)[1])
# NOTE: again we lowercase to avoid case sensitivity in validation
for key in environ:
low_key = key.replace('OIDC_CLAIM_', 'oidc.claim.').lower()
Expand Down Expand Up @@ -382,7 +388,7 @@ def main(client_id, user_arguments_dict, environ=None):

# Extract raw values

if auth_type == AUTH_CERTIFICATE:
if auth_type == AUTH.CERTIFICATE:
main_id = accepted['cert_id'][-1].strip()
# TODO: consider switching short_id to email?
short_id = main_id
Expand Down Expand Up @@ -411,7 +417,7 @@ def main(client_id, user_arguments_dict, environ=None):
peers_extras['peers_full_name'] = peers_full_name
peers_extras['peers_email'] = peers_email
peer_pattern = keyword_auto
elif auth_type == AUTH_OPENID_V2:
elif auth_type == AUTH.OPENID_V2:
# No guaranteed unique ID from OpenID 2.0 - mirror main and short
main_id = accepted['openid.sreg.nickname'][-1].strip() \
or accepted['openid.sreg.short_id'][-1].strip()
Expand All @@ -435,7 +441,7 @@ def main(client_id, user_arguments_dict, environ=None):
locality = accepted['openid.sreg.locality'][-1].strip()
timezone = accepted['openid.sreg.timezone'][-1].strip()
email = accepted['openid.sreg.email'][-1].strip()
elif auth_type == AUTH_OPENID_CONNECT:
elif auth_type == AUTH.OPENID_CONNECT:
# OpenID Connect identity advice recommends sub or oid as unique
main_id = accepted['oidc.claim.sub'][-1].strip() \
or accepted['oidc.claim.oid'][-1].strip()
Expand Down Expand Up @@ -476,7 +482,7 @@ def main(client_id, user_arguments_dict, environ=None):
accept_terms = (accepted['accept_terms'][-1].strip().lower() in
('1', 'o', 'y', 't', 'on', 'yes', 'true'))

if auth_type in (AUTH_OPENID_V2, AUTH_OPENID_CONNECT):
if auth_type in (AUTH.OPENID_V2, AUTH.OPENID_CONNECT):

# KU OpenID sign up does not deliver accept_terms so we implicitly
# let it imply acceptance for now
Expand Down Expand Up @@ -514,14 +520,14 @@ def main(client_id, user_arguments_dict, environ=None):
base_url = base_url.replace('autocreate.py', backend)

raw_login = ''
if auth_type == AUTH_OPENID_V2:
if auth_type == AUTH.OPENID_V2:
# OpenID 2.0 provides user ID on URL format - only add plain ID
for oid_provider in configuration.user_openid_providers:
openid_prefix = oid_provider.rstrip('/') + '/'
if identity.startswith(openid_prefix):
raw_login = identity.replace(openid_prefix, '')
break
elif auth_type == AUTH_OPENID_CONNECT:
elif auth_type == AUTH.OPENID_CONNECT:
raw_login = identity

if raw_login and not raw_login in openid_names:
Expand Down Expand Up @@ -575,7 +581,7 @@ def main(client_id, user_arguments_dict, environ=None):
# already logged in situation and must autologout first

if not short_id and not email:
if auth_type == AUTH_OPENID_V2 and identity and \
if auth_type == AUTH.OPENID_V2 and identity and \
accepted.get('openid.sreg.required', ''):
logger.warning('autocreate forcing autologut for %s' % client_id)
output_objects.append({'object_type': 'html_form',
Expand Down Expand Up @@ -646,7 +652,7 @@ def main(client_id, user_arguments_dict, environ=None):
personal_page_url = configuration.migserver_https_ext_cert_url
# TODO: consider limiting expire to real cert expire if before default?
user_dict['expire'] = default_account_expire(configuration,
AUTH_CERTIFICATE)
AUTH.CERTIFICATE)
try:
distinguished_name_to_user(main_id)
user_dict['distinguished_name'] = main_id
Expand All @@ -663,13 +669,13 @@ def main(client_id, user_arguments_dict, environ=None):
ext_login_title = "%s login" % configuration.user_ext_oid_title
personal_page_url = configuration.migserver_https_ext_oid_url
user_dict['expire'] = default_account_expire(configuration,
AUTH_OPENID_V2)
AUTH.OPENID_V2)
fill_distinguished_name(user_dict)
elif auth_flavor == AUTH_EXT_OIDC:
ext_login_title = "%s login" % configuration.user_ext_oidc_title
personal_page_url = configuration.migserver_https_ext_oidc_url
user_dict['expire'] = default_account_expire(configuration,
AUTH_OPENID_CONNECT)
AUTH.OPENID_CONNECT)
fill_distinguished_name(user_dict)
else:
# Reject the migX sign up methods through this handler
Expand All @@ -684,7 +690,7 @@ def main(client_id, user_arguments_dict, environ=None):
user_id = user_dict['distinguished_name']

# IMPORTANT: do NOT let a user create with ID different from client_id
if auth_type == AUTH_CERTIFICATE and client_id != user_id:
if auth_type == AUTH.CERTIFICATE and client_id != user_id:
# IMPORTANT: do NOT log credentials
logger.error('refusing autocreate invalid user for %s: %s' %
(client_id, mask_creds(user_dict)))
Expand Down Expand Up @@ -719,10 +725,10 @@ def main(client_id, user_arguments_dict, environ=None):
# If server allows automatic addition of users with a CA validated cert
# we create the user immediately and skip mail

if auth_type == AUTH_CERTIFICATE and configuration.auto_add_cert_user \
or auth_type == AUTH_OPENID_V2 and \
if auth_type == AUTH.CERTIFICATE and configuration.auto_add_cert_user \
or auth_type == AUTH.OPENID_V2 and \
configuration.auto_add_oid_user \
or auth_type == AUTH_OPENID_CONNECT and \
or auth_type == AUTH.OPENID_CONNECT and \
configuration.auto_add_oidc_user:
fill_user(user_dict)

Expand Down
Loading
Loading