Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,10 @@ public static void logThreadPoolStatus() {
List<ThreadPoolStatusMonitor> monitors = threadPoolWrapper.getStatusPrinters();
for (ThreadPoolStatusMonitor monitor : monitors) {
double value = monitor.value(threadPoolWrapper.getThreadPoolExecutor());
String nameFormatted = String.format("%-40s", threadPoolWrapper.getName());
String descFormatted = String.format("%-12s", monitor.describe());
waterMarkLogger.info("{}{}{}", nameFormatted, descFormatted, value);
waterMarkLogger.info("\t{}\t{}\t{}", threadPoolWrapper.getName(),
monitor.describe(),
value);

if (enablePrintJstack) {
if (monitor.needPrintJstack(threadPoolWrapper.getThreadPoolExecutor(), value) &&
System.currentTimeMillis() - jstackTime > jstackPeriodTime) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,17 @@ public class ProxyConfig implements ConfigFile {
private boolean enableGrpcEpoll = false;
private int grpcThreadPoolNums = 16 + PROCESSOR_NUMBER * 2;
private int grpcThreadPoolQueueCapacity = 100000;

/**
* Maximum number of concurrent gRPC calls allowed per client connection.
* <p>
* A single client issuing excessively high concurrent requests may skew the validation load balancing
* and overload a single proxy instance (hotspot), potentially bringing it down. Limiting
* {@code grpcMaxConcurrentCallsPerConnection} helps mitigate this per-connection hotspot risk.
* <p>
* Note: Setting this limit too low may cause send/consume failures (e.g., backpressure or rejected calls).
*/
private int grpcMaxConcurrentCallsPerConnection = Integer.MAX_VALUE;
private String brokerConfigPath = ConfigurationManager.getProxyHome() + "/conf/broker.conf";
/**
* gRPC max message size
Expand Down Expand Up @@ -1572,4 +1583,12 @@ public int getReturnHandleGroupThreadPoolNums() {
public void setReturnHandleGroupThreadPoolNums(int returnHandleGroupThreadPoolNums) {
this.returnHandleGroupThreadPoolNums = returnHandleGroupThreadPoolNums;
}

public int getGrpcMaxConcurrentCallsPerConnection() {
return grpcMaxConcurrentCallsPerConnection;
}

public void setGrpcMaxConcurrentCallsPerConnection(int grpcMaxConcurrentCallsPerConnection) {
this.grpcMaxConcurrentCallsPerConnection = grpcMaxConcurrentCallsPerConnection;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@
import io.grpc.netty.shaded.io.netty.channel.epoll.EpollServerSocketChannel;
import io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoopGroup;
import io.grpc.netty.shaded.io.netty.channel.socket.nio.NioServerSocketChannel;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
import org.apache.rocketmq.proxy.grpc.interceptor.ContextInterceptor;
import org.apache.rocketmq.proxy.grpc.interceptor.GlobalExceptionInterceptor;
import org.apache.rocketmq.proxy.grpc.interceptor.HeaderInterceptor;

import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.proxy.service.cert.TlsCertificateManager;

public class GrpcServerBuilder {
Expand All @@ -52,18 +52,20 @@ public static GrpcServerBuilder newBuilder(ThreadPoolExecutor executor, int port
}

protected GrpcServerBuilder(ThreadPoolExecutor executor, int port, TlsCertificateManager tlsCertificateManager) {
ProxyConfig config = ConfigurationManager.getProxyConfig();
this.tlsCertificateManager = tlsCertificateManager;
serverBuilder = NettyServerBuilder.forPort(port);
serverBuilder = NettyServerBuilder.forPort(port)
.maxConcurrentCallsPerConnection(config.getGrpcMaxConcurrentCallsPerConnection());

serverBuilder.protocolNegotiator(new ProxyAndTlsProtocolNegotiator());

// build server
int bossLoopNum = ConfigurationManager.getProxyConfig().getGrpcBossLoopNum();
int workerLoopNum = ConfigurationManager.getProxyConfig().getGrpcWorkerLoopNum();
int maxInboundMessageSize = ConfigurationManager.getProxyConfig().getGrpcMaxInboundMessageSize();
long idleTimeMills = ConfigurationManager.getProxyConfig().getGrpcClientIdleTimeMills();
int bossLoopNum = config.getGrpcBossLoopNum();
int workerLoopNum = config.getGrpcWorkerLoopNum();
int maxInboundMessageSize = config.getGrpcMaxInboundMessageSize();
long idleTimeMills = config.getGrpcClientIdleTimeMills();

if (ConfigurationManager.getProxyConfig().isEnableGrpcEpoll()) {
if (config.isEnableGrpcEpoll()) {
serverBuilder.bossEventLoopGroup(new EpollEventLoopGroup(bossLoopNum))
.workerEventLoopGroup(new EpollEventLoopGroup(workerLoopNum))
.channelType(EpollServerSocketChannel.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.channel.Channel;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.auth.config.AuthConfig;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.future.FutureTaskExt;
Expand Down Expand Up @@ -59,12 +64,6 @@
import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.protocol.ResponseCode;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOutClient {
private final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);

Expand Down Expand Up @@ -192,7 +191,7 @@ public RemotingProtocolServer(MessagingProcessor messagingProcessor, TlsCertific
this.timerExecutor = ThreadUtils.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("RemotingServerScheduler-%d").build()
);
this.timerExecutor.scheduleAtFixedRate(this::cleanExpireRequest, 10, 10, TimeUnit.SECONDS);
this.timerExecutor.scheduleAtFixedRate(this::cleanExpireRequest, 100, 100, TimeUnit.MILLISECONDS);

this.registerRemotingServer(this.defaultRemotingServer);
}
Expand Down
2 changes: 1 addition & 1 deletion proxy/src/main/resources/rmq.proxy.logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
<maxFileSize>128MB</maxFileSize>
</triggeringPolicy>
<encoder>
<pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %m%n</pattern>
<pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8}%m%n</pattern>
<charset class="java.nio.charset.Charset">UTF-8</charset>
</encoder>
</appender>
Expand Down
Loading