-
Notifications
You must be signed in to change notification settings - Fork 17
fix lost connection and reconnect automatically #83
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
d384023
497f446
8252cf0
a6efd0b
3e9ae2b
2ba2d78
5ad8b6b
e98e31f
a2a8b42
4be0d2c
5c398f7
2df6fa1
d64e990
e2f4d32
05f7d20
e3d9fa7
8593122
a2c4c9a
8651c7e
f919ed5
1818693
e327efb
92554bd
09901fe
20cac5c
1e10433
6ea4b2a
0497cfa
968beb8
9cd0659
81d7857
6e40a4d
2159793
ddd59f4
0750eee
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| using System; | ||
|
|
||
| using ProGaudi.MsgPack.Light; | ||
|
|
||
| using ProGaudi.Tarantool.Client.Model.Requests; | ||
|
|
||
| namespace ProGaudi.Tarantool.Client.Converters | ||
| { | ||
| internal class PingPacketConverter : IMsgPackConverter<PingRequest> | ||
| { | ||
| public void Initialize(MsgPackContext context) | ||
| { | ||
| } | ||
|
|
||
| public void Write(PingRequest value, IMsgPackWriter writer) | ||
| { | ||
| } | ||
|
|
||
| public PingRequest Read(IMsgPackReader reader) | ||
| { | ||
| throw new NotImplementedException(); | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,23 +1,21 @@ | ||
| using System.Collections.Generic; | ||
| using System.IO; | ||
| using System; | ||
| using System.Threading.Tasks; | ||
|
|
||
| using ProGaudi.Tarantool.Client.Model; | ||
| using ProGaudi.Tarantool.Client.Model.Requests; | ||
| using ProGaudi.Tarantool.Client.Model.Responses; | ||
|
|
||
| namespace ProGaudi.Tarantool.Client | ||
| { | ||
| public interface ILogicalConnection | ||
| public interface ILogicalConnection : IDisposable | ||
| { | ||
| Task Connect(); | ||
|
|
||
| bool IsConnected(); | ||
|
|
||
| Task SendRequestWithEmptyResponse<TRequest>(TRequest request) | ||
| where TRequest : IRequest; | ||
|
|
||
| Task<DataResponse<TResponse[]>> SendRequest<TRequest, TResponse>(TRequest request) | ||
| where TRequest : IRequest; | ||
|
|
||
| TaskCompletionSource<MemoryStream> PopResponseCompletionSource(RequestId requestId, MemoryStream resultStream); | ||
|
|
||
| IEnumerable<TaskCompletionSource<MemoryStream>> PopAllResponseCompletionSources(); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,9 +1,18 @@ | ||
| using System; | ||
| using System.IO; | ||
| using System.Threading.Tasks; | ||
|
|
||
| using ProGaudi.Tarantool.Client.Model; | ||
|
|
||
| namespace ProGaudi.Tarantool.Client | ||
| { | ||
|
|
||
| public interface IResponseReader : IDisposable | ||
| { | ||
| void BeginReading(); | ||
|
|
||
| Task<MemoryStream> GetResponseTask(RequestId requestId); | ||
|
|
||
| bool IsConnected(); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,4 @@ | ||
| using System; | ||
| using System.Collections.Concurrent; | ||
| using System.Collections.Generic; | ||
| using System.IO; | ||
| using System.Linq; | ||
| using System.Threading; | ||
|
|
@@ -20,20 +18,95 @@ internal class LogicalConnection : ILogicalConnection | |
| { | ||
| private readonly MsgPackContext _msgPackContext; | ||
|
|
||
| private readonly ClientOptions _clientOptions; | ||
|
|
||
| private readonly RequestIdCounter _requestIdCounter; | ||
|
|
||
| private readonly INetworkStreamPhysicalConnection _physicalConnection; | ||
|
|
||
| private long _currentRequestId; | ||
| private readonly ReaderWriterLockSlim _physicalConnectionLock = new ReaderWriterLockSlim(); | ||
|
|
||
| private readonly ConcurrentDictionary<RequestId, TaskCompletionSource<MemoryStream>> _pendingRequests = | ||
| new ConcurrentDictionary<RequestId, TaskCompletionSource<MemoryStream>>(); | ||
| private readonly IResponseReader _responseReader; | ||
|
|
||
| private readonly ILog _logWriter; | ||
|
|
||
| public LogicalConnection(ClientOptions options, INetworkStreamPhysicalConnection physicalConnection) | ||
| private bool _disposed; | ||
|
|
||
| public LogicalConnection(ClientOptions options, RequestIdCounter requestIdCounter) | ||
| { | ||
| _clientOptions = options; | ||
| _requestIdCounter = requestIdCounter; | ||
| _msgPackContext = options.MsgPackContext; | ||
| _logWriter = options.LogWriter; | ||
| _physicalConnection = physicalConnection; | ||
|
|
||
| _physicalConnection = new NetworkStreamPhysicalConnection(); | ||
| _responseReader = new ResponseReader(_clientOptions, _physicalConnection); | ||
| } | ||
|
|
||
| public void Dispose() | ||
| { | ||
| if (_disposed) | ||
| { | ||
| return; | ||
| } | ||
|
|
||
| _disposed = true; | ||
|
|
||
| _responseReader.Dispose(); | ||
| _physicalConnection.Dispose(); | ||
| } | ||
|
|
||
| public async Task Connect() | ||
| { | ||
| await _physicalConnection.Connect(_clientOptions); | ||
|
|
||
| var greetingsResponseBytes = new byte[128]; | ||
| var readCount = await _physicalConnection.ReadAsync(greetingsResponseBytes, 0, greetingsResponseBytes.Length); | ||
| if (readCount != greetingsResponseBytes.Length) | ||
| { | ||
| throw ExceptionHelper.UnexpectedGreetingBytesCount(readCount); | ||
| } | ||
|
|
||
| var greetings = new GreetingsResponse(greetingsResponseBytes); | ||
|
|
||
| _clientOptions.LogWriter?.WriteLine($"Greetings received, salt is {Convert.ToBase64String(greetings.Salt)} ."); | ||
|
|
||
| _responseReader.BeginReading(); | ||
|
|
||
| _clientOptions.LogWriter?.WriteLine("Server responses reading started."); | ||
|
|
||
| await LoginIfNotGuest(greetings); | ||
| } | ||
|
|
||
| public bool IsConnected() | ||
| { | ||
| if (_disposed) | ||
| { | ||
| return false; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Or we should throw exception?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. then we need catch exceptions here instead of check: https://github.com/progaudi/tarantool-csharp/pull/83/files/4be0d2ca63c07b90d6c7d9449e3da7ebb0624b65#diff-8b31d61323fab83274f6898980abbf6bR59 |
||
| } | ||
|
|
||
| if (!_responseReader.IsConnected() || !_physicalConnection.IsConnected()) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IsConnected should not change state.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
| { | ||
| return false; | ||
| } | ||
|
|
||
| return true; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This lock is necessary. Otherwise you'll end up with two threads mixing requests and broken pipeline
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
| } | ||
|
|
||
| private async Task LoginIfNotGuest(GreetingsResponse greetings) | ||
| { | ||
| var singleNode = _clientOptions.ConnectionOptions.Nodes.Single(); | ||
|
|
||
| if (string.IsNullOrEmpty(singleNode.Uri.UserName)) | ||
| { | ||
| _clientOptions.LogWriter?.WriteLine("Guest mode, no authentication attempt."); | ||
| return; | ||
| } | ||
|
|
||
| var authenticateRequest = AuthenticationRequest.Create(greetings, singleNode.Uri); | ||
|
|
||
| await SendRequestWithEmptyResponse(authenticateRequest); | ||
| _clientOptions.LogWriter?.WriteLine($"Authentication request send: {authenticateRequest}"); | ||
| } | ||
|
|
||
| public async Task SendRequestWithEmptyResponse<TRequest>(TRequest request) | ||
|
|
@@ -48,15 +121,6 @@ public async Task<DataResponse<TResponse[]>> SendRequest<TRequest, TResponse>(TR | |
| return await SendRequestImpl<TRequest, DataResponse<TResponse[]>>(request); | ||
| } | ||
|
|
||
| public TaskCompletionSource<MemoryStream> PopResponseCompletionSource(RequestId requestId, MemoryStream resultStream) | ||
| { | ||
| TaskCompletionSource<MemoryStream> request; | ||
|
|
||
| return _pendingRequests.TryRemove(requestId, out request) | ||
| ? request | ||
| : null; | ||
| } | ||
|
|
||
| public static byte[] ReadFully(Stream input) | ||
| { | ||
| input.Position = 0; | ||
|
|
@@ -72,32 +136,43 @@ public static byte[] ReadFully(Stream input) | |
| } | ||
| } | ||
|
|
||
| public IEnumerable<TaskCompletionSource<MemoryStream>> PopAllResponseCompletionSources() | ||
| { | ||
| var result = _pendingRequests.Values.ToArray(); | ||
| _pendingRequests.Clear(); | ||
| return result; | ||
| } | ||
|
|
||
| private async Task<TResponse> SendRequestImpl<TRequest, TResponse>(TRequest request) | ||
| where TRequest : IRequest | ||
| { | ||
| if (_disposed) | ||
| { | ||
| throw new ObjectDisposedException(nameof(LogicalConnection)); | ||
| } | ||
|
|
||
| var bodyBuffer = MsgPackSerializer.Serialize(request, _msgPackContext); | ||
|
|
||
| var requestId = GetRequestId(); | ||
| var responseTask = GetResponseTask(requestId); | ||
| var requestId = _requestIdCounter.GetRequestId(); | ||
| var responseTask = _responseReader.GetResponseTask(requestId); | ||
|
|
||
| long headerLength; | ||
| var headerBuffer = CreateAndSerializeBuffer(request, requestId, bodyBuffer, out headerLength); | ||
|
|
||
| lock (_physicalConnection) | ||
| try | ||
| { | ||
| _physicalConnectionLock.EnterWriteLock(); | ||
|
|
||
| _logWriter?.WriteLine($"Begin sending request header buffer, requestId: {requestId}, code: {request.Code}, length: {headerBuffer.Length}"); | ||
| _physicalConnection.Write(headerBuffer, 0, Constants.PacketSizeBufferSize + (int) headerLength); | ||
| _physicalConnection.Write(headerBuffer, 0, Constants.PacketSizeBufferSize + (int)headerLength); | ||
|
|
||
| _logWriter?.WriteLine($"Begin sending request body buffer, length: {bodyBuffer.Length}"); | ||
| _physicalConnection.Write(bodyBuffer, 0, bodyBuffer.Length); | ||
| } | ||
| catch (Exception ex) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Merge this into one try-catch-finally, please.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
| { | ||
| _logWriter?.WriteLine( | ||
| $"Request with requestId {requestId} failed, header:\n{ToReadableString(headerBuffer)} \n body: \n{ToReadableString(bodyBuffer)}"); | ||
| Dispose(); | ||
| throw; | ||
| } | ||
| finally | ||
| { | ||
| _physicalConnectionLock.ExitWriteLock(); | ||
| } | ||
|
|
||
| try | ||
| { | ||
|
|
@@ -137,22 +212,5 @@ private byte[] CreateAndSerializeBuffer<TRequest>( | |
| MsgPackSerializer.Serialize(packetLength, stream, _msgPackContext); | ||
| return packetSizeBuffer; | ||
| } | ||
|
|
||
| private RequestId GetRequestId() | ||
| { | ||
| var requestId = Interlocked.Increment(ref _currentRequestId); | ||
| return (RequestId) (ulong) requestId; | ||
| } | ||
|
|
||
| private Task<MemoryStream> GetResponseTask(RequestId requestId) | ||
| { | ||
| var tcs = new TaskCompletionSource<MemoryStream>(); | ||
| if (!_pendingRequests.TryAdd(requestId, tcs)) | ||
| { | ||
| throw ExceptionHelper.RequestWithSuchIdAlreadySent(requestId); | ||
| } | ||
|
|
||
| return tcs.Task; | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There should be state, not only boolean flag.