Skip to content

Commit

Permalink
Add CLI command to get available built-in functions (apache#16822)
Browse files Browse the repository at this point in the history
(cherry picked from commit 34eb74d)
  • Loading branch information
cbornet authored and nicoloboschi committed Sep 29, 2022
1 parent 4bd86dc commit 35d3f6b
Show file tree
Hide file tree
Showing 9 changed files with 122 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.FunctionDefinition;
import org.apache.pulsar.common.functions.FunctionState;
import org.apache.pulsar.common.functions.UpdateOptionsImpl;
import org.apache.pulsar.common.io.ConnectorDefinition;
Expand Down Expand Up @@ -751,6 +752,23 @@ public void reloadBuiltinFunctions() throws IOException {
functions().reloadBuiltinFunctions(clientAppId(), clientAuthData());
}

@GET
@ApiOperation(
value = "Fetches the list of built-in Pulsar functions",
response = FunctionDefinition.class,
responseContainer = "List"
)
@ApiResponses(value = {
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 408, message = "Request timeout")
})
@Path("/builtins")
@Produces(MediaType.APPLICATION_JSON)
public List<FunctionDefinition> getBuiltinFunction() {
return functions().getBuiltinFunctions(clientAppId(), clientAuthData());
}

@PUT
@ApiOperation(value = "Updates a Pulsar Function on the worker leader", hidden = true)
@ApiResponses(value = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.FunctionDefinition;
import org.apache.pulsar.common.functions.FunctionState;
import org.apache.pulsar.common.functions.UpdateOptions;
import org.apache.pulsar.common.io.ConnectorDefinition;
Expand Down Expand Up @@ -828,6 +829,19 @@ CompletableFuture<Void> downloadFunctionAsync(
@Deprecated
Set<String> getSinks() throws PulsarAdminException;

/**
* Fetches a list of supported Pulsar Functions currently running in cluster mode.
*
* @throws PulsarAdminException
* Unexpected error
*/
List<FunctionDefinition> getBuiltInFunctions() throws PulsarAdminException;

/**
* Fetches a list of supported Pulsar Functions currently running in cluster mode asynchronously.
*/
CompletableFuture<List<FunctionDefinition>> getBuiltInFunctionsAsync();

/**
* Fetch the current state associated with a Pulsar Function.
* <p/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.FunctionDefinition;
import org.apache.pulsar.common.functions.FunctionState;
import org.apache.pulsar.common.functions.UpdateOptions;
import org.apache.pulsar.common.functions.UpdateOptionsImpl;
Expand Down Expand Up @@ -739,6 +740,35 @@ public Set<String> getSinks() throws PulsarAdminException {
.map(ConnectorDefinition::getName).collect(Collectors.toSet());
}

@Override
public List<FunctionDefinition> getBuiltInFunctions() throws PulsarAdminException {
return sync(this::getBuiltInFunctionsAsync);
}

@Override
public CompletableFuture<List<FunctionDefinition>> getBuiltInFunctionsAsync() {
WebTarget path = functions.path("builtins");
final CompletableFuture<List<FunctionDefinition>> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<Response>() {
@Override
public void completed(Response response) {
if (response.getStatus() != Response.Status.OK.getStatusCode()) {
future.completeExceptionally(getApiException(response));
} else {
future.complete(response.readEntity(
new GenericType<List<FunctionDefinition>>() {}));
}
}

@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}

public List<WorkerInfo> getCluster() throws PulsarAdminException {
try {
return request(functions.path("cluster")).get(new GenericType<List<WorkerInfo>>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.text.WordUtils;
import org.apache.pulsar.admin.cli.utils.CmdUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
Expand Down Expand Up @@ -1236,6 +1237,7 @@ public CmdFunctions(Supplier<PulsarAdmin> admin) throws PulsarClientException {
jcommander.addCommand("upload", getUploader());
jcommander.addCommand("download", getDownloader());
jcommander.addCommand("reload", new ReloadBuiltInFunctions());
jcommander.addCommand("available-functions", new ListBuiltInFunctions());
}

@VisibleForTesting
Expand Down Expand Up @@ -1325,4 +1327,17 @@ private void parseFullyQualifiedFunctionName(String fqfn, FunctionConfig functio
}
}

@Parameters(commandDescription = "Get the list of Pulsar Functions supported by Pulsar cluster")
public class ListBuiltInFunctions extends BaseCommand {
@Override
void runCmd() throws Exception {
getAdmin().functions().getBuiltInFunctions()
.forEach(function -> {
System.out.println(function.getName());
System.out.println(WordUtils.wrap(function.getDescription(), 80));
System.out.println("----------------------------------------");
});
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@

import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import java.util.TreeMap;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.functions.FunctionDefinition;
import org.apache.pulsar.functions.utils.functions.FunctionArchive;
import org.apache.pulsar.functions.utils.functions.FunctionUtils;

Expand All @@ -42,6 +45,11 @@ public Path getFunctionArchive(String functionType) {
return functions.get(functionType).getArchivePath();
}

public List<FunctionDefinition> getFunctionDefinitions() {
return functions.values().stream().map(FunctionArchive::getFunctionDefinition)
.collect(Collectors.toList());
}

public void reloadFunctions(WorkerConfig workerConfig) throws IOException {
this.functions = FunctionUtils.searchForFunctions(workerConfig.getFunctionsDirectory());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.FunctionDefinition;
import org.apache.pulsar.common.functions.UpdateOptionsImpl;
import org.apache.pulsar.common.functions.Utils;
import org.apache.pulsar.common.functions.WorkerInfo;
Expand Down Expand Up @@ -689,6 +690,19 @@ public void reloadBuiltinFunctions(String clientRole, AuthenticationDataSource a
worker().getFunctionsManager().reloadFunctions(worker().getWorkerConfig());
}

@Override
public List<FunctionDefinition> getBuiltinFunctions(String clientRole,
AuthenticationDataSource authenticationData) {
if (!isWorkerServiceAvailable()) {
throwUnavailableException();
}

if (worker().getWorkerConfig().isAuthorizationEnabled() && !isSuperUser(clientRole, authenticationData)) {
throw new RestException(Response.Status.UNAUTHORIZED, "Client is not authorized to perform operation");
}
return this.worker().getFunctionsManager().getFunctionDefinitions();
}

private Function.FunctionDetails validateUpdateRequestParams(final String tenant,
final String namespace,
final String componentName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.swagger.annotations.ApiResponses;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.FunctionDefinition;
import org.apache.pulsar.common.functions.FunctionState;
import org.apache.pulsar.common.functions.UpdateOptionsImpl;
import org.apache.pulsar.common.io.ConnectorDefinition;
Expand Down Expand Up @@ -366,6 +367,23 @@ public void reloadBuiltinFunctions() throws IOException {
functions().reloadBuiltinFunctions(clientAppId(), clientAuthData());
}

@GET
@ApiOperation(
value = "Fetches the list of built-in Pulsar functions",
response = FunctionDefinition.class,
responseContainer = "List"
)
@ApiResponses(value = {
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 408, message = "Request timeout")
})
@Path("/builtins")
@Produces(MediaType.APPLICATION_JSON)
public List<FunctionDefinition> getBuiltinFunctions() {
return functions().getBuiltinFunctions(clientAppId(), clientAuthData());
}

@GET
@Path("/{tenant}/{namespace}/{functionName}/state/{key}")
public FunctionState getFunctionState(final @PathParam("tenant") String tenant,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.List;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.FunctionDefinition;
import org.apache.pulsar.common.functions.UpdateOptionsImpl;
import org.apache.pulsar.common.policies.data.FunctionStatus;
import org.apache.pulsar.common.policies.data.FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData;
Expand Down Expand Up @@ -80,7 +82,9 @@ FunctionInstanceStatusData getFunctionInstanceStatus(final String tenant,
final String clientRole,
final AuthenticationDataSource clientAuthenticationDataHttps);


void reloadBuiltinFunctions(String clientRole, AuthenticationDataSource clientAuthenticationDataHttps)
throws IOException;

List<FunctionDefinition> getBuiltinFunctions(String clientRole,
AuthenticationDataSource clientAuthenticationDataHttps);
}

0 comments on commit 35d3f6b

Please sign in to comment.