Skip to content

Commit

Permalink
Experiment with IteratingCallback (#12040)
Browse files Browse the repository at this point in the history
The previous semantic of `onCompleteFailure` has been renamed to `onFailure(Throwable)`, which is called immediately (but serialized) on either an abort or a failure.   A new `onCompleteFailure(Throwable)` method has been added that is called only after a `failed(throwable)` or a `abort(Throwable)` followed by `succeeded()` or `failed(Throwable)``

No usage has yet been made of the new `onCompleteFailure`, but the ICB implementation has been completely replaced by the one developed in #11876

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
Signed-off-by: Ludovic Orban <lorban@bitronix.be>
Co-authored-by: Simone Bordet <simone.bordet@gmail.com>
Co-authored-by: Ludovic Orban <lorban@bitronix.be>
  • Loading branch information
3 people authored Aug 26, 2024
1 parent b9bcb58 commit 7d7eeb3
Show file tree
Hide file tree
Showing 55 changed files with 1,583 additions and 545 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -329,28 +329,29 @@ protected Action process() throws Throwable
// Read a chunk.
chunk = source.read();

// No chunk, demand to be called back when there will be more chunks.
// If no chunk, schedule a demand callback when there are more chunks.
if (chunk == null)
{
source.demand(this::iterate);
return Action.IDLE;
source.demand(this::succeeded);
return Action.SCHEDULED;
}

// The read failed, re-throw the failure
// causing onCompleteFailure() to be invoked.
if (Content.Chunk.isFailure(chunk))
throw chunk.getFailure();

// Copy the chunk.
// Copy the chunk by scheduling an asynchronous write.
sink.write(chunk.isLast(), chunk.getByteBuffer(), this);
return Action.SCHEDULED;
}

@Override
protected void onSuccess()
{
// After every successful write, release the chunk.
chunk.release();
// After every successful write, release the chunk
// and reset to the next chunk
chunk = Content.Chunk.releaseAndNext(chunk);
}

@Override
Expand All @@ -361,14 +362,20 @@ protected void onCompleteSuccess()
}

@Override
protected void onCompleteFailure(Throwable failure)
protected void onFailure(Throwable cause)
{
// In case of a failure, either on the
// read or on the write, release the chunk.
chunk.release();

// The copy is failed, fail the callback.
callback.failed(failure);
// This method is invoked before a write() has completed, so
// the chunk is not released here, but in onCompleteFailure().
callback.failed(cause);
}

@Override
protected void onCompleteFailure(Throwable failure)
{
// In case of a failure, this method is invoked when the write()
// is completed, and it is now possible to release the chunk.
chunk = Content.Chunk.releaseAndNext(chunk);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@

import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.ConnectionStatistics;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.Retainable;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
Expand Down Expand Up @@ -225,11 +228,13 @@ public void echoCorrect()
// tag::echo-correct[]
class EchoConnection extends AbstractConnection
{
private final ByteBufferPool.Sized pool;
private final IteratingCallback callback = new EchoIteratingCallback();

public EchoConnection(EndPoint endp, Executor executor)
public EchoConnection(EndPoint endp, ByteBufferPool.Sized pool, Executor executor)
{
super(endp, executor);
this.pool = pool;
}

@Override
Expand All @@ -250,20 +255,20 @@ public void onFillable()

class EchoIteratingCallback extends IteratingCallback
{
private ByteBuffer buffer;
private RetainableByteBuffer buffer;

@Override
protected Action process() throws Throwable
{
// Obtain a buffer if we don't already have one.
if (buffer == null)
buffer = BufferUtil.allocate(1024);
buffer = pool.acquire();

int filled = getEndPoint().fill(buffer);
int filled = getEndPoint().fill(buffer.getByteBuffer());
if (filled > 0)
{
// We have filled some bytes, echo them back.
getEndPoint().write(this, buffer);
getEndPoint().write(this, buffer.getByteBuffer());

// Signal that the iteration should resume
// when the write() operation is completed.
Expand All @@ -273,14 +278,15 @@ else if (filled == 0)
{
// We don't need the buffer anymore, so
// don't keep it around while we are idle.
buffer = null;
buffer = Retainable.release(buffer);

// No more bytes to read, declare
// again interest for fill events.
fillInterested();
fillInterested(this);

// Signal that the iteration is now IDLE.
return Action.IDLE;
// Signal that the iteration is now SCHEDULED
// for a fillable callback.
return Action.SCHEDULED;
}
else
{
Expand All @@ -291,17 +297,11 @@ else if (filled == 0)
}

@Override
protected void onCompleteSuccess()
{
// The iteration completed successfully.
getEndPoint().close();
}

@Override
protected void onCompleteFailure(Throwable cause)
protected void onCompleted(Throwable cause)
{
// The iteration completed with a failure.
// The iteration completed.
getEndPoint().close(cause);
buffer = Retainable.release(buffer);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,14 +523,16 @@ protected Action process() throws Throwable // <2>
@Override
public void succeed()
{
// When the send succeeds, succeed this IteratingCallback.
// Map the o.e.j.websocket.api.Callback to o.e.j.util.Callback.
// When the send() succeeds, succeed this IteratingCallback.
succeeded();
}

@Override
public void fail(Throwable x)
{
// When the send fails, fail this IteratingCallback.
// Map the o.e.j.websocket.api.Callback to o.e.j.util.Callback.
// When the send() fails, fail this IteratingCallback.
failed(x);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ public void onOpen()
@Override
public void onFillable()
{
// Called from fillInterested() in onOpen() to start iteration.
callback.iterate();
}

Expand Down Expand Up @@ -206,24 +207,20 @@ protected Action process() throws Throwable
// the application completed the request processing.
return Action.SCHEDULED;
}
else
{
// Did not receive enough JSON bytes,
// loop around to try to read more.
}
// Did not receive enough JSON bytes to complete the
// JSON parsing, loop around to try to read more bytes.
}
else if (filled == 0)
{
// We don't need the buffer anymore, so
// don't keep it around while we are idle.
buffer = null;

// No more bytes to read, declare
// again interest for fill events.
fillInterested();
// No more bytes to read, declare again interest for fill events.
fillInterested(this);

// Signal that the iteration is now IDLE.
return Action.IDLE;
// Signal that the iteration is now SCHEDULED for fill interest callback.
return Action.SCHEDULED;
}
else
{
Expand Down
15 changes: 10 additions & 5 deletions documentation/jetty/modules/programming-guide/pages/arch/io.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ In turn, this calls `IteratingCallback.process()`, an abstract method that must
Method `process()` must return:

* `Action.SCHEDULED`, to indicate whether the loop has performed a non-blocking, possibly asynchronous, operation
* `Action.IDLE`, to indicate that the loop should temporarily be suspended to be resumed later
* `Action.IDLE`, to indicate that the loop should temporarily be suspended to be resumed later with another call to iterate
* `Action.SUCCEEDED` to indicate that the loop exited successfully

Any exception thrown within `process()` exits the loops with a failure.
Expand All @@ -209,13 +209,18 @@ If this was the only active network connection, the system would now be idle, wi

Eventually, the Jetty I/O system will notify that the `write()` completed; this notifies the `IteratingCallback` that can now resume the loop and call `process()` again.

When `process()` is called, it is possible that zero bytes are read from the network; in this case, you want to deallocate the buffer since the other peer may never send more bytes for the `Connection` to read, or it may send them after a long pause -- in both cases we do not want to retain the memory allocated by the buffer; next, you want to call `fillInterested()` to declare again interest for read events, and return `Action.IDLE` since there is nothing to write back and therefore the loop may be suspended.
When more bytes are again available to be read from the network, `onFillable()` will be called again and that will start the iteration again.
When `process()` is called, it is possible that zero bytes are read from the network; in this case, you want to deallocate the buffer since the other peer may never send more bytes for the `Connection` to read, or it may send them after a long pause -- in both cases we do not want to retain the memory allocated by the buffer; next, you want to call `fillInterested(this)` to declare again interest for read events, and return `Action.SCHEDULED` since a callback is scheduled to occur once filling is possible.

Another possibility is that during `process()` the read returns `-1` indicating that the other peer has closed the connection; this means that there will not be more bytes to read and the loop can be exited, so you return `Action.SUCCEEDED`; `IteratingCallback` will then call `onCompleteSuccess()` where you can close the `EndPoint`.

The last case is that during `process()` an exception is thrown, for example by `EndPoint.fill(ByteBuffer)` or, in more advanced implementations, by code that parses the bytes that have been read and finds them unacceptable; any exception thrown within `process()` will be caught by `IteratingCallback` that will exit the loop with a failure and call `onCompleteFailure(Throwable)` with the exception that has been thrown, where you can close the `EndPoint`, passing the exception that is the reason for closing prematurely the `EndPoint`.

Note that some failures may occur whilst a scheduled operation is in progress.
Such failures are notified immediately via the `onFailure(Throwable)` method, but care must be taken to not release any resources that may still be in use by the scheduled operation.
The `onCompleteFailure(Throwable)` method is called when both a failure has occurred and any scheduled operation has completed.
An example of this issue is that a buffer used for a write operation cannot be returned to a pool in `onFailure(Throwable)` as the write may still be progressing.
Either the buffer must be removed from the pool in `onFailure(Throwable)` or the release of the buffer deferred until `onCompleteFailure(Throwable)` is called.

[IMPORTANT]
====
Asynchronous programming is hard.
Expand Down Expand Up @@ -356,9 +361,9 @@ You must initiate a second write only when the first is finished, for example:
include::code:example$src/main/java/org/eclipse/jetty/docs/programming/ContentDocs.java[tags=sinkMany]
----

When you need to perform an unknown number of writes, you must use an `IteratingCallback`, explained in <<echo,this section>>, to avoid ``StackOverFlowError``s.
When you need to perform an unknown number of writes, you may use an `IteratingCallback`, explained in <<echo,this section>>, to avoid ``StackOverFlowError``s.

For example, to copy from a `Content.Source` to a `Content.Sink` you should use the convenience method `Content.copy(Content.Source, Content.Sink, Callback)`.
For example, to copy from a `Content.Source` to a `Content.Sink` you could use the convenience method `Content.copy(Content.Source, Content.Sink, Callback)`.
For illustrative purposes, below you can find the implementation of `copy(Content.Source, Content.Sink, Callback)` that uses an `IteratingCallback`:

[,java,indent=0]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -617,14 +617,8 @@ protected void onSuccess()
}

@Override
protected void onCompleteFailure(Throwable x)
protected void onFailure(Throwable x)
{
if (chunk != null)
{
chunk.release();
chunk = Content.Chunk.next(chunk);
}

failRequest(x);
internalAbort(exchange, x);

Expand All @@ -633,6 +627,14 @@ protected void onCompleteFailure(Throwable x)
promise.succeeded(true);
}

@Override
protected void onCompleteFailure(Throwable x)
{
if (chunk != null)
chunk.release();
chunk = Content.Chunk.next(chunk);
}

@Override
public InvocationType getInvocationType()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.Retainable;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
Expand Down Expand Up @@ -237,7 +238,9 @@ protected Action process() throws Exception
@Override
protected void onSuccess()
{
release();
headerBuffer = Retainable.release(headerBuffer);
chunkBuffer = Retainable.release(chunkBuffer);
contentByteBuffer = null;
}

@Override
Expand All @@ -248,21 +251,16 @@ protected void onCompleteSuccess()
}

@Override
protected void onCompleteFailure(Throwable cause)
protected void onFailure(Throwable cause)
{
super.onCompleteFailure(cause);
release();
callback.failed(cause);
}

private void release()
@Override
protected void onCompleteFailure(Throwable cause)
{
if (headerBuffer != null)
headerBuffer.release();
headerBuffer = null;
if (chunkBuffer != null)
chunkBuffer.release();
chunkBuffer = null;
headerBuffer = Retainable.release(headerBuffer);
chunkBuffer = Retainable.release(chunkBuffer);
contentByteBuffer = null;
}
}
Expand Down Expand Up @@ -334,11 +332,16 @@ protected Action process() throws Exception
}
}

@Override
protected void onFailure(Throwable cause)
{
callback.failed(cause);
}

@Override
protected void onCompleteFailure(Throwable cause)
{
release();
callback.failed(cause);
}

private void release()
Expand Down
Loading

0 comments on commit 7d7eeb3

Please sign in to comment.