-
-
Notifications
You must be signed in to change notification settings - Fork 952
Add ExecuteAsync, Fix CancelAsync Deadlock #1343
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,7 @@ | |
using System.Runtime.ExceptionServices; | ||
using System.Text; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
|
||
using Renci.SshNet.Abstractions; | ||
using Renci.SshNet.Channels; | ||
|
@@ -26,6 +27,7 @@ public class SshCommand : IDisposable | |
private CommandAsyncResult _asyncResult; | ||
private AsyncCallback _callback; | ||
private EventWaitHandle _sessionErrorOccuredWaitHandle; | ||
private EventWaitHandle _commmandCancelledWaitHandle; | ||
private Exception _exception; | ||
private StringBuilder _result; | ||
private StringBuilder _error; | ||
|
@@ -105,56 +107,72 @@ public Stream CreateInputStream() | |
/// <summary> | ||
/// Gets the command execution result. | ||
/// </summary> | ||
#pragma warning disable S1133 // Deprecated code should be removed | ||
[Obsolete("Please read the result from the OutputStream. I.e. new StreamReader(shell.OutputStream).ReadToEnd().")] | ||
#pragma warning disable S1133 // Deprecated code should be removed | ||
public string Result | ||
{ | ||
get | ||
{ | ||
_result ??= new StringBuilder(); | ||
return GetResult(); | ||
} | ||
} | ||
|
||
if (OutputStream != null && OutputStream.Length > 0) | ||
internal string GetResult() | ||
{ | ||
_result ??= new StringBuilder(); | ||
|
||
if (OutputStream != null && OutputStream.Length > 0) | ||
{ | ||
using (var sr = new StreamReader(OutputStream, | ||
_encoding, | ||
detectEncodingFromByteOrderMarks: true, | ||
bufferSize: 1024, | ||
leaveOpen: true)) | ||
{ | ||
using (var sr = new StreamReader(OutputStream, | ||
_encoding, | ||
detectEncodingFromByteOrderMarks: true, | ||
bufferSize: 1024, | ||
leaveOpen: true)) | ||
{ | ||
_ = _result.Append(sr.ReadToEnd()); | ||
} | ||
_ = _result.Append(sr.ReadToEnd()); | ||
} | ||
|
||
return _result.ToString(); | ||
} | ||
|
||
return _result.ToString(); | ||
} | ||
|
||
/// <summary> | ||
/// Gets the command execution error. | ||
/// </summary> | ||
#pragma warning disable S1133 // Deprecated code should be removed | ||
[Obsolete("Please read the error result from the ExtendedOutputStream. I.e. new StreamReader(shell.ExtendedOutputStream).ReadToEnd().")] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why should we make it obsolete? |
||
#pragma warning disable S1133 // Deprecated code should be removed | ||
public string Error | ||
{ | ||
get | ||
{ | ||
if (_hasError) | ||
{ | ||
_error ??= new StringBuilder(); | ||
return GetError(); | ||
} | ||
} | ||
|
||
if (ExtendedOutputStream != null && ExtendedOutputStream.Length > 0) | ||
internal string GetError() | ||
{ | ||
if (_hasError) | ||
{ | ||
_error ??= new StringBuilder(); | ||
|
||
if (ExtendedOutputStream != null && ExtendedOutputStream.Length > 0) | ||
{ | ||
using (var sr = new StreamReader(ExtendedOutputStream, | ||
_encoding, | ||
detectEncodingFromByteOrderMarks: true, | ||
bufferSize: 1024, | ||
leaveOpen: true)) | ||
{ | ||
using (var sr = new StreamReader(ExtendedOutputStream, | ||
_encoding, | ||
detectEncodingFromByteOrderMarks: true, | ||
bufferSize: 1024, | ||
leaveOpen: true)) | ||
{ | ||
_ = _error.Append(sr.ReadToEnd()); | ||
} | ||
_ = _error.Append(sr.ReadToEnd()); | ||
} | ||
|
||
return _error.ToString(); | ||
} | ||
|
||
return string.Empty; | ||
return _error.ToString(); | ||
} | ||
|
||
return string.Empty; | ||
} | ||
|
||
/// <summary> | ||
|
@@ -186,6 +204,7 @@ internal SshCommand(ISession session, string commandText, Encoding encoding) | |
_encoding = encoding; | ||
CommandTimeout = Session.InfiniteTimeSpan; | ||
_sessionErrorOccuredWaitHandle = new AutoResetEvent(initialState: false); | ||
_commmandCancelledWaitHandle = new AutoResetEvent(initialState: false); | ||
|
||
_session.Disconnected += Session_Disconnected; | ||
_session.ErrorOccured += Session_ErrorOccured; | ||
|
@@ -348,21 +367,109 @@ public string EndExecute(IAsyncResult asyncResult) | |
_channel = null; | ||
|
||
commandAsyncResult.EndCalled = true; | ||
|
||
#pragma warning disable CS0618 | ||
return Result; | ||
#pragma warning disable CS0618 | ||
} | ||
} | ||
|
||
/// <summary> | ||
/// Cancels command execution in asynchronous scenarios. | ||
/// Waits for the pending asynchronous command execution to complete. | ||
/// </summary> | ||
public void CancelAsync() | ||
/// <param name="asyncResult">The reference to the pending asynchronous request to finish.</param> | ||
/// <returns>Command execution exit status.</returns> | ||
/// <example> | ||
/// <code source="..\..\src\Renci.SshNet.Tests\Classes\SshCommandTest.cs" region="Example SshCommand CreateCommand BeginExecute IsCompleted EndExecute" language="C#" title="Asynchronous Command Execution" /> | ||
/// </example> | ||
/// <exception cref="ArgumentException">Either the IAsyncResult object did not come from the corresponding async method on this type, or EndExecute was called multiple times with the same IAsyncResult.</exception> | ||
/// <exception cref="ArgumentNullException"><paramref name="asyncResult"/> is <c>null</c>.</exception> | ||
public int EndExecuteWithStatus(IAsyncResult asyncResult) | ||
{ | ||
if (_channel is not null && _channel.IsOpen && _asyncResult is not null) | ||
if (asyncResult == null) | ||
{ | ||
// TODO: check with Oleg if we shouldn't dispose the channel and uninitialize it ? | ||
_channel.Dispose(); | ||
throw new ArgumentNullException(nameof(asyncResult)); | ||
} | ||
|
||
var commandAsyncResult = asyncResult switch | ||
{ | ||
CommandAsyncResult result when result == _asyncResult => result, | ||
_ => throw new ArgumentException( | ||
$"The {nameof(IAsyncResult)} object was not returned from the corresponding asynchronous method on this class.") | ||
}; | ||
|
||
lock (_endExecuteLock) | ||
{ | ||
if (commandAsyncResult.EndCalled) | ||
{ | ||
throw new ArgumentException("EndExecute can only be called once for each asynchronous operation."); | ||
} | ||
|
||
// wait for operation to complete (or time out) | ||
WaitOnHandle(_asyncResult.AsyncWaitHandle); | ||
UnsubscribeFromEventsAndDisposeChannel(_channel); | ||
_channel = null; | ||
|
||
commandAsyncResult.EndCalled = true; | ||
|
||
return ExitStatus; | ||
} | ||
} | ||
|
||
/// <summary> | ||
/// Executes the the command asynchronously. | ||
/// </summary> | ||
/// <returns>Exit status of the operation.</returns> | ||
public Task<int> ExecuteAsync() | ||
{ | ||
return ExecuteAsync(forceKill: false, default); | ||
} | ||
|
||
/// <summary> | ||
/// Executes the the command asynchronously. | ||
/// </summary> | ||
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to observe.</param> | ||
/// <returns>Exit status of the operation.</returns> | ||
public Task<int> ExecuteAsync(CancellationToken cancellationToken) | ||
{ | ||
return ExecuteAsync(forceKill: false, cancellationToken); | ||
} | ||
|
||
/// <summary> | ||
/// Executes the the command asynchronously. | ||
/// </summary> | ||
/// <param name="forceKill">if true send SIGKILL instead of SIGTERM to cancel the command.</param> | ||
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to observe.</param> | ||
/// <returns>Exit status of the operation.</returns> | ||
public async Task<int> ExecuteAsync(bool forceKill, CancellationToken cancellationToken) | ||
{ | ||
#if NET || NETSTANDARD2_1_OR_GREATER | ||
await using var ctr = cancellationToken.Register(() => CancelAsync(forceKill), useSynchronizationContext: false).ConfigureAwait(continueOnCapturedContext: false); | ||
#else | ||
using var ctr = cancellationToken.Register(() => CancelAsync(forceKill), useSynchronizationContext: false); | ||
#endif // NET || NETSTANDARD2_1_OR_GREATER | ||
|
||
try | ||
{ | ||
var status = await Task<int>.Factory.FromAsync(BeginExecute(), EndExecuteWithStatus).ConfigureAwait(false); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I want to add I know that your approach is easier to implement, but this is similar to nuget: https://www.nuget.org/packages/Renci.SshNet.Async#versions-body-tab You can find code here: https://github.com/JohnTheGr8/Renci.SshNet.Async/blob/master/Renci.SshNet.Async/SshNetExtensions.cs |
||
cancellationToken.ThrowIfCancellationRequested(); | ||
|
||
return status; | ||
} | ||
catch (Exception) when (cancellationToken.IsCancellationRequested) | ||
{ | ||
throw new OperationCanceledException("Command execution has been cancelled.", cancellationToken); | ||
} | ||
} | ||
|
||
/// <summary> | ||
/// Cancels command execution in asynchronous scenarios. | ||
/// </summary> | ||
/// <param name="forceKill">if true send SIGKILL instead of SIGTERM.</param> | ||
public void CancelAsync(bool forceKill = false) | ||
{ | ||
var signal = forceKill ? "KILL" : "TERM"; | ||
_ = _channel?.SendExitSignalRequest(signal, coreDumped: false, "Command execution has been cancelled.", "en"); | ||
_ = _commmandCancelledWaitHandle.Set(); | ||
} | ||
|
||
/// <summary> | ||
|
@@ -506,6 +613,7 @@ private void WaitOnHandle(WaitHandle waitHandle) | |
var waitHandles = new[] | ||
{ | ||
_sessionErrorOccuredWaitHandle, | ||
_commmandCancelledWaitHandle, | ||
waitHandle | ||
}; | ||
|
||
|
@@ -515,7 +623,8 @@ private void WaitOnHandle(WaitHandle waitHandle) | |
case 0: | ||
ExceptionDispatchInfo.Capture(_exception).Throw(); | ||
break; | ||
case 1: | ||
case 1: // Command cancelled | ||
case 2: | ||
// Specified waithandle was signaled | ||
break; | ||
case WaitHandle.WaitTimeout: | ||
|
@@ -620,6 +729,9 @@ protected virtual void Dispose(bool disposing) | |
_sessionErrorOccuredWaitHandle = null; | ||
} | ||
|
||
_commmandCancelledWaitHandle?.Dispose(); | ||
_commmandCancelledWaitHandle = null; | ||
|
||
_isDisposed = true; | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why should we make it obsolete?