Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 79 additions & 62 deletions SoftLayer/API.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,38 @@
))


def _build_transport(url, proxy, timeout, user_agent, verify):
"""Construct the appropriate transport based on the endpoint URL.

Selects RestTransport when the URL contains '/rest', otherwise falls back
to XmlRpcTransport. Extracted to avoid duplicating this logic across
``create_client_from_env``, ``employee_client``, and ``BaseClient``.

:param str url: The API endpoint URL.
:param str proxy: Optional proxy URL.
:param timeout: Request timeout in seconds (``None`` means no timeout).
:param str user_agent: Optional User-Agent string override.
:param verify: SSL verification — ``True``, ``False``, or a path to a CA bundle.
:returns: A :class:`~SoftLayer.transports.RestTransport` or
:class:`~SoftLayer.transports.XmlRpcTransport` instance.
"""
if url is not None and '/rest' in url:
return transports.RestTransport(
endpoint_url=url,
proxy=proxy,
timeout=timeout,
user_agent=user_agent,
verify=verify,
)
return transports.XmlRpcTransport(
endpoint_url=url,
proxy=proxy,
timeout=timeout,
user_agent=user_agent,
verify=verify,
)


def create_client_from_env(username=None,
api_key=None,
endpoint_url=None,
Expand All @@ -62,7 +94,7 @@ def create_client_from_env(username=None,
verify=True):
"""Creates a SoftLayer API client using your environment.

Settings are loaded via keyword arguments, environemtal variables and
Settings are loaded via keyword arguments, environmental variables and
config file.

:param username: an optional API username if you wish to bypass the
Expand Down Expand Up @@ -104,25 +136,13 @@ def create_client_from_env(username=None,
config_file=config_file)

if transport is None:
url = settings.get('endpoint_url')
if url is not None and '/rest' in url:
# If this looks like a rest endpoint, use the rest transport
transport = transports.RestTransport(
endpoint_url=settings.get('endpoint_url'),
proxy=settings.get('proxy'),
timeout=settings.get('timeout'),
user_agent=user_agent,
verify=verify,
)
else:
# Default the transport to use XMLRPC
transport = transports.XmlRpcTransport(
endpoint_url=settings.get('endpoint_url'),
proxy=settings.get('proxy'),
timeout=settings.get('timeout'),
user_agent=user_agent,
verify=verify,
)
transport = _build_transport(
url=settings.get('endpoint_url'),
proxy=settings.get('proxy'),
timeout=settings.get('timeout'),
user_agent=user_agent,
verify=verify,
)

# If we have enough information to make an auth driver, let's do it
if auth is None and settings.get('username') and settings.get('api_key'):
Expand Down Expand Up @@ -157,13 +177,13 @@ def employee_client(username=None,
verify=True):
"""Creates an INTERNAL SoftLayer API client using your environment.

Settings are loaded via keyword arguments, environemtal variables and config file.
Settings are loaded via keyword arguments, environmental variables and config file.

:param username: your user ID
:param access_token: hash from SoftLayer_User_Employee::performExternalAuthentication(username, password, token)
:param password: password to use for employee authentication
:param access_token: hash from SoftLayer_User_Employee::performExternalAuthentication
:param endpoint_url: the API endpoint base URL you wish to connect to.
Set this to API_PRIVATE_ENDPOINT to connect via SoftLayer's private network.
Must contain 'internal'. Set this to API_PRIVATE_ENDPOINT to connect
via SoftLayer's private network.
:param proxy: proxy to be used to make API calls
:param integer timeout: timeout for API requests
:param auth: an object which responds to get_headers() to be inserted into the xml-rpc headers.
Expand All @@ -173,56 +193,54 @@ def employee_client(username=None,
calls if you wish to bypass the packages built in User Agent string
:param transport: An object that's callable with this signature: transport(SoftLayer.transports.Request)
:param bool verify: decide to verify the server's SSL/TLS cert.
DO NOT SET TO FALSE WITHOUT UNDERSTANDING THE IMPLICATIONS.
"""
# Pass caller-supplied verify so it is not silently discarded; the config
# file value will take precedence if present (via get_client_settings).
settings = config.get_client_settings(username=username,
api_key=None,
endpoint_url=endpoint_url,
timeout=timeout,
proxy=proxy,
verify=None,
verify=verify,
config_file=config_file)

url = settings.get('endpoint_url', '')
verify = settings.get('verify', True)
# Honour the config-file value; fall back to the caller-supplied default.
verify = settings.get('verify', verify)

if 'internal' not in url:
raise exceptions.SoftLayerError(f"{url} does not look like an Internal Employee url.")

# url is guaranteed non-empty here (the guard above ensures it contains
# 'internal'), so no additional None-check is needed.
if transport is None:
if url is not None and '/rest' in url:
# If this looks like a rest endpoint, use the rest transport
transport = transports.RestTransport(
endpoint_url=url,
proxy=settings.get('proxy'),
timeout=settings.get('timeout'),
user_agent=user_agent,
verify=verify,
)
else:
# Default the transport to use XMLRPC
transport = transports.XmlRpcTransport(
endpoint_url=url,
proxy=settings.get('proxy'),
timeout=settings.get('timeout'),
user_agent=user_agent,
verify=verify,
)

transport = _build_transport(
url=url,
proxy=settings.get('proxy'),
timeout=settings.get('timeout'),
user_agent=user_agent,
verify=verify,
)

# Resolve all settings-derived credentials together before auth selection.
if access_token is None:
access_token = settings.get('access_token')

user_id = settings.get('userid')
# Assume access_token is valid for now, user has logged in before at least.
if settings.get('auth_cert', False):
auth = slauth.X509Authentication(settings.get('auth_cert'), verify)
return EmployeeClient(auth=auth, transport=transport, config_file=config_file)
elif access_token and user_id:
auth = slauth.EmployeeAuthentication(user_id, access_token)
return EmployeeClient(auth=auth, transport=transport, config_file=config_file)
else:
# This is for logging in mostly.
LOGGER.info("No access_token or userid found in settings, creating a No Auth client for now.")
return EmployeeClient(auth=None, transport=transport, config_file=config_file)

# Select the appropriate auth driver only when the caller has not already
# supplied one. A single return keeps construction separate from selection.
if auth is None:
if settings.get('auth_cert'):
auth = slauth.X509Authentication(settings.get('auth_cert'), verify)
elif access_token and user_id:
auth = slauth.EmployeeAuthentication(user_id, access_token)
else:
# No credentials available — caller must authenticate explicitly
# (e.g. via EmployeeClient.authenticate_with_internal).
LOGGER.info("No access_token or userid found in settings, creating a No Auth client for now.")

return EmployeeClient(auth=auth, transport=transport, config_file=config_file)


def Client(**kwargs):
Expand All @@ -237,7 +255,7 @@ class BaseClient(object):
:param transport: An object that's callable with this signature: transport(SoftLayer.transports.Request)
"""
_prefix = "SoftLayer_"
auth: slauth.AuthenticationBase
auth: slauth.AuthenticationBase | None

def __init__(self, auth=None, transport=None, config_file=None):
if config_file is None:
Expand All @@ -247,7 +265,7 @@ def __init__(self, auth=None, transport=None, config_file=None):
self.__setAuth(auth)
self.__setTransport(transport)

def __setAuth(self, auth=None):
def __setAuth(self, auth: slauth.AuthenticationBase | None = None):
"""Prepares the authentication property"""
self.auth = auth

Expand Down Expand Up @@ -751,9 +769,7 @@ def refresh_token(self, userId, auth_token):

def call(self, service, method, *args, **kwargs):
"""Handles refreshing Employee tokens in case of a HTTP 401 error"""
if (service == 'SoftLayer_Account' or service == 'Account') and not kwargs.get('id'):
if not self.account_id:
raise exceptions.SoftLayerError("SoftLayer_Account service requires an ID")
if self.account_id and not kwargs.get('id', False):
kwargs['id'] = self.account_id

try:
Expand All @@ -763,6 +779,7 @@ def call(self, service, method, *args, **kwargs):
userId = self.settings['softlayer'].get('userid')
access_token = self.settings['softlayer'].get('access_token')
LOGGER.warning("Token has expired, trying to refresh. %s", ex.faultString)
print("Token has expired, trying to refresh. %s", ex.faultString)
self.refresh_token(userId, access_token)
# Try the Call again this time....
return BaseClient.call(self, service, method, *args, **kwargs)
Expand Down
45 changes: 44 additions & 1 deletion SoftLayer/CLI/login.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,58 @@ def censor_password(value):


@click.command(cls=SLCommand)
@click.option('--session-token',
default=None,
help='An existing employee session token (hash). Click the "Copy Session Token" in the internal portal to get this value.'
'Can also be set via the SLCLI_SESSION_TOKEN environment variable.',
envvar='SLCLI_SESSION_TOKEN')
@click.option('--user-id',
default=None,
type=int,
help='Employee IMS user ID. This is the number in the url when you click your username in the internal portal, under "user information". '
'Can also be set via the SLCLI_USER_ID environment variable. Or read from the configuration file.',
envvar='SLCLI_USER_ID')
@click.option('--legacy',
default=False,
type=bool,
is_flag=True,
help='Login with username, password, yubi key combination. Only valid if ISV is not required. If using ISV, use your session token.')
@environment.pass_env
def cli(env):
def cli(env, session_token: str | None, user_id: int | None, legacy: bool):
"""Logs you into the internal SoftLayer Network.

username: Set this in either the softlayer config, or SL_USER ENV variable
password: Set this in SL_PASSWORD env variable. You will be prompted for them otherwise.

To log in with an existing session token instead of username/password/2FA:

slcli login --session-token <token> --user-id <id>

Or via environment variables:

SLCLI_SESSION_TOKEN=<token> SLCLI_USER_ID=<id> slcli login
"""
config_settings = config.get_config(config_file=env.config_file)
settings = config_settings['softlayer']

if not user_id:
user_id = int(settings.get('userid', 0)) or int(os.environ.get('SLCLI_USER_ID', 0))
# --session-token supplied on the CLI (or via SLCLI_SESSION_TOKEN env var):
# authenticate directly, persist to config, and return immediately.
if not legacy:
if not user_id:
user_id = int(input("User ID (number): "))
if not session_token:
session_token = os.environ.get('SLCLI_SESSION_TOKEN', '') or input("Session Token: ")
env.client.authenticate_with_hash(user_id, session_token)
settings['access_token'] = session_token
settings['userid'] = str(user_id)
config_settings['softlayer'] = settings
config.write_config(config_settings, env.config_file)
click.echo(f"Logged in with session token for user ID {user_id}.")
return


username = settings.get('username') or os.environ.get('SLCLI_USER', None)
password = os.environ.get('SLCLI_PASSWORD', '')
yubi = None
Expand Down
5 changes: 1 addition & 4 deletions tests/api_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,10 +389,7 @@ def test_expired_token_is_really_expired(self, api_response):
@mock.patch('SoftLayer.API.BaseClient.call')
def test_account_check(self, _call):
self.client.transport = self.mocks
exception = self.assertRaises(
exceptions.SoftLayerError,
self.client.call, "SoftLayer_Account", "getObject")
self.assertEqual(str(exception), "SoftLayer_Account service requires an ID")

self.client.account_id = 1234
self.client.call("SoftLayer_Account", "getObject")
self.client.call("SoftLayer_Account", "getObject1", id=9999)
Expand Down
Loading