Skip to content

Use Throttling Netty Write Handler on HTTP Path #84751

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/84751.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 84751
summary: Use Throttling Netty Write Handler on HTTP Path
area: Network
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.elasticsearch.http.HttpServerChannel;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.netty4.Netty4Utils;
import org.elasticsearch.transport.netty4.Netty4WriteThrottlingHandler;
import org.elasticsearch.transport.netty4.NettyAllocator;
import org.elasticsearch.transport.netty4.NettyByteBufSizer;
import org.elasticsearch.transport.netty4.SharedGroupFactory;
Expand Down Expand Up @@ -303,6 +304,7 @@ protected HttpChannelHandler(final Netty4HttpServerTransport transport, final Ht
protected void initChannel(Channel ch) throws Exception {
Netty4HttpChannel nettyHttpChannel = new Netty4HttpChannel(ch);
ch.attr(HTTP_CHANNEL_KEY).set(nettyHttpChannel);
ch.pipeline().addLast("chunked_writer", new Netty4WriteThrottlingHandler());
ch.pipeline().addLast("byte_buf_sizer", NettyByteBufSizer.INSTANCE);
ch.pipeline().addLast("read_timeout", new ReadTimeoutHandler(transport.readTimeoutMillis, TimeUnit.MILLISECONDS));
final HttpRequestDecoder decoder = new HttpRequestDecoder(
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.transport.netty4;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.InboundPipeline;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.Transports;

/**
* A handler (must be the last one!) that does size based frame decoding and forwards the actual message
* to the relevant action.
*/
public class Netty4MessageInboundHandler extends ChannelInboundHandlerAdapter {

private final Netty4Transport transport;

private final InboundPipeline pipeline;

public Netty4MessageInboundHandler(Netty4Transport transport, Recycler<BytesRef> recycler) {
this.transport = transport;
final ThreadPool threadPool = transport.getThreadPool();
final Transport.RequestHandlers requestHandlers = transport.getRequestHandlers();
this.pipeline = new InboundPipeline(
transport.getVersion(),
transport.getStatsTracker(),
recycler,
threadPool::relativeTimeInMillis,
transport.getInflightBreaker(),
requestHandlers::getHandler,
transport::inboundMessage,
transport.ignoreDeserializationErrors()
);
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
assert Transports.assertDefaultThreadContext(transport.getThreadPool().getThreadContext());
assert Transports.assertTransportThread();
assert msg instanceof ByteBuf : "Expected message type ByteBuf, found: " + msg.getClass();

final ByteBuf buffer = (ByteBuf) msg;
Netty4TcpChannel channel = ctx.channel().attr(Netty4Transport.CHANNEL_KEY).get();
final BytesReference wrapped = Netty4Utils.toBytesReference(buffer);
try (ReleasableBytesReference reference = new ReleasableBytesReference(wrapped, new ByteBufRefCounted(buffer))) {
pipeline.handleBytes(channel, reference);
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
assert Transports.assertDefaultThreadContext(transport.getThreadPool().getThreadContext());
ExceptionsHelper.maybeDieOnAnotherThread(cause);
final Throwable unwrapped = ExceptionsHelper.unwrap(cause, ElasticsearchException.class);
final Throwable newCause = unwrapped != null ? unwrapped : cause;
Netty4TcpChannel tcpChannel = ctx.channel().attr(Netty4Transport.CHANNEL_KEY).get();
if (newCause instanceof Error) {
transport.onException(tcpChannel, new Exception(newCause));
} else {
transport.onException(tcpChannel, (Exception) newCause);
}
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Releasables.closeExpectNoException(pipeline);
super.channelInactive(ctx);
}

private record ByteBufRefCounted(ByteBuf buffer) implements RefCounted {

@Override
public void incRef() {
buffer.retain();
}

@Override
public boolean tryIncRef() {
if (hasReferences() == false) {
return false;
}
try {
buffer.retain();
} catch (RuntimeException e) {
assert hasReferences() == false;
return false;
}
return true;
}

@Override
public boolean decRef() {
return buffer.release();
}

@Override
public boolean hasReferences() {
return buffer.refCnt() > 0;
}
}
}
Loading