Skip to content

Commit

Permalink
THRIFT-4673 IAsyncResult not supported by layered transports (buffere…
Browse files Browse the repository at this point in the history
…d/framed)

Client: C#
Patch: Jens Geyer

This closes apache#1634
  • Loading branch information
Jens-G committed Nov 24, 2018
1 parent 7abb7d5 commit fc52c3c
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 4 deletions.
27 changes: 24 additions & 3 deletions lib/csharp/src/Transport/TBufferedTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public override int Read(byte[] buf, int off, int len)
inputBuffer.Capacity = bufSize;

while (true)
{
{
int got = inputBuffer.Read(buf, off, len);
if (got > 0)
return got;
Expand Down Expand Up @@ -129,19 +129,40 @@ public override void Write(byte[] buf, int off, int len)
}
}

public override void Flush()
private void InternalFlush()
{
CheckNotDisposed();
if (!IsOpen)
throw new TTransportException(TTransportException.ExceptionType.NotOpen);
if (outputBuffer.Length > 0)
{
transport.Write(outputBuffer.GetBuffer(), 0, (int)outputBuffer.Length);
outputBuffer.SetLength(0);
}
}

public override void Flush()
{
CheckNotDisposed();
InternalFlush();

transport.Flush();
}

public override IAsyncResult BeginFlush(AsyncCallback callback, object state)
{
CheckNotDisposed();
InternalFlush();

return transport.BeginFlush( callback, state);
}

public override void EndFlush(IAsyncResult asyncResult)
{
transport.EndFlush( asyncResult);
}



protected void CheckNotDisposed()
{
if (_IsDisposed)
Expand Down
21 changes: 20 additions & 1 deletion lib/csharp/src/Transport/TFramedTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public override void Write(byte[] buf, int off, int len)
writeBuffer.Write(buf, off, len);
}

public override void Flush()
private void InternalFlush()
{
CheckNotDisposed();
if (!IsOpen)
Expand All @@ -126,10 +126,29 @@ public override void Flush()
transport.Write(buf, 0, len);

InitWriteBuffer();
}

public override void Flush()
{
CheckNotDisposed();
InternalFlush();

transport.Flush();
}

public override IAsyncResult BeginFlush(AsyncCallback callback, object state)
{
CheckNotDisposed();
InternalFlush();

return transport.BeginFlush( callback, state);
}

public override void EndFlush(IAsyncResult asyncResult)
{
transport.EndFlush( asyncResult);
}

private void InitWriteBuffer()
{
// Reserve space for message header to be put right before sending it out
Expand Down

0 comments on commit fc52c3c

Please sign in to comment.