Skip to content

Commit 89c91c6

Browse files
authored
Merge pull request #147 from karlovnv/master
PR implements batching
2 parents 7101e2f + 6a9eeec commit 89c91c6

File tree

6 files changed

+110
-40
lines changed

6 files changed

+110
-40
lines changed

src/progaudi.tarantool/IRequestWriter.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,6 @@ internal interface IRequestWriter : IDisposable
99

1010
bool IsConnected { get; }
1111

12-
void Write(ArraySegment<byte> header, ArraySegment<byte> body);
12+
void Write(ArraySegment<byte> request);
1313
}
1414
}

src/progaudi.tarantool/LogicalConnection.cs

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ internal class LogicalConnection : ILogicalConnection
2929
private readonly IRequestWriter _requestWriter;
3030

3131
private readonly ILog _logWriter;
32-
3332
private bool _disposed;
3433

3534
public LogicalConnection(ClientOptions options, RequestIdCounter requestIdCounter)
@@ -152,15 +151,26 @@ private async Task<MemoryStream> SendRequestImpl<TRequest>(TRequest request, Tim
152151
throw new ObjectDisposedException(nameof(LogicalConnection));
153152
}
154153

155-
var bodyBuffer = MsgPackSerializer.Serialize(request, _msgPackContext);
156-
154+
157155
var requestId = _requestIdCounter.GetRequestId();
158156
var responseTask = _responseReader.GetResponseTask(requestId);
159157

160-
var headerBuffer = CreateAndSerializeHeader(request, requestId, bodyBuffer);
158+
var stream = CreateAndSerializeHeader(request, requestId);
159+
MsgPackSerializer.Serialize(request, stream, _msgPackContext);
160+
var totalLength = stream.Position - Constants.PacketSizeBufferSize;
161+
var packetLength = new PacketSize((uint)(totalLength));
162+
AddPacketSize(stream, packetLength);
163+
164+
ArraySegment<byte> buffer;
165+
if(!stream.TryGetBuffer(out buffer))
166+
{
167+
throw new InvalidOperationException("broken buffer");
168+
}
169+
170+
//keep API for the sake of backward comp.
161171
_requestWriter.Write(
162-
headerBuffer,
163-
new ArraySegment<byte>(bodyBuffer, 0, bodyBuffer.Length));
172+
// merged header and body
173+
buffer);
164174

165175
try
166176
{
@@ -177,7 +187,7 @@ private async Task<MemoryStream> SendRequestImpl<TRequest>(TRequest request, Tim
177187
}
178188
catch (ArgumentException)
179189
{
180-
_logWriter?.WriteLine($"Response with requestId {requestId} failed, header:\n{headerBuffer.ToReadableString()} \n body: \n{bodyBuffer.ToReadableString()}");
190+
_logWriter?.WriteLine($"Response with requestId {requestId} failed, content:\n{buffer.ToReadableString()} ");
181191
throw;
182192
}
183193
catch (TimeoutException)
@@ -187,24 +197,23 @@ private async Task<MemoryStream> SendRequestImpl<TRequest>(TRequest request, Tim
187197
}
188198
}
189199

190-
private ArraySegment<byte> CreateAndSerializeHeader<TRequest>(
200+
private MemoryStream CreateAndSerializeHeader<TRequest>(
191201
TRequest request,
192-
RequestId requestId,
193-
byte[] serializedRequest) where TRequest : IRequest
202+
RequestId requestId) where TRequest : IRequest
194203
{
195-
var packetSizeBuffer = new byte[Constants.PacketSizeBufferSize + Constants.MaxHeaderLength];
196-
var stream = new MemoryStream(packetSizeBuffer);
197-
204+
var stream = new MemoryStream();
205+
198206
var requestHeader = new RequestHeader(request.Code, requestId);
199207
stream.Seek(Constants.PacketSizeBufferSize, SeekOrigin.Begin);
200208
MsgPackSerializer.Serialize(requestHeader, stream, _msgPackContext);
201209

202-
var lengthAndHeaderLengthByteCount = (int)stream.Position;
203-
var headerLength = lengthAndHeaderLengthByteCount - Constants.PacketSizeBufferSize;
204-
var packetLength = new PacketSize((uint) (headerLength + serializedRequest.Length));
210+
return stream;
211+
}
212+
213+
private void AddPacketSize(MemoryStream stream, PacketSize packetLength)
214+
{
205215
stream.Seek(0, SeekOrigin.Begin);
206216
MsgPackSerializer.Serialize(packetLength, stream, _msgPackContext);
207-
return new ArraySegment<byte>(packetSizeBuffer, 0, lengthAndHeaderLengthByteCount);
208217
}
209218
}
210219
}

src/progaudi.tarantool/Model/ClientOptions.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
using ProGaudi.MsgPack.Light;
1+
using System;
2+
using System.Net.Sockets;
3+
using ProGaudi.MsgPack.Light;
24

35
namespace ProGaudi.Tarantool.Client.Model
46
{
@@ -24,6 +26,8 @@ private ClientOptions(ConnectionOptions options, ILog log, MsgPackContext contex
2426
}
2527
}
2628

29+
public Action<Socket> ConfigureSocket { get; set; }
30+
2731
public ILog LogWriter { get; }
2832

2933
public MsgPackContext MsgPackContext { get; }

src/progaudi.tarantool/Model/ConnectionOptions.cs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,21 @@ private void Parse(string replicationSource, ILog log)
3131
}
3232
}
3333
}
34+
public int WriteStreamBufferSize { get; set; } = 8192 * 2;
3435

35-
public int ReadStreamBufferSize { get; set; } = 4096;
36+
/// <summary>
37+
/// If number of pending requests more than the value, Throttle does not apply
38+
/// </summary>
39+
public int MinRequestsWithThrottle { get; set; } = 16;
40+
41+
/// <summary>
42+
/// 0 - unlimited
43+
/// </summary>
44+
public int MaxRequestsInBatch { get; set; } = 0;
45+
46+
public int WriteThrottlePeriodInMs { get; set; } = 10;
47+
48+
public int ReadStreamBufferSize { get; set; } = 8192;
3649

3750
public int WriteNetworkTimeout { get; set; } = -1;
3851

src/progaudi.tarantool/NetworkStreamPhysicalConnection.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,12 @@ public async Task Connect(ClientOptions options)
4646
{
4747
NoDelay = true
4848
};
49+
50+
if(options.ConfigureSocket != null)
51+
{
52+
options.ConfigureSocket(_socket);
53+
}
54+
4955
await ConnectAsync(_socket, singleNode.Uri.Host, singleNode.Uri.Port).ConfigureAwait(false);;
5056

5157
_stream = new NetworkStream(_socket, true);

src/progaudi.tarantool/RequestWriter.cs

Lines changed: 58 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.Diagnostics;
4+
using System.IO;
35
using System.Threading;
6+
using System.Threading.Tasks;
47
using ProGaudi.Tarantool.Client.Model;
58

69
namespace ProGaudi.Tarantool.Client
@@ -9,25 +12,28 @@ internal class RequestWriter : IRequestWriter
912
{
1013
private readonly ClientOptions _clientOptions;
1114
private readonly IPhysicalConnection _physicalConnection;
12-
private readonly Queue<Tuple<ArraySegment<byte>, ArraySegment<byte>>> _buffer;
15+
private readonly Queue<ArraySegment<byte>> _buffer;
1316
private readonly object _lock = new object();
1417
private readonly Thread _thread;
1518
private readonly ManualResetEventSlim _exitEvent;
1619
private readonly ManualResetEventSlim _newRequestsAvailable;
20+
private readonly ConnectionOptions _connectionOptions;
1721
private bool _disposed;
22+
private long _remaining;
1823

1924
public RequestWriter(ClientOptions clientOptions, IPhysicalConnection physicalConnection)
2025
{
2126
_clientOptions = clientOptions;
2227
_physicalConnection = physicalConnection;
23-
_buffer = new Queue<Tuple<ArraySegment<byte>, ArraySegment<byte>>>();
28+
_buffer = new Queue<ArraySegment<byte>>();
2429
_thread = new Thread(WriteFunction)
2530
{
2631
IsBackground = true,
2732
Name = $"{clientOptions.Name} :: Write"
2833
};
2934
_exitEvent = new ManualResetEventSlim();
3035
_newRequestsAvailable = new ManualResetEventSlim();
36+
_connectionOptions = _clientOptions.ConnectionOptions;
3137
}
3238

3339
public void BeginWriting()
@@ -43,18 +49,18 @@ public void BeginWriting()
4349

4450
public bool IsConnected => !_disposed;
4551

46-
public void Write(ArraySegment<byte> header, ArraySegment<byte> body)
52+
public void Write(ArraySegment<byte> request)
4753
{
4854
if (_disposed)
4955
{
5056
throw new ObjectDisposedException(nameof(ResponseReader));
5157
}
5258

53-
_clientOptions?.LogWriter?.WriteLine($"Enqueuing request: headers {header.Count} bytes, body {body.Count} bytes.");
59+
_clientOptions?.LogWriter?.WriteLine($"Enqueuing request: {request.Count} bytes.");
5460
bool shouldSignal;
5561
lock (_lock)
5662
{
57-
_buffer.Enqueue(Tuple.Create(header, body));
63+
_buffer.Enqueue(request);
5864
shouldSignal = _buffer.Count == 1;
5965
}
6066

@@ -79,56 +85,88 @@ public void Dispose()
7985
private void WriteFunction()
8086
{
8187
var handles = new[] { _exitEvent.WaitHandle, _newRequestsAvailable.WaitHandle };
82-
88+
var throttle = _connectionOptions.WriteThrottlePeriodInMs;
89+
long remaining;
8390
while (true)
8491
{
8592
switch (WaitHandle.WaitAny(handles))
8693
{
8794
case 0:
8895
return;
8996
case 1:
90-
WriteRequests(200);
97+
WriteRequests(_connectionOptions.WriteStreamBufferSize,
98+
_connectionOptions.MaxRequestsInBatch);
99+
100+
remaining = Interlocked.Read(ref _remaining);
101+
102+
// Thread.Sleep will be called only if the number of pending bytes less than
103+
// MinRequestsWithThrottle
104+
105+
if (throttle > 0 && remaining < _connectionOptions.MinRequestsWithThrottle)
106+
Thread.Sleep(throttle);
107+
91108
break;
92109
default:
93110
throw new ArgumentOutOfRangeException();
94111
}
95112
}
96113
}
97114

98-
private void WriteRequests(int limit)
115+
private void WriteRequests(int bufferLength, int limit)
99116
{
100117
void WriteBuffer(ArraySegment<byte> buffer)
101118
{
102119
_physicalConnection.Write(buffer.Array, buffer.Offset, buffer.Count);
103120
}
104121

105-
Tuple<ArraySegment<byte>, ArraySegment<byte>> GetRequest()
122+
bool GetRequest(out ArraySegment<byte> result)
106123
{
107124
lock (_lock)
108125
{
109126
if (_buffer.Count > 0)
110-
return _buffer.Dequeue();
127+
{
128+
_remaining = _buffer.Count + 1;
129+
result = _buffer.Dequeue();
130+
return true;
131+
}
111132
}
112133

113-
return null;
114-
}
134+
result = default(ArraySegment<byte>);
135+
return false;
136+
}
115137

116-
Tuple<ArraySegment<byte>, ArraySegment<byte>> request;
138+
ArraySegment<byte> request;
117139
var count = 0;
118-
while ((request = GetRequest()) != null)
140+
UInt64 length = 0;
141+
var list = new List<ArraySegment<byte>>();
142+
while (GetRequest(out request))
119143
{
120-
_clientOptions?.LogWriter?.WriteLine($"Writing request: headers {request.Item1.Count} bytes, body {request.Item2.Count} bytes.");
121-
122-
WriteBuffer(request.Item1);
123-
WriteBuffer(request.Item2);
144+
_clientOptions?.LogWriter?.WriteLine($"Writing request: {request.Count} bytes.");
145+
length += (uint)request.Count;
124146

125-
_clientOptions?.LogWriter?.WriteLine($"Wrote request: headers {request.Item1.Count} bytes, body {request.Item2.Count} bytes.");
147+
list.Add(request);
148+
_clientOptions?.LogWriter?.WriteLine($"Wrote request: {request.Count} bytes.");
126149

127150
count++;
128-
if (limit > 0 && count > limit)
151+
if ((limit > 0 && count > limit) || length > (ulong)bufferLength)
129152
{
130153
break;
131154
}
155+
156+
}
157+
158+
if (list.Count > 0)
159+
{
160+
// merge requests into one buffer
161+
var result = new byte[length];
162+
int position = 0;
163+
foreach (var r in list)
164+
{
165+
Buffer.BlockCopy(r.Array, r.Offset, result, position, r.Count);
166+
position += r.Count;
167+
}
168+
169+
WriteBuffer(new ArraySegment<byte>(result));
132170
}
133171

134172
lock (_lock)

0 commit comments

Comments
 (0)