Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
d384023
fix reconnect - introduce faulted state
Dmitry-Bryliuk Dec 2, 2016
497f446
phys connection and resp reader into logical conn
Dmitry-Bryliuk Dec 2, 2016
8252cf0
requests queue and faulted state inside resp reader
Dmitry-Bryliuk Dec 2, 2016
a6efd0b
more distributed arcvitecture
Dmitry-Bryliuk Dec 5, 2016
3e9ae2b
check resp reader fault state for connection alive
Dmitry-Bryliuk Dec 5, 2016
2ba2d78
more log records
Dmitry-Bryliuk Dec 5, 2016
5ad8b6b
some fixes
Dmitry-Bryliuk Dec 5, 2016
e98e31f
some fixes
Dmitry-Bryliuk Dec 5, 2016
a2a8b42
disposed/state redesign
Dmitry-Bryliuk Dec 8, 2016
4be0d2c
task status check fixed
Dmitry-Bryliuk Dec 8, 2016
5c398f7
requested fixes
Dmitry-Bryliuk Dec 16, 2016
2df6fa1
ping tarantool instead of socket ping, tbd
Dmitry-Bryliuk Dec 19, 2016
d64e990
ping tarantool, done
Dmitry-Bryliuk Dec 20, 2016
e2f4d32
fix reconnect - introduce faulted state
Dmitry-Bryliuk Dec 2, 2016
05f7d20
phys connection and resp reader into logical conn
Dmitry-Bryliuk Dec 2, 2016
e3d9fa7
requests queue and faulted state inside resp reader
Dmitry-Bryliuk Dec 2, 2016
8593122
more distributed arcvitecture
Dmitry-Bryliuk Dec 5, 2016
a2c4c9a
check resp reader fault state for connection alive
Dmitry-Bryliuk Dec 5, 2016
8651c7e
more log records
Dmitry-Bryliuk Dec 5, 2016
f919ed5
some fixes
Dmitry-Bryliuk Dec 5, 2016
1818693
some fixes
Dmitry-Bryliuk Dec 5, 2016
e327efb
disposed/state redesign
Dmitry-Bryliuk Dec 8, 2016
92554bd
task status check fixed
Dmitry-Bryliuk Dec 8, 2016
09901fe
requested fixes
Dmitry-Bryliuk Dec 16, 2016
20cac5c
ping tarantool instead of socket ping, tbd
Dmitry-Bryliuk Dec 19, 2016
1e10433
ping tarantool, done
Dmitry-Bryliuk Dec 20, 2016
6ea4b2a
Merge branch 'fix/reconnect' of github.com:progaudi/tarantool-csharp …
Dmitry-Bryliuk Dec 20, 2016
0497cfa
cosmetic fix
Dmitry-Bryliuk Dec 20, 2016
968beb8
slim locks and non-concurrent dict for requests
Dmitry-Bryliuk Dec 21, 2016
9cd0659
ping without timeout, alive interval for ping
Dmitry-Bryliuk Dec 22, 2016
81d7857
ping check interval in ConnectionOptions
Dmitry-Bryliuk Dec 22, 2016
6e40a4d
minor fixes
Dmitry-Bryliuk Dec 23, 2016
2159793
not working version with ReadWriteLock, tbd
Dmitry-Bryliuk Dec 23, 2016
ddd59f4
always reconnect with events
Dmitry-Bryliuk Dec 26, 2016
0750eee
remove unnecessary flag, use async call for ping
Dmitry-Bryliuk Dec 27, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 4 additions & 47 deletions src/tarantool.client/Box.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks;

using ProGaudi.Tarantool.Client.Model;
using ProGaudi.Tarantool.Client.Model.Requests;
using ProGaudi.Tarantool.Client.Model.Responses;
using ProGaudi.Tarantool.Client.Utils;

namespace ProGaudi.Tarantool.Client
{
Expand All @@ -15,40 +12,17 @@ public class Box : IBox

private readonly ILogicalConnection _logicalConnection;

private readonly IResponseReader _responseReader;

private readonly INetworkStreamPhysicalConnection _physicalConnection;

public Box(ClientOptions options)
{
_clientOptions = options;
TarantoolConvertersRegistrator.Register(options.MsgPackContext);

_physicalConnection = new NetworkStreamPhysicalConnection();
_logicalConnection = new LogicalConnection(options, _physicalConnection);
_responseReader = new ResponseReader(_logicalConnection, options, _physicalConnection);
_logicalConnection = new LogicalConnectionManager(options);
}

public async Task Connect()
{
await _physicalConnection.Connect(_clientOptions);

var greetingsResponseBytes = new byte[128];
var readCount = await _physicalConnection.ReadAsync(greetingsResponseBytes, 0, greetingsResponseBytes.Length);
if (readCount != greetingsResponseBytes.Length)
{
throw ExceptionHelper.UnexpectedGreetingBytesCount(readCount);
}

var greetings = new GreetingsResponse(greetingsResponseBytes);

_clientOptions.LogWriter?.WriteLine($"Greetings received, salt is {Convert.ToBase64String(greetings.Salt)} .");

_responseReader.BeginReading();

_clientOptions.LogWriter?.WriteLine("Server responses reading started.");

await LoginIfNotGuest(greetings);
await _logicalConnection.Connect();
}

public static async Task<Box> Connect(string replicationSource)
Expand All @@ -72,8 +46,7 @@ public void Dispose()
{
_clientOptions.LogWriter?.WriteLine("Box is disposing...");
_clientOptions.LogWriter?.Flush();
_responseReader.Dispose();
_physicalConnection.Dispose();
_logicalConnection.Dispose();
}

public ISchema GetSchema()
Expand Down Expand Up @@ -141,21 +114,5 @@ public Task<DataResponse<TResponse[]>> Eval<TResponse>(string expression)
{
return Eval<TarantoolTuple, TResponse>(expression, TarantoolTuple.Empty);
}

private async Task LoginIfNotGuest(GreetingsResponse greetings)
{
var singleNode = _clientOptions.ConnectionOptions.Nodes.Single();

if (string.IsNullOrEmpty(singleNode.Uri.UserName))
{
_clientOptions.LogWriter?.WriteLine("Guest mode, no authentication attempt.");
return;
}

var authenticateRequest = AuthenticationRequest.Create(greetings, singleNode.Uri);

await _logicalConnection.SendRequestWithEmptyResponse(authenticateRequest);
_clientOptions.LogWriter?.WriteLine($"Authentication request send: {authenticateRequest}");
}
}
}
24 changes: 24 additions & 0 deletions src/tarantool.client/Converters/PingPacketConverter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using System;

using ProGaudi.MsgPack.Light;

using ProGaudi.Tarantool.Client.Model.Requests;

namespace ProGaudi.Tarantool.Client.Converters
{
internal class PingPacketConverter : IMsgPackConverter<PingRequest>
{
public void Initialize(MsgPackContext context)
{
}

public void Write(PingRequest value, IMsgPackWriter writer)
{
}

public PingRequest Read(IMsgPackReader reader)
{
throw new NotImplementedException();
}
}
}
14 changes: 6 additions & 8 deletions src/tarantool.client/ILogicalConnection.cs
Original file line number Diff line number Diff line change
@@ -1,23 +1,21 @@
using System.Collections.Generic;
using System.IO;
using System;
using System.Threading.Tasks;

using ProGaudi.Tarantool.Client.Model;
using ProGaudi.Tarantool.Client.Model.Requests;
using ProGaudi.Tarantool.Client.Model.Responses;

namespace ProGaudi.Tarantool.Client
{
public interface ILogicalConnection
public interface ILogicalConnection : IDisposable
{
Task Connect();

bool IsConnected();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be state, not only boolean flag.


Task SendRequestWithEmptyResponse<TRequest>(TRequest request)
where TRequest : IRequest;

Task<DataResponse<TResponse[]>> SendRequest<TRequest, TResponse>(TRequest request)
where TRequest : IRequest;

TaskCompletionSource<MemoryStream> PopResponseCompletionSource(RequestId requestId, MemoryStream resultStream);

IEnumerable<TaskCompletionSource<MemoryStream>> PopAllResponseCompletionSources();
}
}
1 change: 1 addition & 0 deletions src/tarantool.client/INetworkStreamPhysicalConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ public interface INetworkStreamPhysicalConnection : IDisposable
{
Task Connect(ClientOptions options);
Task Flush();
bool IsConnected();
Task<int> ReadAsync(byte[] buffer, int offset, int count);
void Write(byte[] buffer, int offset, int count);
}
Expand Down
9 changes: 9 additions & 0 deletions src/tarantool.client/IResponseReader.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
using System;
using System.IO;
using System.Threading.Tasks;

using ProGaudi.Tarantool.Client.Model;

namespace ProGaudi.Tarantool.Client
{

public interface IResponseReader : IDisposable
{
void BeginReading();

Task<MemoryStream> GetResponseTask(RequestId requestId);

bool IsConnected();
}
}
146 changes: 102 additions & 44 deletions src/tarantool.client/LogicalConnection.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
Expand All @@ -20,20 +18,95 @@ internal class LogicalConnection : ILogicalConnection
{
private readonly MsgPackContext _msgPackContext;

private readonly ClientOptions _clientOptions;

private readonly RequestIdCounter _requestIdCounter;

private readonly INetworkStreamPhysicalConnection _physicalConnection;

private long _currentRequestId;
private readonly ReaderWriterLockSlim _physicalConnectionLock = new ReaderWriterLockSlim();

private readonly ConcurrentDictionary<RequestId, TaskCompletionSource<MemoryStream>> _pendingRequests =
new ConcurrentDictionary<RequestId, TaskCompletionSource<MemoryStream>>();
private readonly IResponseReader _responseReader;

private readonly ILog _logWriter;

public LogicalConnection(ClientOptions options, INetworkStreamPhysicalConnection physicalConnection)
private bool _disposed;

public LogicalConnection(ClientOptions options, RequestIdCounter requestIdCounter)
{
_clientOptions = options;
_requestIdCounter = requestIdCounter;
_msgPackContext = options.MsgPackContext;
_logWriter = options.LogWriter;
_physicalConnection = physicalConnection;

_physicalConnection = new NetworkStreamPhysicalConnection();
_responseReader = new ResponseReader(_clientOptions, _physicalConnection);
}

public void Dispose()
{
if (_disposed)
{
return;
}

_disposed = true;

_responseReader.Dispose();
_physicalConnection.Dispose();
}

public async Task Connect()
{
await _physicalConnection.Connect(_clientOptions);

var greetingsResponseBytes = new byte[128];
var readCount = await _physicalConnection.ReadAsync(greetingsResponseBytes, 0, greetingsResponseBytes.Length);
if (readCount != greetingsResponseBytes.Length)
{
throw ExceptionHelper.UnexpectedGreetingBytesCount(readCount);
}

var greetings = new GreetingsResponse(greetingsResponseBytes);

_clientOptions.LogWriter?.WriteLine($"Greetings received, salt is {Convert.ToBase64String(greetings.Salt)} .");

_responseReader.BeginReading();

_clientOptions.LogWriter?.WriteLine("Server responses reading started.");

await LoginIfNotGuest(greetings);
}

public bool IsConnected()
{
if (_disposed)
{
return false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or we should throw exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

if (!_responseReader.IsConnected() || !_physicalConnection.IsConnected())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IsConnected should not change state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

{
return false;
}

return true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This lock is necessary. Otherwise you'll end up with two threads mixing requests and broken pipeline

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

}

private async Task LoginIfNotGuest(GreetingsResponse greetings)
{
var singleNode = _clientOptions.ConnectionOptions.Nodes.Single();

if (string.IsNullOrEmpty(singleNode.Uri.UserName))
{
_clientOptions.LogWriter?.WriteLine("Guest mode, no authentication attempt.");
return;
}

var authenticateRequest = AuthenticationRequest.Create(greetings, singleNode.Uri);

await SendRequestWithEmptyResponse(authenticateRequest);
_clientOptions.LogWriter?.WriteLine($"Authentication request send: {authenticateRequest}");
}

public async Task SendRequestWithEmptyResponse<TRequest>(TRequest request)
Expand All @@ -48,15 +121,6 @@ public async Task<DataResponse<TResponse[]>> SendRequest<TRequest, TResponse>(TR
return await SendRequestImpl<TRequest, DataResponse<TResponse[]>>(request);
}

public TaskCompletionSource<MemoryStream> PopResponseCompletionSource(RequestId requestId, MemoryStream resultStream)
{
TaskCompletionSource<MemoryStream> request;

return _pendingRequests.TryRemove(requestId, out request)
? request
: null;
}

public static byte[] ReadFully(Stream input)
{
input.Position = 0;
Expand All @@ -72,32 +136,43 @@ public static byte[] ReadFully(Stream input)
}
}

public IEnumerable<TaskCompletionSource<MemoryStream>> PopAllResponseCompletionSources()
{
var result = _pendingRequests.Values.ToArray();
_pendingRequests.Clear();
return result;
}

private async Task<TResponse> SendRequestImpl<TRequest, TResponse>(TRequest request)
where TRequest : IRequest
{
if (_disposed)
{
throw new ObjectDisposedException(nameof(LogicalConnection));
}

var bodyBuffer = MsgPackSerializer.Serialize(request, _msgPackContext);

var requestId = GetRequestId();
var responseTask = GetResponseTask(requestId);
var requestId = _requestIdCounter.GetRequestId();
var responseTask = _responseReader.GetResponseTask(requestId);

long headerLength;
var headerBuffer = CreateAndSerializeBuffer(request, requestId, bodyBuffer, out headerLength);

lock (_physicalConnection)
try
{
_physicalConnectionLock.EnterWriteLock();

_logWriter?.WriteLine($"Begin sending request header buffer, requestId: {requestId}, code: {request.Code}, length: {headerBuffer.Length}");
_physicalConnection.Write(headerBuffer, 0, Constants.PacketSizeBufferSize + (int) headerLength);
_physicalConnection.Write(headerBuffer, 0, Constants.PacketSizeBufferSize + (int)headerLength);

_logWriter?.WriteLine($"Begin sending request body buffer, length: {bodyBuffer.Length}");
_physicalConnection.Write(bodyBuffer, 0, bodyBuffer.Length);
}
catch (Exception ex)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merge this into one try-catch-finally, please.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

{
_logWriter?.WriteLine(
$"Request with requestId {requestId} failed, header:\n{ToReadableString(headerBuffer)} \n body: \n{ToReadableString(bodyBuffer)}");
Dispose();
throw;
}
finally
{
_physicalConnectionLock.ExitWriteLock();
}

try
{
Expand Down Expand Up @@ -137,22 +212,5 @@ private byte[] CreateAndSerializeBuffer<TRequest>(
MsgPackSerializer.Serialize(packetLength, stream, _msgPackContext);
return packetSizeBuffer;
}

private RequestId GetRequestId()
{
var requestId = Interlocked.Increment(ref _currentRequestId);
return (RequestId) (ulong) requestId;
}

private Task<MemoryStream> GetResponseTask(RequestId requestId)
{
var tcs = new TaskCompletionSource<MemoryStream>();
if (!_pendingRequests.TryAdd(requestId, tcs))
{
throw ExceptionHelper.RequestWithSuchIdAlreadySent(requestId);
}

return tcs.Task;
}
}
}
Loading