From cab11c0337ef2c14c668bc5c1fd10dce17d7388f Mon Sep 17 00:00:00 2001 From: Panos Date: Mon, 18 Jan 2021 23:26:52 +0000 Subject: [PATCH 01/41] Updated ssh single client to use thread pool. Updated base yield duration --- pssh/clients/ssh/single.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pssh/clients/ssh/single.py b/pssh/clients/ssh/single.py index 8b048054..c5f8ee2f 100644 --- a/pssh/clients/ssh/single.py +++ b/pssh/clients/ssh/single.py @@ -17,7 +17,7 @@ import logging -from gevent import sleep, spawn, Timeout as GTimeout, joinall +from gevent import sleep, spawn, Timeout as GTimeout, joinall, get_hub from ssh import options from ssh.session import Session, SSH_READ_PENDING, SSH_WRITE_PENDING from ssh.key import import_privkey_file, import_cert_file, copy_cert_to_privkey @@ -32,6 +32,7 @@ logger = logging.getLogger(__name__) +THREAD_POOL = get_hub().threadpool class SSHClient(BaseSSHClient): @@ -155,7 +156,7 @@ def _init_session(self, retries=1): raise ex def _session_connect(self): - self.session.connect() + THREAD_POOL.apply(self.session.connect) def auth(self): if self.gssapi_auth or (self.gssapi_server_identity or self.gssapi_client_identity): @@ -243,7 +244,7 @@ def _read_output_to_buffer(self, channel, _buffer, is_stderr=False): # Yield event loop to other greenlets if we have no data to # send back, meaning the generator does not yield and can there # for block other generators/greenlets from running. - sleep(.1) + sleep(.001) def wait_finished(self, host_output, timeout=None): """Wait for EOF from channel and close channel. From 4da84f955bcba843def8caae98d58b9f55de243a Mon Sep 17 00:00:00 2001 From: Panos Date: Fri, 22 Jan 2021 20:42:25 +0000 Subject: [PATCH 02/41] WIP --- pssh/clients/base/parallel.py | 2 +- pssh/clients/base/single.py | 4 ++++ pssh/clients/native/parallel.py | 4 ++-- pssh/clients/native/single.py | 19 ++++++++++++++----- pssh/clients/reader.py | 2 +- pssh/clients/ssh/single.py | 4 ++-- 6 files changed, 24 insertions(+), 11 deletions(-) diff --git a/pssh/clients/base/parallel.py b/pssh/clients/base/parallel.py index 77905f12..9a24b827 100644 --- a/pssh/clients/base/parallel.py +++ b/pssh/clients/base/parallel.py @@ -272,7 +272,7 @@ def _run_command(self, host_i, host, command, sudo=False, user=None, shell=None, use_pty=False, encoding='utf-8', read_timeout=None): """Make SSHClient if needed, run command on host""" - logger.debug("_run_command with read timeout %s", read_timeout) + # logger.debug("_run_command with read timeout %s", read_timeout) try: _client = self._make_ssh_client(host_i, host) host_out = _client.run_command( diff --git a/pssh/clients/base/single.py b/pssh/clients/base/single.py index 1f6b64f6..8234fa06 100644 --- a/pssh/clients/base/single.py +++ b/pssh/clients/base/single.py @@ -537,9 +537,12 @@ def _eagain_errcode(self, func, eagain, *args, **kwargs): timeout = kwargs.pop('timeout', self.timeout) with GTimeout(seconds=timeout, exception=Timeout): ret = func(*args, **kwargs) + sleep(.0000001) while ret == eagain: self.poll() + sleep(.0000001) ret = func(*args, **kwargs) + sleep(.0000001) return ret def _eagain_write(self, write_func, data, timeout=None): @@ -667,6 +670,7 @@ def _poll_socket(self, events, timeout=None): timeout = timeout * 1000 if timeout is not None else 100 poller = poll() poller.register(self.sock, eventmask=events) + # logger.debug("Polling with timeout %s", timeout) poller.poll(timeout=timeout) def _poll_errcodes(self, directions_func, inbound, outbound, timeout=None): diff --git a/pssh/clients/native/parallel.py b/pssh/clients/native/parallel.py index f27a0fec..f5a57de0 100644 --- a/pssh/clients/native/parallel.py +++ b/pssh/clients/native/parallel.py @@ -230,8 +230,8 @@ def __del__(self): def _make_ssh_client(self, host_i, host): auth_thread_pool = True - logger.debug("Make client request for host %s, (host_i, host) in clients: %s", - host, (host_i, host) in self._host_clients) + logger.debug("Make client request for host %s, %s, (host_i, host) in clients: %s", + host_i, host, (host_i, host) in self._host_clients) if (host_i, host) not in self._host_clients \ or self._host_clients[(host_i, host)] is None: _user, _port, _password, _pkey, proxy_host, proxy_port, proxy_user, \ diff --git a/pssh/clients/native/single.py b/pssh/clients/native/single.py index ceaaae31..e812b962 100644 --- a/pssh/clients/native/single.py +++ b/pssh/clients/native/single.py @@ -220,10 +220,12 @@ def _agent_auth(self): self.session.agent_auth(self.user) def _pkey_auth(self, pkey_file, password=None): - self.session.userauth_publickey_fromfile( - self.user, - pkey_file, - passphrase=password if password is not None else b'') + passphrase = password if password is not None else b'' + THREAD_POOL.apply( + self.session.userauth_publickey_fromfile, + args=(self.user, pkey_file), + kwds={'passphrase': passphrase}, + ) def _password_auth(self): try: @@ -232,11 +234,13 @@ def _password_auth(self): raise AuthenticationError("Password authentication failed - %s", ex) def _open_session(self): + # chan = THREAD_POOL.apply(self._eagain, args=(self.session.open_session,)) chan = self._eagain(self.session.open_session) return chan def open_session(self): """Open new channel from session""" + logger.debug("Opening session") try: chan = self._open_session() except Exception as ex: @@ -272,7 +276,9 @@ def execute(self, cmd, use_pty=False, channel=None): if use_pty: self._eagain(channel.pty) logger.debug("Executing command '%s'", cmd) - self._eagain(channel.execute, cmd) + sleep() + THREAD_POOL.apply(self._eagain, args=(channel.execute, cmd)) + # self._eagain(channel.execute, cmd) return channel def _read_output_to_buffer(self, read_func, _buffer): @@ -281,6 +287,7 @@ def _read_output_to_buffer(self, read_func, _buffer): size, data = read_func() while size == LIBSSH2_ERROR_EAGAIN: self.poll() + sleep(.000001) size, data = read_func() if size <= 0: break @@ -317,6 +324,8 @@ def close_channel(self, channel): def _eagain(self, func, *args, **kwargs): return self._eagain_errcode(func, LIBSSH2_ERROR_EAGAIN, *args, **kwargs) + # return THREAD_POOL.apply(self._eagain_errcode, + # args=(func, LIBSSH2_ERROR_EAGAIN, *args), kwds=kwargs) def _make_sftp_eagain(self): return self._eagain(self.session.sftp_init) diff --git a/pssh/clients/reader.py b/pssh/clients/reader.py index c3f2d818..a4ca3e8a 100644 --- a/pssh/clients/reader.py +++ b/pssh/clients/reader.py @@ -77,4 +77,4 @@ def __iter__(self): if data: yield data elif self._read_pos == self._write_pos: - sleep(.1) + sleep(.000001) diff --git a/pssh/clients/ssh/single.py b/pssh/clients/ssh/single.py index c5f8ee2f..3f95818f 100644 --- a/pssh/clients/ssh/single.py +++ b/pssh/clients/ssh/single.py @@ -179,7 +179,7 @@ def _pkey_auth(self, pkey_file, password=None): if self.cert_file is not None: logger.debug("Certificate file set - trying certificate authentication") self._import_cert_file(pkey) - self.session.userauth_publickey(pkey) + THREAD_POOL.apply(self.session.userauth_publickey, args=(pkey,)) def _import_cert_file(self, pkey): cert_key = import_cert_file(self.cert_file) @@ -191,7 +191,7 @@ def _shell(self, channel): return self._eagain(channel.request_shell) def _open_session(self): - channel = self.session.channel_new() + channel = THREAD_POOL.apply(self.session.channel_new) channel.set_blocking(0) self._eagain(channel.open_session) return channel From 1ac913bc017e609f69e70a3f9084c3bef035b75d Mon Sep 17 00:00:00 2001 From: Panos Date: Sat, 23 Jan 2021 01:32:43 +0000 Subject: [PATCH 03/41] Wip --- pssh/clients/base/single.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pssh/clients/base/single.py b/pssh/clients/base/single.py index 8234fa06..68c839f3 100644 --- a/pssh/clients/base/single.py +++ b/pssh/clients/base/single.py @@ -537,12 +537,12 @@ def _eagain_errcode(self, func, eagain, *args, **kwargs): timeout = kwargs.pop('timeout', self.timeout) with GTimeout(seconds=timeout, exception=Timeout): ret = func(*args, **kwargs) - sleep(.0000001) + sleep() while ret == eagain: self.poll() - sleep(.0000001) ret = func(*args, **kwargs) - sleep(.0000001) + sleep() + sleep() return ret def _eagain_write(self, write_func, data, timeout=None): From a849dfa06b07f425c340ad7592ffc6e431506eca Mon Sep 17 00:00:00 2001 From: Panos Date: Sat, 23 Jan 2021 02:03:18 +0000 Subject: [PATCH 04/41] Polling changes --- pssh/clients/base/single.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pssh/clients/base/single.py b/pssh/clients/base/single.py index 68c839f3..e6c67cb6 100644 --- a/pssh/clients/base/single.py +++ b/pssh/clients/base/single.py @@ -541,7 +541,7 @@ def _eagain_errcode(self, func, eagain, *args, **kwargs): while ret == eagain: self.poll() ret = func(*args, **kwargs) - sleep() + sleep(.0000001) sleep() return ret From dec6bf31acca3422ae9f89eb0e319adb9f8783f2 Mon Sep 17 00:00:00 2001 From: Panos Date: Sat, 23 Jan 2021 02:16:29 +0000 Subject: [PATCH 05/41] Polling --- pssh/clients/base/single.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pssh/clients/base/single.py b/pssh/clients/base/single.py index e6c67cb6..68c839f3 100644 --- a/pssh/clients/base/single.py +++ b/pssh/clients/base/single.py @@ -541,7 +541,7 @@ def _eagain_errcode(self, func, eagain, *args, **kwargs): while ret == eagain: self.poll() ret = func(*args, **kwargs) - sleep(.0000001) + sleep() sleep() return ret From ef39dd14f914b06df3fdf36188bfe511b59df521 Mon Sep 17 00:00:00 2001 From: Panos Date: Sat, 23 Jan 2021 02:35:02 +0000 Subject: [PATCH 06/41] Small poll --- pssh/clients/base/single.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pssh/clients/base/single.py b/pssh/clients/base/single.py index 68c839f3..5d05b4c0 100644 --- a/pssh/clients/base/single.py +++ b/pssh/clients/base/single.py @@ -541,7 +541,7 @@ def _eagain_errcode(self, func, eagain, *args, **kwargs): while ret == eagain: self.poll() ret = func(*args, **kwargs) - sleep() + sleep(.000001) sleep() return ret From 469a1c2eb9b4d459dffd7c01c9dda2a2e0eb6976 Mon Sep 17 00:00:00 2001 From: Panos Date: Sat, 23 Jan 2021 03:33:20 +0000 Subject: [PATCH 07/41] Added threading --- pssh/clients/base/single.py | 3 +-- pssh/clients/native/single.py | 9 ++++----- pssh/clients/ssh/single.py | 6 ++++-- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/pssh/clients/base/single.py b/pssh/clients/base/single.py index 5d05b4c0..812cf454 100644 --- a/pssh/clients/base/single.py +++ b/pssh/clients/base/single.py @@ -667,10 +667,9 @@ def _poll_socket(self, events, timeout=None): return # gevent.select.poll converts seconds to miliseconds to match python socket # implementation - timeout = timeout * 1000 if timeout is not None else 100 + timeout = timeout * 1000 if timeout is not None else 1 poller = poll() poller.register(self.sock, eventmask=events) - # logger.debug("Polling with timeout %s", timeout) poller.poll(timeout=timeout) def _poll_errcodes(self, directions_func, inbound, outbound, timeout=None): diff --git a/pssh/clients/native/single.py b/pssh/clients/native/single.py index e812b962..f7954d56 100644 --- a/pssh/clients/native/single.py +++ b/pssh/clients/native/single.py @@ -234,8 +234,8 @@ def _password_auth(self): raise AuthenticationError("Password authentication failed - %s", ex) def _open_session(self): - # chan = THREAD_POOL.apply(self._eagain, args=(self.session.open_session,)) - chan = self._eagain(self.session.open_session) + chan = THREAD_POOL.apply(self._eagain, args=(self.session.open_session,)) + # chan = self._eagain(self.session.open_session) return chan def open_session(self): @@ -276,19 +276,18 @@ def execute(self, cmd, use_pty=False, channel=None): if use_pty: self._eagain(channel.pty) logger.debug("Executing command '%s'", cmd) - sleep() THREAD_POOL.apply(self._eagain, args=(channel.execute, cmd)) - # self._eagain(channel.execute, cmd) return channel def _read_output_to_buffer(self, read_func, _buffer): try: while True: size, data = read_func() + sleep() while size == LIBSSH2_ERROR_EAGAIN: self.poll() - sleep(.000001) size, data = read_func() + sleep(.0000001) if size <= 0: break _buffer.write(data) diff --git a/pssh/clients/ssh/single.py b/pssh/clients/ssh/single.py index 3f95818f..83cd2821 100644 --- a/pssh/clients/ssh/single.py +++ b/pssh/clients/ssh/single.py @@ -193,7 +193,7 @@ def _shell(self, channel): def _open_session(self): channel = THREAD_POOL.apply(self.session.channel_new) channel.set_blocking(0) - self._eagain(channel.open_session) + THREAD_POOL.apply(self._eagain, args=(channel.open_session,)) return channel def open_session(self): @@ -226,7 +226,9 @@ def execute(self, cmd, use_pty=False, channel=None): if use_pty: self._eagain(channel.request_pty, timeout=self.timeout) logger.debug("Executing command '%s'", cmd) - self._eagain(channel.request_exec, cmd, timeout=self.timeout) + THREAD_POOL.apply(self._eagain, + args=(channel.request_exec, cmd), + kwds={'timeout': self.timeout}) return channel def _read_output_to_buffer(self, channel, _buffer, is_stderr=False): From e1896355f0e7795f8f60c0f48d5d20f50e5925dc Mon Sep 17 00:00:00 2001 From: Panos Date: Sat, 23 Jan 2021 03:47:27 +0000 Subject: [PATCH 08/41] Exec thread pool, polling --- pssh/clients/ssh/single.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pssh/clients/ssh/single.py b/pssh/clients/ssh/single.py index 83cd2821..4ce27014 100644 --- a/pssh/clients/ssh/single.py +++ b/pssh/clients/ssh/single.py @@ -226,9 +226,10 @@ def execute(self, cmd, use_pty=False, channel=None): if use_pty: self._eagain(channel.request_pty, timeout=self.timeout) logger.debug("Executing command '%s'", cmd) - THREAD_POOL.apply(self._eagain, - args=(channel.request_exec, cmd), - kwds={'timeout': self.timeout}) + self._eagain(channel.request_exec, cmd, timeout=self.timeout) + # THREAD_POOL.apply(self._eagain, + # args=(channel.request_exec, cmd), + # kwds={'timeout': self.timeout}) return channel def _read_output_to_buffer(self, channel, _buffer, is_stderr=False): @@ -238,7 +239,7 @@ def _read_output_to_buffer(self, channel, _buffer, is_stderr=False): size, data = channel.read_nonblocking(is_stderr=is_stderr) except EOF: _buffer.eof.set() - sleep(.1) + sleep(.000001) return if size > 0: _buffer.write(data) From ab5d3e30601628a4cc769b974cf6d73226ed822b Mon Sep 17 00:00:00 2001 From: Panos Date: Sat, 23 Jan 2021 04:03:12 +0000 Subject: [PATCH 09/41] Thread pool, polling --- pssh/clients/base/parallel.py | 1 - pssh/clients/ssh/single.py | 14 +++++--------- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/pssh/clients/base/parallel.py b/pssh/clients/base/parallel.py index 9a24b827..91019c35 100644 --- a/pssh/clients/base/parallel.py +++ b/pssh/clients/base/parallel.py @@ -272,7 +272,6 @@ def _run_command(self, host_i, host, command, sudo=False, user=None, shell=None, use_pty=False, encoding='utf-8', read_timeout=None): """Make SSHClient if needed, run command on host""" - # logger.debug("_run_command with read timeout %s", read_timeout) try: _client = self._make_ssh_client(host_i, host) host_out = _client.run_command( diff --git a/pssh/clients/ssh/single.py b/pssh/clients/ssh/single.py index 4ce27014..f507c5c3 100644 --- a/pssh/clients/ssh/single.py +++ b/pssh/clients/ssh/single.py @@ -226,10 +226,10 @@ def execute(self, cmd, use_pty=False, channel=None): if use_pty: self._eagain(channel.request_pty, timeout=self.timeout) logger.debug("Executing command '%s'", cmd) - self._eagain(channel.request_exec, cmd, timeout=self.timeout) - # THREAD_POOL.apply(self._eagain, - # args=(channel.request_exec, cmd), - # kwds={'timeout': self.timeout}) + # self._eagain(channel.request_exec, cmd, timeout=self.timeout) + THREAD_POOL.apply(self._eagain, + args=(channel.request_exec, cmd), + kwds={'timeout': self.timeout}) return channel def _read_output_to_buffer(self, channel, _buffer, is_stderr=False): @@ -241,13 +241,9 @@ def _read_output_to_buffer(self, channel, _buffer, is_stderr=False): _buffer.eof.set() sleep(.000001) return + sleep(.000001) if size > 0: _buffer.write(data) - else: - # Yield event loop to other greenlets if we have no data to - # send back, meaning the generator does not yield and can there - # for block other generators/greenlets from running. - sleep(.001) def wait_finished(self, host_output, timeout=None): """Wait for EOF from channel and close channel. From c78a4736bbea5704fb57c685234512a6a73bcb3f Mon Sep 17 00:00:00 2001 From: Panos Date: Sat, 23 Jan 2021 04:44:57 +0000 Subject: [PATCH 10/41] Updates --- pssh/clients/native/single.py | 5 ++--- pssh/clients/ssh/single.py | 7 ++++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pssh/clients/native/single.py b/pssh/clients/native/single.py index f7954d56..e31ba19d 100644 --- a/pssh/clients/native/single.py +++ b/pssh/clients/native/single.py @@ -283,7 +283,6 @@ def _read_output_to_buffer(self, read_func, _buffer): try: while True: size, data = read_func() - sleep() while size == LIBSSH2_ERROR_EAGAIN: self.poll() size, data = read_func() @@ -291,6 +290,7 @@ def _read_output_to_buffer(self, read_func, _buffer): if size <= 0: break _buffer.write(data) + sleep() finally: _buffer.eof.set() @@ -314,12 +314,11 @@ def wait_finished(self, host_output, timeout=None): if channel is None: return self._eagain(channel.wait_eof, timeout=timeout) - # Close channel to indicate no more commands will be sent over it self.close_channel(channel) def close_channel(self, channel): logger.debug("Closing channel") - self._eagain(channel.close) + THREAD_POOL.apply(self._eagain, args=(channel.close,)) def _eagain(self, func, *args, **kwargs): return self._eagain_errcode(func, LIBSSH2_ERROR_EAGAIN, *args, **kwargs) diff --git a/pssh/clients/ssh/single.py b/pssh/clients/ssh/single.py index f507c5c3..f07d7bdd 100644 --- a/pssh/clients/ssh/single.py +++ b/pssh/clients/ssh/single.py @@ -226,7 +226,6 @@ def execute(self, cmd, use_pty=False, channel=None): if use_pty: self._eagain(channel.request_pty, timeout=self.timeout) logger.debug("Executing command '%s'", cmd) - # self._eagain(channel.request_exec, cmd, timeout=self.timeout) THREAD_POOL.apply(self._eagain, args=(channel.request_exec, cmd), kwds={'timeout': self.timeout}) @@ -241,9 +240,9 @@ def _read_output_to_buffer(self, channel, _buffer, is_stderr=False): _buffer.eof.set() sleep(.000001) return - sleep(.000001) if size > 0: _buffer.write(data) + sleep(.000001) def wait_finished(self, host_output, timeout=None): """Wait for EOF from channel and close channel. @@ -300,7 +299,9 @@ def close_channel(self, channel): :type channel: :py:class:`ssh.channel.Channel` """ logger.debug("Closing channel") - self._eagain(channel.close, timeout=self.timeout) + THREAD_POOL.apply(self._eagain, + args=(channel.close,), + kwds={'timeout': self.timeout}) def poll(self, timeout=None): """ssh-python based co-operative gevent poll on session socket.""" From ae7d76c8ebd4fc6c862697e1485e60dc142f236b Mon Sep 17 00:00:00 2001 From: Panos Date: Sat, 23 Jan 2021 13:49:36 +0000 Subject: [PATCH 11/41] Updated thread pool use --- pssh/clients/native/single.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/pssh/clients/native/single.py b/pssh/clients/native/single.py index e31ba19d..96988be8 100644 --- a/pssh/clients/native/single.py +++ b/pssh/clients/native/single.py @@ -234,8 +234,7 @@ def _password_auth(self): raise AuthenticationError("Password authentication failed - %s", ex) def _open_session(self): - chan = THREAD_POOL.apply(self._eagain, args=(self.session.open_session,)) - # chan = self._eagain(self.session.open_session) + chan = self._eagain(self.session.open_session) return chan def open_session(self): @@ -276,7 +275,7 @@ def execute(self, cmd, use_pty=False, channel=None): if use_pty: self._eagain(channel.pty) logger.debug("Executing command '%s'", cmd) - THREAD_POOL.apply(self._eagain, args=(channel.execute, cmd)) + self._eagain(channel.execute, cmd) return channel def _read_output_to_buffer(self, read_func, _buffer): @@ -318,12 +317,10 @@ def wait_finished(self, host_output, timeout=None): def close_channel(self, channel): logger.debug("Closing channel") - THREAD_POOL.apply(self._eagain, args=(channel.close,)) + self._eagain(channel.close) def _eagain(self, func, *args, **kwargs): return self._eagain_errcode(func, LIBSSH2_ERROR_EAGAIN, *args, **kwargs) - # return THREAD_POOL.apply(self._eagain_errcode, - # args=(func, LIBSSH2_ERROR_EAGAIN, *args), kwds=kwargs) def _make_sftp_eagain(self): return self._eagain(self.session.sftp_init) From 03391ee0c45f87e6a09fe2ae64d7b9d283ab0682 Mon Sep 17 00:00:00 2001 From: Panos Date: Sat, 23 Jan 2021 16:05:42 +0000 Subject: [PATCH 12/41] Updated native, ssh client threadpool use. Updated test --- pssh/clients/native/single.py | 7 +++-- pssh/clients/ssh/single.py | 41 ++++++++++++++++------------ tests/native/test_parallel_client.py | 2 +- 3 files changed, 30 insertions(+), 20 deletions(-) diff --git a/pssh/clients/native/single.py b/pssh/clients/native/single.py index 96988be8..8cf20319 100644 --- a/pssh/clients/native/single.py +++ b/pssh/clients/native/single.py @@ -21,6 +21,7 @@ from warnings import warn from gevent import sleep, spawn, get_hub +from gevent.lock import RLock from ssh2.error_codes import LIBSSH2_ERROR_EAGAIN from ssh2.exceptions import SFTPHandleError, SFTPProtocolError, \ Timeout as SSH2Timeout @@ -99,6 +100,7 @@ def __init__(self, host, :raises: :py:class:`pssh.exceptions.PKeyFileError` on errors finding provided private key. """ + self._session_lock = RLock() self.forward_ssh_agent = forward_ssh_agent self._forward_requested = False self.keepalive_seconds = keepalive_seconds @@ -234,7 +236,8 @@ def _password_auth(self): raise AuthenticationError("Password authentication failed - %s", ex) def _open_session(self): - chan = self._eagain(self.session.open_session) + with self._session_lock: + chan = THREAD_POOL.apply(self._eagain, args=(self.session.open_session,)) return chan def open_session(self): @@ -289,7 +292,7 @@ def _read_output_to_buffer(self, read_func, _buffer): if size <= 0: break _buffer.write(data) - sleep() + sleep(.0000001) finally: _buffer.eof.set() diff --git a/pssh/clients/ssh/single.py b/pssh/clients/ssh/single.py index f07d7bdd..dfe6d307 100644 --- a/pssh/clients/ssh/single.py +++ b/pssh/clients/ssh/single.py @@ -18,6 +18,7 @@ import logging from gevent import sleep, spawn, Timeout as GTimeout, joinall, get_hub +from gevent.lock import RLock from ssh import options from ssh.session import Session, SSH_READ_PENDING, SSH_WRITE_PENDING from ssh.key import import_privkey_file, import_cert_file, copy_cert_to_privkey @@ -102,6 +103,7 @@ def __init__(self, host, :raises: :py:class:`pssh.exceptions.PKeyFileError` on errors finding provided private key. """ + self._session_lock = RLock() self.cert_file = _validate_pkey_path(cert_file, host) self.gssapi_auth = gssapi_auth self.gssapi_server_identity = gssapi_server_identity @@ -190,10 +192,15 @@ def _import_cert_file(self, pkey): def _shell(self, channel): return self._eagain(channel.request_shell) - def _open_session(self): - channel = THREAD_POOL.apply(self.session.channel_new) + def _channel_new(self): + channel = self.session.channel_new() channel.set_blocking(0) - THREAD_POOL.apply(self._eagain, args=(channel.open_session,)) + self._eagain(channel.open_session) + return channel + + def _open_session(self): + with self._session_lock: + channel = THREAD_POOL.apply(self._channel_new) return channel def open_session(self): @@ -226,23 +233,23 @@ def execute(self, cmd, use_pty=False, channel=None): if use_pty: self._eagain(channel.request_pty, timeout=self.timeout) logger.debug("Executing command '%s'", cmd) - THREAD_POOL.apply(self._eagain, - args=(channel.request_exec, cmd), - kwds={'timeout': self.timeout}) + self._eagain(channel.request_exec, cmd) return channel def _read_output_to_buffer(self, channel, _buffer, is_stderr=False): - while True: - self.poll(timeout=self.timeout) - try: - size, data = channel.read_nonblocking(is_stderr=is_stderr) - except EOF: - _buffer.eof.set() - sleep(.000001) - return - if size > 0: - _buffer.write(data) - sleep(.000001) + try: + while True: + self.poll() + try: + size, data = channel.read_nonblocking(is_stderr=is_stderr) + except EOF: + sleep(.0000001) + return + if size > 0: + _buffer.write(data) + sleep(.0000001) + finally: + _buffer.eof.set() def wait_finished(self, host_output, timeout=None): """Wait for EOF from channel and close channel. diff --git a/tests/native/test_parallel_client.py b/tests/native/test_parallel_client.py index 161888da..4e58c9da 100644 --- a/tests/native/test_parallel_client.py +++ b/tests/native/test_parallel_client.py @@ -1424,7 +1424,7 @@ def test_read_timeout(self): def test_partial_read_timeout_close_cmd(self): self.assertTrue(self.client.finished()) - output = self.client.run_command('while true; do echo a line; sleep .1; done', + output = self.client.run_command('while true; do echo a line; sleep .05; done', use_pty=True, timeout=.15) stdout = [] try: From 96524336e794343ec1a6ecb5f0a7f9379b9634fc Mon Sep 17 00:00:00 2001 From: Panos Date: Sat, 23 Jan 2021 18:44:05 +0000 Subject: [PATCH 13/41] Updates --- pssh/clients/native/single.py | 4 ++-- pssh/clients/ssh/single.py | 7 ++++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/pssh/clients/native/single.py b/pssh/clients/native/single.py index 8cf20319..d027a9c7 100644 --- a/pssh/clients/native/single.py +++ b/pssh/clients/native/single.py @@ -238,11 +238,11 @@ def _password_auth(self): def _open_session(self): with self._session_lock: chan = THREAD_POOL.apply(self._eagain, args=(self.session.open_session,)) + # chan = self._eagain(self.session.open_session) return chan def open_session(self): """Open new channel from session""" - logger.debug("Opening session") try: chan = self._open_session() except Exception as ex: @@ -288,7 +288,7 @@ def _read_output_to_buffer(self, read_func, _buffer): while size == LIBSSH2_ERROR_EAGAIN: self.poll() size, data = read_func() - sleep(.0000001) + sleep() if size <= 0: break _buffer.write(data) diff --git a/pssh/clients/ssh/single.py b/pssh/clients/ssh/single.py index dfe6d307..da47b875 100644 --- a/pssh/clients/ssh/single.py +++ b/pssh/clients/ssh/single.py @@ -306,9 +306,10 @@ def close_channel(self, channel): :type channel: :py:class:`ssh.channel.Channel` """ logger.debug("Closing channel") - THREAD_POOL.apply(self._eagain, - args=(channel.close,), - kwds={'timeout': self.timeout}) + self._eagain(channel.close) + # THREAD_POOL.apply(self._eagain, + # args=(channel.close,), + # kwds={'timeout': self.timeout}) def poll(self, timeout=None): """ssh-python based co-operative gevent poll on session socket.""" From d5a740e9e16ca0c6e545e881d37d65651720683f Mon Sep 17 00:00:00 2001 From: Panos Date: Sat, 23 Jan 2021 19:16:32 +0000 Subject: [PATCH 14/41] Updates --- pssh/clients/native/single.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pssh/clients/native/single.py b/pssh/clients/native/single.py index d027a9c7..13b66d16 100644 --- a/pssh/clients/native/single.py +++ b/pssh/clients/native/single.py @@ -292,7 +292,7 @@ def _read_output_to_buffer(self, read_func, _buffer): if size <= 0: break _buffer.write(data) - sleep(.0000001) + sleep() finally: _buffer.eof.set() From 4b487859969d9dc9a41fa7ec1334356fbdf444f6 Mon Sep 17 00:00:00 2001 From: Panos Date: Sat, 30 Jan 2021 15:55:43 +0000 Subject: [PATCH 15/41] Updated single and parallel clients. Make parallel connect auth use gevent pool. --- pssh/clients/base/parallel.py | 3 ++- pssh/clients/native/single.py | 10 +++------- pssh/clients/ssh/single.py | 22 ++++++---------------- 3 files changed, 11 insertions(+), 24 deletions(-) diff --git a/pssh/clients/base/parallel.py b/pssh/clients/base/parallel.py index 91019c35..453c4c76 100644 --- a/pssh/clients/base/parallel.py +++ b/pssh/clients/base/parallel.py @@ -298,7 +298,8 @@ def connect_auth(self): :returns: list of greenlets to ``joinall`` with. :rtype: list(:py:mod:`gevent.greenlet.Greenlet`) """ - cmds = [spawn(self._make_ssh_client, i, host) for i, host in enumerate(self.hosts)] + cmds = [self.pool.spawn(self._make_ssh_client, i, host) + for i, host in enumerate(self.hosts)] return cmds def _consume_output(self, stdout, stderr): diff --git a/pssh/clients/native/single.py b/pssh/clients/native/single.py index 13b66d16..ce6f541f 100644 --- a/pssh/clients/native/single.py +++ b/pssh/clients/native/single.py @@ -21,7 +21,6 @@ from warnings import warn from gevent import sleep, spawn, get_hub -from gevent.lock import RLock from ssh2.error_codes import LIBSSH2_ERROR_EAGAIN from ssh2.exceptions import SFTPHandleError, SFTPProtocolError, \ Timeout as SSH2Timeout @@ -100,7 +99,6 @@ def __init__(self, host, :raises: :py:class:`pssh.exceptions.PKeyFileError` on errors finding provided private key. """ - self._session_lock = RLock() self.forward_ssh_agent = forward_ssh_agent self._forward_requested = False self.keepalive_seconds = keepalive_seconds @@ -236,9 +234,7 @@ def _password_auth(self): raise AuthenticationError("Password authentication failed - %s", ex) def _open_session(self): - with self._session_lock: - chan = THREAD_POOL.apply(self._eagain, args=(self.session.open_session,)) - # chan = self._eagain(self.session.open_session) + chan = self._eagain(self.session.open_session) return chan def open_session(self): @@ -288,11 +284,11 @@ def _read_output_to_buffer(self, read_func, _buffer): while size == LIBSSH2_ERROR_EAGAIN: self.poll() size, data = read_func() - sleep() + sleep(.0000001) if size <= 0: break _buffer.write(data) - sleep() + sleep(.0000001) finally: _buffer.eof.set() diff --git a/pssh/clients/ssh/single.py b/pssh/clients/ssh/single.py index da47b875..c90223b1 100644 --- a/pssh/clients/ssh/single.py +++ b/pssh/clients/ssh/single.py @@ -18,7 +18,6 @@ import logging from gevent import sleep, spawn, Timeout as GTimeout, joinall, get_hub -from gevent.lock import RLock from ssh import options from ssh.session import Session, SSH_READ_PENDING, SSH_WRITE_PENDING from ssh.key import import_privkey_file, import_cert_file, copy_cert_to_privkey @@ -103,7 +102,6 @@ def __init__(self, host, :raises: :py:class:`pssh.exceptions.PKeyFileError` on errors finding provided private key. """ - self._session_lock = RLock() self.cert_file = _validate_pkey_path(cert_file, host) self.gssapi_auth = gssapi_auth self.gssapi_server_identity = gssapi_server_identity @@ -189,19 +187,14 @@ def _import_cert_file(self, pkey): copy_cert_to_privkey(cert_key, pkey) logger.debug("Imported certificate file %s for pkey %s", self.cert_file, self.pkey) - def _shell(self, channel): - return self._eagain(channel.request_shell) - - def _channel_new(self): - channel = self.session.channel_new() - channel.set_blocking(0) - self._eagain(channel.open_session) - return channel + def _shell(self, chan): + return self._eagain(chan.request_shell) def _open_session(self): - with self._session_lock: - channel = THREAD_POOL.apply(self._channel_new) - return channel + chan = self.session.channel_new() + chan.set_blocking(0) + self._eagain(chan.open_session) + return chan def open_session(self): """Open new channel from session.""" @@ -307,9 +300,6 @@ def close_channel(self, channel): """ logger.debug("Closing channel") self._eagain(channel.close) - # THREAD_POOL.apply(self._eagain, - # args=(channel.close,), - # kwds={'timeout': self.timeout}) def poll(self, timeout=None): """ssh-python based co-operative gevent poll on session socket.""" From 386d272bf80d15f17d9f07c6eed7f63cb5cfd260 Mon Sep 17 00:00:00 2001 From: Panos Date: Sat, 6 Feb 2021 18:40:48 +0000 Subject: [PATCH 16/41] Updated reader --- pssh/clients/base/single.py | 2 +- pssh/clients/reader.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pssh/clients/base/single.py b/pssh/clients/base/single.py index 812cf454..7c4b805d 100644 --- a/pssh/clients/base/single.py +++ b/pssh/clients/base/single.py @@ -667,7 +667,7 @@ def _poll_socket(self, events, timeout=None): return # gevent.select.poll converts seconds to miliseconds to match python socket # implementation - timeout = timeout * 1000 if timeout is not None else 1 + timeout = timeout * 1000 if timeout is not None else 100 poller = poll() poller.register(self.sock, eventmask=events) poller.poll(timeout=timeout) diff --git a/pssh/clients/reader.py b/pssh/clients/reader.py index a4ca3e8a..888627c2 100644 --- a/pssh/clients/reader.py +++ b/pssh/clients/reader.py @@ -77,4 +77,4 @@ def __iter__(self): if data: yield data elif self._read_pos == self._write_pos: - sleep(.000001) + sleep(.01) From 8e968e44aa98296aa311ac2b81d800c57ab8c5cf Mon Sep 17 00:00:00 2001 From: Panos Date: Sun, 7 Feb 2021 15:31:27 +0000 Subject: [PATCH 17/41] Updates --- pssh/clients/reader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pssh/clients/reader.py b/pssh/clients/reader.py index 888627c2..c3f2d818 100644 --- a/pssh/clients/reader.py +++ b/pssh/clients/reader.py @@ -77,4 +77,4 @@ def __iter__(self): if data: yield data elif self._read_pos == self._write_pos: - sleep(.01) + sleep(.1) From cc8d8ee93b1d980911801473c0cde6b1c9fa1088 Mon Sep 17 00:00:00 2001 From: Panos Date: Sun, 21 Feb 2021 18:02:19 +0000 Subject: [PATCH 18/41] Updated changelog --- Changelog.rst | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/Changelog.rst b/Changelog.rst index 870eac56..1a9f0bbb 100644 --- a/Changelog.rst +++ b/Changelog.rst @@ -7,8 +7,14 @@ Change Log Changes ------- +* Performance and scaling improvements for all clients. Allow ``ssh-python`` (``libssh``) client to use multiple cores for authentication. * ``user`` keyword argument no longer required on Windows - exception is raised if user cannot be identified. +Fixes +----- + +* ``ParallelSSHClient.connect_auth`` would not honour client pool size. + 2.5.4 +++++ From 9c46807ce7823feecf79c479f1c902c84ea78775 Mon Sep 17 00:00:00 2001 From: Panos Date: Sun, 21 Feb 2021 18:39:24 +0000 Subject: [PATCH 19/41] Updates --- pssh/clients/base/single.py | 4 +--- pssh/clients/native/single.py | 4 ++-- pssh/clients/ssh/single.py | 3 +-- 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/pssh/clients/base/single.py b/pssh/clients/base/single.py index 06f42e27..705a0829 100644 --- a/pssh/clients/base/single.py +++ b/pssh/clients/base/single.py @@ -528,12 +528,10 @@ def _eagain_errcode(self, func, eagain, *args, **kwargs): timeout = kwargs.pop('timeout', self.timeout) with GTimeout(seconds=timeout, exception=Timeout): ret = func(*args, **kwargs) - sleep() while ret == eagain: self.poll() ret = func(*args, **kwargs) - sleep(.000001) - sleep() + sleep() return ret def _eagain_write(self, write_func, data, timeout=None): diff --git a/pssh/clients/native/single.py b/pssh/clients/native/single.py index 8a25517b..af3e9922 100644 --- a/pssh/clients/native/single.py +++ b/pssh/clients/native/single.py @@ -281,11 +281,11 @@ def _read_output_to_buffer(self, read_func, _buffer): while size == LIBSSH2_ERROR_EAGAIN: self.poll() size, data = read_func() - sleep(.0000001) + sleep() if size <= 0: break _buffer.write(data) - sleep(.0000001) + sleep() finally: _buffer.eof.set() diff --git a/pssh/clients/ssh/single.py b/pssh/clients/ssh/single.py index 6832ce8d..fe6d118a 100644 --- a/pssh/clients/ssh/single.py +++ b/pssh/clients/ssh/single.py @@ -233,11 +233,10 @@ def _read_output_to_buffer(self, channel, _buffer, is_stderr=False): try: size, data = channel.read_nonblocking(is_stderr=is_stderr) except EOF: - sleep(.0000001) return if size > 0: _buffer.write(data) - sleep(.0000001) + sleep() finally: _buffer.eof.set() From 5f3235e14947972203dbee62783033933d052f1a Mon Sep 17 00:00:00 2001 From: Panos Date: Sun, 21 Feb 2021 18:46:10 +0000 Subject: [PATCH 20/41] Updates --- pssh/clients/native/single.py | 2 +- pssh/clients/ssh/single.py | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pssh/clients/native/single.py b/pssh/clients/native/single.py index af3e9922..c51ae079 100644 --- a/pssh/clients/native/single.py +++ b/pssh/clients/native/single.py @@ -228,7 +228,7 @@ def _pkey_auth(self, pkey_file, password=None): ) def _password_auth(self): - self.session.userauth_password(self.user, self.password) + THREAD_POOL.apply(self.session.userauth_password, args=(self.user, self.password)) def _open_session(self): chan = self._eagain(self.session.open_session) diff --git a/pssh/clients/ssh/single.py b/pssh/clients/ssh/single.py index fe6d118a..fd12fefc 100644 --- a/pssh/clients/ssh/single.py +++ b/pssh/clients/ssh/single.py @@ -121,7 +121,7 @@ def disconnect(self): self.sock.close() def _agent_auth(self): - self.session.userauth_agent(self.user) + THREAD_POOL.apply(self.session.userauth_agent, args=(self.user,)) def _keepalive(self): pass @@ -161,7 +161,7 @@ def _session_connect(self): def auth(self): if self.gssapi_auth or (self.gssapi_server_identity or self.gssapi_client_identity): try: - return self.session.userauth_gssapi() + return THREAD_POOL.apply(self.session.userauth_gssapi) except Exception as ex: logger.error( "GSSAPI authentication with server id %s and client id %s failed - %s", @@ -169,13 +169,13 @@ def auth(self): return super(SSHClient, self).auth() def _password_auth(self): - self.session.userauth_password(self.user, self.password) + THREAD_POOL.apply(self.session.userauth_password, args=(self.user, self.password)) def _pkey_auth(self, pkey_file, password=None): pkey = import_privkey_file(pkey_file, passphrase=password if password is not None else '') if self.cert_file is not None: logger.debug("Certificate file set - trying certificate authentication") - self._import_cert_file(pkey) + THREAD_POOL.apply(self._import_cert_file, args=(pkey,)) THREAD_POOL.apply(self.session.userauth_publickey, args=(pkey,)) def _import_cert_file(self, pkey): From 69be1ee3b2e4bd486417af2647ab0fd62e8f6694 Mon Sep 17 00:00:00 2001 From: Panos Date: Sun, 21 Feb 2021 18:55:56 +0000 Subject: [PATCH 21/41] Added greenlet locking around thread calls --- pssh/clients/native/single.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/pssh/clients/native/single.py b/pssh/clients/native/single.py index c51ae079..f905b290 100644 --- a/pssh/clients/native/single.py +++ b/pssh/clients/native/single.py @@ -21,6 +21,7 @@ from warnings import warn from gevent import sleep, spawn, get_hub +from gevent.lock import RLock from ssh2.error_codes import LIBSSH2_ERROR_EAGAIN from ssh2.exceptions import SFTPHandleError, SFTPProtocolError, \ Timeout as SSH2Timeout @@ -106,6 +107,7 @@ def __init__(self, host, self._proxy_client = None self.host = host self.port = port if port is not None else 22 + self._lock = RLock() if proxy_host is not None: _port = port if proxy_port is None else proxy_port _pkey = pkey if proxy_pkey is None else proxy_pkey @@ -196,7 +198,8 @@ def _init_session(self, retries=1): self.session.set_timeout(self.timeout * 1000) try: if self._auth_thread_pool: - THREAD_POOL.apply(self.session.handshake, (self.sock,)) + with self._lock: + THREAD_POOL.apply(self.session.handshake, (self.sock,)) else: self.session.handshake(self.sock) except Exception as ex: @@ -221,14 +224,16 @@ def _agent_auth(self): def _pkey_auth(self, pkey_file, password=None): passphrase = password if password is not None else b'' - THREAD_POOL.apply( - self.session.userauth_publickey_fromfile, - args=(self.user, pkey_file), - kwds={'passphrase': passphrase}, - ) + with self._lock: + THREAD_POOL.apply( + self.session.userauth_publickey_fromfile, + args=(self.user, pkey_file), + kwds={'passphrase': passphrase}, + ) def _password_auth(self): - THREAD_POOL.apply(self.session.userauth_password, args=(self.user, self.password)) + with self._lock: + THREAD_POOL.apply(self.session.userauth_password, args=(self.user, self.password)) def _open_session(self): chan = self._eagain(self.session.open_session) From 1e107f690070bc0674542e1b6dfb228467a86149 Mon Sep 17 00:00:00 2001 From: Panos Date: Sun, 21 Feb 2021 19:05:24 +0000 Subject: [PATCH 22/41] Added locking for ssh client --- pssh/clients/ssh/single.py | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/pssh/clients/ssh/single.py b/pssh/clients/ssh/single.py index fd12fefc..931c1ab5 100644 --- a/pssh/clients/ssh/single.py +++ b/pssh/clients/ssh/single.py @@ -18,6 +18,7 @@ import logging from gevent import sleep, spawn, Timeout as GTimeout, joinall, get_hub +from gevent.lock import RLock from ssh import options from ssh.session import Session, SSH_READ_PENDING, SSH_WRITE_PENDING from ssh.key import import_privkey_file, import_cert_file, copy_cert_to_privkey @@ -107,6 +108,7 @@ def __init__(self, host, self.gssapi_server_identity = gssapi_server_identity self.gssapi_client_identity = gssapi_client_identity self.gssapi_delegate_credentials = gssapi_delegate_credentials + self._lock = RLock() super(SSHClient, self).__init__( host, user=user, password=password, port=port, pkey=pkey, num_retries=num_retries, retry_delay=retry_delay, @@ -121,7 +123,8 @@ def disconnect(self): self.sock.close() def _agent_auth(self): - THREAD_POOL.apply(self.session.userauth_agent, args=(self.user,)) + with self._lock: + THREAD_POOL.apply(self.session.userauth_agent, args=(self.user,)) def _keepalive(self): pass @@ -156,12 +159,14 @@ def _init_session(self, retries=1): raise ex def _session_connect(self): - THREAD_POOL.apply(self.session.connect) + with self._lock: + THREAD_POOL.apply(self.session.connect) def auth(self): if self.gssapi_auth or (self.gssapi_server_identity or self.gssapi_client_identity): try: - return THREAD_POOL.apply(self.session.userauth_gssapi) + with self._lock: + return THREAD_POOL.apply(self.session.userauth_gssapi) except Exception as ex: logger.error( "GSSAPI authentication with server id %s and client id %s failed - %s", @@ -169,18 +174,23 @@ def auth(self): return super(SSHClient, self).auth() def _password_auth(self): - THREAD_POOL.apply(self.session.userauth_password, args=(self.user, self.password)) + with self._lock: + THREAD_POOL.apply(self.session.userauth_password, args=(self.user, self.password)) def _pkey_auth(self, pkey_file, password=None): - pkey = import_privkey_file(pkey_file, passphrase=password if password is not None else '') + passphrase = password if password is not None else '' + pkey = THREAD_POOL.apply( + import_privkey_file, args=(pkey_file,), kwds={'passphrase': passphrase}) if self.cert_file is not None: logger.debug("Certificate file set - trying certificate authentication") THREAD_POOL.apply(self._import_cert_file, args=(pkey,)) - THREAD_POOL.apply(self.session.userauth_publickey, args=(pkey,)) + with self._lock: + THREAD_POOL.apply(self.session.userauth_publickey, args=(pkey,)) def _import_cert_file(self, pkey): cert_key = import_cert_file(self.cert_file) - self.session.userauth_try_publickey(cert_key) + with self._lock: + self.session.userauth_try_publickey(cert_key) copy_cert_to_privkey(cert_key, pkey) logger.debug("Imported certificate file %s for pkey %s", self.cert_file, self.pkey) From fe526513820443c036d1020a637b440b3d723982 Mon Sep 17 00:00:00 2001 From: Panos Date: Sun, 3 Apr 2022 11:15:00 +0100 Subject: [PATCH 23/41] Load pkey data once for global key in parallel clients. Split stdout/stderr locks. Added threading --- pssh/clients/base/parallel.py | 4 ++-- pssh/clients/base/single.py | 10 +++++----- pssh/clients/native/single.py | 24 +++++++++++++----------- pssh/clients/ssh/single.py | 14 +++++++------- 4 files changed, 27 insertions(+), 25 deletions(-) diff --git a/pssh/clients/base/parallel.py b/pssh/clients/base/parallel.py index 16f13335..012646ac 100644 --- a/pssh/clients/base/parallel.py +++ b/pssh/clients/base/parallel.py @@ -65,6 +65,7 @@ def __init__(self, hosts, user=None, password=None, port=None, pkey=None, self.password = password self.port = port self.pkey = pkey + self.__pkey_data = self._load_pkey_data(pkey) if pkey is not None else None self.num_retries = num_retries self.timeout = timeout self._host_clients = {} @@ -555,8 +556,7 @@ def _get_ssh_client(self, host_i, host): if _client is not None: return _client cfg = self._get_host_config(host_i, host) - _pkey = self.pkey if cfg.private_key is None else cfg.private_key - _pkey_data = self._load_pkey_data(_pkey) + _pkey_data = self.__pkey_data if cfg.private_key is None else self._load_pkey_data(cfg.private_key) _client = self._make_ssh_client(host, cfg, _pkey_data) self._host_clients[(host_i, host)] = _client return _client diff --git a/pssh/clients/base/single.py b/pssh/clients/base/single.py index d093778e..724d3bc5 100644 --- a/pssh/clients/base/single.py +++ b/pssh/clients/base/single.py @@ -22,20 +22,19 @@ from gevent import sleep, socket, Timeout as GTimeout from gevent.hub import Hub +from gevent.lock import RLock from gevent.select import poll, POLLIN, POLLOUT - -from ssh2.utils import find_eol from ssh2.exceptions import AgentConnectionError, AgentListIdentitiesError, \ AgentAuthenticationError, AgentGetIdentityError +from ssh2.utils import find_eol from ..common import _validate_pkey -from ...constants import DEFAULT_RETRIES, RETRY_DELAY from ..reader import ConcurrentRWBuffer +from ...constants import DEFAULT_RETRIES, RETRY_DELAY from ...exceptions import UnknownHostError, AuthenticationError, \ ConnectionError, Timeout, NoIPv6AddressFoundError from ...output import HostOutput, HostOutputBuffers, BufferData - Hub.NOT_ERROR = (Exception,) host_logger = logging.getLogger('pssh.host_logger') logger = logging.getLogger(__name__) @@ -186,6 +185,7 @@ def __init__(self, host, self.identity_auth = identity_auth self._keepalive_greenlet = None self.ipv6_only = ipv6_only + self._sess_lock = RLock() self._init() def _pkey_from_memory(self, pkey_data): @@ -473,7 +473,7 @@ def _read_output_buffer(self, _buffer, timeout=None): finally: timer.close() - def _read_output_to_buffer(self, read_func, _buffer): + def _read_output_to_buffer(self, read_func, _buffer, is_stderr=False): raise NotImplementedError def wait_finished(self, host_output, timeout=None): diff --git a/pssh/clients/native/single.py b/pssh/clients/native/single.py index 3211930e..edd99ae2 100644 --- a/pssh/clients/native/single.py +++ b/pssh/clients/native/single.py @@ -117,7 +117,6 @@ def __init__(self, host, self._proxy_client = None self.host = host self.port = port if port is not None else 22 - self._lock = RLock() if proxy_host is not None: _port = port if proxy_port is None else proxy_port _pkey = pkey if proxy_pkey is None else proxy_pkey @@ -132,7 +131,8 @@ def __init__(self, host, identity_auth=identity_auth, ) proxy_host = '127.0.0.1' - self._chan_lock = RLock() + self._chan_stdout_lock = RLock() + self._chan_stderr_lock = RLock() super(SSHClient, self).__init__( host, user=user, password=password, port=port, pkey=pkey, num_retries=num_retries, retry_delay=retry_delay, @@ -220,7 +220,7 @@ def _init_session(self, retries=1): self.session.set_timeout(self.timeout * 1000) try: if self._auth_thread_pool: - with self._lock: + with self._sess_lock: THREAD_POOL.apply(self.session.handshake, (self.sock,)) else: self.session.handshake(self.sock) @@ -240,11 +240,12 @@ def _keepalive(self): self._keepalive_greenlet = self.spawn_send_keepalive() def _agent_auth(self): - self.session.agent_auth(self.user) + with self._sess_lock: + THREAD_POOL.apply(self.session.agent_auth, args=(self.user,)) def _pkey_file_auth(self, pkey_file, password=None): passphrase = password if password is not None else b'' - with self._lock: + with self._sess_lock: THREAD_POOL.apply( self.session.userauth_publickey_fromfile, args=(self.user, pkey_file), @@ -253,7 +254,7 @@ def _pkey_file_auth(self, pkey_file, password=None): def _pkey_from_memory(self, pkey_data): passphrase = self.password if self.password is not None else b'' - with self._lock: + with self._sess_lock: THREAD_POOL.apply( self.session.userauth_publickey_frommemory, args=(self.user, pkey_data), @@ -261,7 +262,7 @@ def _pkey_from_memory(self, pkey_data): ) def _password_auth(self): - with self._lock: + with self._sess_lock: THREAD_POOL.apply(self.session.userauth_password, args=(self.user, self.password)) def _open_session(self): @@ -308,14 +309,15 @@ def execute(self, cmd, use_pty=False, channel=None): self._eagain(channel.execute, cmd) return channel - def _read_output_to_buffer(self, read_func, _buffer): + def _read_output_to_buffer(self, read_func, _buffer, is_stderr=False): + _lock = self._chan_stderr_lock if is_stderr else self._chan_stdout_lock try: while True: - with self._chan_lock: + with _lock: size, data = read_func() while size == LIBSSH2_ERROR_EAGAIN: self.poll() - with self._chan_lock: + with _lock: size, data = read_func() if size <= 0: break @@ -347,7 +349,7 @@ def wait_finished(self, host_output, timeout=None): self.close_channel(channel) def close_channel(self, channel): - with self._chan_lock: + with self._chan_stdout_lock, self._chan_stderr_lock: logger.debug("Closing channel") self._eagain(channel.close) diff --git a/pssh/clients/ssh/single.py b/pssh/clients/ssh/single.py index 000b66c2..54fc1ced 100644 --- a/pssh/clients/ssh/single.py +++ b/pssh/clients/ssh/single.py @@ -115,7 +115,6 @@ def __init__(self, host, self.gssapi_server_identity = gssapi_server_identity self.gssapi_client_identity = gssapi_client_identity self.gssapi_delegate_credentials = gssapi_delegate_credentials - self._lock = RLock() super(SSHClient, self).__init__( host, user=user, password=password, port=port, pkey=pkey, num_retries=num_retries, retry_delay=retry_delay, @@ -132,7 +131,7 @@ def disconnect(self): self.sock.close() def _agent_auth(self): - with self._lock: + with self._sess_lock: THREAD_POOL.apply(self.session.userauth_agent, args=(self.user,)) def _keepalive(self): @@ -166,22 +165,23 @@ def _init_session(self, retries=1): raise ex def _session_connect(self): - with self._lock: + with self._sess_lock: THREAD_POOL.apply(self.session.connect) def auth(self): if self.gssapi_auth or (self.gssapi_server_identity or self.gssapi_client_identity): try: - with self._lock: + with self._sess_lock: return THREAD_POOL.apply(self.session.userauth_gssapi) except Exception as ex: logger.error( "GSSAPI authentication with server id %s and client id %s failed - %s", self.gssapi_server_identity, self.gssapi_client_identity, ex) + raise return super(SSHClient, self).auth() def _password_auth(self): - with self._lock: + with self._sess_lock: THREAD_POOL.apply(self.session.userauth_password, args=(self.user, self.password)) def _pkey_file_auth(self, pkey_file, password=None): @@ -194,7 +194,7 @@ def _pkey_obj_auth(self, pkey): if self.cert_file is not None: logger.debug("Certificate file set - trying certificate authentication") THREAD_POOL.apply(self._import_cert_file, args=(pkey,)) - with self._lock: + with self._sess_lock: THREAD_POOL.apply(self.session.userauth_publickey, args=(pkey,)) def _pkey_from_memory(self, pkey_data): @@ -208,7 +208,7 @@ def _pkey_from_memory(self, pkey_data): def _import_cert_file(self, pkey): cert_key = import_cert_file(self.cert_file) - with self._lock: + with self._sess_lock: self.session.userauth_try_publickey(cert_key) copy_cert_to_privkey(cert_key, pkey) logger.debug("Imported certificate file %s for pkey %s", self.cert_file, self.pkey) From 0eb8d0214d9c63c23903759cce6a6beaa62e56e0 Mon Sep 17 00:00:00 2001 From: Panos Date: Sun, 3 Apr 2022 11:57:25 +0100 Subject: [PATCH 24/41] Added threading to sftp get/put --- pssh/clients/native/single.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/pssh/clients/native/single.py b/pssh/clients/native/single.py index edd99ae2..ff72cec4 100644 --- a/pssh/clients/native/single.py +++ b/pssh/clients/native/single.py @@ -446,11 +446,14 @@ def sftp_put(self, sftp, local_file, remote_file): with self._sftp_openfh( sftp.open, remote_file, f_flags, mode) as remote_fh: try: - self._sftp_put(remote_fh, local_file) + self._sess_lock.acquire() + THREAD_POOL.apply(self._sftp_put, args=(remote_fh, local_file)) except SFTPProtocolError as ex: msg = "Error writing to remote file %s - %s" logger.error(msg, remote_file, ex) raise SFTPIOError(msg, remote_file, ex) + finally: + self._sess_lock.release() def mkdir(self, sftp, directory): """Make directory via SFTP channel. @@ -626,6 +629,9 @@ def scp_send(self, local_file, remote_file, recurse=False, sftp=None): :type remote_file: str :param recurse: Whether or not to descend into directories recursively. :type recurse: bool + :param sftp: The SFTP channel to use instead of creating a new one. + Only used when ``recurse`` is ``True``. + :type sftp: :py:class:`ssh2.sftp.SFTP` :raises: :py:class:`ValueError` when a directory is supplied to ``local_file`` and ``recurse`` is not set @@ -704,11 +710,14 @@ def sftp_get(self, sftp, remote_file, local_file): sftp.open, remote_file, LIBSSH2_FXF_READ, LIBSSH2_SFTP_S_IRUSR) as remote_fh: try: - self._sftp_get(remote_fh, local_file) + self._sess_lock.acquire() + THREAD_POOL.apply(self._sftp_get, args=(remote_fh, local_file)) except SFTPProtocolError as ex: msg = "Error reading from remote file %s - %s" logger.error(msg, remote_file, ex) raise SFTPIOError(msg, remote_file, ex) + finally: + self._sess_lock.release() def get_exit_status(self, channel): """Get exit status code for channel or ``None`` if not ready. From dcfd2eae3717a64ca3b958f7a54ac852cd3805e9 Mon Sep 17 00:00:00 2001 From: Panos Date: Sun, 3 Apr 2022 13:37:04 +0100 Subject: [PATCH 25/41] Cleanup --- pssh/clients/base/parallel.py | 28 ++++++++++++++++++---------- pssh/clients/native/parallel.py | 23 ++++------------------- pssh/clients/native/single.py | 5 ++--- pssh/clients/reader.py | 25 +++++++++++++++++++------ pssh/output.py | 2 +- tests/native/test_parallel_client.py | 14 ++++++++------ tests/test_reader.py | 5 +++-- 7 files changed, 55 insertions(+), 47 deletions(-) diff --git a/pssh/clients/base/parallel.py b/pssh/clients/base/parallel.py index 012646ac..620d0f04 100644 --- a/pssh/clients/base/parallel.py +++ b/pssh/clients/base/parallel.py @@ -114,6 +114,20 @@ def hosts(self, _hosts): self._host_clients.pop((i, host), None) self._hosts = _hosts + def __del__(self): + self.disconnect() + + def disconnect(self): + if not hasattr(self, '_host_clients'): + return + for s_client in self._host_clients.values(): + try: + s_client.disconnect() + except Exception as ex: + logger.debug("Client disconnect failed with %s", ex) + pass + del s_client + def _check_host_config(self): if self.host_config is None: return @@ -257,7 +271,7 @@ def get_last_output(self, cmds=None): return self._get_output_from_cmds( cmds, raise_error=False) - def _get_host_config(self, host_i, host): + def _get_host_config(self, host_i): if self.host_config is None: config = HostConfig( user=self.user, port=self.port, password=self.password, private_key=self.pkey, @@ -314,9 +328,9 @@ def connect_auth(self): return cmds def _consume_output(self, stdout, stderr): - for line in stdout: + for _ in stdout: pass - for line in stderr: + for _ in stderr: pass def join(self, output=None, consume_output=False, timeout=None): @@ -543,19 +557,13 @@ def _copy_remote_file(self, host_i, host, remote_file, local_file, recurse, return client.copy_remote_file( remote_file, local_file, recurse=recurse, **kwargs) - def _handle_greenlet_exc(self, func, host, *args, **kwargs): - try: - return func(*args, **kwargs) - except Exception as ex: - raise ex - def _get_ssh_client(self, host_i, host): logger.debug("Make client request for host %s, (host_i, host) in clients: %s", host, (host_i, host) in self._host_clients) _client = self._host_clients.get((host_i, host)) if _client is not None: return _client - cfg = self._get_host_config(host_i, host) + cfg = self._get_host_config(host_i) _pkey_data = self.__pkey_data if cfg.private_key is None else self._load_pkey_data(cfg.private_key) _client = self._make_ssh_client(host, cfg, _pkey_data) self._host_clients[(host_i, host)] = _client diff --git a/pssh/clients/native/parallel.py b/pssh/clients/native/parallel.py index 0b95a51e..f28be0ae 100644 --- a/pssh/clients/native/parallel.py +++ b/pssh/clients/native/parallel.py @@ -216,17 +216,6 @@ def run_command(self, command, sudo=False, user=None, stop_on_errors=True, read_timeout=read_timeout, ) - def __del__(self): - if not hasattr(self, '_host_clients'): - return - for s_client in self._host_clients.values(): - try: - s_client.disconnect() - except Exception as ex: - logger.debug("Client disconnect failed with %s", ex) - pass - del s_client - def _make_ssh_client(self, host, cfg, _pkey_data): _client = SSHClient( host, user=cfg.user or self.user, password=cfg.password or self.password, port=cfg.port or self.port, @@ -370,16 +359,12 @@ def copy_remote_file(self, remote_file, local_file, recurse=False, encoding=encoding) def _scp_send(self, host_i, host, local_file, remote_file, recurse=False): - self._get_ssh_client(host_i, host) - return self._handle_greenlet_exc( - self._host_clients[(host_i, host)].scp_send, host, - local_file, remote_file, recurse=recurse) + _client = self._get_ssh_client(host_i, host) + return _client.scp_send(local_file, remote_file, recurse=recurse) def _scp_recv(self, host_i, host, remote_file, local_file, recurse=False): - self._get_ssh_client(host_i, host) - return self._handle_greenlet_exc( - self._host_clients[(host_i, host)].scp_recv, host, - remote_file, local_file, recurse=recurse) + _client = self._get_ssh_client(host_i, host) + return _client.scp_recv(remote_file, local_file, recurse=recurse) def scp_send(self, local_file, remote_file, recurse=False, copy_args=None): """Copy local file to remote file in parallel via SCP. diff --git a/pssh/clients/native/single.py b/pssh/clients/native/single.py index ff72cec4..54eed38a 100644 --- a/pssh/clients/native/single.py +++ b/pssh/clients/native/single.py @@ -315,10 +315,9 @@ def _read_output_to_buffer(self, read_func, _buffer, is_stderr=False): while True: with _lock: size, data = read_func() - while size == LIBSSH2_ERROR_EAGAIN: + if size == LIBSSH2_ERROR_EAGAIN: self.poll() - with _lock: - size, data = read_func() + continue if size <= 0: break _buffer.write(data) diff --git a/pssh/clients/reader.py b/pssh/clients/reader.py index f9ab1e9f..fd5cbbe6 100644 --- a/pssh/clients/reader.py +++ b/pssh/clients/reader.py @@ -20,11 +20,20 @@ except ImportError: from cStringIO import StringIO as BytesIO -from gevent import sleep from gevent.event import Event from gevent.lock import RLock +class _Eof(Event): + def __init__(self, unread_data): + self._unread_data = unread_data + Event.__init__(self) + + def set(self): + self._unread_data.set() + Event.set(self) + + class ConcurrentRWBuffer(object): """Concurrent reader/writer of bytes for use from multiple greenlets. @@ -33,18 +42,19 @@ class ConcurrentRWBuffer(object): Iterate on buffer object to read data, yielding greenlet if no data exists until self.eof has been set. - Writers should ``eof.set()`` when finished writing data via ``write``. + Writers should ``ConcurrentRWBuffer.eof.set()`` when finished writing data via ``write``. Readers can use ``read()`` to get any available data or ``None``. """ - __slots__ = ('_buffer', '_read_pos', '_write_pos', 'eof', '_lock') + __slots__ = ('_buffer', '_read_pos', '_write_pos', 'eof', '_lock', '_unread_data') def __init__(self): self._buffer = BytesIO() self._read_pos = 0 self._write_pos = 0 - self.eof = Event() self._lock = RLock() + self._unread_data = Event() + self.eof = _Eof(self._unread_data) def write(self, data): """Write data to buffer. @@ -56,6 +66,8 @@ def write(self, data): if not self._buffer.tell() == self._write_pos: self._buffer.seek(self._write_pos) self._write_pos += self._buffer.write(data) + if not self._unread_data.is_set() and self._read_pos < self._write_pos: + self._unread_data.set() def read(self): """Read available data, or return None @@ -64,6 +76,7 @@ def read(self): """ with self._lock: if self._write_pos == 0 or self._read_pos == self._write_pos: + self._unread_data.clear() return elif not self._buffer.tell() == self._read_pos: self._buffer.seek(self._read_pos) @@ -76,5 +89,5 @@ def __iter__(self): data = self.read() if data: yield data - elif self._read_pos == self._write_pos: - sleep(.1) + else: + self._unread_data.wait() diff --git a/pssh/output.py b/pssh/output.py index c7e9375e..0baa7f76 100644 --- a/pssh/output.py +++ b/pssh/output.py @@ -44,7 +44,7 @@ def __init__(self, reader, rw_buffer): """ :param reader: Greenlet reading data from channel and writing to rw_buffer :type reader: :py:class:`gevent.Greenlet` - :param rw_bufffer: Read/write buffer + :param rw_buffer: Read/write buffer :type rw_buffer: :py:class:`pssh.clients.reader.ConcurrentRWBuffer` """ self.reader = reader diff --git a/tests/native/test_parallel_client.py b/tests/native/test_parallel_client.py index 5b760985..69eb3f3e 100644 --- a/tests/native/test_parallel_client.py +++ b/tests/native/test_parallel_client.py @@ -1270,6 +1270,7 @@ def test_join_timeout_subset_read(self): ] client = ParallelSSHClient(hosts, port=self.port, pkey=self.user_key) + joinall(client.connect_auth(), raise_error=True) output = client.run_command(cmd, host_args=host_args) try: client.join(output, timeout=.2) @@ -1320,6 +1321,7 @@ def test_partial_read_timeout_close_cmd(self): except Timeout: pass self.assertTrue(len(stdout) > 0) + sleep(.15) output[0].client.close_channel(output[0].channel) self.client.join(output) # Should not timeout @@ -1540,21 +1542,20 @@ def test_scp_bad_copy_args(self): def test_scp_send_exc(self): client = ParallelSSHClient([self.host], pkey=self.user_key, num_retries=1) + def _scp_send(*args): raise Exception - def _client_send(*args): - return client._handle_greenlet_exc(_scp_send, 'fake') - client._scp_send = _client_send + client._scp_send = _scp_send cmds = client.scp_send('local_file', 'remote_file') self.assertRaises(Exception, joinall, cmds, raise_error=True) def test_scp_recv_exc(self): client = ParallelSSHClient([self.host], pkey=self.user_key, num_retries=1) + def _scp_recv(*args): raise Exception - def _client_recv(*args): - return client._handle_greenlet_exc(_scp_recv, 'fake') - client._scp_recv = _client_recv + + client._scp_recv = _scp_recv cmds = client.scp_recv('remote_file', 'local_file') self.assertRaises(Exception, joinall, cmds, raise_error=True) @@ -1758,6 +1759,7 @@ def test_client_disconnect(self): client.join(consume_output=True) single_client = list(client._host_clients.values())[0] del client + # client.disconnect() self.assertEqual(single_client.session, None) def test_client_disconnect_error(self): diff --git a/tests/test_reader.py b/tests/test_reader.py index b353f80c..5f16904e 100644 --- a/tests/test_reader.py +++ b/tests/test_reader.py @@ -55,6 +55,7 @@ def test_multi_write_read(self): def test_concurrent_rw(self): written_data = Queue() + def _writer(_buffer): while True: data = b"".join([ascii_letters[m].encode() for m in [randrange(0, 8) for _ in range(8)]]) @@ -66,13 +67,13 @@ def _writer(_buffer): sleep(0.5) data = self.buffer.read() _data = b"" - while written_data.qsize() !=0 : + while written_data.qsize() != 0: _data += written_data.get() self.assertEqual(data, _data) sleep(0.5) data = self.buffer.read() _data = b"" - while written_data.qsize() !=0 : + while written_data.qsize() != 0: _data += written_data.get() self.assertEqual(data, _data) writer.kill() From d178ceab14498c7e208eff7b8be7ccf123af8c4e Mon Sep 17 00:00:00 2001 From: Panos Date: Sun, 3 Apr 2022 14:38:47 +0100 Subject: [PATCH 26/41] Docstrings --- pssh/clients/reader.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pssh/clients/reader.py b/pssh/clients/reader.py index fd5cbbe6..6d9ddf4f 100644 --- a/pssh/clients/reader.py +++ b/pssh/clients/reader.py @@ -39,10 +39,10 @@ class ConcurrentRWBuffer(object): Supports both concurrent reading and writing. - Iterate on buffer object to read data, yielding greenlet if no data exists + Iterate on buffer object to read data, yielding event loop if no data exists until self.eof has been set. - Writers should ``ConcurrentRWBuffer.eof.set()`` when finished writing data via ``write``. + Writers should call ``ConcurrentRWBuffer.eof.set()`` when finished writing data via ``write``. Readers can use ``read()`` to get any available data or ``None``. """ @@ -70,7 +70,7 @@ def write(self, data): self._unread_data.set() def read(self): - """Read available data, or return None + """Read available data, or return None. :rtype: bytes """ From 14e65b4e5d3c376c0baa6a37d5be513ffd7aecad Mon Sep 17 00:00:00 2001 From: Panos Date: Sun, 3 Apr 2022 21:23:49 +0100 Subject: [PATCH 27/41] Cleanup --- pssh/clients/native/single.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pssh/clients/native/single.py b/pssh/clients/native/single.py index 54eed38a..3f210a12 100644 --- a/pssh/clients/native/single.py +++ b/pssh/clients/native/single.py @@ -444,8 +444,8 @@ def sftp_put(self, sftp, local_file, remote_file): f_flags = LIBSSH2_FXF_CREAT | LIBSSH2_FXF_WRITE | LIBSSH2_FXF_TRUNC with self._sftp_openfh( sftp.open, remote_file, f_flags, mode) as remote_fh: + self._sess_lock.acquire() try: - self._sess_lock.acquire() THREAD_POOL.apply(self._sftp_put, args=(remote_fh, local_file)) except SFTPProtocolError as ex: msg = "Error writing to remote file %s - %s" @@ -708,8 +708,8 @@ def sftp_get(self, sftp, remote_file, local_file): with self._sftp_openfh( sftp.open, remote_file, LIBSSH2_FXF_READ, LIBSSH2_SFTP_S_IRUSR) as remote_fh: + self._sess_lock.acquire() try: - self._sess_lock.acquire() THREAD_POOL.apply(self._sftp_get, args=(remote_fh, local_file)) except SFTPProtocolError as ex: msg = "Error reading from remote file %s - %s" From a817a544550f61ef4cacca7996ba72e9e44da0a7 Mon Sep 17 00:00:00 2001 From: Panos Date: Sun, 3 Apr 2022 21:27:38 +0100 Subject: [PATCH 28/41] Docstrings --- pssh/clients/native/single.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pssh/clients/native/single.py b/pssh/clients/native/single.py index 3f210a12..c86ef624 100644 --- a/pssh/clients/native/single.py +++ b/pssh/clients/native/single.py @@ -569,6 +569,9 @@ def scp_recv(self, remote_file, local_file, recurse=False, sftp=None, :type local_file: str :param recurse: Whether or not to recursively copy directories :type recurse: bool + :param sftp: The SFTP channel to use instead of creating a new one. + Only used when ``recurse`` is ``True``. + :type sftp: :py:class:`ssh2.sftp.SFTP` :param encoding: Encoding to use for file paths when recursion is enabled. :type encoding: str From 480ab011d071ab4a5dec0b4cfd1fad0a2036deba Mon Sep 17 00:00:00 2001 From: Panos Date: Sun, 3 Apr 2022 21:43:55 +0100 Subject: [PATCH 29/41] Docstrings, cleanup --- pssh/clients/base/single.py | 12 ++++++++++-- pssh/clients/native/parallel.py | 7 ++++++- pssh/clients/native/tunnel.py | 2 +- pssh/clients/ssh/single.py | 14 ++++++-------- pssh/utils.py | 2 +- 5 files changed, 24 insertions(+), 13 deletions(-) diff --git a/pssh/clients/base/single.py b/pssh/clients/base/single.py index 724d3bc5..b1ce63ca 100644 --- a/pssh/clients/base/single.py +++ b/pssh/clients/base/single.py @@ -286,7 +286,7 @@ def _connect(self, host, port, retries=1): raise unknown_ex from ex for i, (family, _type, proto, _, sock_addr) in enumerate(addr_info): try: - return self._connect_socket(family, _type, proto, sock_addr, host, port, retries) + return self._connect_socket(family, _type, sock_addr, host, port, retries) except ConnectionRefusedError as ex: if i+1 == len(addr_info): logger.error("No available addresses from %s", [addr[4] for addr in addr_info]) @@ -294,7 +294,7 @@ def _connect(self, host, port, retries=1): raise continue - def _connect_socket(self, family, _type, proto, sock_addr, host, port, retries): + def _connect_socket(self, family, _type, sock_addr, host, port, retries): self.sock = socket.socket(family, _type) if self.timeout: self.sock.settimeout(self.timeout) @@ -427,6 +427,9 @@ def read_stderr(self, stderr_buffer, timeout=None): :param stderr_buffer: Buffer to read from. :type stderr_buffer: :py:class:`pssh.clients.reader.ConcurrentRWBuffer` + :param timeout: Timeout in seconds - defaults to no timeout. + :type timeout: int or float + :rtype: generator """ logger.debug("Reading from stderr buffer, timeout=%s", timeout) @@ -438,6 +441,9 @@ def read_output(self, stdout_buffer, timeout=None): :param stdout_buffer: Buffer to read from. :type stdout_buffer: :py:class:`pssh.clients.reader.ConcurrentRWBuffer` + :param timeout: Timeout in seconds - defaults to no timeout. + :type timeout: int or float + :rtype: generator """ logger.debug("Reading from stdout buffer, timeout=%s", timeout) @@ -495,6 +501,8 @@ def read_output_buffer(self, output_buffer, prefix=None, :type output_buffer: iterator :param prefix: String to prefix log output to ``host_logger`` with :type prefix: str + :param encoding: Output encoding to use for host logger. + :type encoding: str :param callback: Function to call back once buffer is depleted: :type callback: function :param callback_args: Arguments for call back function diff --git a/pssh/clients/native/parallel.py b/pssh/clients/native/parallel.py index f28be0ae..3fab60bf 100644 --- a/pssh/clients/native/parallel.py +++ b/pssh/clients/native/parallel.py @@ -389,6 +389,11 @@ def scp_send(self, local_file, remote_file, recurse=False, copy_args=None): :type local_file: str :param remote_file: Remote filepath on remote host to copy file to :type remote_file: str + :param copy_args: (Optional) format local_file and remote_file strings + with per-host arguments in ``copy_args``. ``copy_args`` length must + equal length of host list - + :py:class:`pssh.exceptions.HostArgumentError` is raised otherwise + :type copy_args: tuple or list :param recurse: Whether or not to descend into directories recursively. :type recurse: bool @@ -400,7 +405,7 @@ def scp_send(self, local_file, remote_file, recurse=False, copy_args=None): """ copy_args = [{'local_file': local_file, 'remote_file': remote_file} - for i, host in enumerate(self.hosts)] \ + for _ in self.hosts] \ if copy_args is None else copy_args local_file = "%(local_file)s" remote_file = "%(remote_file)s" diff --git a/pssh/clients/native/tunnel.py b/pssh/clients/native/tunnel.py index 5748a3c2..57577234 100644 --- a/pssh/clients/native/tunnel.py +++ b/pssh/clients/native/tunnel.py @@ -193,7 +193,7 @@ def _read_forward_sock(self, forward_sock, channel): sleep(.01) continue try: - self._client._eagain_write(channel.write, data) + self._client.eagain_write(channel.write, data) except Exception as ex: logger.error("Error writing data to channel - %s", ex) raise diff --git a/pssh/clients/ssh/single.py b/pssh/clients/ssh/single.py index 54fc1ced..428cd05d 100644 --- a/pssh/clients/ssh/single.py +++ b/pssh/clients/ssh/single.py @@ -18,20 +18,18 @@ import logging from gevent import sleep, spawn, Timeout as GTimeout, joinall, get_hub -from gevent.lock import RLock from ssh import options -from ssh.session import Session, SSH_READ_PENDING, SSH_WRITE_PENDING -from ssh.key import import_privkey_file, import_cert_file, copy_cert_to_privkey,\ - import_privkey_base64 -from ssh.exceptions import EOF from ssh.error_codes import SSH_AGAIN +from ssh.exceptions import EOF +from ssh.key import import_privkey_file, import_cert_file, copy_cert_to_privkey, \ + import_privkey_base64 +from ssh.session import Session, SSH_READ_PENDING, SSH_WRITE_PENDING from ..base.single import BaseSSHClient from ..common import _validate_pkey_path -from ...output import HostOutput -from ...exceptions import SessionError, Timeout from ...constants import DEFAULT_RETRIES, RETRY_DELAY - +from ...exceptions import SessionError, Timeout +from ...output import HostOutput logger = logging.getLogger(__name__) THREAD_POOL = get_hub().threadpool diff --git a/pssh/utils.py b/pssh/utils.py index 9eca4d23..c9a35e4f 100644 --- a/pssh/utils.py +++ b/pssh/utils.py @@ -46,5 +46,5 @@ def enable_host_logger(): def enable_debug_logger(): - """Enable debug logging for the library to sdout.""" + """Enable debug logging for the library to stdout.""" return enable_logger(logger, level=logging.DEBUG) From b8e4c996efd1616ec326fe81e0f8a2b8c93cd1aa Mon Sep 17 00:00:00 2001 From: Panos Date: Sun, 3 Apr 2022 22:01:42 +0100 Subject: [PATCH 30/41] Updated examples --- examples/parallel_commands.py | 2 +- examples/quickstart.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/parallel_commands.py b/examples/parallel_commands.py index 72b96902..c0a48137 100644 --- a/examples/parallel_commands.py +++ b/examples/parallel_commands.py @@ -10,7 +10,7 @@ cmds = ['sleep 5; uname' for _ in range(10)] start = datetime.datetime.now() for cmd in cmds: - output.append(client.run_command(cmd, stop_on_errors=False, return_list=True)) + output.append(client.run_command(cmd, stop_on_errors=False)) end = datetime.datetime.now() print("Started %s 'sleep 5' commands on %s host(s) in %s" % ( len(cmds), len(hosts), end-start,)) diff --git a/examples/quickstart.py b/examples/quickstart.py index a759ae62..412466cb 100644 --- a/examples/quickstart.py +++ b/examples/quickstart.py @@ -10,4 +10,4 @@ for host_out in output: for line in host_out.stdout: print(line) -print("Host %s: exit code %s" % (host_out.host, host_out.exit_code)) + print("Host %s: exit code %s" % (host_out.host, host_out.exit_code)) From e741cc60977cb0735f81c3f02978d81353321425 Mon Sep 17 00:00:00 2001 From: Panos Date: Sun, 3 Apr 2022 22:06:38 +0100 Subject: [PATCH 31/41] Cleanup --- pssh/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pssh/__init__.py b/pssh/__init__.py index bce2c66a..29bebb72 100644 --- a/pssh/__init__.py +++ b/pssh/__init__.py @@ -26,10 +26,10 @@ See also `pssh.clients.ParallelSSHClient` and pssh.clients.SSHClient` for class documentation. """ - - from logging import getLogger, NullHandler + from ._version import get_versions + __version__ = get_versions()['version'] del get_versions From 15659748bac918e611f5ae5c165f98870ff965b4 Mon Sep 17 00:00:00 2001 From: Panos Date: Sun, 3 Apr 2022 22:16:26 +0100 Subject: [PATCH 32/41] Cleanup --- pssh/clients/base/parallel.py | 16 ++++++++-------- pssh/clients/native/parallel.py | 1 - pssh/clients/ssh/parallel.py | 4 +--- 3 files changed, 9 insertions(+), 12 deletions(-) diff --git a/pssh/clients/base/parallel.py b/pssh/clients/base/parallel.py index 620d0f04..18b8e50d 100644 --- a/pssh/clients/base/parallel.py +++ b/pssh/clients/base/parallel.py @@ -23,7 +23,7 @@ from gevent import joinall, spawn, Timeout as GTimeout from gevent.hub import Hub -from ..common import _validate_pkey_path +from ..common import _validate_pkey_path, _validate_pkey from ...config import HostConfig from ...constants import DEFAULT_RETRIES, RETRY_DELAY from ...exceptions import HostArgumentError, Timeout, ShellError, HostConfigError @@ -64,7 +64,7 @@ def __init__(self, hosts, user=None, password=None, port=None, pkey=None, self.user = user self.password = password self.port = port - self.pkey = pkey + self.pkey = _validate_pkey(pkey) self.__pkey_data = self._load_pkey_data(pkey) if pkey is not None else None self.num_retries = num_retries self.timeout = timeout @@ -570,12 +570,12 @@ def _get_ssh_client(self, host_i, host): return _client def _load_pkey_data(self, _pkey): - if isinstance(_pkey, str): - _validate_pkey_path(_pkey) - with open(_pkey, 'rb') as fh: - _pkey_data = fh.read() - return _pkey_data - return _pkey + if not isinstance(_pkey, str): + return _pkey + _pkey = _validate_pkey_path(_pkey) + with open(_pkey, 'rb') as fh: + _pkey_data = fh.read() + return _pkey_data def _make_ssh_client(self, host, cfg, _pkey_data): raise NotImplementedError diff --git a/pssh/clients/native/parallel.py b/pssh/clients/native/parallel.py index 3fab60bf..d6839f3c 100644 --- a/pssh/clients/native/parallel.py +++ b/pssh/clients/native/parallel.py @@ -127,7 +127,6 @@ def __init__(self, hosts, user=None, password=None, port=22, pkey=None, identity_auth=identity_auth, ipv6_only=ipv6_only, ) - self.pkey = _validate_pkey(pkey) self.proxy_host = proxy_host self.proxy_port = proxy_port self.proxy_pkey = _validate_pkey(proxy_pkey) diff --git a/pssh/clients/ssh/parallel.py b/pssh/clients/ssh/parallel.py index bd7a11a8..a9193a4c 100644 --- a/pssh/clients/ssh/parallel.py +++ b/pssh/clients/ssh/parallel.py @@ -18,11 +18,10 @@ import logging from .single import SSHClient -from ..common import _validate_pkey_path, _validate_pkey from ..base.parallel import BaseParallelSSHClient +from ..common import _validate_pkey_path from ...constants import DEFAULT_RETRIES, RETRY_DELAY - logger = logging.getLogger(__name__) @@ -126,7 +125,6 @@ def __init__(self, hosts, user=None, password=None, port=22, pkey=None, identity_auth=identity_auth, ipv6_only=ipv6_only, ) - self.pkey = _validate_pkey(pkey) self.cert_file = _validate_pkey_path(cert_file) self.forward_ssh_agent = forward_ssh_agent self.gssapi_auth = gssapi_auth From 872ae1189642eaa6a8db29c4727d51ef31e819eb Mon Sep 17 00:00:00 2001 From: Panos Date: Mon, 4 Apr 2022 22:34:58 +0100 Subject: [PATCH 33/41] Updated tests --- tests/native/test_parallel_client.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/tests/native/test_parallel_client.py b/tests/native/test_parallel_client.py index 69eb3f3e..77bb78dd 100644 --- a/tests/native/test_parallel_client.py +++ b/tests/native/test_parallel_client.py @@ -1038,9 +1038,10 @@ def test_per_host_dict_args(self): server.start_server() hosts = [self.host, host2, host3] hosts_gen = (h for h in hosts) - host_args = [dict(zip(('host_arg1', 'host_arg2',), - ('arg1-%s' % (i,), 'arg2-%s' % (i,),))) - for i, _ in enumerate(hosts)] + host_args = [{'host_arg1': 'arg1-%s' % (i,), + 'host_arg2': 'arg2-%s' % (i,), + } + for i in range(len(hosts))] cmd = 'echo %(host_arg1)s %(host_arg2)s' client = ParallelSSHClient(hosts, port=self.port, pkey=self.user_key, @@ -1266,14 +1267,14 @@ def test_join_timeout_subset_read(self): hosts = [self.host, self.host] cmd = 'sleep %(i)s; echo %(i)s' host_args = [{'i': '0.1'}, - {'i': '0.25'}, + {'i': '0.35'}, ] client = ParallelSSHClient(hosts, port=self.port, pkey=self.user_key) joinall(client.connect_auth(), raise_error=True) output = client.run_command(cmd, host_args=host_args) try: - client.join(output, timeout=.2) + client.join(output, timeout=.3) except Timeout as ex: finished_output = ex.args[2] unfinished_output = ex.args[3] @@ -1286,7 +1287,7 @@ def test_join_timeout_subset_read(self): # Should not timeout client.join(unfinished_output, timeout=2) rest_stdout = list(unfinished_output[0].stdout) - self.assertEqual(rest_stdout, ['0.25']) + self.assertEqual(rest_stdout, ['0.35']) def test_join_timeout_set_no_timeout(self): client = ParallelSSHClient([self.host], port=self.port, From 5386007b01c923fb781d2321dbb77001c23d1fbe Mon Sep 17 00:00:00 2001 From: Panos Date: Mon, 4 Apr 2022 23:46:13 +0100 Subject: [PATCH 34/41] Added multiple available v4 and v6 addresses test --- tests/native/test_single_client.py | 55 +++++++++++++++++++++++++----- 1 file changed, 46 insertions(+), 9 deletions(-) diff --git a/tests/native/test_single_client.py b/tests/native/test_single_client.py index b99a4065..56961083 100644 --- a/tests/native/test_single_client.py +++ b/tests/native/test_single_client.py @@ -22,7 +22,7 @@ from datetime import datetime from hashlib import sha256 from tempfile import NamedTemporaryFile -from unittest.mock import MagicMock, call, patch +from unittest.mock import MagicMock, call, patch, call import pytest from gevent import sleep, spawn, Timeout as GTimeout, socket @@ -101,6 +101,9 @@ def test_ipv6(self, gsocket): host = '::1' addr_info = ('::1', self.port, 0, 0) gsocket.IPPROTO_TCP = socket.IPPROTO_TCP + gsocket.AF_INET6 = socket.AF_INET6 + gsocket.AF_INET = socket.AF_INET + gsocket.SocketKind = socket.SocketKind gsocket.socket = MagicMock() _sock = MagicMock() gsocket.socket.return_value = _sock @@ -112,16 +115,20 @@ def test_ipv6(self, gsocket): getaddrinfo.return_value = [( socket.AF_INET6, socket.SocketKind.SOCK_STREAM, socket.IPPROTO_TCP, '', addr_info)] with raises(ConnectionError): - client = SSHClient(host, port=self.port, pkey=self.user_key, - num_retries=1) + SSHClient(host, port=self.port, pkey=self.user_key, + num_retries=1) getaddrinfo.assert_called_once_with(host, self.port, proto=socket.IPPROTO_TCP) sock_con.assert_called_once_with(addr_info) @patch('pssh.clients.base.single.socket') def test_multiple_available_addr(self, gsocket): - host = '127.0.0.1' - addr_info = (host, self.port) + host = 'localhost' + ipv6_addr_info = ('::1', self.port, 0, 0) + ipv4_addr_info = ('127.0.0.1', self.port) gsocket.IPPROTO_TCP = socket.IPPROTO_TCP + gsocket.AF_INET6 = socket.AF_INET6 + gsocket.AF_INET = socket.AF_INET + gsocket.SocketKind = socket.SocketKind gsocket.socket = MagicMock() _sock = MagicMock() gsocket.socket.return_value = _sock @@ -131,14 +138,44 @@ def test_multiple_available_addr(self, gsocket): getaddrinfo = MagicMock() gsocket.getaddrinfo = getaddrinfo getaddrinfo.return_value = [ - (socket.AF_INET, socket.SocketKind.SOCK_STREAM, socket.IPPROTO_TCP, '', addr_info), - (socket.AF_INET, socket.SocketKind.SOCK_STREAM, socket.IPPROTO_TCP, '', addr_info), + (socket.AF_INET6, socket.SocketKind.SOCK_STREAM, socket.IPPROTO_TCP, '', ipv6_addr_info), + (socket.AF_INET, socket.SocketKind.SOCK_STREAM, socket.IPPROTO_TCP, '', ipv4_addr_info), ] with raises(ConnectionError): - client = SSHClient(host, port=self.port, pkey=self.user_key, - num_retries=1) + SSHClient(host, port=self.port, pkey=self.user_key, + num_retries=1) + expected_calls = [call(ipv6_addr_info), call(ipv4_addr_info)] getaddrinfo.assert_called_with(host, self.port, proto=socket.IPPROTO_TCP) assert sock_con.call_count == len(getaddrinfo.return_value) + assert sock_con.call_args_list == expected_calls + + @patch('pssh.clients.base.single.socket') + def test_multiple_available_addr_ipv6(self, gsocket): + host = 'localhost' + ipv6_addr_info = ('::1', self.port, 0, 0) + ipv4_addr_info = ('127.0.0.1', self.port) + gsocket.IPPROTO_TCP = socket.IPPROTO_TCP + gsocket.AF_INET6 = socket.AF_INET6 + gsocket.AF_INET = socket.AF_INET + gsocket.SocketKind = socket.SocketKind + gsocket.socket = MagicMock() + _sock = MagicMock() + gsocket.socket.return_value = _sock + sock_con = MagicMock() + sock_con.side_effect = ConnectionRefusedError + _sock.connect = sock_con + getaddrinfo = MagicMock() + gsocket.getaddrinfo = getaddrinfo + getaddrinfo.return_value = [ + (socket.AF_INET6, socket.SocketKind.SOCK_STREAM, socket.IPPROTO_TCP, '', ipv6_addr_info), + (socket.AF_INET, socket.SocketKind.SOCK_STREAM, socket.IPPROTO_TCP, '', ipv4_addr_info), + ] + with raises(ConnectionError): + SSHClient(host, port=self.port, pkey=self.user_key, + num_retries=1, + ipv6_only=True) + getaddrinfo.assert_called_once_with(host, self.port, proto=socket.IPPROTO_TCP) + sock_con.assert_called_once_with(ipv6_addr_info) def test_no_ipv6(self): try: From 7ab83408cc4ba8e352b7b1a0cfd6af074e2f0024 Mon Sep 17 00:00:00 2001 From: Panos Date: Mon, 4 Apr 2022 23:54:21 +0100 Subject: [PATCH 35/41] Added type checking to host config check --- pssh/clients/base/parallel.py | 6 +++--- tests/native/test_parallel_client.py | 7 ++++++- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/pssh/clients/base/parallel.py b/pssh/clients/base/parallel.py index 18b8e50d..2ed327dd 100644 --- a/pssh/clients/base/parallel.py +++ b/pssh/clients/base/parallel.py @@ -131,6 +131,9 @@ def disconnect(self): def _check_host_config(self): if self.host_config is None: return + if not isinstance(self.host_config, list): + raise HostConfigError("Host configuration of type %s is invalid - valid types are list[HostConfig]", + type(self.host_config)) host_len = len(self.hosts) if host_len != len(self.host_config): raise ValueError( @@ -289,9 +292,6 @@ def _get_host_config(self, host_i): gssapi_delegate_credentials=self.gssapi_delegate_credentials, ) return config - elif not isinstance(self.host_config, list): - raise HostConfigError("Host configuration of type %s is invalid - valid types are list[HostConfig]", - type(self.host_config)) config = self.host_config[host_i] return config diff --git a/tests/native/test_parallel_client.py b/tests/native/test_parallel_client.py index 77bb78dd..c168240e 100644 --- a/tests/native/test_parallel_client.py +++ b/tests/native/test_parallel_client.py @@ -36,7 +36,7 @@ AuthenticationException, ConnectionErrorException, \ HostArgumentException, SFTPError, SFTPIOError, Timeout, SCPError, \ PKeyFileError, ShellError, HostArgumentError, NoIPv6AddressFoundError, \ - AuthenticationError + AuthenticationError, HostConfigError from pssh.output import HostOutput from .base_ssh2_case import PKEY_FILENAME, PUB_FILE @@ -963,6 +963,11 @@ def test_host_config_bad_entries(self): self.assertRaises(ValueError, ParallelSSHClient, hosts, host_config=host_config) self.assertRaises(ValueError, ParallelSSHClient, iter(hosts), host_config=host_config) + def test_invalid_host_config(self): + hosts = ['localhost', 'localhost'] + host_config = {'localhost': HostConfig(), 'localhost2': HostConfig()} + self.assertRaises(HostConfigError, ParallelSSHClient, hosts, host_config=host_config) + def test_pssh_client_override_allow_agent_authentication(self): """Test running command with allow_agent set to False""" client = ParallelSSHClient([self.host], From f59846d1892551d9bcdd8e5995c9a31f45d3a161 Mon Sep 17 00:00:00 2001 From: Panos Date: Tue, 5 Apr 2022 00:02:02 +0100 Subject: [PATCH 36/41] Updated tests --- tests/native/test_parallel_client.py | 2 ++ tests/test_reader.py | 12 ++++++------ 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/tests/native/test_parallel_client.py b/tests/native/test_parallel_client.py index c168240e..0c7a16fb 100644 --- a/tests/native/test_parallel_client.py +++ b/tests/native/test_parallel_client.py @@ -44,6 +44,8 @@ class ParallelSSHClientTest(unittest.TestCase): + server = None + client = None @classmethod def setUpClass(cls): diff --git a/tests/test_reader.py b/tests/test_reader.py index 5f16904e..8e4c643a 100644 --- a/tests/test_reader.py +++ b/tests/test_reader.py @@ -16,12 +16,12 @@ # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA import unittest - -from random import random, randint, randrange +from random import randrange from string import ascii_letters -from gevent.queue import Queue from gevent import spawn, sleep +from gevent.queue import Queue + from pssh.clients.reader import ConcurrentRWBuffer @@ -61,16 +61,16 @@ def _writer(_buffer): data = b"".join([ascii_letters[m].encode() for m in [randrange(0, 8) for _ in range(8)]]) _buffer.write(data) written_data.put(data) - sleep(0.2) + sleep(0.05) writer = spawn(_writer, self.buffer) writer.start() - sleep(0.5) + sleep(0.1) data = self.buffer.read() _data = b"" while written_data.qsize() != 0: _data += written_data.get() self.assertEqual(data, _data) - sleep(0.5) + sleep(0.1) data = self.buffer.read() _data = b"" while written_data.qsize() != 0: From c46e81d3e22917dbc979c3b36761799a78e8a402 Mon Sep 17 00:00:00 2001 From: Panos Date: Tue, 5 Apr 2022 00:06:35 +0100 Subject: [PATCH 37/41] Cleanup imports --- pssh/clients/native/tunnel.py | 7 +------ pssh/clients/reader.py | 5 +---- 2 files changed, 2 insertions(+), 10 deletions(-) diff --git a/pssh/clients/native/tunnel.py b/pssh/clients/native/tunnel.py index 57577234..ed937d8a 100644 --- a/pssh/clients/native/tunnel.py +++ b/pssh/clients/native/tunnel.py @@ -16,12 +16,8 @@ # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA import logging - +from queue import Queue from threading import Thread, Event -try: - from queue import Queue -except ImportError: - from Queue import Queue from gevent import spawn, joinall, get_hub, sleep from gevent.server import StreamServer @@ -29,7 +25,6 @@ from ...constants import DEFAULT_RETRIES - logger = logging.getLogger(__name__) diff --git a/pssh/clients/reader.py b/pssh/clients/reader.py index 6d9ddf4f..c6b69b2a 100644 --- a/pssh/clients/reader.py +++ b/pssh/clients/reader.py @@ -15,10 +15,7 @@ # License along with this library; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA -try: - from io import BytesIO -except ImportError: - from cStringIO import StringIO as BytesIO +from io import BytesIO from gevent.event import Event from gevent.lock import RLock From 6e7ab26a4774497981f71900aedc738607475c97 Mon Sep 17 00:00:00 2001 From: Panos Date: Tue, 5 Apr 2022 00:37:45 +0100 Subject: [PATCH 38/41] Cleanup socket on failure to connect. Updated test. --- pssh/clients/native/single.py | 2 ++ pssh/clients/ssh/single.py | 2 ++ tests/native/test_tunnel.py | 7 +++---- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/pssh/clients/native/single.py b/pssh/clients/native/single.py index c86ef624..3eca83b6 100644 --- a/pssh/clients/native/single.py +++ b/pssh/clients/native/single.py @@ -230,6 +230,8 @@ def _init_session(self, retries=1): return self._connect_init_session_retry(retries=retries+1) msg = "Error connecting to host %s:%s - %s" logger.error(msg, self.host, self.port, ex) + if not self.sock.closed: + self.sock.close() if isinstance(ex, SSH2Timeout): raise Timeout(msg, self.host, self.port, ex) raise diff --git a/pssh/clients/ssh/single.py b/pssh/clients/ssh/single.py index 428cd05d..89d95e2d 100644 --- a/pssh/clients/ssh/single.py +++ b/pssh/clients/ssh/single.py @@ -160,6 +160,8 @@ def _init_session(self, retries=1): return self._connect_init_session_retry(retries=retries+1) msg = "Error connecting to host %s:%s - %s" logger.error(msg, self.host, self.port, ex) + if not self.sock.closed: + self.sock.close() raise ex def _session_connect(self): diff --git a/tests/native/test_tunnel.py b/tests/native/test_tunnel.py index 8137ddd9..da4c7d4a 100644 --- a/tests/native/test_tunnel.py +++ b/tests/native/test_tunnel.py @@ -15,14 +15,13 @@ # License along with this library; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA -import gc import os -import time import unittest from datetime import datetime from getpass import getuser -from sys import version_info +import gc +import time from gevent import sleep, spawn, Timeout as GTimeout from ssh2.exceptions import SocketSendError, SocketRecvError @@ -39,7 +38,7 @@ class TunnelTest(unittest.TestCase): @classmethod def setUpClass(cls): - _mask = int('0600') if version_info <= (2,) else 0o600 + _mask = 0o600 os.chmod(PKEY_FILENAME, _mask) cls.port = 2225 cls.cmd = 'echo me' From 7a4a90f9308e64dfea155fcdd7e0a0ee34f8defc Mon Sep 17 00:00:00 2001 From: Panos Date: Tue, 5 Apr 2022 00:48:30 +0100 Subject: [PATCH 39/41] Updated tests --- tests/ssh/test_single_client.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/tests/ssh/test_single_client.py b/tests/ssh/test_single_client.py index b109ba1f..9dda6006 100644 --- a/tests/ssh/test_single_client.py +++ b/tests/ssh/test_single_client.py @@ -51,6 +51,7 @@ def test_execute(self): stderr = list(host_out.stderr) expected = [self.resp] self.assertEqual(expected, output) + self.assertEqual(len(stderr), 0) exit_code = host_out.channel.get_exit_status() self.assertEqual(exit_code, 0) @@ -186,11 +187,11 @@ def test_identity_auth_failure(self): def test_password_auth_failure(self): try: - client = SSHClient(self.host, port=self.port, num_retries=1, - allow_agent=False, - identity_auth=False, - password='blah blah blah', - ) + SSHClient(self.host, port=self.port, num_retries=1, + allow_agent=False, + identity_auth=False, + password='blah blah blah', + ) except AuthenticationException as ex: self.assertIsInstance(ex.args[3], AuthenticationDenied) else: @@ -229,7 +230,7 @@ def test_open_session_timeout(self): num_retries=2, timeout=.1) - def _session(timeout=None): + def _session(_=None): sleep(.2) client.open_session = _session self.assertRaises(GTimeout, client.run_command, self.cmd) @@ -244,6 +245,7 @@ def test_client_read_timeout(self): def test_open_session_exc(self): class Error(Exception): pass + def _session(): raise Error client = SSHClient(self.host, port=self.port, @@ -255,6 +257,7 @@ def _session(): def test_session_connect_exc(self): class Error(Exception): pass + def _con(): raise Error client = SSHClient(self.host, port=self.port, @@ -282,8 +285,10 @@ def test_no_auth(self): def test_agent_auth_failure(self): class UnknownError(Exception): pass + def _agent_auth_unk(): raise UnknownError + def _agent_auth_agent_err(): raise AuthenticationDenied client = SSHClient(self.host, port=self.port, @@ -316,6 +321,7 @@ def _agent_auth(): def test_disconnect_exc(self): class DiscError(Exception): pass + def _disc(): raise DiscError client = SSHClient(self.host, port=self.port, From 648c7fcc5ce7e5019fe877233b203192c611f1c8 Mon Sep 17 00:00:00 2001 From: Panos Date: Tue, 5 Apr 2022 00:53:02 +0100 Subject: [PATCH 40/41] Updated tests --- tests/ssh/test_parallel_client.py | 29 +++++++++-------------------- 1 file changed, 9 insertions(+), 20 deletions(-) diff --git a/tests/ssh/test_parallel_client.py b/tests/ssh/test_parallel_client.py index 401fd791..b922dee3 100644 --- a/tests/ssh/test_parallel_client.py +++ b/tests/ssh/test_parallel_client.py @@ -33,6 +33,8 @@ class LibSSHParallelTest(unittest.TestCase): + server = None + client = None @classmethod def setUpClass(cls): @@ -237,12 +239,7 @@ def test_pssh_client_hosts_list_part_failure(self): self.assertTrue(output[1].exception is not None) self.assertEqual(output[1].host, hosts[1]) self.assertEqual(output[1].exception.args[-2], hosts[1]) - try: - raise output[1].exception - except ConnectionErrorException: - pass - else: - raise Exception("Expected ConnectionError, got %s instead" % (output[1].exception,)) + self.assertIsInstance(output[1].exception, ConnectionErrorException) def test_pssh_client_timeout(self): # 1ms timeout @@ -316,13 +313,12 @@ def test_connection_error_exception(self): client.join(output) self.assertIsInstance(output[0].exception, ConnectionErrorException) self.assertEqual(output[0].host, host) + self.assertIsInstance(output[0].exception, ConnectionErrorException) try: raise output[0].exception except ConnectionErrorException as ex: self.assertEqual(ex.args[-2], host) self.assertEqual(ex.args[-1], port) - else: - raise Exception("Expected ConnectionErrorException") def test_bad_pkey_path(self): self.assertRaises(PKeyFileError, ParallelSSHClient, [self.host], port=self.port, @@ -334,12 +330,9 @@ def test_multiple_single_quotes_in_cmd(self): output = self.client.run_command("echo 'me' 'and me'") stdout = list(output[0].stdout) expected = 'me and me' - self.assertTrue(len(stdout)==1, - msg="Got incorrect number of lines in output - %s" % (stdout,)) + self.assertTrue(len(stdout) == 1) self.assertEqual(output[0].exit_code, 0) - self.assertEqual(expected, stdout[0], - msg="Got unexpected output. Expected %s, got %s" % ( - expected, stdout[0],)) + self.assertEqual(expected, stdout[0]) def test_backtics_in_cmd(self): """Test running command with backtics in it""" @@ -353,9 +346,7 @@ def test_multiple_shell_commands(self): stdout = list(output[0].stdout) expected = ["me", "and", "me"] self.assertEqual(output[0].exit_code, 0) - self.assertEqual(expected, stdout, - msg="Got unexpected output. Expected %s, got %s" % ( - expected, stdout,)) + self.assertEqual(expected, stdout) def test_escaped_quotes(self): """Test escaped quotes in shell variable are handled correctly""" @@ -363,9 +354,7 @@ def test_escaped_quotes(self): stdout = list(output[0].stdout) expected = ['--flags="this"'] self.assertEqual(output[0].exit_code, 0) - self.assertEqual(expected, stdout, - msg="Got unexpected output. Expected %s, got %s" % ( - expected, stdout,)) + self.assertEqual(expected, stdout) def test_read_timeout(self): client = ParallelSSHClient([self.host], port=self.port, @@ -392,7 +381,7 @@ def test_timeout_file_read(self): self.assertRaises(Timeout, self.client.join, output, timeout=.1) for host_out in output: try: - for line in host_out.stdout: + for _ in host_out.stdout: pass except Timeout: pass From 541f133674627a4433f7238ee53d200cb5f430fa Mon Sep 17 00:00:00 2001 From: Panos Date: Tue, 5 Apr 2022 01:15:29 +0100 Subject: [PATCH 41/41] Cleanup tests --- tests/native/test_parallel_client.py | 25 +++++++++---------------- 1 file changed, 9 insertions(+), 16 deletions(-) diff --git a/tests/native/test_parallel_client.py b/tests/native/test_parallel_client.py index 0c7a16fb..38bac397 100644 --- a/tests/native/test_parallel_client.py +++ b/tests/native/test_parallel_client.py @@ -148,12 +148,7 @@ def test_client_shells_join_timeout(self): """ self.client.run_shell_commands(shells, cmd) self.assertRaises(Timeout, self.client.join_shells, shells, timeout=.1) - try: - self.client.join_shells(shells, timeout=.1) - except Timeout: - pass - else: - raise AssertionError + self.assertRaises(Timeout, self.client.join_shells, shells, timeout=.1) self.client.join_shells(shells, timeout=1) stdout = list(shells[0].stdout) self.assertListEqual(stdout, [self.resp, self.resp]) @@ -1124,7 +1119,7 @@ def test_pty(self): expected_stdout = [] # With a PTY, stdout and stderr are combined into stdout self.assertEqual(expected_stderr, stdout) - self.assertEqual([], stderr) + self.assertEqual(expected_stdout, stderr) self.assertTrue(exit_code == 0) def test_output_attributes(self): @@ -1175,7 +1170,7 @@ def test_retries(self): client = ParallelSSHClient(['127.0.0.100'], port=self.port, num_retries=2, retry_delay=.1) self.assertRaises(ConnectionErrorException, client.run_command, self.cmd) - host = ''.join([random.choice(string.ascii_letters) for n in range(8)]) + host = ''.join([random.choice(string.ascii_letters) for _ in range(8)]) client.hosts = [host] self.assertRaises(UnknownHostException, client.run_command, self.cmd) @@ -1242,7 +1237,7 @@ def test_setting_hosts(self): def test_unknown_host_failure(self): """Test connection error failure case - ConnectionErrorException""" - host = ''.join([random.choice(string.ascii_letters) for n in range(8)]) + host = ''.join([random.choice(string.ascii_letters) for _ in range(8)]) client = ParallelSSHClient([host], port=self.port, num_retries=1) self.assertRaises(UnknownHostException, client.run_command, self.cmd) @@ -1312,7 +1307,7 @@ def test_read_timeout(self): self.assertFalse(client.finished(output)) client.join(output) # import ipdb; ipdb.set_trace() - for host_out in output: + for _ in output: stdout = list(output[0].stdout) self.assertEqual(len(stdout), 3) self.assertTrue(client.finished(output)) @@ -1399,7 +1394,7 @@ def test_timeout_file_read(self): self.assertRaises(Timeout, self.client.join, output, timeout=.1) for host_out in output: try: - for line in host_out.stdout: + for _ in host_out.stdout: pass except Timeout: pass @@ -1435,7 +1430,6 @@ def test_scp_send_dir(self): with open(local_filename, 'w') as file_h: file_h.writelines([test_file_data + os.linesep]) remote_filename = os.path.sep.join([remote_test_dir, remote_filepath]) - remote_file_abspath = os.path.expanduser('~/' + remote_filename) remote_test_dir_abspath = os.path.expanduser('~/' + remote_test_dir) try: cmds = self.client.scp_send(local_filename, remote_filename) @@ -1551,7 +1545,7 @@ def test_scp_bad_copy_args(self): def test_scp_send_exc(self): client = ParallelSSHClient([self.host], pkey=self.user_key, num_retries=1) - def _scp_send(*args): + def _scp_send(*_): raise Exception client._scp_send = _scp_send cmds = client.scp_send('local_file', 'remote_file') @@ -1560,7 +1554,7 @@ def _scp_send(*args): def test_scp_recv_exc(self): client = ParallelSSHClient([self.host], pkey=self.user_key, num_retries=1) - def _scp_recv(*args): + def _scp_recv(*_): raise Exception client._scp_recv = _scp_recv @@ -1814,7 +1808,7 @@ def read_stream_dt(self, host_out, stream, read_timeout): now = datetime.now() timed_out = False try: - for line in stream: + for _ in stream: pass except Timeout: timed_out = True @@ -1885,7 +1879,6 @@ def test_read_stdout_timeout_stderr_no_timeout(self): self.assertTrue(dt.total_seconds() < read_timeout) def test_read_multi_same_hosts(self): - hosts = [self.host, self.host] outputs = [ self.client.run_command(self.cmd), self.client.run_command(self.cmd),