Skip to content

Commit

Permalink
Merge branch 'apache-3.2' into apache-3.3
Browse files Browse the repository at this point in the history
# Conflicts:
#	dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/pu/AbstractPortUnificationServer.java
#	dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/AbstractServerCall.java
#	dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
#	dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java
#	dubbo-serialization/dubbo-serialization-fastjson2/src/main/java/org/apache/dubbo/common/serialize/fastjson2/Fastjson2ScopeModelInitializer.java
  • Loading branch information
AlbumenJ committed Feb 13, 2025
2 parents c58a699 + 3627fdb commit 6312f14
Show file tree
Hide file tree
Showing 19 changed files with 126 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ public CertManager(FrameworkModel frameworkModel) {

public ProviderCert getProviderConnectionConfig(URL localAddress, SocketAddress remoteAddress) {
for (CertProvider certProvider : certProviders) {
if (certProvider.isSupport(localAddress)) {
ProviderCert cert = certProvider.getProviderConnectionConfig(localAddress);
if (certProvider.isSupport(localAddress, remoteAddress)) {
ProviderCert cert = certProvider.getProviderConnectionConfig(localAddress, remoteAddress);
if (cert != null) {
return cert;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,21 @@
import org.apache.dubbo.common.extension.ExtensionScope;
import org.apache.dubbo.common.extension.SPI;

import java.net.SocketAddress;

@SPI(scope = ExtensionScope.FRAMEWORK)
public interface CertProvider {
boolean isSupport(URL address);

default boolean isSupport(URL address, SocketAddress remoteAddress) {
return isSupport(address);
}

ProviderCert getProviderConnectionConfig(URL localAddress);

default ProviderCert getProviderConnectionConfig(URL localAddress, SocketAddress remoteAddress) {
return getProviderConnectionConfig(localAddress);
}

Cert getConsumerConnectionConfig(URL remoteAddress);
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,10 @@ public void unsubscribe(String serviceName, String group, EventListener eventLis
}
}

public List<Instance> getAllInstances(String serviceName, String group) throws NacosException {
return apply(
() -> nacosConnectionManager.getNamingService().getAllInstances(handleInnerSymbol(serviceName), group));
public List<Instance> getAllInstancesWithoutSubscription(String serviceName, String group) throws NacosException {
return apply(() -> nacosConnectionManager
.getNamingService()
.getAllInstances(handleInnerSymbol(serviceName), group, false));
}

public void registerInstance(String serviceName, String group, Instance instance) throws NacosException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,8 @@ public List<URL> lookup(final URL url) {
List<URL> urls = new LinkedList<>();
Set<String> serviceNames = getServiceNames(url, null);
for (String serviceName : serviceNames) {
List<Instance> instances =
namingService.getAllInstances(serviceName, getUrl().getGroup(Constants.DEFAULT_GROUP));
List<Instance> instances = namingService.getAllInstancesWithoutSubscription(
serviceName, getUrl().getGroup(Constants.DEFAULT_GROUP));
urls.addAll(buildURLs(url, instances));
}
return urls;
Expand Down Expand Up @@ -270,8 +270,8 @@ private void doSubscribe(final URL url, final NacosAggregateListener listener, f
* in https://github.com/apache/dubbo/issues/5978
*/
for (String serviceName : serviceNames) {
List<Instance> instances =
namingService.getAllInstances(serviceName, getUrl().getGroup(Constants.DEFAULT_GROUP));
List<Instance> instances = namingService.getAllInstancesWithoutSubscription(
serviceName, getUrl().getGroup(Constants.DEFAULT_GROUP));
notifySubscriber(url, serviceName, listener, instances);
}
for (String serviceName : serviceNames) {
Expand All @@ -280,8 +280,8 @@ private void doSubscribe(final URL url, final NacosAggregateListener listener, f
} else {
for (String serviceName : serviceNames) {
List<Instance> instances = new LinkedList<>();
instances.addAll(
namingService.getAllInstances(serviceName, getUrl().getGroup(Constants.DEFAULT_GROUP)));
instances.addAll(namingService.getAllInstancesWithoutSubscription(
serviceName, getUrl().getGroup(Constants.DEFAULT_GROUP)));
String serviceInterface = serviceName;
String[] segments = serviceName.split(SERVICE_NAME_SEPARATOR, -1);
if (segments.length == 4) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ public List<Instance> getAllInstances(String serviceName, boolean subscribe) thr
}

@Override
public List<Instance> getAllInstances(String serviceName, String groupName, boolean subscribe) {
public List<Instance> getAllInstances(String serviceName, String groupName, boolean subscribe)
throws NacosException {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,7 @@ void testSuccess() {
public void registerInstance(String serviceName, String groupName, Instance instance) {}

@Override
public List<Instance> getAllInstances(String serviceName, String groupName) {
public List<Instance> getAllInstances(String serviceName, String groupName, boolean subscribe) {
return null;
}
};
Expand All @@ -674,7 +674,7 @@ public List<Instance> getAllInstances(String serviceName, String groupName) {
Assertions.fail(e);
}
try {
nacosNamingServiceWrapper.getAllInstances("Test", "Test");
nacosNamingServiceWrapper.getAllInstancesWithoutSubscription("Test", "Test");
} catch (NacosException e) {
Assertions.fail(e);
}
Expand All @@ -690,7 +690,8 @@ public void registerInstance(String serviceName, String groupName, Instance inst
}

@Override
public List<Instance> getAllInstances(String serviceName, String groupName) throws NacosException {
public List<Instance> getAllInstances(String serviceName, String groupName, boolean subscribe)
throws NacosException {
throw new NacosException();
}
};
Expand All @@ -699,7 +700,9 @@ public List<Instance> getAllInstances(String serviceName, String groupName) thro
new NacosNamingServiceWrapper(new NacosConnectionManager(namingService), 0, 0);
Assertions.assertThrows(
NacosException.class, () -> nacosNamingServiceWrapper.registerInstance("Test", "Test", null));
Assertions.assertThrows(NacosException.class, () -> nacosNamingServiceWrapper.getAllInstances("Test", "Test"));
Assertions.assertThrows(
NacosException.class,
() -> nacosNamingServiceWrapper.getAllInstancesWithoutSubscription("Test", "Test"));
}

@Test
Expand All @@ -717,7 +720,8 @@ public void registerInstance(String serviceName, String groupName, Instance inst
}

@Override
public List<Instance> getAllInstances(String serviceName, String groupName) throws NacosException {
public List<Instance> getAllInstances(String serviceName, String groupName, boolean subscribe)
throws NacosException {
if (count2.incrementAndGet() < 10) {
throw new NacosException();
}
Expand All @@ -735,9 +739,11 @@ public List<Instance> getAllInstances(String serviceName, String groupName) thro
Assertions.fail(e);
}

Assertions.assertThrows(NacosException.class, () -> nacosNamingServiceWrapper.getAllInstances("Test", "Test"));
Assertions.assertThrows(
NacosException.class,
() -> nacosNamingServiceWrapper.getAllInstancesWithoutSubscription("Test", "Test"));
try {
nacosNamingServiceWrapper.getAllInstances("Test", "Test");
nacosNamingServiceWrapper.getAllInstancesWithoutSubscription("Test", "Test");
} catch (NacosException e) {
Assertions.fail(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ public interface Constants {

List<String> REST_SERVER = Arrays.asList("jetty", "tomcat", "netty");
String CONTENT_LENGTH_KEY = "content-length";
String SSL_SESSION_KEY = "ssl-session";

String CONNECTION_HANDLER_NAME = "connectionHandler";
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,12 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.dubbo.common.constants.CommonConstants.COMMA_SEPARATOR;
import static org.apache.dubbo.common.constants.CommonConstants.EXT_PROTOCOL;

public abstract class AbstractPortUnificationServer extends AbstractServer {

/**
* extension name -> activate WireProtocol
*/
private final Map<String, WireProtocol> protocols;
private volatile Map<String, WireProtocol> protocols;

/*
protocol name --> URL object
Expand All @@ -55,23 +52,24 @@ public abstract class AbstractPortUnificationServer extends AbstractServer {

public AbstractPortUnificationServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
ExtensionLoader<WireProtocol> loader = url.getOrDefaultFrameworkModel().getExtensionLoader(WireProtocol.class);
Map<String, WireProtocol> protocols = loader.getActivateExtension(url, new String[0]).stream()
.collect(Collectors.toConcurrentMap(loader::getExtensionName, Function.identity()));
// load extra protocols
String extraProtocols = url.getParameter(EXT_PROTOCOL);
if (StringUtils.isNotEmpty(extraProtocols)) {
Arrays.stream(extraProtocols.split(COMMA_SEPARATOR)).forEach(p -> {
protocols.put(p, loader.getExtension(p));
});
}
this.protocols = protocols;
}

public Map<String, WireProtocol> getProtocols() {
return protocols;
}

@Override
protected final void doOpen() {
ExtensionLoader<WireProtocol> extensionLoader =
getUrl().getOrDefaultFrameworkModel().getExtensionLoader(WireProtocol.class);
this.protocols = extensionLoader.getActivateExtension(getUrl(), new String[0]).stream()
.collect(Collectors.toConcurrentMap(extensionLoader::getExtensionName, Function.identity()));

doOpen0();
}

protected abstract void doOpen0();

/*
This method registers URL object and corresponding channel handler to pu server.
In PuServerExchanger.bind, this method is called with ConcurrentHashMap.computeIfPresent to register messages to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,7 @@ public void connected(Channel ch) throws RemotingException {
@Override
public void disconnected(Channel ch) throws RemotingException {
if (getChannelsSize() == 0) {
logger.warn(
INTERNAL_ERROR,
"unknown error in remoting module",
"",
logger.info(
"All clients has disconnected from " + ch.getLocalAddress() + ". You can graceful shutdown now.");
}
super.disconnected(ch);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void bind() {
}

@Override
protected void doOpen() {
protected void doOpen0() {
NettyHelper.setNettyLoggerFactory();
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory(EVENT_LOOP_BOSS_POOL_NAME, true));
ExecutorService worker =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void bind() throws Throwable {
}

@Override
public void doOpen() throws Throwable {
public void doOpen0() {
bootstrap = new ServerBootstrap();

bossGroup = NettyEventLoopFactory.eventLoopGroup(1, EVENT_LOOP_BOSS_POOL_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.dubbo.common.ssl.CertManager;
import org.apache.dubbo.common.ssl.ProviderCert;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.remoting.api.ProtocolDetector;
import org.apache.dubbo.remoting.api.WireProtocol;
import org.apache.dubbo.remoting.buffer.ChannelBuffer;
Expand All @@ -44,6 +45,7 @@
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.ssl.SslHandshakeCompletionEvent;
import io.netty.util.AttributeKey;

import static org.apache.dubbo.common.constants.LoggerCodeConstants.INTERNAL_ERROR;

Expand All @@ -57,6 +59,7 @@ public class NettyPortUnificationServerHandler extends ByteToMessageDecoder {
private final Map<String, WireProtocol> protocols;
private final Map<String, URL> urlMapper;
private final Map<String, ChannelHandler> handlerMapper;
private static final AttributeKey<SSLSession> SSL_SESSION_KEY = AttributeKey.valueOf(Constants.SSL_SESSION_KEY);

public NettyPortUnificationServerHandler(
URL url,
Expand Down Expand Up @@ -91,6 +94,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
SSLSession session =
ctx.pipeline().get(SslHandler.class).engine().getSession();
LOGGER.info("TLS negotiation succeed with session: " + session);
ctx.channel().attr(SSL_SESSION_KEY).set(session);
} else {
LOGGER.error(
INTERNAL_ERROR,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,19 @@
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.Constants;

import javax.net.ssl.SSLSession;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.ssl.SslHandshakeCompletionEvent;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.AttributeKey;

/**
* NettyServerHandler.
Expand All @@ -43,6 +48,8 @@ public class NettyServerHandler extends ChannelDuplexHandler {
*/
private final Map<String, Channel> channels = new ConcurrentHashMap<>();

private static final AttributeKey<SSLSession> SSL_SESSION_KEY = AttributeKey.valueOf(Constants.SSL_SESSION_KEY);

private final URL url;

private final ChannelHandler handler;
Expand Down Expand Up @@ -128,6 +135,15 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
}
}
super.userEventTriggered(ctx, evt);
if (evt instanceof SslHandshakeCompletionEvent) {
SslHandshakeCompletionEvent handshakeEvent = (SslHandshakeCompletionEvent) evt;
if (handshakeEvent.isSuccess()) {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
channel.setAttribute(
Constants.SSL_SESSION_KEY,
ctx.channel().attr(SSL_SESSION_KEY).get());
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.remoting.Constants;

import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLSession;
Expand All @@ -28,13 +29,14 @@
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.ssl.SslHandshakeCompletionEvent;
import io.netty.util.AttributeKey;

import static org.apache.dubbo.common.constants.LoggerCodeConstants.INTERNAL_ERROR;

public class SslClientTlsHandler extends ChannelInboundHandlerAdapter {

private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(SslClientTlsHandler.class);

private static final AttributeKey<SSLSession> SSL_SESSION_KEY = AttributeKey.valueOf(Constants.SSL_SESSION_KEY);
private final SslContext sslContext;

public SslClientTlsHandler(URL url) {
Expand All @@ -60,6 +62,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
ctx.pipeline().get(SslHandler.class).engine().getSession();
logger.info("TLS negotiation succeed with: " + session.getPeerHost());
ctx.pipeline().remove(this);
ctx.channel().attr(SSL_SESSION_KEY).set(session);
} else {
logger.error(
INTERNAL_ERROR,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.dubbo.common.ssl.AuthPolicy;
import org.apache.dubbo.common.ssl.CertManager;
import org.apache.dubbo.common.ssl.ProviderCert;
import org.apache.dubbo.remoting.Constants;

import javax.net.ssl.SSLSession;

Expand All @@ -34,6 +35,7 @@
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.ssl.SslHandshakeCompletionEvent;
import io.netty.util.AttributeKey;

import static org.apache.dubbo.common.constants.LoggerCodeConstants.INTERNAL_ERROR;

Expand All @@ -43,6 +45,7 @@ public class SslServerTlsHandler extends ByteToMessageDecoder {
private final URL url;

private final boolean sslDetected;
private static final AttributeKey<SSLSession> SSL_SESSION_KEY = AttributeKey.valueOf(Constants.SSL_SESSION_KEY);

public SslServerTlsHandler(URL url) {
this.url = url;
Expand Down Expand Up @@ -74,6 +77,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
logger.info("TLS negotiation succeed with: " + session.getPeerHost());
// Remove after handshake success.
ctx.pipeline().remove(this);
ctx.channel().attr(SSL_SESSION_KEY).set(session);
} else {
logger.error(
INTERNAL_ERROR,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ public void encode(Channel channel, OutputStream output, Object message) throws
public Object decode(Channel channel, InputStream input) throws IOException {
int contentLength = input.available();
getAttributes().put(Constants.CONTENT_LENGTH_KEY, contentLength);
Object sslSession = channel.getAttribute(Constants.SSL_SESSION_KEY);
if (null != sslSession) {
put(Constants.SSL_SESSION_KEY, sslSession);
}

ObjectInput in = CodecSupport.getSerialization(serializationType).deserialize(channel.getUrl(), input);
this.put(SERIALIZATION_ID_KEY, serializationType);
Expand Down
Loading

0 comments on commit 6312f14

Please sign in to comment.