-
Notifications
You must be signed in to change notification settings - Fork 17
fix lost connection and reconnect automatically #83
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 8 commits
d384023
497f446
8252cf0
a6efd0b
3e9ae2b
2ba2d78
5ad8b6b
e98e31f
a2a8b42
4be0d2c
5c398f7
2df6fa1
d64e990
e2f4d32
05f7d20
e3d9fa7
8593122
a2c4c9a
8651c7e
f919ed5
1818693
e327efb
92554bd
09901fe
20cac5c
1e10433
6ea4b2a
0497cfa
968beb8
9cd0659
81d7857
6e40a4d
2159793
ddd59f4
0750eee
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,16 +8,18 @@ | |
|
|
||
| namespace ProGaudi.Tarantool.Client | ||
| { | ||
| public interface ILogicalConnection | ||
| using System; | ||
|
|
||
| public interface ILogicalConnection : IDisposable | ||
| { | ||
| Task Connect(); | ||
|
|
||
| bool IsConnected(); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There should be state, not only boolean flag. |
||
|
|
||
| Task SendRequestWithEmptyResponse<TRequest>(TRequest request) | ||
| where TRequest : IRequest; | ||
|
|
||
| Task<DataResponse<TResponse[]>> SendRequest<TRequest, TResponse>(TRequest request) | ||
| where TRequest : IRequest; | ||
|
|
||
| TaskCompletionSource<MemoryStream> PopResponseCompletionSource(RequestId requestId, MemoryStream resultStream); | ||
|
|
||
| IEnumerable<TaskCompletionSource<MemoryStream>> PopAllResponseCompletionSources(); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,8 +2,19 @@ | |
|
|
||
| namespace ProGaudi.Tarantool.Client | ||
| { | ||
| using System.IO; | ||
| using System.Threading.Tasks; | ||
|
|
||
| using ProGaudi.Tarantool.Client.Model; | ||
|
|
||
| public interface IResponseReader : IDisposable | ||
| { | ||
| void BeginReading(); | ||
|
|
||
| Task<MemoryStream> GetResponseTask(RequestId requestId); | ||
|
|
||
| void SetFaultedState(); | ||
|
|
||
| bool IsFaultedState { get; } | ||
|
||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,20 +20,73 @@ internal class LogicalConnection : ILogicalConnection | |
| { | ||
| private readonly MsgPackContext _msgPackContext; | ||
|
|
||
| private readonly INetworkStreamPhysicalConnection _physicalConnection; | ||
| private readonly ClientOptions _clientOptions; | ||
|
|
||
| private long _currentRequestId; | ||
| private readonly RequestIdCounter _requestIdCounter; | ||
|
|
||
| private readonly ConcurrentDictionary<RequestId, TaskCompletionSource<MemoryStream>> _pendingRequests = | ||
| new ConcurrentDictionary<RequestId, TaskCompletionSource<MemoryStream>>(); | ||
| private INetworkStreamPhysicalConnection _physicalConnection; | ||
|
|
||
| private IResponseReader _responseReader; | ||
|
|
||
| private readonly ILog _logWriter; | ||
|
|
||
| public LogicalConnection(ClientOptions options, INetworkStreamPhysicalConnection physicalConnection) | ||
| public LogicalConnection(ClientOptions options, RequestIdCounter requestIdCounter) | ||
| { | ||
| _clientOptions = options; | ||
| _requestIdCounter = requestIdCounter; | ||
| _msgPackContext = options.MsgPackContext; | ||
| _logWriter = options.LogWriter; | ||
| _physicalConnection = physicalConnection; | ||
| } | ||
|
|
||
| public void Dispose() | ||
| { | ||
| Interlocked.Exchange(ref _responseReader, null)?.Dispose(); | ||
| Interlocked.Exchange(ref _physicalConnection, null)?.Dispose(); | ||
| } | ||
|
|
||
| public async Task Connect() | ||
| { | ||
| _physicalConnection = new NetworkStreamPhysicalConnection(); | ||
| await _physicalConnection.Connect(_clientOptions); | ||
|
|
||
| var greetingsResponseBytes = new byte[128]; | ||
| var readCount = await _physicalConnection.ReadAsync(greetingsResponseBytes, 0, greetingsResponseBytes.Length); | ||
| if (readCount != greetingsResponseBytes.Length) | ||
| { | ||
| throw ExceptionHelper.UnexpectedGreetingBytesCount(readCount); | ||
| } | ||
|
|
||
| var greetings = new GreetingsResponse(greetingsResponseBytes); | ||
|
|
||
| _clientOptions.LogWriter?.WriteLine($"Greetings received, salt is {Convert.ToBase64String(greetings.Salt)} ."); | ||
|
|
||
| _responseReader = new ResponseReader(_clientOptions, _physicalConnection); | ||
| _responseReader.BeginReading(); | ||
|
|
||
| _clientOptions.LogWriter?.WriteLine("Server responses reading started."); | ||
|
|
||
| await LoginIfNotGuest(greetings); | ||
| } | ||
|
|
||
| public bool IsConnected() | ||
| { | ||
| return !(_responseReader?.IsFaultedState ?? true) && (_physicalConnection?.IsConnected() ?? false); | ||
|
||
| } | ||
|
|
||
| private async Task LoginIfNotGuest(GreetingsResponse greetings) | ||
| { | ||
| var singleNode = _clientOptions.ConnectionOptions.Nodes.Single(); | ||
|
|
||
| if (string.IsNullOrEmpty(singleNode.Uri.UserName)) | ||
| { | ||
| _clientOptions.LogWriter?.WriteLine("Guest mode, no authentication attempt."); | ||
| return; | ||
| } | ||
|
|
||
| var authenticateRequest = AuthenticationRequest.Create(greetings, singleNode.Uri); | ||
|
|
||
| await SendRequestWithEmptyResponse(authenticateRequest); | ||
| _clientOptions.LogWriter?.WriteLine($"Authentication request send: {authenticateRequest}"); | ||
| } | ||
|
|
||
| public async Task SendRequestWithEmptyResponse<TRequest>(TRequest request) | ||
|
|
@@ -48,15 +101,6 @@ public async Task<DataResponse<TResponse[]>> SendRequest<TRequest, TResponse>(TR | |
| return await SendRequestImpl<TRequest, DataResponse<TResponse[]>>(request); | ||
| } | ||
|
|
||
| public TaskCompletionSource<MemoryStream> PopResponseCompletionSource(RequestId requestId, MemoryStream resultStream) | ||
| { | ||
| TaskCompletionSource<MemoryStream> request; | ||
|
|
||
| return _pendingRequests.TryRemove(requestId, out request) | ||
| ? request | ||
| : null; | ||
| } | ||
|
|
||
| public static byte[] ReadFully(Stream input) | ||
| { | ||
| input.Position = 0; | ||
|
|
@@ -72,31 +116,33 @@ public static byte[] ReadFully(Stream input) | |
| } | ||
| } | ||
|
|
||
| public IEnumerable<TaskCompletionSource<MemoryStream>> PopAllResponseCompletionSources() | ||
| { | ||
| var result = _pendingRequests.Values.ToArray(); | ||
| _pendingRequests.Clear(); | ||
| return result; | ||
| } | ||
|
|
||
| private async Task<TResponse> SendRequestImpl<TRequest, TResponse>(TRequest request) | ||
| where TRequest : IRequest | ||
| { | ||
| var bodyBuffer = MsgPackSerializer.Serialize(request, _msgPackContext); | ||
|
|
||
| var requestId = GetRequestId(); | ||
| var responseTask = GetResponseTask(requestId); | ||
| var requestId = _requestIdCounter.GetRequestId(); | ||
| var responseTask = _responseReader.GetResponseTask(requestId); | ||
|
|
||
| long headerLength; | ||
| var headerBuffer = CreateAndSerializeBuffer(request, requestId, bodyBuffer, out headerLength); | ||
|
|
||
| lock (_physicalConnection) | ||
| try | ||
| { | ||
| _logWriter?.WriteLine($"Begin sending request header buffer, requestId: {requestId}, code: {request.Code}, length: {headerBuffer.Length}"); | ||
| _physicalConnection.Write(headerBuffer, 0, Constants.PacketSizeBufferSize + (int) headerLength); | ||
| lock (_physicalConnection) | ||
| { | ||
| _logWriter?.WriteLine($"Begin sending request header buffer, requestId: {requestId}, code: {request.Code}, length: {headerBuffer.Length}"); | ||
| _physicalConnection.Write(headerBuffer, 0, Constants.PacketSizeBufferSize + (int)headerLength); | ||
|
|
||
| _logWriter?.WriteLine($"Begin sending request body buffer, length: {bodyBuffer.Length}"); | ||
| _physicalConnection.Write(bodyBuffer, 0, bodyBuffer.Length); | ||
| _logWriter?.WriteLine($"Begin sending request body buffer, length: {bodyBuffer.Length}"); | ||
| _physicalConnection.Write(bodyBuffer, 0, bodyBuffer.Length); | ||
| } | ||
| } | ||
| catch (Exception ex) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Merge this into one try-catch-finally, please.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
| { | ||
| _responseReader.SetFaultedState(); | ||
| _logWriter?.WriteLine($"Request with requestId {requestId} failed, header:\n{ToReadableString(headerBuffer)} \n body: \n{ToReadableString(bodyBuffer)}"); | ||
| throw; | ||
| } | ||
|
|
||
| try | ||
|
|
@@ -137,22 +183,5 @@ private byte[] CreateAndSerializeBuffer<TRequest>( | |
| MsgPackSerializer.Serialize(packetLength, stream, _msgPackContext); | ||
| return packetSizeBuffer; | ||
| } | ||
|
|
||
| private RequestId GetRequestId() | ||
| { | ||
| var requestId = Interlocked.Increment(ref _currentRequestId); | ||
| return (RequestId) (ulong) requestId; | ||
| } | ||
|
|
||
| private Task<MemoryStream> GetResponseTask(RequestId requestId) | ||
| { | ||
| var tcs = new TaskCompletionSource<MemoryStream>(); | ||
| if (!_pendingRequests.TryAdd(requestId, tcs)) | ||
| { | ||
| throw ExceptionHelper.RequestWithSuchIdAlreadySent(requestId); | ||
| } | ||
|
|
||
| return tcs.Task; | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, respect our coding guidelines: usings should be outside of namespace.