Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release ByteBuf when handle onData failed #13102

Merged
merged 8 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,7 @@ void handleH2TransportError(TriRpcStatus status) {

void finishProcess(TriRpcStatus status, Http2Headers trailers, boolean isReturnTriException) {
final Map<String, String> reserved = filterReservedHeaders(trailers);
final Map<String, Object> attachments = headersToMap(trailers, () -> {
return reserved.get(TripleHeaderEnum.TRI_HEADER_CONVERT.getHeader());
});
final Map<String, Object> attachments = headersToMap(trailers, () -> reserved.get(TripleHeaderEnum.TRI_HEADER_CONVERT.getHeader()));
final TriRpcStatus detailStatus;
final TriRpcStatus statusFromTrailers = getStatusFromTrailers(reserved);
if (statusFromTrailers != null) {
Expand Down Expand Up @@ -449,23 +447,33 @@ public void onHeader(Http2Headers headers, boolean endStream) {

@Override
public void onData(ByteBuf data, boolean endStream) {
executor.execute(() -> {
if (transportError != null) {
transportError.appendDescription(
"Data:" + data.toString(StandardCharsets.UTF_8));
ReferenceCountUtil.release(data);
if (transportError.description.length() > 512 || endStream) {
handleH2TransportError(transportError);
}
return;
}
if (!headerReceived) {
handleH2TransportError(TriRpcStatus.INTERNAL.withDescription(
"headers not received before payload"));
return;
try {
executor.execute(() -> doOnData(data, endStream));
} catch (Throwable t) {
// Tasks will be rejected when the thread pool is closed or full,
// ByteBuf needs to be released to avoid out of heap memory leakage.
// For example, ThreadLessExecutor will be shutdown when request timeout {@link AsyncRpcResult}
ReferenceCountUtil.release(data);
LOGGER.error(PROTOCOL_FAILED_RESPONSE, "", "", "submit onData task failed", t);
}
}

private void doOnData(ByteBuf data, boolean endStream) {
if (transportError != null) {
transportError.appendDescription(
"Data:" + data.toString(StandardCharsets.UTF_8));
ReferenceCountUtil.release(data);
if (transportError.description.length() > 512 || endStream) {
handleH2TransportError(transportError);
}
deframer.deframe(data);
});
return;
}
if (!headerReceived) {
handleH2TransportError(TriRpcStatus.INTERNAL.withDescription(
"headers not received before payload"));
return;
}
deframer.deframe(data);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@

package org.apache.dubbo.rpc.protocol.tri.stream;

import io.netty.util.ReferenceCountUtil;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.rpc.HeaderFilter;
Expand Down Expand Up @@ -69,10 +70,11 @@
import java.util.concurrent.Executor;

import static io.netty.handler.codec.http.HttpResponseStatus.OK;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAILED_REQUEST;

public class TripleServerStream extends AbstractStream implements ServerStream {

private static final Logger LOGGER = LoggerFactory.getLogger(TripleServerStream.class);
private static final ErrorTypeAwareLogger LOGGER = LoggerFactory.getErrorTypeAwareLogger(TripleServerStream.class);
public final ServerTransportObserver transportObserver = new ServerTransportObserver();
private final TripleWriteQueue writeQueue;
private final PathResolver pathResolver;
Expand Down Expand Up @@ -408,11 +410,10 @@ private void processHeader(Http2Headers headers, boolean endStream) {
}
}

Map<String, Object> requestMetadata = headersToMap(headers, () -> {
return Optional.ofNullable(headers.get(TripleHeaderEnum.TRI_HEADER_CONVERT.getHeader()))
.map(CharSequence::toString)
.orElse(null);
});
Map<String, Object> requestMetadata = headersToMap(headers, () ->
Optional.ofNullable(headers.get(TripleHeaderEnum.TRI_HEADER_CONVERT.getHeader()))
.map(CharSequence::toString)
.orElse(null));
boolean hasStub = pathResolver.hasNativeStub(path);
if (hasStub) {
listener = new StubAbstractServerCall(invoker, TripleServerStream.this,
Expand All @@ -431,7 +432,15 @@ private void processHeader(Http2Headers headers, boolean endStream) {

@Override
public void onData(ByteBuf data, boolean endStream) {
executor.execute(() -> doOnData(data, endStream));
try {
executor.execute(() -> doOnData(data, endStream));
} catch (Throwable t) {
// Tasks will be rejected when the thread pool is closed or full,
// ByteBuf needs to be released to avoid out of heap memory leakage.
// For example, ThreadLessExecutor will be shutdown when request timeout {@link AsyncRpcResult}
ReferenceCountUtil.release(data);
LOGGER.error(PROTOCOL_FAILED_REQUEST, "", "", "submit onData task failed", t);
}
}

private void doOnData(ByteBuf data, boolean endStream) {
Expand All @@ -454,10 +463,8 @@ public void cancelByRemote(long errorCode) {
if (listener == null) {
return;
}
executor.execute(() -> {
listener.onCancelByRemote(TriRpcStatus.CANCELLED
.withDescription("Canceled by client ,errorCode=" + errorCode));
});
executor.execute(() -> listener.onCancelByRemote(TriRpcStatus.CANCELLED
.withDescription("Canceled by client ,errorCode=" + errorCode)));
}
}

Expand Down
Loading