-
-
Notifications
You must be signed in to change notification settings - Fork 952
Significantly improve performance of ShellStream's Expect methods #1207
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
b90440b
09032b7
51b7067
bbfc0b8
3d810b4
68d4c29
4420a09
947f1b0
78c5cc6
6b70eff
b9bf1f4
25526b6
24b1684
3cb7f54
7fa8061
03aee34
e9f65c2
7513247
acc22f9
47ded04
3a97875
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,8 @@ public class ShellStream : Stream | |
private readonly Encoding _encoding; | ||
private readonly int _bufferSize; | ||
private readonly Queue<byte> _incoming; | ||
private readonly int _expectSize; | ||
private readonly Queue<byte> _expect; | ||
private readonly Queue<byte> _outgoing; | ||
private IChannelSession _channel; | ||
private AutoResetEvent _dataReceived = new AutoResetEvent(initialState: false); | ||
|
@@ -76,15 +78,28 @@ internal int BufferSize | |
/// <param name="height">The terminal height in pixels.</param> | ||
/// <param name="terminalModeValues">The terminal mode values.</param> | ||
/// <param name="bufferSize">The size of the buffer.</param> | ||
/// <param name="expectSize">The size of the expect buffer.</param> | ||
/// <exception cref="SshException">The channel could not be opened.</exception> | ||
/// <exception cref="SshException">The pseudo-terminal request was not accepted by the server.</exception> | ||
/// <exception cref="SshException">The request to start a shell was not accepted by the server.</exception> | ||
internal ShellStream(ISession session, string terminalName, uint columns, uint rows, uint width, uint height, IDictionary<TerminalModes, uint> terminalModeValues, int bufferSize) | ||
internal ShellStream(ISession session, string terminalName, uint columns, uint rows, uint width, uint height, IDictionary<TerminalModes, uint> terminalModeValues, int bufferSize, int expectSize) | ||
{ | ||
if (bufferSize <= 0) | ||
{ | ||
throw new ArgumentException($"{nameof(bufferSize)} must be between 1 and {int.MaxValue}."); | ||
} | ||
|
||
if (expectSize <= 0) | ||
{ | ||
throw new ArgumentException($"{nameof(expectSize)} must be between 1 and {int.MaxValue}."); | ||
} | ||
|
||
_encoding = session.ConnectionInfo.Encoding; | ||
_session = session; | ||
_bufferSize = bufferSize; | ||
_incoming = new Queue<byte>(); | ||
_expectSize = expectSize; | ||
_expect = new Queue<byte>(_expectSize); | ||
_outgoing = new Queue<byte>(); | ||
|
||
_channel = _session.CreateChannelSession(); | ||
|
@@ -248,35 +263,40 @@ public void Expect(params ExpectAction[] expectActions) | |
public void Expect(TimeSpan timeout, params ExpectAction[] expectActions) | ||
{ | ||
var expectedFound = false; | ||
var text = string.Empty; | ||
var matchText = string.Empty; | ||
|
||
do | ||
{ | ||
lock (_incoming) | ||
{ | ||
if (_incoming.Count > 0) | ||
if (_expect.Count > 0) | ||
{ | ||
text = _encoding.GetString(_incoming.ToArray(), 0, _incoming.Count); | ||
jscarle marked this conversation as resolved.
Show resolved
Hide resolved
|
||
matchText = _encoding.GetString(_expect.ToArray(), 0, _expect.Count); | ||
} | ||
|
||
if (text.Length > 0) | ||
if (matchText.Length > 0) | ||
{ | ||
foreach (var expectAction in expectActions) | ||
{ | ||
var match = expectAction.Expect.Match(text); | ||
var match = expectAction.Expect.Match(matchText); | ||
|
||
if (match.Success) | ||
{ | ||
var result = text.Substring(0, match.Index + match.Length); | ||
var charCount = _encoding.GetByteCount(result); | ||
var returnText = matchText.Substring(0, match.Index + match.Length); | ||
var returnLength = _encoding.GetByteCount(returnText); | ||
|
||
for (var i = 0; i < charCount && _incoming.Count > 0; i++) | ||
// Remove processed items from the queue | ||
for (var i = 0; i < returnLength && _incoming.Count > 0; i++) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this actually going to remove what it should from both If Then we are going to dequeue
Have I got that right? Is that expected? (it feels wrong) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm going to add a test to try to replicate this and make sure its accounted for. I have a feeling that the dequeueing may be slightly off since for things to happen as you've mentioned, data would have to accumulate and be read in different ways within the same workflow. |
||
{ | ||
// Remove processed items from the queue | ||
if (_expect.Count == _incoming.Count) | ||
{ | ||
_ = _expect.Dequeue(); | ||
} | ||
|
||
_ = _incoming.Dequeue(); | ||
} | ||
|
||
expectAction.Action(result); | ||
expectAction.Action(returnText); | ||
expectedFound = true; | ||
} | ||
} | ||
|
@@ -349,27 +369,33 @@ public string Expect(Regex regex) | |
/// </returns> | ||
public string Expect(Regex regex, TimeSpan timeout) | ||
{ | ||
var result = string.Empty; | ||
var matchText = string.Empty; | ||
string returnText; | ||
|
||
while (true) | ||
{ | ||
lock (_incoming) | ||
{ | ||
if (_incoming.Count > 0) | ||
if (_expect.Count > 0) | ||
{ | ||
result = _encoding.GetString(_incoming.ToArray(), 0, _incoming.Count); | ||
matchText = _encoding.GetString(_expect.ToArray(), 0, _expect.Count); | ||
} | ||
|
||
var match = regex.Match(result); | ||
var match = regex.Match(matchText); | ||
|
||
if (match.Success) | ||
{ | ||
result = result.Substring(0, match.Index + match.Length); | ||
var charCount = _encoding.GetByteCount(result); | ||
returnText = matchText.Substring(0, match.Index + match.Length); | ||
var returnLength = _encoding.GetByteCount(returnText); | ||
|
||
// Remove processed items from the queue | ||
for (var i = 0; i < charCount && _incoming.Count > 0; i++) | ||
for (var i = 0; i < returnLength && _incoming.Count > 0; i++) | ||
{ | ||
if (_expect.Count == _incoming.Count) | ||
{ | ||
_ = _expect.Dequeue(); | ||
} | ||
|
||
_ = _incoming.Dequeue(); | ||
} | ||
|
||
|
@@ -390,7 +416,7 @@ public string Expect(Regex regex, TimeSpan timeout) | |
} | ||
} | ||
|
||
return result; | ||
return returnText; | ||
} | ||
|
||
/// <summary> | ||
|
@@ -446,7 +472,8 @@ public IAsyncResult BeginExpect(AsyncCallback callback, object state, params Exp | |
public IAsyncResult BeginExpect(TimeSpan timeout, AsyncCallback callback, object state, params ExpectAction[] expectActions) | ||
#pragma warning restore CA1859 // Use concrete types when possible for improved performance | ||
{ | ||
var text = string.Empty; | ||
var matchText = string.Empty; | ||
string returnText; | ||
|
||
// Create new AsyncResult object | ||
var asyncResult = new ExpectAsyncResult(callback, state); | ||
|
@@ -461,31 +488,36 @@ public IAsyncResult BeginExpect(TimeSpan timeout, AsyncCallback callback, object | |
{ | ||
lock (_incoming) | ||
{ | ||
if (_incoming.Count > 0) | ||
if (_expect.Count > 0) | ||
{ | ||
text = _encoding.GetString(_incoming.ToArray(), 0, _incoming.Count); | ||
matchText = _encoding.GetString(_expect.ToArray(), 0, _expect.Count); | ||
} | ||
|
||
if (text.Length > 0) | ||
if (matchText.Length > 0) | ||
{ | ||
foreach (var expectAction in expectActions) | ||
{ | ||
var match = expectAction.Expect.Match(text); | ||
var match = expectAction.Expect.Match(matchText); | ||
|
||
if (match.Success) | ||
{ | ||
var result = text.Substring(0, match.Index + match.Length); | ||
var charCount = _encoding.GetByteCount(result); | ||
returnText = matchText.Substring(0, match.Index + match.Length); | ||
var returnLength = _encoding.GetByteCount(returnText); | ||
|
||
for (var i = 0; i < match.Index + match.Length && _incoming.Count > 0; i++) | ||
// Remove processed items from the queue | ||
for (var i = 0; i < returnLength && _incoming.Count > 0; i++) | ||
{ | ||
// Remove processed items from the queue | ||
if (_expect.Count == _incoming.Count) | ||
{ | ||
_ = _expect.Dequeue(); | ||
} | ||
|
||
_ = _incoming.Dequeue(); | ||
} | ||
|
||
expectAction.Action(result); | ||
expectAction.Action(returnText); | ||
callback?.Invoke(asyncResult); | ||
expectActionResult = result; | ||
expectActionResult = returnText; | ||
} | ||
} | ||
} | ||
|
@@ -584,6 +616,11 @@ public string ReadLine(TimeSpan timeout) | |
// remove processed bytes from the queue | ||
for (var i = 0; i < bytesProcessed; i++) | ||
{ | ||
if (_expect.Count == _incoming.Count) | ||
{ | ||
_ = _expect.Dequeue(); | ||
} | ||
|
||
_ = _incoming.Dequeue(); | ||
} | ||
|
||
|
@@ -620,6 +657,7 @@ public string Read() | |
lock (_incoming) | ||
{ | ||
text = _encoding.GetString(_incoming.ToArray(), 0, _incoming.Count); | ||
_expect.Clear(); | ||
_incoming.Clear(); | ||
} | ||
|
||
|
@@ -649,6 +687,11 @@ public override int Read(byte[] buffer, int offset, int count) | |
{ | ||
for (; i < count && _incoming.Count > 0; i++) | ||
{ | ||
if (_expect.Count == _incoming.Count) | ||
{ | ||
_ = _expect.Dequeue(); | ||
} | ||
|
||
buffer[offset + i] = _incoming.Dequeue(); | ||
} | ||
} | ||
|
@@ -800,6 +843,12 @@ private void Channel_DataReceived(object sender, ChannelDataEventArgs e) | |
foreach (var b in e.Data) | ||
{ | ||
_incoming.Enqueue(b); | ||
if (_expect.Count == _expectSize) | ||
{ | ||
_ = _expect.Dequeue(); | ||
} | ||
|
||
_expect.Enqueue(b); | ||
} | ||
} | ||
|
||
|
Uh oh!
There was an error while loading. Please reload this page.