Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@ public abstract class AbstractPortUnificationServer extends AbstractServer {
protocol name --> URL object
wire protocol will get url object to config server pipeline for channel
*/
private final Map<String, URL> supportedUrls = new ConcurrentHashMap<>();
private Map<String, URL> supportedUrls;

/*
protocol name --> ChannelHandler object
wire protocol will get handler to config server pipeline for channel
(for triple protocol, it's a default handler that do nothing)
*/
private final Map<String, ChannelHandler> supportedHandlers = new ConcurrentHashMap<>();
private Map<String, ChannelHandler> supportedHandlers;

public AbstractPortUnificationServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
Expand All @@ -63,6 +63,10 @@ public Map<String, WireProtocol> getProtocols() {

@Override
protected final void doOpen() {
// initialize supportedUrls and supportedHandlers before potential usage to avoid NPE.
supportedUrls = new ConcurrentHashMap<>();
supportedHandlers = new ConcurrentHashMap<>();

ExtensionLoader<WireProtocol> loader =
getUrl().getOrDefaultFrameworkModel().getExtensionLoader(WireProtocol.class);
Map<String, WireProtocol> protocols = loader.getActivateExtension(getUrl(), new String[0]).stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public abstract class AbstractTimerTask implements TimerTask {
this.channelProvider = channelProvider;
this.hashedWheelTimer = hashedWheelTimer;
this.tick = tick;
start();
// do not start here because inheritor should set additional timeout parameters before doing task.
Copy link

Copilot AI Aug 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The word 'inheritor' should be 'subclass' or 'subclasses' for better clarity and standard terminology.

Suggested change
// do not start here because inheritor should set additional timeout parameters before doing task.
// do not start here because subclass should set additional timeout parameters before doing task.

Copilot uses AI. Check for mistakes.
}

static Long lastRead(Channel channel) {
Expand All @@ -57,7 +57,7 @@ static Long now() {
return System.currentTimeMillis();
}

private void start() {
protected void start() {
this.timeout = hashedWheelTimer.newTimeout(this, tick, TimeUnit.MILLISECONDS);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public CloseTimerTask(
ChannelProvider channelProvider, HashedWheelTimer hashedWheelTimer, Long tick, int closeTimeout) {
super(channelProvider, hashedWheelTimer, tick);
this.closeTimeout = closeTimeout;
start();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class HeartbeatTimerTask extends AbstractTimerTask {
ChannelProvider channelProvider, HashedWheelTimer hashedWheelTimer, Long heartbeatTick, int heartbeat) {
super(channelProvider, hashedWheelTimer, heartbeatTick);
this.heartbeat = heartbeat;
start();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public ReconnectTimerTask(
int idleTimeout) {
super(channelProvider, hashedWheelTimer, heartbeatTimeoutTick);
this.idleTimeout = idleTimeout;
start();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@

public abstract class AbstractClient extends AbstractEndpoint implements Client {

private final Lock connectLock = new ReentrantLock();
private Lock connectLock;

private final boolean needReconnect;

Expand All @@ -64,6 +64,10 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client

public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);

// initialize connectLock before calling connect()
connectLock = new ReentrantLock();

// set default needReconnect true when channel is not connected
needReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ public MultiMessageHandler(ChannelHandler handler) {
super(handler);
}

@SuppressWarnings("unchecked")
@Override
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof MultiMessage) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,23 @@ public class NettyHttp3Server extends AbstractServer {
private EventLoopGroup bossGroup;
private io.netty.channel.Channel channel;

private final Consumer<ChannelPipeline> pipelineConfigurator;
private final int serverShutdownTimeoutMills;
private Consumer<ChannelPipeline> pipelineConfigurator;
private int serverShutdownTimeoutMills;

@SuppressWarnings("unchecked")
public NettyHttp3Server(URL url, ChannelHandler handler) throws RemotingException {
super(url, ChannelHandlers.wrap(handler, url));
pipelineConfigurator = (Consumer<ChannelPipeline>) getUrl().getAttribute(PIPELINE_CONFIGURATOR_KEY);
Objects.requireNonNull(pipelineConfigurator, "pipelineConfigurator should be set");
serverShutdownTimeoutMills = ConfigurationUtils.getServerShutdownTimeout(getUrl().getOrDefaultModuleModel());
}

@Override
@SuppressWarnings("unchecked")
protected void doOpen() throws Throwable {
bootstrap = new Bootstrap();

// initialize pipelineConfigurator and serverShutdownTimeoutMills before potential usage to avoid NPE.
pipelineConfigurator = (Consumer<ChannelPipeline>) getUrl().getAttribute(PIPELINE_CONFIGURATOR_KEY);
Objects.requireNonNull(pipelineConfigurator, "pipelineConfigurator should be set");
serverShutdownTimeoutMills = ConfigurationUtils.getServerShutdownTimeout(getUrl().getOrDefaultModuleModel());

bossGroup = NettyEventLoopFactory.eventLoopGroup(1, EVENT_LOOP_BOSS_POOL_NAME);
NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

Expand All @@ -56,7 +55,8 @@
*/
public class NettyPortUnificationServer extends AbstractPortUnificationServer {

private Map<String, Channel> dubboChannels = new ConcurrentHashMap<>(); // <ip:port, channel>
// <ip:port, channel>
private Map<String, Channel> dubboChannels;

private ServerBootstrap bootstrap;

Expand Down Expand Up @@ -95,6 +95,7 @@ protected void doOpen0() {
bootstrap = new ServerBootstrap(channelFactory);

final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
// set dubboChannels
dubboChannels = nettyHandler.getChannels();
// https://issues.jboss.org/browse/NETTY-365
// https://issues.jboss.org/browse/NETTY-379
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public class NettyServer extends AbstractServer implements RemotingServer {

private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(NettyServer.class);

private Map<String, Channel> channels; // <ip:port, channel>
// <ip:port, channel>
private Map<String, Channel> channels;

private ServerBootstrap bootstrap;

Expand All @@ -75,6 +76,7 @@ protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap(channelFactory);

final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
// set channels
channels = nettyHandler.getChannels();
// https://issues.jboss.org/browse/NETTY-365
// https://issues.jboss.org/browse/NETTY-379
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
*/
public class NettyPortUnificationServer extends AbstractPortUnificationServer {

private final int serverShutdownTimeoutMills;
private int serverShutdownTimeoutMills;
/**
* netty server bootstrap.
*/
Expand All @@ -69,16 +69,10 @@ public class NettyPortUnificationServer extends AbstractPortUnificationServer {

private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private final Map<String, Channel> dubboChannels = new ConcurrentHashMap<>();
private Map<String, Channel> dubboChannels;

public NettyPortUnificationServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, ChannelHandlers.wrap(handler, url));

// you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in
// CommonConstants.
// the handler will be wrapped: MultiMessageHandler->HeartbeatHandler->handler
// read config before destroy
serverShutdownTimeoutMills = ConfigurationUtils.getServerShutdownTimeout(getUrl().getOrDefaultModuleModel());
}

@Override
Expand All @@ -103,6 +97,15 @@ public void bind() throws Throwable {
public void doOpen0() {
bootstrap = new ServerBootstrap();

// initialize dubboChannels and serverShutdownTimeoutMills before potential usage to avoid NPE.
dubboChannels = new ConcurrentHashMap<>();

// you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in
// CommonConstants.
// the handler will be wrapped: MultiMessageHandler->HeartbeatHandler->handler
// read config before destroy
serverShutdownTimeoutMills = ConfigurationUtils.getServerShutdownTimeout(getUrl().getOrDefaultModuleModel());

bossGroup = NettyEventLoopFactory.eventLoopGroup(1, EVENT_LOOP_BOSS_POOL_NAME);
workerGroup = NettyEventLoopFactory.eventLoopGroup(
getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,13 @@ public class NettyServer extends AbstractServer {

private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private final int serverShutdownTimeoutMills;
private int serverShutdownTimeoutMills;

public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
// you can customize name and type of client thread pool by THREAD_NAME_KEY and THREAD_POOL_KEY in
// CommonConstants.
// the handler will be wrapped: MultiMessageHandler->HeartbeatHandler->handler
super(url, ChannelHandlers.wrap(handler, url));

// read config before destroy
serverShutdownTimeoutMills = ConfigurationUtils.getServerShutdownTimeout(getUrl().getOrDefaultModuleModel());
}

/**
Expand All @@ -99,6 +96,10 @@ public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();

// initialize serverShutdownTimeoutMills before potential usage to avoid NPE.
// read config before destroy
serverShutdownTimeoutMills = ConfigurationUtils.getServerShutdownTimeout(getUrl().getOrDefaultModuleModel());

bossGroup = createBossGroup();
workerGroup = createWorkerGroup();

Expand Down
Loading