diff --git a/.travis.yml b/.travis.yml
index 23191e46..44592f42 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -48,8 +48,12 @@ matrix:
env: ZOOKEEPER_VERSION=3.3.6 TOX_VENV=py36
- python: '3.6'
env: ZOOKEEPER_VERSION=3.4.13 TOX_VENV=py36
+ - python: '3.6'
+ env: ZOOKEEPER_VERSION=3.4.13 TOX_VENV=py36-sasl
- python: '3.6'
env: ZOOKEEPER_VERSION=3.5.4-beta TOX_VENV=py36
+ - python: '3.6'
+ env: ZOOKEEPER_VERSION=3.5.4-beta TOX_VENV=py36-sasl
- python: pypy
env: ZOOKEEPER_VERSION=3.3.6 TOX_VENV=pypy
- python: pypy
diff --git a/CHANGES.md b/CHANGES.md
index d066a127..55bb353f 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -1,3 +1,14 @@
+
+## 2.7.0 (next)
+
+
+#### Features
+
+* **core:**
+ * Add `GSSAPI` support to which enables kazoo to authenticate against a
+ kerberized Zookeeper using `SASL GSSAPI`.
+
+
## 2.6.0 (2018-11-14)
diff --git a/ensure-zookeeper-env.sh b/ensure-zookeeper-env.sh
index c75c90f3..1a98c3b7 100755
--- a/ensure-zookeeper-env.sh
+++ b/ensure-zookeeper-env.sh
@@ -29,5 +29,4 @@ cd $HERE
# Yield execution to venv command
-$*
-
+exec $*
diff --git a/kazoo/client.py b/kazoo/client.py
index afff0dd0..bdbc6781 100644
--- a/kazoo/client.py
+++ b/kazoo/client.py
@@ -102,8 +102,8 @@ class KazooClient(object):
"""
def __init__(self, hosts='127.0.0.1:2181',
timeout=10.0, client_id=None, handler=None,
- default_acl=None, auth_data=None, read_only=None,
- randomize_hosts=True, connection_retry=None,
+ default_acl=None, auth_data=None, sasl_options=None,
+ read_only=None, randomize_hosts=True, connection_retry=None,
command_retry=None, logger=None, keyfile=None,
keyfile_password=None, certfile=None, ca=None,
use_ssl=False, verify_certs=True, **kwargs):
@@ -123,6 +123,9 @@ def __init__(self, hosts='127.0.0.1:2181',
A list of authentication credentials to use for the
connection. Should be a list of (scheme, credential)
tuples as :meth:`add_auth` takes.
+ :param sasl_options:
+ SASL options for the connection. Should be a dict of SASL options
+ passed to the underlying mechs if SASL support is to be used.
:param read_only: Allow connections to read only servers.
:param randomize_hosts: By default randomize host selection.
:param connection_retry:
@@ -275,7 +278,8 @@ def __init__(self, hosts='127.0.0.1:2181',
self._conn_retry.interrupt = lambda: self._stopped.is_set()
self._connection = ConnectionHandler(
- self, self._conn_retry.copy(), logger=self.logger)
+ self, self._conn_retry.copy(), logger=self.logger,
+ sasl_options=sasl_options)
# Every retry call should have its own copy of the retry helper
# to avoid shared retry counts
@@ -303,15 +307,6 @@ def _retry(*args, **kwargs):
self.Semaphore = partial(Semaphore, self)
self.ShallowParty = partial(ShallowParty, self)
- # Managing SASL client
- self.use_sasl = False
- for scheme, auth in self.auth_data:
- if scheme == "sasl":
- self.use_sasl = True
- # Could be used later for GSSAPI implementation
- self.sasl_server_principal = "zk-sasl-md5"
- break
-
# If we got any unhandled keywords, complain like Python would
if kwargs:
raise TypeError('__init__() got unexpected keyword arguments: %s'
@@ -737,12 +732,8 @@ def add_auth(self, scheme, credential):
"""Send credentials to server.
:param scheme: authentication scheme (default supported:
- "digest", "sasl"). Note that "sasl" scheme is
- requiring "pure-sasl" library to be
- installed.
+ "digest").
:param credential: the credential -- value depends on scheme.
- "digest": user:password
- "sasl": user:password
:returns: True if it was successful.
:rtype: bool
diff --git a/kazoo/exceptions.py b/kazoo/exceptions.py
index eeefe495..69f959e2 100644
--- a/kazoo/exceptions.py
+++ b/kazoo/exceptions.py
@@ -43,6 +43,13 @@ class WriterNotClosedException(KazooException):
"""
+class SASLException(KazooException):
+ """Raised if SASL encountered a (local) error.
+
+ .. versionadded:: 2.7.0
+ """
+
+
def _invalid_error_code():
raise RuntimeError('Invalid error code')
diff --git a/kazoo/protocol/connection.py b/kazoo/protocol/connection.py
index 67d57899..b4320f24 100644
--- a/kazoo/protocol/connection.py
+++ b/kazoo/protocol/connection.py
@@ -9,12 +9,15 @@
import sys
import time
+import six
+
from kazoo.exceptions import (
AuthFailedError,
ConnectionDropped,
EXCEPTIONS,
SessionExpiredError,
- NoNodeError
+ NoNodeError,
+ SASLException,
)
from kazoo.loggingsupport import BLATHER
from kazoo.protocol.serialization import (
@@ -30,7 +33,7 @@
SASL,
Transaction,
Watch,
- int_struct
+ int_struct,
)
from kazoo.protocol.states import (
Callback,
@@ -40,10 +43,12 @@
)
from kazoo.retry import (
ForceRetryError,
- RetryFailedError
+ RetryFailedError,
)
+
try:
- from puresasl.client import SASLClient
+ import puresasl
+ import puresasl.client
PURESASL_AVAILABLE = True
except ImportError:
PURESASL_AVAILABLE = False
@@ -139,7 +144,7 @@ class RWServerAvailable(Exception):
class ConnectionHandler(object):
"""Zookeeper connection handler"""
- def __init__(self, client, retry_sleeper, logger=None):
+ def __init__(self, client, retry_sleeper, logger=None, sasl_options=None):
self.client = client
self.handler = client.handler
self.retry_sleeper = retry_sleeper
@@ -159,10 +164,10 @@ def __init__(self, client, retry_sleeper, logger=None):
self._xid = None
self._rw_server = None
self._ro_mode = False
- self._ro = False
self._connection_routine = None
+ self.sasl_options = sasl_options
self.sasl_cli = None
# This is instance specific to avoid odd thread bug issues in Python
@@ -232,8 +237,8 @@ def _read(self, length, timeout):
# have anything to select, but the wrapped object may still
# have something to read as it has previously gotten enough
# data from the underlying socket.
- if (hasattr(self._socket, "pending")
- and self._socket.pending() > 0):
+ if (hasattr(self._socket, "pending") and
+ self._socket.pending() > 0):
pass
else:
s = self.handler.select([self._socket], [], [], timeout)[0]
@@ -427,24 +432,6 @@ def _read_socket(self, read_timeout):
async_object.set(True)
elif header.xid == WATCH_XID:
self._read_watch_event(buffer, offset)
- elif self.sasl_cli and not self.sasl_cli.complete:
- # SASL authentication is not yet finished, this can only
- # be a SASL packet
- self.logger.log(BLATHER, 'Received SASL')
- try:
- challenge, _ = SASL.deserialize(buffer, offset)
- except Exception:
- raise ConnectionDropped('error while SASL authentication.')
- response = self.sasl_cli.process(challenge)
- if response:
- # authentication not yet finished, answering the challenge
- self._send_sasl_request(challenge=response,
- timeout=client._session_timeout)
- else:
- # authentication is ok, state is CONNECTED or CONNECTED_RO
- # remove sensible information from the object
- self._set_connected_ro_or_rw(client)
- self.sasl_cli.dispose()
else:
self.logger.log(BLATHER, 'Reading for header %r', header)
@@ -522,12 +509,13 @@ def _expand_client_hosts(self):
host_ports = []
for host, port in self.client.hosts:
try:
- for rhost in socket.getaddrinfo(host.strip(), port, 0, 0,
+ host = host.strip()
+ for rhost in socket.getaddrinfo(host, port, 0, 0,
socket.IPPROTO_TCP):
- host_ports.append((rhost[4][0], rhost[4][1]))
+ host_ports.append((host, rhost[4][0], rhost[4][1]))
except socket.gaierror as e:
# Skip hosts that don't resolve
- self.logger.warning("Cannot resolve %s: %s", host.strip(), e)
+ self.logger.warning("Cannot resolve %s: %s", host, e)
pass
if self.client.randomize_hosts:
random.shuffle(host_ports)
@@ -542,11 +530,11 @@ def _connect_loop(self, retry):
if len(host_ports) == 0:
return STOP_CONNECTING
- for host, port in host_ports:
+ for host, hostip, port in host_ports:
if self.client._stopped.is_set():
status = STOP_CONNECTING
break
- status = self._connect_attempt(host, port, retry)
+ status = self._connect_attempt(host, hostip, port, retry)
if status is STOP_CONNECTING:
break
@@ -555,7 +543,7 @@ def _connect_loop(self, retry):
else:
raise ForceRetryError('Reconnecting')
- def _connect_attempt(self, host, port, retry):
+ def _connect_attempt(self, host, hostip, port, retry):
client = self.client
KazooTimeoutError = self.handler.timeout_exception
close_connection = False
@@ -574,7 +562,7 @@ def _connect_attempt(self, host, port, retry):
try:
self._xid = 0
- read_timeout, connect_timeout = self._connect(host, port)
+ read_timeout, connect_timeout = self._connect(host, hostip, port)
read_timeout = read_timeout / 1000.0
connect_timeout = connect_timeout / 1000.0
retry.reset()
@@ -611,9 +599,9 @@ def _connect_attempt(self, host, port, retry):
if client._state != KeeperState.CONNECTING:
self.logger.warning("Transition to CONNECTING")
client._session_callback(KeeperState.CONNECTING)
- except AuthFailedError:
+ except AuthFailedError as err:
retry.reset()
- self.logger.warning('AUTH_FAILED closing')
+ self.logger.warning('AUTH_FAILED closing: %s', err)
client._session_callback(KeeperState.AUTH_FAILED)
return STOP_CONNECTING
except SessionExpiredError:
@@ -631,10 +619,10 @@ def _connect_attempt(self, host, port, retry):
if self._socket is not None:
self._socket.close()
- def _connect(self, host, port):
+ def _connect(self, host, hostip, port):
client = self.client
- self.logger.info('Connecting to %s:%s, use_ssl: %r',
- host, port, self.client.use_ssl)
+ self.logger.info('Connecting to %s(%s):%s, use_ssl: %r',
+ host, hostip, port, self.client.use_ssl)
self.logger.log(BLATHER,
' Using session_id: %r session_passwd: %s',
@@ -643,7 +631,7 @@ def _connect(self, host, port):
with self._socket_error_handling():
self._socket = self.handler.create_connection(
- address=(host, port),
+ address=(hostip, port),
timeout=client._session_timeout / 1000.0,
use_ssl=self.client.use_ssl,
keyfile=self.client.keyfile,
@@ -686,68 +674,112 @@ def _connect(self, host, port):
read_timeout)
if connect_result.read_only:
- self._ro = True
+ client._session_callback(KeeperState.CONNECTED_RO)
+ self._ro_mode = iter(self._server_pinger())
+ else:
+ client._session_callback(KeeperState.CONNECTED)
+ self._ro_mode = None
+
+ if self.sasl_options is not None:
+ self._authenticate_with_sasl(host, connect_timeout / 1000.0)
# Get a copy of the auth data before iterating, in case it is
# changed.
client_auth_data_copy = copy.copy(client.auth_data)
- if client.use_sasl and self.sasl_cli is None:
- if PURESASL_AVAILABLE:
- for scheme, auth in client_auth_data_copy:
- if scheme == 'sasl':
- username, password = auth.split(":")
- self.sasl_cli = SASLClient(
- host=client.sasl_server_principal,
- service='zookeeper',
- mechanism='DIGEST-MD5',
- username=username,
- password=password
- )
- break
-
- # As described in rfc
- # https://tools.ietf.org/html/rfc2831#section-2.1
- # sending empty challenge
- self._send_sasl_request(challenge=b'',
- timeout=connect_timeout)
- else:
- self.logger.warn('Pure-sasl library is missing while sasl'
- ' authentification is configured. Please'
- ' install pure-sasl library to connect '
- 'using sasl. Now falling back '
- 'connecting WITHOUT any '
- 'authentification.')
- client.use_sasl = False
- self._set_connected_ro_or_rw(client)
- else:
- self._set_connected_ro_or_rw(client)
- for scheme, auth in client_auth_data_copy:
- if scheme == "digest":
- ap = Auth(0, scheme, auth)
- zxid = self._invoke(
- connect_timeout / 1000.0,
- ap,
- xid=AUTH_XID
- )
- if zxid:
- client.last_zxid = zxid
+ for scheme, auth in client_auth_data_copy:
+ ap = Auth(0, scheme, auth)
+ zxid = self._invoke(connect_timeout / 1000.0, ap, xid=AUTH_XID)
+ if zxid:
+ client.last_zxid = zxid
return read_timeout, connect_timeout
- def _send_sasl_request(self, challenge, timeout):
- """ Called when sending a SASL request, xid needs be to incremented """
- sasl_request = SASL(challenge)
- self._xid = (self._xid % 2147483647) + 1
- xid = self._xid
- self._submit(sasl_request, timeout / 1000.0, xid)
-
- def _set_connected_ro_or_rw(self, client):
- """ Called to decide whether to set the KeeperState to CONNECTED_RO
- or CONNECTED"""
- if self._ro:
- client._session_callback(KeeperState.CONNECTED_RO)
- self._ro_mode = iter(self._server_pinger())
- else:
- client._session_callback(KeeperState.CONNECTED)
- self._ro_mode = None
+ def _authenticate_with_sasl(self, host, timeout):
+ """Establish a SASL authenticated connection to the server.
+ """
+ if not PURESASL_AVAILABLE:
+ raise SASLException('Missing SASL support')
+
+ if 'service' not in self.sasl_options:
+ self.sasl_options['service'] = 'zookeeper'
+
+ # NOTE: Zookeeper hardcoded the domain for Digest authentication
+ # instead of using the hostname. See
+ # zookeeper/util/SecurityUtils.java#L74 and Server/Client
+ # initializations.
+ if self.sasl_options['mechanism'] == 'DIGEST-MD5':
+ host = 'zk-sasl-md5'
+
+ sasl_cli = self.client.sasl_cli = puresasl.client.SASLClient(
+ host=host,
+ **self.sasl_options
+ )
+
+ # Inititalize the process with an empty challenge token
+ challenge = None
+ xid = 0
+
+ while True:
+ if sasl_cli.complete:
+ break
+
+ try:
+ response = sasl_cli.process(challenge=challenge)
+ except puresasl.SASLError as err:
+ six.reraise(
+ SASLException,
+ SASLException('library error: %s' % err.message),
+ sys.exc_info()[2]
+ )
+ except puresasl.SASLProtocolException as err:
+ six.reraise(
+ AuthFailedError,
+ AuthFailedError('protocol error: %s' % err.message),
+ sys.exc_info()[2]
+ )
+ except Exception as err:
+ six.reraise(
+ AuthFailedError,
+ AuthFailedError('Unknown error: %s' % err),
+ sys.exc_info()[2]
+ )
+
+ if sasl_cli.complete and not response:
+ break
+ elif response is None:
+ response = b''
+
+ xid = (xid % 2147483647) + 1
+
+ request = SASL(response)
+ self._submit(request, timeout, xid)
+
+ try:
+ header, buffer, offset = self._read_header(timeout)
+ except ConnectionDropped:
+ # Zookeeper simply drops connections with failed authentication
+ six.reraise(
+ AuthFailedError,
+ AuthFailedError('Connection dropped in SASL'),
+ sys.exc_info()[2]
+ )
+
+ if header.xid != xid:
+ raise RuntimeError('xids do not match, expected %r '
+ 'received %r', xid, header.xid)
+
+ if header.zxid > 0:
+ self.client.last_zxid = header.zxid
+
+ if header.err:
+ callback_exception = EXCEPTIONS[header.err]()
+ self.logger.debug(
+ 'Received error(xid=%s) %r', xid, callback_exception)
+ raise callback_exception
+
+ challenge, _ = SASL.deserialize(buffer, offset)
+
+ # If we made it here, authentication is ok, and we are connected.
+ # Remove sensible information from the object.
+ sasl_cli.dispose()
diff --git a/kazoo/protocol/serialization.py b/kazoo/protocol/serialization.py
index 75c6abe4..fa5c67a3 100644
--- a/kazoo/protocol/serialization.py
+++ b/kazoo/protocol/serialization.py
@@ -391,6 +391,7 @@ def deserialize(cls, bytes, offset):
challenge, offset = read_buffer(bytes, offset)
return challenge, offset
+
class Watch(namedtuple('Watch', 'type state path')):
@classmethod
def deserialize(cls, bytes, offset):
diff --git a/kazoo/tests/test_client.py b/kazoo/tests/test_client.py
index e22261de..9eb7c22b 100644
--- a/kazoo/tests/test_client.py
+++ b/kazoo/tests/test_client.py
@@ -188,14 +188,21 @@ def test_connect_sasl_auth(self):
version = self.client.server_version()
if not version or version < (3, 4):
raise SkipTest("Must use Zookeeper 3.4 or above")
+ try:
+ import puresasl # NOQA
+ except ImportError:
+ raise SkipTest('PureSASL not available.')
username = "jaasuser"
password = "jaas_password"
- sasl_auth = "%s:%s" % (username, password)
acl = make_acl('sasl', credential=username, all=True)
- client = self._get_client(auth_data=[('sasl', sasl_auth)])
+ client = self._get_client(
+ sasl_options={'mechanism': 'DIGEST-MD5',
+ 'username': username,
+ 'password': password}
+ )
client.start()
try:
client.create('/1', acl=(acl,))
@@ -252,8 +259,16 @@ def test_invalid_sasl_auth(self):
version = self.client.server_version()
if not version or version < (3, 4):
raise SkipTest("Must use Zookeeper 3.4 or above")
- client = self._get_client(auth_data=[('sasl', 'baduser:badpassword')])
- self.assertRaises(ConnectionLoss, client.start)
+ try:
+ import puresasl # NOQA
+ except ImportError:
+ raise SkipTest('PureSASL not available.')
+ client = self._get_client(
+ sasl_options={'mechanism': 'DIGEST-MD5',
+ 'username': 'baduser',
+ 'password': 'badpassword'}
+ )
+ self.assertRaises(AuthFailedError, client.start)
def test_async_auth(self):
client = self._get_client()
diff --git a/requirements.txt b/requirements.txt
index 2d5c0c68..f3854685 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,6 +1,5 @@
coverage==3.7.1
mock==1.0.1
nose==1.3.3
-pure-sasl==0.5.1
flake8==2.3.0
objgraph==3.4.0
diff --git a/requirements_sasl.txt b/requirements_sasl.txt
new file mode 100644
index 00000000..3fb3416f
--- /dev/null
+++ b/requirements_sasl.txt
@@ -0,0 +1 @@
+pure_sasl==0.5.1
diff --git a/setup.py b/setup.py
index ee055d53..1d7c64b9 100644
--- a/setup.py
+++ b/setup.py
@@ -24,7 +24,6 @@
'mock',
'nose',
'flake8',
- 'pure-sasl',
'objgraph',
]
@@ -39,6 +38,7 @@
install_requires += [
'gevent>=1.2',
'eventlet>=0.17.1',
+ 'pure-sasl',
]
setup(
@@ -77,6 +77,7 @@
tests_require=tests_require,
extras_require={
'test': tests_require,
+ 'sasl': ['pure-sasl'],
},
long_description_content_type="text/markdown",
)
diff --git a/tox.ini b/tox.ini
index cd4ff27d..0d9783b5 100644
--- a/tox.ini
+++ b/tox.ini
@@ -4,11 +4,13 @@ skipsdist = True
envlist =
pep8,
py27,
- py27-gevent,
- py27-eventlet,
+ py27-{gevent,eventlet,sasl},
py34,
+ py34-sasl,
py35,
+ py35-sasl,
py36,
+ py36-{sasl,docs},
pypy
[testenv:pep8]
@@ -21,17 +23,12 @@ setenv =
VIRTUAL_ENV={envdir}
ZOOKEEPER_VERSION={env:ZOOKEEPER_VERSION:}
deps = -r{toxinidir}/requirements.txt
- -r{toxinidir}/requirements_sphinx.txt
+ docs: -r{toxinidir}/requirements_sphinx.txt
+ gevent: -r{toxinidir}/requirements_gevent.txt
+ sasl: -r{toxinidir}/requirements_sasl.txt
+ eventlet: -r{toxinidir}/requirements_eventlet.txt
commands = {toxinidir}/ensure-zookeeper-env.sh nosetests {posargs: -d -v --with-coverage kazoo.tests}
-[testenv:py27-gevent]
-deps = {[testenv]deps}
- -r{toxinidir}/requirements_gevent.txt
-
-[testenv:py27-eventlet]
-deps = {[testenv]deps}
- -r{toxinidir}/requirements_eventlet.txt
-
[flake8]
builtins = _
exclude = .venv,.tox,dist,doc,*egg,.git,build,tools,local,docs,zookeeper