Skip to content

Commit

Permalink
Set key for message when using function publish (#4005)
Browse files Browse the repository at this point in the history
* Allow to configure TypedMessageBuilder through a Map conf object

* Use constants for message confs

* Reverted previous change

* Use Long instead of Number

* Set key for message when using function publish

* fix unit test

* fix python test

* improving impl

* improving implementation

* add tests and examples

* fix bug

* fix bug

* fixing comments

* fix example

* addressing comments

* fix function
  • Loading branch information
jerrypeng authored Apr 13, 2019
1 parent de6bc11 commit 23b1418
Show file tree
Hide file tree
Showing 11 changed files with 743 additions and 29 deletions.

Large diffs are not rendered by default.

30 changes: 29 additions & 1 deletion pulsar-client-cpp/python/pulsar/functions/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,15 +118,43 @@ def get_secret(self, secret_name):
"""Returns the secret value associated with the name. None if nothing was found"""
pass

@abstractmethod
def get_partition_key(self):
"""Returns partition key of the input message is one exists"""
pass


@abstractmethod
def record_metric(self, metric_name, metric_value):
"""Records the metric_value. metric_value has to satisfy isinstance(metric_value, numbers.Number)"""
pass

@abstractmethod
def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", properties=None, compression_type=None, callback=None):
"""
DEPRECATED
Publishes message to topic_name by first serializing the message using serde_class_name serde
The message will have properties specified if any
"""
pass

@abstractmethod
def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", compression_type=None, callback=None, message_conf=None):
"""Publishes message to topic_name by first serializing the message using serde_class_name serde
The message will have properties specified if any"""
The message will have properties specified if any
The available options for message_conf:
properties,
partition_key,
sequence_id,
replication_clusters,
disable_replication,
event_timestamp
"""
pass

@abstractmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public interface Context {

/**
* The id of the function that we are executing
*
* @return The function id
*/
String getFunctionId();
Expand Down Expand Up @@ -119,16 +120,17 @@ public interface Context {
/**
* Increment the builtin distributed counter referred by key.
*
* @param key The name of the key
* @param key The name of the key
* @param amount The amount to be incremented
*/
void incrCounter(String key, long amount);


/**
* Increment the builtin distributed counter referred by key
* but dont wait for the completion of the increment operation
*
* @param key The name of the key
* @param key The name of the key
* @param amount The amount to be incremented
*/
CompletableFuture<Void> incrCounterAsync(String key, long amount);
Expand All @@ -153,15 +155,15 @@ public interface Context {
/**
* Update the state value for the key.
*
* @param key name of the key
* @param key name of the key
* @param value state value of the key
*/
void putState(String key, ByteBuffer value);

/**
* Update the state value for the key, but don't wait for the operation to be completed
*
* @param key name of the key
* @param key name of the key
* @param value state value of the key
*/
CompletableFuture<Void> putStateAsync(String key, ByteBuffer value);
Expand Down Expand Up @@ -218,19 +220,17 @@ public interface Context {
* Record a user defined metric.
*
* @param metricName The name of the metric
* @param value The value of the metric
* @param value The value of the metric
*/
void recordMetric(String metricName, double value);

/**
* Publish an object using serDe for serializing to the topic.
* Publish an object using serDe or schema class for serializing to the topic.
*
* @param topicName
* The name of the topic for publishing
* @param object
* The object that needs to be published
* @param schemaOrSerdeClassName
* Either a builtin schema type (eg: "avro", "json", "protobuf") or the class name of the custom schema class
* @param topicName The name of the topic for publishing
* @param object The object that needs to be published
* @param schemaOrSerdeClassName Either a builtin schema type (eg: "avro", "json", "protobuf") or the class name
* of the custom schema class
* @return A future that completes when the framework is done publishing the message
*/
<O> CompletableFuture<Void> publish(String topicName, O object, String schemaOrSerdeClassName);
Expand All @@ -239,9 +239,30 @@ public interface Context {
* Publish an object to the topic using default schemas.
*
* @param topicName The name of the topic for publishing
* @param object The object that needs to be published
* @param object The object that needs to be published
* @return A future that completes when the framework is done publishing the message
*/
<O> CompletableFuture<Void> publish(String topicName, O object);

/**
* Publish an object using serDe or schema class for serializing to the topic.
*
* @param topicName The name of the topic for publishing
* @param object The object that needs to be published
* @param schemaOrSerdeClassName Either a builtin schema type (eg: "avro", "json", "protobuf") or the class name
* of the custom schema class
* @param messageConf A map of configurations to set for the message that will be published
* The available options are:
*
* "key" - Parition Key
* "properties" - Map of properties
* "eventTime"
* "sequenceId"
* "replicationClusters"
* "disableReplication"
*
* @return A future that completes when the framework is done publishing the message
*/
<O> CompletableFuture<Void> publish(String topicName, O object, String schemaOrSerdeClassName, Map<String, Object> messageConf);

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.functions.api.Context;
Expand Down Expand Up @@ -258,7 +259,7 @@ public String getSecret(String secretName) {
return null;
}
}

private void ensureStateEnabled() {
checkState(null != stateContext, "State is not enabled.");
}
Expand Down Expand Up @@ -336,11 +337,17 @@ public <O> CompletableFuture<Void> publish(String topicName, O object) {
@SuppressWarnings("unchecked")
@Override
public <O> CompletableFuture<Void> publish(String topicName, O object, String schemaOrSerdeClassName) {
return publish(topicName, object, (Schema<O>) topicSchema.getSchema(topicName, object, schemaOrSerdeClassName, false));
return publish(topicName, object, schemaOrSerdeClassName, null);
}

@SuppressWarnings("unchecked")
@Override
public <O> CompletableFuture<Void> publish(String topicName, O object, String schemaOrSerdeClassName, Map<String, Object> messageConf) {
return publish(topicName, object, (Schema<O>) topicSchema.getSchema(topicName, object, schemaOrSerdeClassName, false), messageConf);
}

@SuppressWarnings("unchecked")
public <O> CompletableFuture<Void> publish(String topicName, O object, Schema<O> schema) {
public <O> CompletableFuture<Void> publish(String topicName, O object, Schema<O> schema, Map<String, Object> messageConf) {
Producer<O> producer = (Producer<O>) publishProducers.get(topicName);

if (producer == null) {
Expand Down Expand Up @@ -382,7 +389,11 @@ public <O> CompletableFuture<Void> publish(String topicName, O object, Schema<O>
}
}

CompletableFuture<Void> future = producer.sendAsync(object).thenApply(msgId -> null);
TypedMessageBuilder<O> messageBuilder = producer.newMessage();
if (messageConf != null) {
messageBuilder.loadConf(messageConf);
}
CompletableFuture<Void> future = messageBuilder.value(object).sendAsync().thenApply(msgId -> null);
future.exceptionally(e -> {
this.statsManager.incrSysExceptions(e);
logger.error("Failed to publish to topic {} with error {}", topicName, e);
Expand Down
14 changes: 13 additions & 1 deletion pulsar-functions/instance/src/main/python/contextimpl.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ def get_message_properties(self):
def get_current_message_topic_name(self):
return self.message.topic_name()

def get_partition_key(self):
return self.message.partition_key()

def get_function_name(self):
return self.instance_config.function_details.name

Expand Down Expand Up @@ -147,6 +150,9 @@ def callback_wrapper(self, callback, topic, message_id, result, msg):
callback(result, msg)

def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", properties=None, compression_type=None, callback=None):
self.publish(topic_name, message, serde_class_name=serde_class_name, compression_type=compression_type, callback=callback, message_conf={"properties": properties})

def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", compression_type=None, callback=None, message_conf=None):
# Just make sure that user supplied values are properly typed
topic_name = str(topic_name)
serde_class_name = str(serde_class_name)
Expand All @@ -172,7 +178,13 @@ def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", p
self.publish_serializers[serde_class_name] = serde_klass()

output_bytes = bytes(self.publish_serializers[serde_class_name].serialize(message))
self.publish_producers[topic_name].send_async(output_bytes, partial(self.callback_wrapper, callback, topic_name, self.get_message_id()), properties=properties)

if message_conf:
self.publish_producers[topic_name].send_async(
output_bytes, partial(self.callback_wrapper, callback, topic_name, self.get_message_id()), **message_conf)
else:
self.publish_producers[topic_name].send_async(
output_bytes, partial(self.callback_wrapper, callback, topic_name, self.get_message_id()))

def ack(self, msgid, topic):
topic_consumer = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@
import io.prometheus.client.CollectorRegistry;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.ProducerBase;
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.instance.state.StateContextImpl;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider;
Expand All @@ -38,10 +42,14 @@
import java.util.concurrent.CompletableFuture;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -72,12 +80,22 @@ public void setup() {
when(client.getSchema(anyString())).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
when(producer.sendAsync(anyString())).thenReturn(CompletableFuture.completedFuture(null));

TypedMessageBuilder messageBuilder = spy(new TypedMessageBuilderImpl(mock(ProducerBase.class), Schema.STRING));
doReturn(new CompletableFuture<>()).when(messageBuilder).sendAsync();
when(producer.newMessage()).thenReturn(messageBuilder);

context = new ContextImpl(
config,
logger,
client,
new EnvironmentBasedSecretsProvider(), new CollectorRegistry(), new String[0],
ComponentType.FUNCTION, null);
context.setCurrentMessageContext(new Record<String>() {
@Override
public String getValue() {
return null;
}
});
}

@Test(expectedExceptions = IllegalStateException.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def test_context_publish(self):

msg = Message()
msg.message_id = Mock(return_value="test_message_id")
msg.partition_key = Mock(return_value="test_key")
context_impl.set_current_message_context(msg, "test_topic_name")

context_impl.publish("test_topic_name", "test_message")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.api.examples;

import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;

import java.util.HashMap;
import java.util.Map;

/**
* Example function that uses the built in publish function in the context
* to publish to a desired topic based on config and setting various message configurations to be passed along.
*
*/
public class PublishFunctionWithMessageConf implements Function<String, Void> {
@Override
public Void process(String input, Context context) {
String publishTopic = (String) context.getUserConfigValueOrDefault("publish-topic", "publishtopic");
String output = String.format("%s!", input);

Map<String, String> properties = new HashMap<>();
properties.put("input_topic", context.getCurrentRecord().getTopicName().get());
properties.putAll(context.getCurrentRecord().getProperties());

Map<String, Object> messageConf = new HashMap<>();
messageConf.put(TypedMessageBuilder.CONF_PROPERTIES, properties);
if (context.getCurrentRecord().getKey().isPresent()) {
messageConf.put(TypedMessageBuilder.CONF_KEY, context.getCurrentRecord().getKey().get());
}
messageConf.put(TypedMessageBuilder.CONF_EVENT_TIME, System.currentTimeMillis());
context.publish(publishTopic, output, null, messageConf);
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

import time
from pulsar import Function

# Example function that uses the built in publish function in the context
# to publish to a desired topic based on config
class PublishFunctionWithMessageConf(Function):
def __init__(self):
pass

def process(self, input, context):
publish_topic = "publishtopic"
if "publish-topic" in context.get_user_config_map():
publish_topic = context.get_user_config_value("publish-topic")
context.publish(publish_topic, input + '!',
message_conf={"properties": {k: v for d in [{"input_topic" : context.get_current_message_topic_name()}, context.get_message_properties()] for k, v in d.items()},
"partition_key": context.get_partition_key(),
"event_timestamp": int(time.time())})
return
Loading

0 comments on commit 23b1418

Please sign in to comment.