diff --git a/pulsar-client-admin/pom.xml b/pulsar-client-admin/pom.xml
index df4f4947a2783..e0665b1179100 100644
--- a/pulsar-client-admin/pom.xml
+++ b/pulsar-client-admin/pom.xml
@@ -46,6 +46,18 @@
${project.version}
+
+ ${project.groupId}
+ pulsar-functions-proto-shaded
+ ${project.version}
+
+
+ *
+ *
+
+
+
+
org.glassfish.jersey.core
jersey-client
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/admin/Functions.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
similarity index 96%
rename from pulsar-client-tools/src/main/java/org/apache/pulsar/client/admin/Functions.java
rename to pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
index 55ba0892249fb..3808a488d8135 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/admin/Functions.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
@@ -21,8 +21,8 @@
import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException;
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
-import org.apache.pulsar.functions.proto.Function.FunctionConfig;
-import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
+import org.apache.pulsar.functions.shaded.proto.Function.FunctionConfig;
+import org.apache.pulsar.functions.shaded.proto.InstanceCommunication.FunctionStatusList;
import java.util.List;
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
index 7bb23b97c1a96..a1a67a2486672 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
@@ -33,6 +33,7 @@
import org.apache.pulsar.client.admin.internal.BrokerStatsImpl;
import org.apache.pulsar.client.admin.internal.BrokersImpl;
import org.apache.pulsar.client.admin.internal.ClustersImpl;
+import org.apache.pulsar.client.admin.internal.FunctionsImpl;
import org.apache.pulsar.client.admin.internal.JacksonConfigurator;
import org.apache.pulsar.client.admin.internal.LookupImpl;
import org.apache.pulsar.client.admin.internal.NamespacesImpl;
@@ -76,6 +77,7 @@ public class PulsarAdmin implements Closeable {
private final Client client;
private final String serviceUrl;
private final Lookup lookups;
+ private final Functions functions;
protected final WebTarget root;
protected final Authentication auth;
@@ -170,6 +172,7 @@ public PulsarAdmin(String serviceUrl, ClientConfigurationData clientConfigData)
this.nonPersistentTopics = new NonPersistentTopicsImpl(root, auth);
this.resourceQuotas = new ResourceQuotasImpl(root, auth);
this.lookups = new LookupImpl(root, auth, useTls);
+ this.functions = new FunctionsImpl(root, auth);
}
/**
@@ -303,6 +306,14 @@ public Lookup lookups() {
return lookups;
}
+ /**
+ *
+ * @return the functions management object
+ */
+ public Functions functions() {
+ return functions;
+ }
+
/**
* @return the broker statics
*/
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
similarity index 87%
rename from pulsar-client-tools/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
rename to pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index 73948230aa219..175e4f31c73bf 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -23,13 +23,16 @@
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.common.policies.data.*;
-import org.apache.pulsar.functions.proto.Function.FunctionConfig;
-import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
-import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.functions.shaded.proto.Function.FunctionConfig;
+import org.apache.pulsar.functions.shaded.proto.InstanceCommunication.FunctionStatusList;
import org.glassfish.jersey.media.multipart.FormDataBodyPart;
import org.glassfish.jersey.media.multipart.FormDataMultiPart;
import org.glassfish.jersey.media.multipart.file.FileDataBodyPart;
+import org.apache.pulsar.functions.shaded.com.google.protobuf.MessageOrBuilder;
+import org.apache.pulsar.functions.shaded.com.google.protobuf.AbstractMessage.Builder;
+import org.apache.pulsar.functions.shaded.com.google.protobuf.util.JsonFormat;
+
import javax.ws.rs.ClientErrorException;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
@@ -37,6 +40,7 @@
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.io.File;
+import java.io.IOException;
import java.util.List;
@Slf4j
@@ -72,7 +76,7 @@ public FunctionConfig getFunction(String tenant, String namespace, String functi
}
String jsonResponse = response.readEntity(String.class);
FunctionConfig.Builder functionConfigBuilder = FunctionConfig.newBuilder();
- Utils.mergeJson(jsonResponse, functionConfigBuilder);
+ mergeJson(jsonResponse, functionConfigBuilder);
return functionConfigBuilder.build();
} catch (Exception e) {
throw getApiException(e);
@@ -89,7 +93,7 @@ public FunctionStatusList getFunctionStatus(
}
String jsonResponse = response.readEntity(String.class);
FunctionStatusList.Builder functionStatusBuilder = FunctionStatusList.newBuilder();
- Utils.mergeJson(jsonResponse, functionStatusBuilder);
+ mergeJson(jsonResponse, functionStatusBuilder);
return functionStatusBuilder.build();
} catch (Exception e) {
throw getApiException(e);
@@ -104,7 +108,7 @@ public void createFunction(FunctionConfig functionConfig, String fileName) throw
mp.bodyPart(new FileDataBodyPart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM_TYPE));
mp.bodyPart(new FormDataBodyPart("functionConfig",
- Utils.printJson(functionConfig),
+ printJson(functionConfig),
MediaType.APPLICATION_JSON_TYPE));
request(functions.path(functionConfig.getTenant()).path(functionConfig.getNamespace()).path(functionConfig.getName()))
.post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class);
@@ -131,7 +135,7 @@ public void updateFunction(FunctionConfig functionConfig, String fileName) throw
mp.bodyPart(new FileDataBodyPart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM_TYPE));
}
mp.bodyPart(new FormDataBodyPart("functionConfig",
- Utils.printJson(functionConfig),
+ printJson(functionConfig),
MediaType.APPLICATION_JSON_TYPE));
request(functions.path(functionConfig.getTenant()).path(functionConfig.getNamespace()).path(functionConfig.getName()))
.put(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class);
@@ -159,4 +163,12 @@ public String triggerFunction(String tenant, String namespace, String functionNa
throw getApiException(e);
}
}
+
+ public static void mergeJson(String json, Builder builder) throws IOException {
+ JsonFormat.parser().merge(json, builder);
+ }
+
+ public static String printJson(MessageOrBuilder msg) throws IOException {
+ return JsonFormat.printer().print(msg);
+ }
}
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
index 80425f1a5bef1..7bcb1090c52a8 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
@@ -31,20 +31,16 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
-import org.apache.pulsar.functions.shaded.io.netty.buffer.ByteBuf;
-import org.apache.pulsar.functions.shaded.io.netty.buffer.ByteBufUtil;
import java.io.File;
-import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
+
import org.apache.bookkeeper.api.StorageClient;
import org.apache.bookkeeper.api.kv.Table;
import org.apache.bookkeeper.clients.StorageClientBuilder;
import org.apache.bookkeeper.clients.config.StorageClientSettings;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
-import java.util.HashMap;
-import java.util.Map;
-
-import com.google.gson.Gson;
import org.apache.pulsar.admin.cli.CmdFunctions.CreateFunction;
import org.apache.pulsar.admin.cli.CmdFunctions.DeleteFunction;
import org.apache.pulsar.admin.cli.CmdFunctions.GetFunction;
@@ -52,12 +48,14 @@
import org.apache.pulsar.admin.cli.CmdFunctions.LocalRunner;
import org.apache.pulsar.admin.cli.CmdFunctions.UpdateFunction;
import org.apache.pulsar.client.admin.Functions;
-import org.apache.pulsar.client.admin.PulsarAdminWithFunctions;
+import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.utils.DefaultSerDe;
-import org.apache.pulsar.functions.proto.Function.FunctionConfig;
+import org.apache.pulsar.functions.shaded.proto.Function.FunctionConfig;
+import org.apache.pulsar.functions.shaded.io.netty.buffer.ByteBuf;
+import org.apache.pulsar.functions.shaded.io.netty.buffer.ByteBufUtil;
import org.apache.pulsar.functions.utils.Reflections;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
@@ -67,6 +65,8 @@
import org.testng.annotations.ObjectFactory;
import org.testng.annotations.Test;
+import com.google.gson.Gson;
+
/**
* Unit test of {@link CmdFunctions}.
*/
@@ -81,7 +81,7 @@ public IObjectFactory getObjectFactory() {
private static final String TEST_NAME = "test_name";
- private PulsarAdminWithFunctions admin;
+ private PulsarAdmin admin;
private Functions functions;
private CmdFunctions cmd;
@@ -100,7 +100,7 @@ private String generateCustomSerdeInputs(String topic, String serde) {
@BeforeMethod
public void setup() throws Exception {
- this.admin = mock(PulsarAdminWithFunctions.class);
+ this.admin = mock(PulsarAdmin.class);
this.functions = mock(Functions.class);
when(admin.functions()).thenReturn(functions);
when(admin.getServiceUrl()).thenReturn("http://localhost:1234");
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index cfaa4ec5dbb20..e3951abdcbaaf 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -22,21 +22,15 @@
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.Parameters;
-import com.beust.jcommander.converters.StringConverter;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonParser;
-import com.google.gson.reflect.TypeToken;
-import org.apache.pulsar.functions.shaded.io.netty.buffer.ByteBuf;
-import org.apache.pulsar.functions.shaded.io.netty.buffer.ByteBufUtil;
-import org.apache.pulsar.functions.shaded.io.netty.buffer.Unpooled;
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Type;
import java.net.MalformedURLException;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import net.jodah.typetools.TypeResolver;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
import org.apache.bookkeeper.api.StorageClient;
import org.apache.bookkeeper.api.kv.Table;
import org.apache.bookkeeper.api.kv.result.KeyValue;
@@ -44,32 +38,40 @@
import org.apache.bookkeeper.clients.config.StorageClientSettings;
import org.apache.bookkeeper.clients.utils.NetUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.PulsarAdminWithFunctions;
+import org.apache.pulsar.client.admin.internal.FunctionsImpl;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.functions.api.Function;
+import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.functions.api.utils.DefaultSerDe;
-import org.apache.pulsar.functions.proto.Function.FunctionConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
-import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
+import org.apache.pulsar.functions.shaded.io.netty.buffer.ByteBuf;
+import org.apache.pulsar.functions.shaded.io.netty.buffer.ByteBufUtil;
+import org.apache.pulsar.functions.shaded.io.netty.buffer.Unpooled;
+import org.apache.pulsar.functions.shaded.proto.Function.FunctionConfig;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.utils.Reflections;
-
-import java.io.File;
-import java.lang.reflect.Type;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
import org.apache.pulsar.functions.utils.Utils;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.beust.jcommander.converters.StringConverter;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonParser;
+import com.google.gson.reflect.TypeToken;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import net.jodah.typetools.TypeResolver;
+
@Slf4j
@Parameters(commandDescription = "Interface for managing Pulsar Functions (lightweight, Lambda-style compute processes that work with Pulsar)")
public class CmdFunctions extends CmdBase {
- private final PulsarAdminWithFunctions fnAdmin;
private final LocalRunner localRunner;
private final CreateFunction creater;
private final DeleteFunction deleter;
@@ -169,7 +171,7 @@ void processArguments() throws Exception {
FunctionConfig.Builder functionConfigBuilder;
if (null != fnConfigFile) {
- functionConfigBuilder = FunctionConfigUtils.loadConfig(new File(fnConfigFile));
+ functionConfigBuilder = loadConfig(new File(fnConfigFile));
} else {
functionConfigBuilder = FunctionConfig.newBuilder();
}
@@ -453,7 +455,7 @@ class LocalRunner extends FunctionConfigCommand {
@Override
void runCmd() throws Exception {
- if (!FunctionConfigUtils.areAllRequiredFieldsPresent(functionConfig)) {
+ if (!areAllRequiredFieldsPresent(functionConfig)) {
throw new RuntimeException("Missing arguments");
}
@@ -469,7 +471,7 @@ void runCmd() throws Exception {
List spawners = new LinkedList<>();
for (int i = 0; i < functionConfig.getParallelism(); ++i) {
InstanceConfig instanceConfig = new InstanceConfig();
- instanceConfig.setFunctionConfig(functionConfig);
+ instanceConfig.setFunctionConfig(convert(functionConfig));
// TODO: correctly implement function version and id
instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
instanceConfig.setFunctionId(UUID.randomUUID().toString());
@@ -498,16 +500,17 @@ public void run() {
}
}
+
}
@Parameters(commandDescription = "Create a Pulsar Function in cluster mode (i.e. deploy it on a Pulsar cluster)")
class CreateFunction extends FunctionConfigCommand {
@Override
void runCmd() throws Exception {
- if (!FunctionConfigUtils.areAllRequiredFieldsPresent(functionConfig)) {
+ if (!areAllRequiredFieldsPresent(functionConfig)) {
throw new RuntimeException("Missing arguments");
}
- fnAdmin.functions().createFunction(functionConfig, userCodeFile);
+ admin.functions().createFunction(functionConfig, userCodeFile);
print("Created successfully");
}
}
@@ -516,7 +519,7 @@ void runCmd() throws Exception {
class GetFunction extends FunctionCommand {
@Override
void runCmd() throws Exception {
- String json = Utils.printJson(fnAdmin.functions().getFunction(tenant, namespace, functionName));
+ String json = Utils.printJson(admin.functions().getFunction(tenant, namespace, functionName));
Gson gson = new GsonBuilder().setPrettyPrinting().create();
System.out.println(gson.toJson(new JsonParser().parse(json)));
}
@@ -526,7 +529,7 @@ void runCmd() throws Exception {
class GetFunctionStatus extends FunctionCommand {
@Override
void runCmd() throws Exception {
- String json = Utils.printJson(fnAdmin.functions().getFunctionStatus(tenant, namespace, functionName));
+ String json = Utils.printJson(admin.functions().getFunctionStatus(tenant, namespace, functionName));
Gson gson = new GsonBuilder().setPrettyPrinting().create();
System.out.println(gson.toJson(new JsonParser().parse(json)));
}
@@ -536,7 +539,7 @@ void runCmd() throws Exception {
class DeleteFunction extends FunctionCommand {
@Override
void runCmd() throws Exception {
- fnAdmin.functions().deleteFunction(tenant, namespace, functionName);
+ admin.functions().deleteFunction(tenant, namespace, functionName);
print("Deleted successfully");
}
}
@@ -545,10 +548,10 @@ void runCmd() throws Exception {
class UpdateFunction extends FunctionConfigCommand {
@Override
void runCmd() throws Exception {
- if (!FunctionConfigUtils.areAllRequiredFieldsPresent(functionConfig)) {
+ if (!areAllRequiredFieldsPresent(functionConfig)) {
throw new RuntimeException("Missing arguments");
}
- fnAdmin.functions().updateFunction(functionConfig, userCodeFile);
+ admin.functions().updateFunction(functionConfig, userCodeFile);
print("Updated successfully");
}
}
@@ -557,7 +560,7 @@ void runCmd() throws Exception {
class ListFunctions extends NamespaceCommand {
@Override
void runCmd() throws Exception {
- print(fnAdmin.functions().getFunctions(tenant, namespace));
+ print(admin.functions().getFunctions(tenant, namespace));
}
}
@@ -630,18 +633,13 @@ void runCmd() throws Exception {
if (triggerFile == null && triggerValue == null) {
throw new RuntimeException("Either a trigger value or a trigger filepath needs to be specified");
}
- String retval = fnAdmin.functions().triggerFunction(tenant, namespace, functionName, triggerValue, triggerFile);
+ String retval = admin.functions().triggerFunction(tenant, namespace, functionName, triggerValue, triggerFile);
System.out.println(retval);
}
}
public CmdFunctions(PulsarAdmin admin) throws PulsarClientException {
super("functions", admin);
- if (admin instanceof PulsarAdminWithFunctions) {
- this.fnAdmin = (PulsarAdminWithFunctions) admin;
- } else {
- this.fnAdmin = new PulsarAdminWithFunctions(admin.getServiceUrl(), admin.getClientConfigData());
- }
localRunner = new LocalRunner();
creater = new CreateFunction();
deleter = new DeleteFunction();
@@ -704,4 +702,25 @@ StateGetter getStateGetter() {
TriggerFunction getTriggerer() {
return triggerer;
}
+
+ private static FunctionConfig.Builder loadConfig(File file) throws IOException {
+ String json = FunctionConfigUtils.convertYamlToJson(file);
+ FunctionConfig.Builder functionConfigBuilder = FunctionConfig.newBuilder();
+ Utils.mergeJson(json, functionConfigBuilder);
+ return functionConfigBuilder;
+ }
+
+ public static boolean areAllRequiredFieldsPresent(FunctionConfig functionConfig) {
+ return functionConfig.getTenant() != null && functionConfig.getNamespace() != null
+ && functionConfig.getName() != null && functionConfig.getClassName() != null
+ && (functionConfig.getInputsCount() > 0 || functionConfig.getCustomSerdeInputsCount() > 0)
+ && functionConfig.getParallelism() > 0;
+ }
+
+ private org.apache.pulsar.functions.proto.Function.FunctionConfig convert(FunctionConfig functionConfig)
+ throws IOException {
+ org.apache.pulsar.functions.proto.Function.FunctionConfig.Builder functionConfigBuilder = org.apache.pulsar.functions.proto.Function.FunctionConfig.newBuilder();
+ Utils.mergeJson(FunctionsImpl.printJson(functionConfig), functionConfigBuilder);
+ return functionConfigBuilder.build();
+ }
}
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/admin/PulsarAdminWithFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/admin/PulsarAdminWithFunctions.java
deleted file mode 100644
index aa17585616f5d..0000000000000
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/admin/PulsarAdminWithFunctions.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.client.admin;
-
-import org.apache.pulsar.client.admin.internal.FunctionsImpl;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-
-/**
- * Pulsar client admin client with functions support.
- */
-public class PulsarAdminWithFunctions extends PulsarAdmin {
-
- private final Functions functions;
- private final ClientConfigurationData clientConf;
-
- public PulsarAdminWithFunctions(String serviceUrl, ClientConfigurationData pulsarConfig)
- throws PulsarClientException {
- super(serviceUrl, pulsarConfig);
- this.functions = new FunctionsImpl(root, auth);
- this.clientConf = pulsarConfig;
- }
-
- /**
- * @return the function management object
- */
- public Functions functions() {
- return functions;
- }
-}
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/admin/PulsarAdminWithFunctionsBuilderImpl.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/admin/PulsarAdminWithFunctionsBuilderImpl.java
deleted file mode 100644
index 1cba31e8c630e..0000000000000
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/admin/PulsarAdminWithFunctionsBuilderImpl.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.client.admin;
-
-import org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl;
-import org.apache.pulsar.client.api.PulsarClientException;
-
-public class PulsarAdminWithFunctionsBuilderImpl extends PulsarAdminBuilderImpl {
- @Override
- public PulsarAdmin build() throws PulsarClientException {
- return new PulsarAdminWithFunctions(conf.getServiceUrl(), conf);
- }
-}
diff --git a/pulsar-functions/pom.xml b/pulsar-functions/pom.xml
index 1a0405bf9b0a1..5428f03c0e70f 100644
--- a/pulsar-functions/pom.xml
+++ b/pulsar-functions/pom.xml
@@ -33,6 +33,7 @@
proto
+ proto-shaded
api-java
utils
metrics
diff --git a/pulsar-functions/proto-shaded/pom.xml b/pulsar-functions/proto-shaded/pom.xml
new file mode 100644
index 0000000000000..981d8b4fadaf2
--- /dev/null
+++ b/pulsar-functions/proto-shaded/pom.xml
@@ -0,0 +1,93 @@
+
+
+
+ 4.0.0
+
+
+ org.apache.pulsar
+ pulsar-functions
+ 2.0.0-incubating-SNAPSHOT
+
+
+ pulsar-functions-proto-shaded
+ Pulsar Functions :: Proto-shaded
+
+
+
+ com.google.protobuf
+ protobuf-java
+ ${protobuf3.version}
+
+
+ ${project.groupId}
+ pulsar-functions-proto
+ ${project.version}
+
+
+
+
+
+ kr.motd.maven
+ os-maven-plugin
+ 1.4.1.Final
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+
+ package
+
+ shade
+
+
+ true
+ true
+ false
+
+
+ com.google.protobuf*:*
+ ${project.groupId}:pulsar-functions-proto
+
+
+
+
+ com.google.protobuf
+ org.apache.pulsar.functions.shaded.com.google.protobuf
+
+
+ org.apache.pulsar.functions.proto
+ org.apache.pulsar.functions.shaded.proto
+
+
+
+
+
+
+
+
+
+
diff --git a/pulsar-functions/utils/pom.xml b/pulsar-functions/utils/pom.xml
index 4e4413cd461bc..e4bde126097f2 100644
--- a/pulsar-functions/utils/pom.xml
+++ b/pulsar-functions/utils/pom.xml
@@ -43,6 +43,10 @@
org.apache.pulsar
pulsar-client-admin-original
+
+ org.apache.pulsar
+ pulsar-functions-proto
+
org.apache.pulsar
pulsar-client-original