diff --git a/conf/proxy.conf b/conf/proxy.conf index a119095218a94..fbb009878e315 100644 --- a/conf/proxy.conf +++ b/conf/proxy.conf @@ -230,6 +230,15 @@ webSocketServiceEnabled=false # Name of the cluster to which this broker belongs to clusterName= + +### --- Proxy Extensions + +# List of proxy extensions to load, which is a list of extension names +#proxyExtensions= + +# The directory to locate extensions +#proxyExtensionsDirectory= + ### --- Deprecated config variables --- ### # Deprecated. Use configurationStoreServers diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java new file mode 100644 index 0000000000000..844c7ca85abc0 --- /dev/null +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ExtensionsDefinitions.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.proxy.extensions; + +import lombok.Data; +import lombok.experimental.Accessors; + +import java.util.Map; +import java.util.TreeMap; + +/** + * The collection of Proxy Extensions. + */ +@Data +@Accessors(fluent = true) +class ExtensionsDefinitions { + + private final Map extensions = new TreeMap<>(); + +} diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtension.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtension.java new file mode 100644 index 0000000000000..b973e10128a5c --- /dev/null +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtension.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.proxy.extensions; + +import io.netty.channel.ChannelInitializer; +import io.netty.channel.socket.SocketChannel; +import org.apache.pulsar.common.classification.InterfaceAudience; +import org.apache.pulsar.common.classification.InterfaceStability; +import org.apache.pulsar.proxy.server.ProxyConfiguration; +import org.apache.pulsar.proxy.server.ProxyService; + +import java.net.InetSocketAddress; +import java.util.Map; + +/** + * The extension interface for support additional extensions on Pulsar Proxy. + */ +@InterfaceAudience.LimitedPrivate +@InterfaceStability.Evolving +public interface ProxyExtension extends AutoCloseable { + + /** + * Returns the unique extension name. For example, `kafka-v2` for extension for Kafka v2 protocol. + */ + String extensionName(); + + /** + * Verify if the extension can handle the given extension name. + * + * @param extension the extension to verify + * @return true if the extension can handle the given extension name, otherwise false. + */ + boolean accept(String extension); + + /** + * Initialize the extension when the extension is constructed from reflection. + * + *

The initialize should initialize all the resources required for serving the extension + * but don't start those resources until {@link #start(ProxyService)} is called. + * + * @param conf proxy service configuration + * @throws Exception when fail to initialize the extension. + */ + void initialize(ProxyConfiguration conf) throws Exception; + + /** + * Start the extension with the provided proxy service. + * + *

The proxy service provides the accesses to the Pulsar Proxy components. + * + * @param service the broker service to start with. + */ + void start(ProxyService service); + + /** + * Create the list of channel initializers for the ports that this extension + * will listen on. + * + *

NOTE: this method is called after {@link #start(ProxyService)}. + * + * @return the list of channel initializers for the ports that this extension listens on. + */ + Map> newChannelInitializers(); + + @Override + void close(); +} + diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionDefinition.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionDefinition.java new file mode 100644 index 0000000000000..65560260c2478 --- /dev/null +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionDefinition.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.proxy.extensions; + +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Metadata information about a Proxy Extension. + */ +@Data +@NoArgsConstructor +public class ProxyExtensionDefinition { + + /** + * The name of the extension. + */ + private String name; + + /** + * The description of the extension to be used for user help. + */ + private String description; + + /** + * The class name for the extension. + */ + private String extensionClass; + +} diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionMetadata.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionMetadata.java new file mode 100644 index 0000000000000..632c841e5afea --- /dev/null +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionMetadata.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.proxy.extensions; + +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.nio.file.Path; + +/** + * The metadata of Proxy Extension. + */ +@Data +@NoArgsConstructor +class ProxyExtensionMetadata { + + /** + * The definition of the extension. + */ + private ProxyExtensionDefinition definition; + + /** + * The path to the extension package. + */ + private Path archivePath; + +} diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionWithClassLoader.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionWithClassLoader.java new file mode 100644 index 0000000000000..1f6924a747166 --- /dev/null +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionWithClassLoader.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.proxy.extensions; + +import io.netty.channel.ChannelInitializer; +import io.netty.channel.socket.SocketChannel; +import lombok.Data; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.common.nar.NarClassLoader; +import org.apache.pulsar.proxy.server.ProxyConfiguration; +import org.apache.pulsar.proxy.server.ProxyService; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Map; + +/** + * A extension with its classloader. + */ +@Slf4j +@Data +@RequiredArgsConstructor +class ProxyExtensionWithClassLoader implements ProxyExtension { + + private final ProxyExtension extension; + private final NarClassLoader classLoader; + + @Override + public String extensionName() { + try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { + return extension.extensionName(); + } + } + + @Override + public boolean accept(String extensionName) { + try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { + return extension.accept(extensionName); + } + } + + @Override + public void initialize(ProxyConfiguration conf) throws Exception { + try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { + extension.initialize(conf); + } + } + + @Override + public void start(ProxyService service) { + try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { + extension.start(service); + } + } + + @Override + public Map> newChannelInitializers() { + try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { + return extension.newChannelInitializers(); + } + } + + @Override + public void close() { + try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) { + extension.close(); + } + + try { + classLoader.close(); + } catch (IOException e) { + log.warn("Failed to close the extension class loader", e); + } + } + + /** + * Help to switch the class loader of current thread to the NarClassLoader, and change it back when it's done. + * With the help of try-with-resources statement, the code would be cleaner than using try finally every time. + */ + private static class ClassLoaderSwitcher implements AutoCloseable { + private final ClassLoader prevClassLoader; + + ClassLoaderSwitcher(ClassLoader classLoader) { + prevClassLoader = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(classLoader); + } + + @Override + public void close() { + Thread.currentThread().setContextClassLoader(prevClassLoader); + } + } +} diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensions.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensions.java new file mode 100644 index 0000000000000..8f58a0938a58c --- /dev/null +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensions.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.proxy.extensions; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.socket.SocketChannel; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.proxy.server.ProxyConfiguration; +import org.apache.pulsar.proxy.server.ProxyService; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.Set; + +/** + * A collection of loaded extensions. + */ +@Slf4j +public class ProxyExtensions implements AutoCloseable { + + /** + * Load the extensions for the given extensions list. + * + * @param conf the pulsar broker service configuration + * @return the collection of extensions + */ + public static ProxyExtensions load(ProxyConfiguration conf) throws IOException { + ExtensionsDefinitions definitions = + ProxyExtensionsUtils.searchForExtensions( + conf.getProxyExtensionsDirectory(), conf.getNarExtractionDirectory()); + + ImmutableMap.Builder extensionsBuilder = ImmutableMap.builder(); + + conf.getProxyExtensions().forEach(extensionName -> { + + ProxyExtensionMetadata definition = definitions.extensions().get(extensionName); + if (null == definition) { + throw new RuntimeException("No extension is found for extension name `" + extensionName + + "`. Available extensions are : " + definitions.extensions()); + } + + ProxyExtensionWithClassLoader extension; + try { + extension = ProxyExtensionsUtils.load(definition, conf.getNarExtractionDirectory()); + } catch (IOException e) { + log.error("Failed to load the extension for extension `" + extensionName + "`", e); + throw new RuntimeException("Failed to load the extension for extension name `" + extensionName + "`"); + } + + if (!extension.accept(extensionName)) { + extension.close(); + log.error("Malformed extension found for extensionName `" + extensionName + "`"); + throw new RuntimeException("Malformed extension found for extension name `" + extensionName + "`"); + } + + extensionsBuilder.put(extensionName, extension); + log.info("Successfully loaded extension for extension name `{}`", extensionName); + }); + + return new ProxyExtensions(extensionsBuilder.build()); + } + + private final Map extensions; + + ProxyExtensions(Map extensions) { + this.extensions = extensions; + } + + /** + * Return the handler for the provided extension. + * + * @param extension the extension to use + * @return the extension to handle the provided extension + */ + public ProxyExtension extension(String extension) { + ProxyExtensionWithClassLoader h = extensions.get(extension); + if (null == h) { + return null; + } else { + return h.getExtension(); + } + } + + public void initialize(ProxyConfiguration conf) throws Exception { + for (ProxyExtension extension : extensions.values()) { + extension.initialize(conf); + } + } + + public Map>> newChannelInitializers() { + Map>> channelInitializers = Maps.newHashMap(); + Set addresses = Sets.newHashSet(); + + for (Map.Entry extension : extensions.entrySet()) { + Map> initializers = + extension.getValue().newChannelInitializers(); + initializers.forEach((address, initializer) -> { + if (!addresses.add(address)) { + log.error("extension for `{}` attempts to use {} for its listening port." + + " But it is already occupied by other extensions.", + extension.getKey(), address); + throw new RuntimeException("extension for `" + extension.getKey() + + "` attempts to use " + address + " for its listening port. But it is" + + " already occupied by other messaging extensions"); + } + channelInitializers.put(extension.getKey(), initializers); + }); + } + + return channelInitializers; + } + + public void start(ProxyService service) { + extensions.values().forEach(extension -> extension.start(service)); + } + + @Override + public void close() { + extensions.values().forEach(ProxyExtension::close); + } +} diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionsUtils.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionsUtils.java new file mode 100644 index 0000000000000..2f02827519c11 --- /dev/null +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/extensions/ProxyExtensionsUtils.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.proxy.extensions; + +import lombok.experimental.UtilityClass; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.common.nar.NarClassLoader; +import org.apache.pulsar.common.util.ObjectMapperFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collections; + +import static com.google.common.base.Preconditions.checkArgument; + +/** + * Util class to search and load {@link ProxyExtension}s. + */ +@UtilityClass +@Slf4j +class ProxyExtensionsUtils { + + static final String PROXY_EXTENSION_DEFINITION_FILE = "pulsar-proxy-extension.yml"; + + /** + * Retrieve the extension definition from the provided handler nar package. + * + * @param narPath the path to the extension NAR package + * @return the extension definition + * @throws IOException when fail to load the extension or get the definition + */ + public static ProxyExtensionDefinition getProxyExtensionDefinition(String narPath, String narExtractionDirectory) + throws IOException { + try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet(), + narExtractionDirectory)) { + return getProxyExtensionDefinition(ncl); + } + } + + private static ProxyExtensionDefinition getProxyExtensionDefinition(NarClassLoader ncl) throws IOException { + String configStr = ncl.getServiceDefinition(PROXY_EXTENSION_DEFINITION_FILE); + + return ObjectMapperFactory.getThreadLocalYaml().readValue( + configStr, ProxyExtensionDefinition.class + ); + } + + /** + * Search and load the available extensions. + * + * @param extensionsDirectory the directory where all the extensions are stored + * @return a collection of extensions + * @throws IOException when fail to load the available extensions from the provided directory. + */ + public static ExtensionsDefinitions searchForExtensions(String extensionsDirectory, + String narExtractionDirectory) throws IOException { + Path path = Paths.get(extensionsDirectory).toAbsolutePath(); + log.info("Searching for extensions in {}", path); + + ExtensionsDefinitions extensions = new ExtensionsDefinitions(); + if (!path.toFile().exists()) { + log.warn("extension directory not found"); + return extensions; + } + + try (DirectoryStream stream = Files.newDirectoryStream(path, "*.nar")) { + for (Path archive : stream) { + try { + ProxyExtensionDefinition phDef = + ProxyExtensionsUtils.getProxyExtensionDefinition(archive.toString(), narExtractionDirectory); + log.info("Found extension from {} : {}", archive, phDef); + + checkArgument(StringUtils.isNotBlank(phDef.getName())); + checkArgument(StringUtils.isNotBlank(phDef.getExtensionClass())); + + ProxyExtensionMetadata metadata = new ProxyExtensionMetadata(); + metadata.setDefinition(phDef); + metadata.setArchivePath(archive); + + extensions.extensions().put(phDef.getName(), metadata); + } catch (Throwable t) { + log.warn("Failed to load connector from {}." + + " It is OK however if you want to use this extension," + + " please make sure you put the correct extension NAR" + + " package in the extensions directory.", archive, t); + } + } + } + + return extensions; + } + + /** + * Load the extension according to the handler definition. + * + * @param metadata the extension definition. + * @return + */ + static ProxyExtensionWithClassLoader load(ProxyExtensionMetadata metadata, + String narExtractionDirectory) throws IOException { + NarClassLoader ncl = NarClassLoader.getFromArchive( + metadata.getArchivePath().toAbsolutePath().toFile(), + Collections.emptySet(), + ProxyExtension.class.getClassLoader(), narExtractionDirectory); + + ProxyExtensionDefinition phDef = getProxyExtensionDefinition(ncl); + if (StringUtils.isBlank(phDef.getExtensionClass())) { + throw new IOException("extension `" + phDef.getName() + "` does NOT provide a protocol" + + " handler implementation"); + } + + try { + Class extensionClass = ncl.loadClass(phDef.getExtensionClass()); + Object extension = extensionClass.newInstance(); + if (!(extension instanceof ProxyExtension)) { + throw new IOException("Class " + phDef.getExtensionClass() + + " does not implement extension interface"); + } + ProxyExtension ph = (ProxyExtension) extension; + return new ProxyExtensionWithClassLoader(ph, ncl); + } catch (Throwable t) { + rethrowIOException(t); + return null; + } + } + + private static void rethrowIOException(Throwable cause) + throws IOException { + if (cause instanceof IOException) { + throw (IOException) cause; + } else if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } else if (cause instanceof Error) { + throw (Error) cause; + } else { + throw new IOException(cause.getMessage(), cause); + } + } + +} diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java index 5ec977a1dfc6e..f192276367161 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java @@ -28,16 +28,19 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.bookkeeper.common.annotation.InterfaceAudience; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.resources.MetadataStoreCacheLoader; import org.apache.pulsar.broker.resources.PulsarResources; +import org.apache.pulsar.common.classification.InterfaceStability; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport; +import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,8 +51,10 @@ /** * Maintains available active broker list and returns next active broker in round-robin for discovery service. - * + * This is an API used by Proxy Extensions. */ +@InterfaceStability.Evolving +@InterfaceAudience.LimitedPrivate public class BrokerDiscoveryProvider implements Closeable { final MetadataStoreCacheLoader metadataStoreCacheLoader; @@ -76,6 +81,16 @@ public BrokerDiscoveryProvider(ProxyConfiguration config, PulsarResources pulsar } } + /** + * Access the list of available brokers. + * Used by Protocol Handlers + * @return the list of available brokers + * @throws PulsarServerException + */ + public List getAvailableBrokers() throws PulsarServerException { + return metadataStoreCacheLoader.getAvailableBrokers(); + } + /** * Find next broker {@link LoadManagerReport} in round-robin fashion. * diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java index 5ce8968835100..a81da5af783da 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java @@ -560,6 +560,19 @@ public class ProxyConfiguration implements PulsarConfiguration { } ) + /***** --- Protocol Handlers --- ****/ + @FieldContext( + category = CATEGORY_PLUGIN, + doc = "The directory to locate proxy extensions" + ) + private String proxyExtensionsDirectory = "./proxyextensions"; + + @FieldContext( + category = CATEGORY_PLUGIN, + doc = "List of messaging protocols to load, which is a list of extension names" + ) + private Set proxyExtensions = Sets.newTreeSet(); + /***** --- WebSocket --- ****/ @FieldContext( category = CATEGORY_WEBSOCKET, diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java index af83c5d51f80d..fe6362df0b013 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java @@ -25,16 +25,19 @@ import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.AdaptiveRecvByteBufAllocator; import io.netty.channel.Channel; +import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.util.concurrent.DefaultThreadFactory; import io.netty.util.HashedWheelTimer; import io.netty.util.Timer; -import io.netty.util.concurrent.DefaultThreadFactory; import io.prometheus.client.Counter; import io.prometheus.client.Gauge; import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -55,6 +58,7 @@ import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; +import org.apache.pulsar.proxy.extensions.ProxyExtensions; import org.apache.pulsar.proxy.stats.TopicStats; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,6 +77,7 @@ public class ProxyService implements Closeable { private MetadataStoreExtended localMetadataStore; private MetadataStoreExtended configMetadataStore; private PulsarResources pulsarResources; + private ProxyExtensions proxyExtensions = null; private final EventLoopGroup acceptorGroup; private final EventLoopGroup workerGroup; @@ -121,7 +126,7 @@ public class ProxyService implements Closeable { private AdditionalServlets proxyAdditionalServlets; public ProxyService(ProxyConfiguration proxyConfig, - AuthenticationService authenticationService) throws IOException { + AuthenticationService authenticationService) throws Exception { checkNotNull(proxyConfig); this.proxyConfig = proxyConfig; this.timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-timer", Thread.currentThread().isDaemon()), 1, TimeUnit.MILLISECONDS); @@ -140,6 +145,10 @@ public ProxyService(ProxyConfiguration proxyConfig, this.workerGroup = EventLoopUtil.newEventLoopGroup(numThreads, false, workersThreadFactory); this.authenticationService = authenticationService; + // Initialize the message protocol handlers + proxyExtensions = ProxyExtensions.load(proxyConfig); + proxyExtensions.initialize(proxyConfig); + statsExecutor = Executors .newSingleThreadScheduledExecutor(new DefaultThreadFactory("proxy-stats-executor")); statsExecutor.schedule(()->{ @@ -213,6 +222,44 @@ public void start() throws Exception { } else { this.serviceUrlTls = null; } + + // Initialize the message protocol handlers. + // start the protocol handlers only after the broker is ready, + // so that the protocol handlers can access broker service properly. + this.proxyExtensions.start(this); + Map>> protocolHandlerChannelInitializers = + this.proxyExtensions.newChannelInitializers(); + startProxyExtensions(protocolHandlerChannelInitializers, bootstrap); + } + + // This call is used for starting additional protocol handlers + public void startProxyExtensions( + Map>> protocolHandlers, ServerBootstrap serverBootstrap) { + + protocolHandlers.forEach((extensionName, initializers) -> { + initializers.forEach((address, initializer) -> { + try { + startProxyExtension(extensionName, address, initializer, serverBootstrap); + } catch (IOException e) { + LOG.error("{}", e.getMessage(), e.getCause()); + throw new RuntimeException(e.getMessage(), e.getCause()); + } + }); + }); + } + + private void startProxyExtension(String extensionName, + SocketAddress address, + ChannelInitializer initializer, + ServerBootstrap serverBootstrap) throws IOException { + ServerBootstrap bootstrap = serverBootstrap.clone(); + bootstrap.childHandler(initializer); + try { + bootstrap.bind(address).sync(); + } catch (Exception e) { + throw new IOException("Failed to bind extension `" + extensionName + "` on " + address, e); + } + LOG.info("Successfully bound extension `{}` on {}", extensionName, address); } public BrokerDiscoveryProvider getDiscoveryProvider() { diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/MockProxyExtension.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/MockProxyExtension.java new file mode 100644 index 0000000000000..94dd3b55e2c62 --- /dev/null +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/MockProxyExtension.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.proxy.extensions; + +import io.netty.channel.ChannelInitializer; +import io.netty.channel.socket.SocketChannel; +import org.apache.pulsar.proxy.server.ProxyConfiguration; +import org.apache.pulsar.proxy.server.ProxyService; + +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.Map; + +class MockProxyExtension implements ProxyExtension { + + public static final String NAME = "mock"; + + @Override + public String extensionName() { + return NAME; + } + + @Override + public boolean accept(String protocol) { + return NAME.equals(protocol); + } + + @Override + public void initialize(ProxyConfiguration conf) throws Exception { + // no-op + } + + @Override + public void start(ProxyService service) { + // no-op + } + + @Override + public Map> newChannelInitializers() { + return Collections.emptyMap(); + } + + @Override + public void close() { + // no-op + } +} diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/ProxyExtensionUtilsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/ProxyExtensionUtilsTest.java new file mode 100644 index 0000000000000..86ae7fffa775e --- /dev/null +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/ProxyExtensionUtilsTest.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.proxy.extensions; + +import org.apache.pulsar.common.nar.NarClassLoader; +import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.testng.IObjectFactory; +import org.testng.annotations.ObjectFactory; +import org.testng.annotations.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Paths; +import java.util.Set; + +import static org.apache.pulsar.proxy.extensions.ProxyExtensionsUtils.PROXY_EXTENSION_DEFINITION_FILE; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.AssertJUnit.assertSame; +import static org.testng.AssertJUnit.assertTrue; +import static org.testng.AssertJUnit.fail; + +@PrepareForTest({ + ProxyExtensionsUtils.class, NarClassLoader.class +}) +@PowerMockIgnore({"org.apache.logging.log4j.*"}) +@Test(groups = "broker") +public class ProxyExtensionUtilsTest { + + // Necessary to make PowerMockito.mockStatic work with TestNG. + @ObjectFactory + public IObjectFactory getObjectFactory() { + return new org.powermock.modules.testng.PowerMockObjectFactory(); + } + + @Test + public void testLoadProtocolHandler() throws Exception { + ProxyExtensionDefinition def = new ProxyExtensionDefinition(); + def.setExtensionClass(MockProxyExtension.class.getName()); + def.setDescription("test-ext"); + + String archivePath = "/path/to/ext/nar"; + + ProxyExtensionMetadata metadata = new ProxyExtensionMetadata(); + metadata.setDefinition(def); + metadata.setArchivePath(Paths.get(archivePath)); + + NarClassLoader mockLoader = mock(NarClassLoader.class); + when(mockLoader.getServiceDefinition(eq(PROXY_EXTENSION_DEFINITION_FILE))) + .thenReturn(ObjectMapperFactory.getThreadLocalYaml().writeValueAsString(def)); + Class handlerClass = MockProxyExtension.class; + when(mockLoader.loadClass(eq(MockProxyExtension.class.getName()))) + .thenReturn(handlerClass); + + PowerMockito.mockStatic(NarClassLoader.class); + PowerMockito.when(NarClassLoader.getFromArchive( + any(File.class), + any(Set.class), + any(ClassLoader.class), + any(String.class) + )).thenReturn(mockLoader); + + ProxyExtensionWithClassLoader returnedPhWithCL = ProxyExtensionsUtils.load(metadata, ""); + ProxyExtension returnedPh = returnedPhWithCL.getExtension(); + + assertSame(mockLoader, returnedPhWithCL.getClassLoader()); + assertTrue(returnedPh instanceof MockProxyExtension); + } + + @Test + public void testLoadProtocolHandlerBlankHandlerClass() throws Exception { + ProxyExtensionDefinition def = new ProxyExtensionDefinition(); + def.setDescription("test-ext"); + + String archivePath = "/path/to/ext/nar"; + + ProxyExtensionMetadata metadata = new ProxyExtensionMetadata(); + metadata.setDefinition(def); + metadata.setArchivePath(Paths.get(archivePath)); + + NarClassLoader mockLoader = mock(NarClassLoader.class); + when(mockLoader.getServiceDefinition(eq(PROXY_EXTENSION_DEFINITION_FILE))) + .thenReturn(ObjectMapperFactory.getThreadLocalYaml().writeValueAsString(def)); + Class handlerClass = MockProxyExtension.class; + when(mockLoader.loadClass(eq(MockProxyExtension.class.getName()))) + .thenReturn(handlerClass); + + PowerMockito.mockStatic(NarClassLoader.class); + PowerMockito.when(NarClassLoader.getFromArchive( + any(File.class), + any(Set.class), + any(ClassLoader.class), + any(String.class) + )).thenReturn(mockLoader); + + try { + ProxyExtensionsUtils.load(metadata, ""); + fail("Should not reach here"); + } catch (IOException ioe) { + // expected + } + } + + @Test + public void testLoadProtocolHandlerWrongHandlerClass() throws Exception { + ProxyExtensionDefinition def = new ProxyExtensionDefinition(); + def.setExtensionClass(Runnable.class.getName()); + def.setDescription("test-ext"); + + String archivePath = "/path/to/ext/nar"; + + ProxyExtensionMetadata metadata = new ProxyExtensionMetadata(); + metadata.setDefinition(def); + metadata.setArchivePath(Paths.get(archivePath)); + + NarClassLoader mockLoader = mock(NarClassLoader.class); + when(mockLoader.getServiceDefinition(eq(PROXY_EXTENSION_DEFINITION_FILE))) + .thenReturn(ObjectMapperFactory.getThreadLocalYaml().writeValueAsString(def)); + Class handlerClass = Runnable.class; + when(mockLoader.loadClass(eq(Runnable.class.getName()))) + .thenReturn(handlerClass); + + PowerMockito.mockStatic(NarClassLoader.class); + PowerMockito.when(NarClassLoader.getFromArchive( + any(File.class), + any(Set.class), + any(ClassLoader.class), + any(String.class) + )).thenReturn(mockLoader); + + try { + ProxyExtensionsUtils.load(metadata, ""); + fail("Should not reach here"); + } catch (IOException ioe) { + // expected + } + } + +} diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/ProxyExtensionWithClassLoaderTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/ProxyExtensionWithClassLoaderTest.java new file mode 100644 index 0000000000000..b43eb22ab8952 --- /dev/null +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/ProxyExtensionWithClassLoaderTest.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.proxy.extensions; + +import io.netty.channel.ChannelInitializer; +import io.netty.channel.socket.SocketChannel; +import org.apache.pulsar.common.nar.NarClassLoader; +import org.apache.pulsar.proxy.server.ProxyConfiguration; +import org.apache.pulsar.proxy.server.ProxyService; +import org.testng.annotations.Test; + +import java.net.InetSocketAddress; +import java.util.Map; + +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.same; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.expectThrows; + +/** + * Unit test {@link ProxyExtensionWithClassLoader}. + */ +@Test(groups = "broker") +public class ProxyExtensionWithClassLoaderTest { + + @Test + public void testWrapper() throws Exception { + ProxyExtension h = mock(ProxyExtension.class); + NarClassLoader loader = mock(NarClassLoader.class); + ProxyExtensionWithClassLoader wrapper = new ProxyExtensionWithClassLoader(h, loader); + + String protocol = "kafka"; + + when(h.extensionName()).thenReturn(protocol); + assertEquals(protocol, wrapper.extensionName()); + verify(h, times(1)).extensionName(); + + when(h.accept(eq(protocol))).thenReturn(true); + assertTrue(wrapper.accept(protocol)); + verify(h, times(1)).accept(same(protocol)); + + ProxyConfiguration conf = new ProxyConfiguration(); + wrapper.initialize(conf); + verify(h, times(1)).initialize(same(conf)); + + ProxyService service = mock(ProxyService.class); + wrapper.start(service); + verify(h, times(1)).start(service); + } + + public void testClassLoaderSwitcher() throws Exception { + NarClassLoader loader = mock(NarClassLoader.class); + + String protocol = "test-protocol"; + + ProxyExtension h = new ProxyExtension() { + @Override + public String extensionName() { + assertEquals(Thread.currentThread().getContextClassLoader(), loader); + return protocol; + } + + @Override + public boolean accept(String protocol) { + assertEquals(Thread.currentThread().getContextClassLoader(), loader); + return true; + } + + @Override + public void initialize(ProxyConfiguration conf) throws Exception { + assertEquals(Thread.currentThread().getContextClassLoader(), loader); + throw new Exception("test exception"); + } + + @Override + public void start(ProxyService service) { + assertEquals(Thread.currentThread().getContextClassLoader(), loader); + } + + @Override + public Map> newChannelInitializers() { + assertEquals(Thread.currentThread().getContextClassLoader(), loader); + return null; + } + + @Override + public void close() { + assertEquals(Thread.currentThread().getContextClassLoader(), loader); + } + }; + ProxyExtensionWithClassLoader wrapper = new ProxyExtensionWithClassLoader(h, loader); + + ClassLoader curClassLoader = Thread.currentThread().getContextClassLoader(); + + assertEquals(wrapper.extensionName(), protocol); + assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader); + + assertTrue(wrapper.accept(protocol)); + assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader); + + + ProxyConfiguration conf = new ProxyConfiguration(); + expectThrows(Exception.class, () -> wrapper.initialize(conf)); + assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader); + + ProxyService service = mock(ProxyService.class); + wrapper.start(service); + assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader); + + + assertNull(wrapper.newChannelInitializers()); + assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader); + + wrapper.close(); + assertEquals(Thread.currentThread().getContextClassLoader(), curClassLoader); + } +} diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/ProxyExtensionsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/ProxyExtensionsTest.java new file mode 100644 index 0000000000000..00fb3be50c3ac --- /dev/null +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/ProxyExtensionsTest.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.proxy.extensions; + +import io.netty.channel.ChannelInitializer; +import io.netty.channel.socket.SocketChannel; +import org.apache.pulsar.common.nar.NarClassLoader; +import org.apache.pulsar.proxy.server.ProxyConfiguration; +import org.apache.pulsar.proxy.server.ProxyService; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.Map; + +import static org.mockito.ArgumentMatchers.same; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertSame; + +/** + * Unit test {@link ProxyExtensions}. + */ +@Test(groups = "proxy") +public class ProxyExtensionsTest { + + private static final String protocol1 = "protocol1"; + private ProxyExtension extension1; + private NarClassLoader ncl1; + private static final String protocol2 = "protocol2"; + private ProxyExtension extension2; + private NarClassLoader ncl2; + private static final String protocol3 = "protocol3"; + + private Map extensionsMap; + private ProxyExtensions extensions; + + @BeforeMethod + public void setup() { + this.extension1 = mock(ProxyExtension.class); + this.ncl1 = mock(NarClassLoader.class); + this.extension2 = mock(ProxyExtension.class); + this.ncl2 = mock(NarClassLoader.class); + + this.extensionsMap = new HashMap<>(); + this.extensionsMap.put( + protocol1, + new ProxyExtensionWithClassLoader(extension1, ncl1)); + this.extensionsMap.put( + protocol2, + new ProxyExtensionWithClassLoader(extension2, ncl2)); + this.extensions = new ProxyExtensions(this.extensionsMap); + } + + @AfterMethod(alwaysRun = true) + public void teardown() throws Exception { + this.extensions.close(); + + verify(extension1, times(1)).close(); + verify(extension2, times(1)).close(); + verify(ncl1, times(1)).close(); + verify(ncl2, times(1)).close(); + } + + @Test + public void testGetProtocol() { + assertSame(extension1, extensions.extension(protocol1)); + assertSame(extension2, extensions.extension(protocol2)); + assertNull(extensions.extension(protocol3)); + } + + @Test + public void testInitialize() throws Exception { + ProxyConfiguration conf = new ProxyConfiguration(); + extensions.initialize(conf); + verify(extension1, times(1)).initialize(same(conf)); + verify(extension2, times(1)).initialize(same(conf)); + } + + @Test + public void testStart() { + ProxyService service = mock(ProxyService.class); + extensions.start(service); + verify(extension1, times(1)).start(same(service)); + verify(extension2, times(1)).start(same(service)); + } + + @Test + public void testNewChannelInitializersSuccess() { + ChannelInitializer i1 = mock(ChannelInitializer.class); + ChannelInitializer i2 = mock(ChannelInitializer.class); + Map> p1Initializers = new HashMap<>(); + p1Initializers.put(new InetSocketAddress("127.0.0.1", 6650), i1); + p1Initializers.put(new InetSocketAddress("127.0.0.2", 6651), i2); + + ChannelInitializer i3 = mock(ChannelInitializer.class); + ChannelInitializer i4 = mock(ChannelInitializer.class); + Map> p2Initializers = new HashMap<>(); + p2Initializers.put(new InetSocketAddress("127.0.0.3", 6650), i3); + p2Initializers.put(new InetSocketAddress("127.0.0.4", 6651), i4); + + when(extension1.newChannelInitializers()).thenReturn(p1Initializers); + when(extension2.newChannelInitializers()).thenReturn(p2Initializers); + + Map>> initializers = + extensions.newChannelInitializers(); + + assertEquals(2, initializers.size()); + assertSame(p1Initializers, initializers.get(protocol1)); + assertSame(p2Initializers, initializers.get(protocol2)); + } + + @Test(expectedExceptions = RuntimeException.class) + public void testNewChannelInitializersOverlapped() { + ChannelInitializer i1 = mock(ChannelInitializer.class); + ChannelInitializer i2 = mock(ChannelInitializer.class); + Map> p1Initializers = new HashMap<>(); + p1Initializers.put(new InetSocketAddress("127.0.0.1", 6650), i1); + p1Initializers.put(new InetSocketAddress("127.0.0.2", 6651), i2); + + ChannelInitializer i3 = mock(ChannelInitializer.class); + ChannelInitializer i4 = mock(ChannelInitializer.class); + Map> p2Initializers = new HashMap<>(); + p2Initializers.put(new InetSocketAddress("127.0.0.1", 6650), i3); + p2Initializers.put(new InetSocketAddress("127.0.0.4", 6651), i4); + + when(extension1.newChannelInitializers()).thenReturn(p1Initializers); + when(extension2.newChannelInitializers()).thenReturn(p2Initializers); + + extensions.newChannelInitializers(); + } + +}