Skip to content

Commit

Permalink
PIP-99 - Pulsar Proxy Estensions (#11838)
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli authored Sep 30, 2021
1 parent 0c22e0f commit 91ca173
Show file tree
Hide file tree
Showing 15 changed files with 1,225 additions and 3 deletions.
9 changes: 9 additions & 0 deletions conf/proxy.conf
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,15 @@ webSocketServiceEnabled=false
# Name of the cluster to which this broker belongs to
clusterName=


### --- Proxy Extensions

# List of proxy extensions to load, which is a list of extension names
#proxyExtensions=

# The directory to locate extensions
#proxyExtensionsDirectory=

### --- Deprecated config variables --- ###

# Deprecated. Use configurationStoreServers
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.proxy.extensions;

import lombok.Data;
import lombok.experimental.Accessors;

import java.util.Map;
import java.util.TreeMap;

/**
* The collection of Proxy Extensions.
*/
@Data
@Accessors(fluent = true)
class ExtensionsDefinitions {

private final Map<String, ProxyExtensionMetadata> extensions = new TreeMap<>();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.proxy.extensions;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;
import org.apache.pulsar.proxy.server.ProxyConfiguration;
import org.apache.pulsar.proxy.server.ProxyService;

import java.net.InetSocketAddress;
import java.util.Map;

/**
* The extension interface for support additional extensions on Pulsar Proxy.
*/
@InterfaceAudience.LimitedPrivate
@InterfaceStability.Evolving
public interface ProxyExtension extends AutoCloseable {

/**
* Returns the unique extension name. For example, `kafka-v2` for extension for Kafka v2 protocol.
*/
String extensionName();

/**
* Verify if the extension can handle the given <tt>extension name</tt>.
*
* @param extension the extension to verify
* @return true if the extension can handle the given extension name, otherwise false.
*/
boolean accept(String extension);

/**
* Initialize the extension when the extension is constructed from reflection.
*
* <p>The initialize should initialize all the resources required for serving the extension
* but don't start those resources until {@link #start(ProxyService)} is called.
*
* @param conf proxy service configuration
* @throws Exception when fail to initialize the extension.
*/
void initialize(ProxyConfiguration conf) throws Exception;

/**
* Start the extension with the provided proxy service.
*
* <p>The proxy service provides the accesses to the Pulsar Proxy components.
*
* @param service the broker service to start with.
*/
void start(ProxyService service);

/**
* Create the list of channel initializers for the ports that this extension
* will listen on.
*
* <p>NOTE: this method is called after {@link #start(ProxyService)}.
*
* @return the list of channel initializers for the ports that this extension listens on.
*/
Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelInitializers();

@Override
void close();
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.proxy.extensions;

import lombok.Data;
import lombok.NoArgsConstructor;

/**
* Metadata information about a Proxy Extension.
*/
@Data
@NoArgsConstructor
public class ProxyExtensionDefinition {

/**
* The name of the extension.
*/
private String name;

/**
* The description of the extension to be used for user help.
*/
private String description;

/**
* The class name for the extension.
*/
private String extensionClass;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.proxy.extensions;

import lombok.Data;
import lombok.NoArgsConstructor;

import java.nio.file.Path;

/**
* The metadata of Proxy Extension.
*/
@Data
@NoArgsConstructor
class ProxyExtensionMetadata {

/**
* The definition of the extension.
*/
private ProxyExtensionDefinition definition;

/**
* The path to the extension package.
*/
private Path archivePath;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.proxy.extensions;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.proxy.server.ProxyConfiguration;
import org.apache.pulsar.proxy.server.ProxyService;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Map;

/**
* A extension with its classloader.
*/
@Slf4j
@Data
@RequiredArgsConstructor
class ProxyExtensionWithClassLoader implements ProxyExtension {

private final ProxyExtension extension;
private final NarClassLoader classLoader;

@Override
public String extensionName() {
try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
return extension.extensionName();
}
}

@Override
public boolean accept(String extensionName) {
try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
return extension.accept(extensionName);
}
}

@Override
public void initialize(ProxyConfiguration conf) throws Exception {
try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
extension.initialize(conf);
}
}

@Override
public void start(ProxyService service) {
try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
extension.start(service);
}
}

@Override
public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelInitializers() {
try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
return extension.newChannelInitializers();
}
}

@Override
public void close() {
try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
extension.close();
}

try {
classLoader.close();
} catch (IOException e) {
log.warn("Failed to close the extension class loader", e);
}
}

/**
* Help to switch the class loader of current thread to the NarClassLoader, and change it back when it's done.
* With the help of try-with-resources statement, the code would be cleaner than using try finally every time.
*/
private static class ClassLoaderSwitcher implements AutoCloseable {
private final ClassLoader prevClassLoader;

ClassLoaderSwitcher(ClassLoader classLoader) {
prevClassLoader = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(classLoader);
}

@Override
public void close() {
Thread.currentThread().setContextClassLoader(prevClassLoader);
}
}
}
Loading

0 comments on commit 91ca173

Please sign in to comment.