Description
Background and Motivation
Currently after calling .FlushAsync()
on aPipeWriter
provided by Pipe
marks writing as operations complete; which makes the writing block eligible for collection when the PipeReader
advances to the position of the writer (releasing the memory for an idle pipe).
However, the writer may have more to write and is only flushing so too much backlog and latency doesn't build up.
As the write flag is released; if the writer immediately wants to write more, the reader and writer move into a data race and lock contention. So the writer getting more memory with .GetMemory()
needs to move through a slower lock protected path.
This additional option would tell the .FlushAsync
not to release the write flag; and then calling GetMemory()
after the flush would not need to go via contended locks and potentially re-rent memory from the MemoryPool that's just been released to the pool because the PipeReader
won the data race.
In fact the PipeWriter
would not need to call GetMemory()
at all if it still have enough memory in its current block to write to as it would not have been invalidated by the .FlushAsync
(by releasing the write flag it also invalidates the current memory so the full process needs to be gone through to require some memory to write to)
Additionally for non-Pipe
backed PipeWriter
s; such as a socket-base one this flag could flow through to the socket Send passing MSG_MORE
on linux send(2)
flag; which acts as TCP_CORK for that send only pushing out full packets (but without the extra syscall that TCP_CORK
needs)
MSG_MORE (Since Linux 2.4.4)
The caller has more data to send. This flag is used with TCP sockets to obtain the same effect as the TCP_CORK socket option (see tcp(7)), with the difference that this flag can be set on a per-call basis.
Or RIO_MSG_DEFER
flag on Windows for RioSend
RIO_MSG_DEFER
The request does not need to be executed immediately. This will insert the request into the request queue, but it may or may not trigger the execution of the request.
Proposed API
public abstract partial class PipeWriter
{
/// <summary>
/// Makes bytes written available to <see cref="PipeReader"/> and runs <see cref="PipeReader.ReadAsync"/> continuation.
/// </summary>
/// <param name="isMoreData"><see cref="bool"/> indicating that more data is to be written imminently.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
public
virtual ValueTask<FlushResult> FlushAsync(bool isMoreData,
CancellationToken cancellationToken = default)
=> FlushAsync(cancellationToken);
}
Usage Examples
Current
input.Advance(bytesReceived);
var flushTask = input.FlushAsync();
if (!_waitForData)
{
// Previous buffer has been invalidated by the FlushAsync
// we need to ask for completely new one which may contend
// with the reawoken Reader from the FlushAsync
buffer = input.GetMemory(MinAllocBufferSize);
}
var result = await flushTask;
After
input.Advance(bytesReceived);
var flushTask = input.FlushAsync(isMoreData: !_waitForData);
if (!_waitForData)
{
if (buffer.Length - bytesReceived >= MinAllocBufferSize)
{
// We have enough memory, only need to slice the existing one
// by the same amount as the input.Advance
buffer = buffer.Slice(bytesReceived);
}
else
{
buffer = input.GetMemory(MinAllocBufferSize);
}
}
var result = await flushTask;
Alternative Designs
A PipeOptions
level switch to say never release the block even if writing is off; this has the disadvantage of not being able to release the block when the pipe is truly reached an idle point.
Also it doesn't enable other PipeWriter
s such as a socket based one to use MSG_MORE
; RIO_MSG_DEFER
.
/cc @davidfowl @halter73
FYI @stephentoub @adamsitnik as discussed at meeting.