|
46 | 46 | using System.Net.Sockets; |
47 | 47 | using System.Runtime.InteropServices; |
48 | 48 | using System.Text; |
| 49 | +using System.Threading.Channels; |
49 | 50 | using System.Threading.Tasks; |
50 | 51 |
|
51 | 52 | using RabbitMQ.Client.Exceptions; |
@@ -79,7 +80,8 @@ class SocketFrameHandler : IFrameHandler |
79 | 80 | private readonly ITcpClient _socket; |
80 | 81 | private readonly Stream _writer; |
81 | 82 | private readonly object _semaphore = new object(); |
82 | | - private readonly object _streamLock = new object(); |
| 83 | + private readonly Channel<OutboundFrame> _frameChannel = Channel.CreateUnbounded<OutboundFrame>(new UnboundedChannelOptions { AllowSynchronousContinuations = false, SingleReader = true, SingleWriter = false }); |
| 84 | + private Task _frameWriter; |
83 | 85 | private bool _closed; |
84 | 86 | public SocketFrameHandler(AmqpTcpEndpoint endpoint, |
85 | 87 | Func<AddressFamily, ITcpClient> socketFactory, |
@@ -124,6 +126,7 @@ public SocketFrameHandler(AmqpTcpEndpoint endpoint, |
124 | 126 | _writer = new BufferedStream(netstream, _socket.Client.SendBufferSize); |
125 | 127 |
|
126 | 128 | WriteTimeout = writeTimeout; |
| 129 | + _frameWriter = Task.Run(WriteFrameImpl); |
127 | 130 | } |
128 | 131 | public AmqpTcpEndpoint Endpoint { get; set; } |
129 | 132 |
|
@@ -181,6 +184,15 @@ public void Close() |
181 | 184 | { |
182 | 185 | if (!_closed) |
183 | 186 | { |
| 187 | + try |
| 188 | + { |
| 189 | + _frameChannel.Writer.Complete(); |
| 190 | + _frameWriter.Wait(); |
| 191 | + } |
| 192 | + catch(Exception) |
| 193 | + { |
| 194 | + } |
| 195 | + |
184 | 196 | try |
185 | 197 | { |
186 | 198 | _socket.Close(); |
@@ -222,46 +234,31 @@ public void SendHeader() |
222 | 234 | headerBytes[7] = (byte)Endpoint.Protocol.MinorVersion; |
223 | 235 | } |
224 | 236 |
|
225 | | - Write(new ArraySegment<byte>(headerBytes), true); |
| 237 | + _writer.Write(headerBytes, 0, 8); |
| 238 | + _writer.Flush(); |
226 | 239 | } |
227 | 240 |
|
228 | | - public void WriteFrame(OutboundFrame frame, bool flush = true) |
| 241 | + public void WriteFrame(OutboundFrame frame) |
229 | 242 | { |
230 | | - int bufferSize = frame.GetMinimumBufferSize(); |
231 | | - byte[] memoryArray = ArrayPool<byte>.Shared.Rent(bufferSize); |
232 | | - Memory<byte> slice = new Memory<byte>(memoryArray, 0, bufferSize); |
233 | | - frame.WriteTo(slice); |
234 | | - _socket.Client.Poll(_writeableStateTimeoutMicroSeconds, SelectMode.SelectWrite); |
235 | | - Write(slice.Slice(0, frame.ByteCount), flush); |
236 | | - ArrayPool<byte>.Shared.Return(memoryArray); |
237 | | - } |
238 | | - |
239 | | - public void WriteFrameSet(IList<OutboundFrame> frames) |
240 | | - { |
241 | | - for (int i = 0; i < frames.Count; i++) |
242 | | - { |
243 | | - WriteFrame(frames[i], false); |
244 | | - } |
245 | | - |
246 | | - lock (_streamLock) |
247 | | - { |
248 | | - _writer.Flush(); |
249 | | - } |
| 243 | + _frameChannel.Writer.TryWrite(frame); |
250 | 244 | } |
251 | 245 |
|
252 | | - private void Write(ReadOnlyMemory<byte> buffer, bool flush) |
| 246 | + public async Task WriteFrameImpl() |
253 | 247 | { |
254 | | - lock (_streamLock) |
| 248 | + while (await _frameChannel.Reader.WaitToReadAsync().ConfigureAwait(false)) |
255 | 249 | { |
256 | | - if (MemoryMarshal.TryGetArray(buffer, out ArraySegment<byte> segment)) |
| 250 | + _socket.Client.Poll(_writeableStateTimeoutMicroSeconds, SelectMode.SelectWrite); |
| 251 | + while (_frameChannel.Reader.TryRead(out OutboundFrame frame)) |
257 | 252 | { |
258 | | - _writer.Write(segment.Array, segment.Offset, segment.Count); |
259 | | - |
260 | | - if (flush) |
261 | | - { |
262 | | - _writer.Flush(); |
263 | | - } |
| 253 | + int bufferSize = frame.GetMinimumBufferSize(); |
| 254 | + byte[] memoryArray = ArrayPool<byte>.Shared.Rent(bufferSize); |
| 255 | + Memory<byte> slice = new Memory<byte>(memoryArray, 0, bufferSize); |
| 256 | + frame.WriteTo(slice); |
| 257 | + _writer.Write(memoryArray, 0, bufferSize); |
| 258 | + ArrayPool<byte>.Shared.Return(memoryArray); |
264 | 259 | } |
| 260 | + |
| 261 | + _writer.Flush(); |
265 | 262 | } |
266 | 263 | } |
267 | 264 |
|
|
0 commit comments