You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Pulsar IO connectors make it possible to connect Pulsar to an external system:
A Source reads continuously from an external system and writes to a Pulsar topic
A Sink reads continuously from a Pulsar topic and writes to an external system.
Sources and Sinks are written in Java.
Pulsar also has a lightweight computing system named Pulsar Functions. A Pulsar Function reads from one or more topics, applies user logic written in Java, Python or Go and writes to an output topic.
When using Pulsar IO connectors, the format of what is read/written from/to the source/sink is defined by the connector code. But there are a lot of situations where a user wants to transform this data before using it. Currently the solution is to either :
write a custom connector that transforms the data the way we want but that means writing a lot of code without reuse, packaging and managing custom connectors and so on..
write a Function to transform the data after it was written to a topic by a Source or before it is read from a topic by a Sink. This is not very efficient as we have to use an intermediate topic, which means additional storage, IO, and latency.
Considering all this, it would be handy to be able to apply a Function on-the-fly to a connector without going through an intermediary topic.
Goal
This PIP defines the changes needed to be able to apply a preprocessing Function on-the-fly to a Sink.
The preprocessing function can be a built-in function, a package function, or loaded through an http URL or a file path.
Sources, Sinks and Functions are based on the same runtime process that:
reads from a Source. For Sinks and Functions this Source is a PulsarSource consuming from a Pulsar topic
applies a Function. For Sources and Sinks, this Function is IdentityFunction which returns the data it gets without modification.
writes to a Sink. For Sources and Functions, this Sink is a PulsarSink writing to a Pulsar topic.
This PIP reuses this and allows configuring a Function different from IdentityFunction to Sinks.
Only Functions returning a Record will be authorized to ensure that the Function sets the Schema explicitly.
Out of the scope of this PIP, for future work:
Applying a post-processing Function to a Source
Loading the Function jar through the Sink CLI
API Changes
Admin CLI
The following options will be added to the pulsar-admin sinks CLI create, update and localrun:
preprocess-function: the preprocess function applied before the Sink. Starts by builtin:// for built-in functions, function:// for package function, http:// or file://
preprocess-function-classname: the preprocess function class name (optional if the function is a NAR)
preprocess-function-config: the configuration of the preprocess function in the same format as the user-config parameter of the functions create CLI command.
The corresponding fields will be added to SinkConfig:
The field extraFunctionPackageLocation to the protobuf structure FunctionMetaData will be added. This field will be filled with the location of the extra function to apply when registering a sink and used in the Runtime to load the function code.
The following parameters will be added to JavaInstanceStarter:
--extra_function_jar: the path to the extra function jar
--extra_function_id: the extra function UUID cache key
These parameters are then used by the ThreadRuntime to load the function from the FunctionCacheManager or create it there if needed.
Download the extra function
The statefulset spawned in KubernetesRuntime needs to be able to download the extra functions code via API.
An extra-function query param will be added to the download function HTTP endpoint
@Path("/{tenant}/{namespace}/{functionName}/download")
publicStreamingOutputdownloadFunction(
@ApiParam(value = "The tenant of functions")
final@PathParam("tenant") Stringtenant,
@ApiParam(value = "The namespace of functions")
final@PathParam("namespace") Stringnamespace,
@ApiParam(value = "The name of functions")
final@PathParam("functionName") StringfunctionName) {
final@PathParam("functionName") StringfunctionName,
@ApiParam(value = "Whether to download the extra-function")
final@QueryParam("extra-function") booleanextraFunction) {
If extraFunction is true then the extra function will be returned instead of the sink.
The Java admin SDK will have the following methods added:
/** * Download Function Code. * * @param destinationFile * file where data should be downloaded to * @param tenant * Tenant name * @param namespace * Namespace name * @param function * Function name * @param extraFunction * Whether to download the extra-function (for sources and sinks) * @throws PulsarAdminException */voiddownloadFunction(StringdestinationFile, Stringtenant, Stringnamespace, Stringfunction,
booleanextraFunction) throwsPulsarAdminException;
/** * Download Function Code asynchronously. * * @param destinationFile * file where data should be downloaded to * @param tenant * Tenant name * @param namespace * Namespace name * @param function * Function name * @param extraFunction * Whether to download the extra-function (for sources and sinks) */CompletableFuture<Void> downloadFunctionAsync(
StringdestinationFile, Stringtenant, Stringnamespace, Stringfunction, booleanextraFunction);
The parameter --extra-function will be added to the admin CLI command functions download
Implementation
Pulsar-admin
Add the admin CLI options when creating/updating/localrunning the sink (see API changes)
Pulsar broker
On the broker API, in registerSink/updateSink, if a preprocessing function is present in the Sink config, we:
validate the function
get the function classloader (from builtin or download a package file)
load the function
inspect the function types and set the first arg as Sink type. Also verify that the second arg is of type Record.
use the function classloader instead of the sink classloader to verify if custom schemas, serdes, crypto key readers can be loaded and are conform.
get the function package location and fill the protobuf extraFunctionPackageLocation field with it. A name for this preprocessing function is generated from the sink name so it can be referenced when stored in BookKeeper or in package management. The name of the preprocessing function is {sink name}__sink-function.
set the functionDetails with the preprocessing function config (function class name and function userConfig)
The --extra-function query parameter is added to the functions download CLI command, admin SDK and HTTP API (see API changes).
Function worker
When the InstanceConfig is created, an UUID is set to the extraFunctionId field. This field will serve as a cache key for the extra function (see API changes).
When the FunctionActioner starts the function, if extraFunctionPackageLocation is present, the same is done for the extra function as what is done for the connector:
if the runtime is not externally managed, the extra function code is downloaded from the extraFunctionPackageLocation and the Runtime is created with the extra package file path and original name (see API changes to RuntimeFactory::createContainer)
if the runtime is externally managed, the Runtime is created with the extraFunctionPackageLocation and original name.
Depending on the configured runtime, if there’s an extra function file:
For the ThreadRuntime, the extra function classloader is obtained with the instance extraFunctionId cache key, then this classloader is passed to the JavaInstanceRunnable. The JavaInstanceRunnable then switches between the connector classloader and the extra function classloader accordingly..
For the ProcessRuntime, the path to the extra function jar is added to the --extra_function_jar parameter in the JavaInstanceStarter command. The JavaInstanceStarter then uses it when creating its ThreadRuntime.
For the KubernetesRuntime, a command is added in the statefulset exec command to download the extra function using the –extra-function flag of the functions download command. And the path to this downloaded jar is added to the --extra_function_jar parameter of the JavaInstanceStarter command.
LocalRunner
If sinkConfig has a preprocessFunction, the LocalRunner will use the same methods as in the broker to get the function file and functionDetails and use them when spawning the Runtime.
Reject Alternatives
N/A
The text was updated successfully, but these errors were encountered:
Motivation
Pulsar IO connectors make it possible to connect Pulsar to an external system:
Sources and Sinks are written in Java.
Pulsar also has a lightweight computing system named Pulsar Functions. A Pulsar Function reads from one or more topics, applies user logic written in Java, Python or Go and writes to an output topic.
When using Pulsar IO connectors, the format of what is read/written from/to the source/sink is defined by the connector code. But there are a lot of situations where a user wants to transform this data before using it. Currently the solution is to either :
Considering all this, it would be handy to be able to apply a Function on-the-fly to a connector without going through an intermediary topic.
Goal
This PIP defines the changes needed to be able to apply a preprocessing Function on-the-fly to a Sink.
The preprocessing function can be a built-in function, a package function, or loaded through an http URL or a file path.
Sources, Sinks and Functions are based on the same runtime process that:
This PIP reuses this and allows configuring a Function different from IdentityFunction to Sinks.
Only Functions returning a Record will be authorized to ensure that the Function sets the Schema explicitly.
Out of the scope of this PIP, for future work:
API Changes
Admin CLI
The following options will be added to the
pulsar-admin sinks
CLIcreate
,update
andlocalrun
:preprocess-function
: the preprocess function applied before the Sink. Starts bybuiltin://
for built-in functions,function://
for package function,http://
orfile://
preprocess-function-classname
: the preprocess function class name (optional if the function is a NAR)preprocess-function-config
: the configuration of the preprocess function in the same format as theuser-config
parameter of thefunctions create
CLI command.The corresponding fields will be added to
SinkConfig
:Function definition
The field
extraFunctionPackageLocation
to the protobuf structureFunctionMetaData
will be added. This field will be filled with the location of the extra function to apply when registering a sink and used in the Runtime to load the function code.Runtime
The parameters
extraFunctionFile
andoriginalExtraFunctionFileName
will be added toRuntimeFactory::createContainer
Instance function cache
A field
extraFunctionId
toInstanceConfig
that will hold the UUID cache key of the extra function will be added.JavaInstanceStarter
The following parameters will be added to JavaInstanceStarter:
--extra_function_jar
: the path to the extra function jar--extra_function_id
: the extra function UUID cache keyThese parameters are then used by the
ThreadRuntime
to load the function from theFunctionCacheManager
or create it there if needed.Download the extra function
The statefulset spawned in
KubernetesRuntime
needs to be able to download the extra functions code via API.An
extra-function
query param will be added to the download function HTTP endpointIf
extraFunction
istrue
then the extra function will be returned instead of the sink.The Java admin SDK will have the following methods added:
The parameter
--extra-function
will be added to the admin CLI commandfunctions download
Implementation
Pulsar-admin
Pulsar broker
On the broker API, in registerSink/updateSink, if a preprocessing function is present in the Sink config, we:
{sink name}__sink-function
.functionDetails
with the preprocessing function config (function class name and function userConfig)The
--extra-function
query parameter is added to thefunctions download
CLI command, admin SDK and HTTP API (see API changes).Function worker
When the
InstanceConfig
is created, an UUID is set to theextraFunctionId
field. This field will serve as a cache key for the extra function (see API changes).When the
FunctionActioner
starts the function, ifextraFunctionPackageLocation
is present, the same is done for the extra function as what is done for the connector:extraFunctionPackageLocation
and theRuntime
is created with the extra package file path and original name (see API changes toRuntimeFactory::createContainer
)Runtime
is created with theextraFunctionPackageLocation
and original name.Depending on the configured runtime, if there’s an extra function file:
ThreadRuntime
, the extra function classloader is obtained with the instanceextraFunctionId
cache key, then this classloader is passed to theJavaInstanceRunnable
. TheJavaInstanceRunnable
then switches between the connector classloader and the extra function classloader accordingly..ProcessRuntime
, the path to the extra function jar is added to the--extra_function_jar
parameter in theJavaInstanceStarter
command. TheJavaInstanceStarter
then uses it when creating itsThreadRuntime
.KubernetesRuntime
, a command is added in the statefulset exec command to download the extra function using the–extra-function
flag of thefunctions download
command. And the path to this downloaded jar is added to the--extra_function_jar
parameter of theJavaInstanceStarter
command.LocalRunner
If
sinkConfig
has apreprocessFunction
, theLocalRunner
will use the same methods as in the broker to get the function file andfunctionDetails
and use them when spawning theRuntime
.Reject Alternatives
N/A
The text was updated successfully, but these errors were encountered: