Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connect with sockaddrs to support non-zero ipv6 scope ids #1433

Merged
merged 1 commit into from
Mar 10, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 10 additions & 13 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,8 @@ def __init__(self, host, port, afi, **configs):
self.host = host
self.port = port
self.afi = afi
self._sock_ip = host
self._sock_port = port
self._sock_afi = afi
self._sock_addr = None
self.in_flight_requests = collections.deque()
self._api_versions = None

Expand Down Expand Up @@ -279,13 +278,12 @@ def _dns_lookup(self):
return False
return True

def _next_afi_host_port(self):
def _next_afi_sockaddr(self):
if not self._gai:
if not self._dns_lookup():
return
afi, _, __, ___, sockaddr = self._gai.pop(0)
host, port = sockaddr[:2]
return (afi, host, port)
return (afi, sockaddr)

def connect_blocking(self, timeout=float('inf')):
if self.connected():
Expand Down Expand Up @@ -327,13 +325,13 @@ def connect(self):
"""Attempt to connect and return ConnectionState"""
if self.state is ConnectionStates.DISCONNECTED and not self.blacked_out():
self.last_attempt = time.time()
next_lookup = self._next_afi_host_port()
next_lookup = self._next_afi_sockaddr()
if not next_lookup:
self.close(Errors.ConnectionError('DNS failure'))
return
else:
log.debug('%s: creating new socket', self)
self._sock_afi, self._sock_ip, self._sock_port = next_lookup
self._sock_afi, self._sock_addr = next_lookup
self._sock = socket.socket(self._sock_afi, socket.SOCK_STREAM)

for option in self.config['socket_options']:
Expand All @@ -348,17 +346,16 @@ def connect(self):
# so we need to double check that we are still connecting before
if self.connecting():
self.config['state_change_callback'](self)
log.info('%s: connecting to %s:%d [%s:%d %s]', self, self.host,
self.port, self._sock_ip, self._sock_port,
AFI_NAMES[self._sock_afi])
log.info('%s: connecting to %s:%d [%s %s]', self, self.host,
self.port, self._sock_addr, AFI_NAMES[self._sock_afi])

if self.state is ConnectionStates.CONNECTING:
# in non-blocking mode, use repeated calls to socket.connect_ex
# to check connection status
request_timeout = self.config['request_timeout_ms'] / 1000.0
ret = None
try:
ret = self._sock.connect_ex((self._sock_ip, self._sock_port))
ret = self._sock.connect_ex(self._sock_addr)
except socket.error as err:
ret = err.errno

Expand Down Expand Up @@ -1009,9 +1006,9 @@ def filter(self, record):
return version

def __str__(self):
return "<BrokerConnection node_id=%s host=%s:%d %s [%s:%d %s]>" % (
return "<BrokerConnection node_id=%s host=%s:%d %s [%s %s]>" % (
self.node_id, self.host, self.port, self.state,
self._sock_ip, self._sock_port, AFI_NAMES[self._sock_afi])
AFI_NAMES[self._sock_afi], self._sock_addr)


class BrokerConnectionMetrics(object):
Expand Down
27 changes: 12 additions & 15 deletions test/test_conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,33 +258,31 @@ def test_lookup_on_connect():
assert conn.host == hostname
assert conn.port == port
assert conn.afi == socket.AF_UNSPEC
ip1 = '127.0.0.1'
afi1 = socket.AF_INET
sockaddr1 = ('127.0.0.1', 9092)
mock_return1 = [
(afi1, socket.SOCK_STREAM, 6, '', (ip1, 9092)),
(afi1, socket.SOCK_STREAM, 6, '', sockaddr1),
]
with mock.patch("socket.getaddrinfo", return_value=mock_return1) as m:
conn.connect()
m.assert_called_once_with(hostname, port, 0, 1)
conn.close()
assert conn._sock_ip == ip1
assert conn._sock_port == 9092
assert conn._sock_afi == afi1
assert conn._sock_addr == sockaddr1
conn.close()

ip2 = '::1'
afi2 = socket.AF_INET6
sockaddr2 = ('::1', 9092, 0, 0)
mock_return2 = [
(afi2, socket.SOCK_STREAM, 6, '', (ip2, 9092)),
(afi2, socket.SOCK_STREAM, 6, '', sockaddr2),
]

with mock.patch("socket.getaddrinfo", return_value=mock_return2) as m:
conn.last_attempt = 0
conn.connect()
m.assert_called_once_with(hostname, port, 0, 1)
conn.close()
assert conn._sock_ip == ip2
assert conn._sock_port == 9092
assert conn._sock_afi == afi2
assert conn._sock_addr == sockaddr2
conn.close()


def test_relookup_on_failure():
Expand All @@ -300,17 +298,16 @@ def test_relookup_on_failure():
assert conn.disconnected()
assert conn.last_attempt > last_attempt

ip2 = '127.0.0.2'
afi2 = socket.AF_INET
sockaddr2 = ('127.0.0.2', 9092)
mock_return2 = [
(afi2, socket.SOCK_STREAM, 6, '', (ip2, 9092)),
(afi2, socket.SOCK_STREAM, 6, '', sockaddr2),
]

with mock.patch("socket.getaddrinfo", return_value=mock_return2) as m:
conn.last_attempt = 0
conn.connect()
m.assert_called_once_with(hostname, port, 0, 1)
conn.close()
assert conn._sock_ip == ip2
assert conn._sock_port == 9092
assert conn._sock_afi == afi2
assert conn._sock_addr == sockaddr2
conn.close()