Skip to content

Commit

Permalink
NIFI-6797: Add support for specifying Partition via EL or RecordPath …
Browse files Browse the repository at this point in the history
…for PublishKafka(Record)_1_0 and PublishKafka(Record)_2_0

This closes #3834.

Signed-off-by: Bryan Bende <bbende@apache.org>
  • Loading branch information
markap14 authored and bbende committed Oct 23, 2019
1 parent ace23c3 commit 3543b9c
Show file tree
Hide file tree
Showing 18 changed files with 605 additions and 202 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kerberos-credentials-service-api</artifactId>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
Expand Down Expand Up @@ -92,6 +92,12 @@
<version>2.6</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-path</artifactId>
<version>1.10.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies>
<profiles>
<profile>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
*/
package org.apache.nifi.processors.kafka.pubsub;

import java.util.Map;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

/**
* Collection of implementation of common Kafka {@link Partitioner}s.
*/
Expand Down Expand Up @@ -59,4 +59,40 @@ private synchronized int next(int numberOfPartitions) {
return index++;
}
}

public static class RecordPathPartitioner implements Partitioner {
@Override
public int partition(final String topic, final Object key, final byte[] keyBytes, final Object value, final byte[] valueBytes, final Cluster cluster) {
// When this partitioner is used, it is always overridden by creating the ProducerRecord with the Partition directly specified. However, we must have a unique value
// to set in the Producer's config, so this class exists
return 0;
}

@Override
public void close() {
}

@Override
public void configure(final Map<String, ?> configs) {
}
}


public static class ExpressionLanguagePartitioner implements Partitioner {
@Override
public int partition(final String topic, final Object key, final byte[] keyBytes, final Object value, final byte[] valueBytes, final Cluster cluster) {
// When this partitioner is used, it is always overridden by creating the ProducerRecord with the Partition directly specified. However, we must have a unique value
// to set in the Producer's config, so this class exists
return 0;
}

@Override
public void close() {
}

@Override
public void configure(final Map<String, ?> configs) {
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,16 @@
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.FlowFileFilters;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.record.path.RecordPath;
import org.apache.nifi.record.path.RecordPathResult;
import org.apache.nifi.record.path.util.RecordPathCache;
import org.apache.nifi.record.path.validation.RecordPathValidator;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;

Expand All @@ -60,11 +65,16 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAccumulator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.regex.Pattern;

import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;

@Tags({"Apache", "Kafka", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub", "1.0"})
@CapabilityDescription("Sends the contents of a FlowFile as individual records to Apache Kafka using the Kafka 1.0 Producer API. "
+ "The contents of the FlowFile are expected to be record-oriented data that can be read by the configured Record Reader. "
Expand Down Expand Up @@ -98,6 +108,12 @@ public class PublishKafkaRecord_1_0 extends AbstractProcessor {
+ "the next Partition to Partition 2, and so on, wrapping as necessary.");
static final AllowableValue RANDOM_PARTITIONING = new AllowableValue("org.apache.kafka.clients.producer.internals.DefaultPartitioner",
"DefaultPartitioner", "Messages will be assigned to random partitions.");
static final AllowableValue RECORD_PATH_PARTITIONING = new AllowableValue(Partitioners.RecordPathPartitioner.class.getName(),
"RecordPath Partitioner", "Interprets the <Partition> property as a RecordPath that will be evaluated against each Record to determine which partition the Record will go to. All Records " +
"that have the same value for the given RecordPath will go to the same Partition.");
static final AllowableValue EXPRESSION_LANGUAGE_PARTITIONING = new AllowableValue(Partitioners.ExpressionLanguagePartitioner.class.getName(), "Expression Language Partitioner",
"Interprets the <Partition> property as Expression Language that will be evaluated against each FlowFile. This Expression will be evaluated once against the FlowFile, " +
"so all Records in a given FlowFile will go to the same partition.");

static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded",
Expand Down Expand Up @@ -184,11 +200,20 @@ public class PublishKafkaRecord_1_0 extends AbstractProcessor {
.name("partitioner.class")
.displayName("Partitioner class")
.description("Specifies which class to use to compute a partition id for a message. Corresponds to Kafka's 'partitioner.class' property.")
.allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING)
.allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING, RECORD_PATH_PARTITIONING, EXPRESSION_LANGUAGE_PARTITIONING)
.defaultValue(RANDOM_PARTITIONING.getValue())
.required(false)
.build();

static final PropertyDescriptor PARTITION = new PropertyDescriptor.Builder()
.name("partition")
.displayName("Partition")
.description("Specifies which Partition Records will go to. How this value is interpreted is dictated by the <Partitioner class> property.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
.build();

static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder()
.name("compression.type")
.displayName("Compression Type")
Expand Down Expand Up @@ -253,6 +278,7 @@ public class PublishKafkaRecord_1_0 extends AbstractProcessor {
private static final Set<Relationship> RELATIONSHIPS;

private volatile PublisherPool publisherPool = null;
private final RecordPathCache recordPathCache = new RecordPathCache(25);

static {
final List<PropertyDescriptor> properties = new ArrayList<>();
Expand All @@ -276,6 +302,7 @@ public class PublishKafkaRecord_1_0 extends AbstractProcessor {
properties.add(ACK_WAIT_TIME);
properties.add(METADATA_WAIT_TIME);
properties.add(PARTITION_CLASS);
properties.add(PARTITION);
properties.add(COMPRESSION_CODEC);

PROPERTIES = Collections.unmodifiableList(properties);
Expand Down Expand Up @@ -325,6 +352,32 @@ protected Collection<ValidationResult> customValidate(final ValidationContext va
}
}

final String partitionClass = validationContext.getProperty(PARTITION_CLASS).getValue();
if (RECORD_PATH_PARTITIONING.getValue().equals(partitionClass)) {
final String rawRecordPath = validationContext.getProperty(PARTITION).getValue();
if (rawRecordPath == null) {
results.add(new ValidationResult.Builder()
.subject("Partition")
.valid(false)
.explanation("The <Partition> property must be specified if using the RecordPath Partitioning class")
.build());
} else if (!validationContext.isExpressionLanguagePresent(rawRecordPath)) {
final ValidationResult result = new RecordPathValidator().validate(PARTITION.getDisplayName(), rawRecordPath, validationContext);
if (result != null) {
results.add(result);
}
}
} else if (EXPRESSION_LANGUAGE_PARTITIONING.getValue().equals(partitionClass)) {
final String rawRecordPath = validationContext.getProperty(PARTITION).getValue();
if (rawRecordPath == null) {
results.add(new ValidationResult.Builder()
.subject("Partition")
.valid(false)
.explanation("The <Partition> property must be specified if using the Expression Language Partitioning class")
.build());
}
}

return results;
}

Expand Down Expand Up @@ -414,6 +467,8 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
final String messageKeyField = context.getProperty(MESSAGE_KEY_FIELD).evaluateAttributeExpressions(flowFile).getValue();

final Function<Record, Integer> partitioner = getPartitioner(context, flowFile);

try {
session.read(flowFile, new InputStreamCallback() {
@Override
Expand All @@ -423,7 +478,7 @@ public void process(final InputStream in) throws IOException {
final RecordSet recordSet = reader.createRecordSet();

final RecordSchema schema = writerFactory.getSchema(flowFile.getAttributes(), recordSet.getSchema());
lease.publish(flowFile, recordSet, writerFactory, schema, messageKeyField, topic);
lease.publish(flowFile, recordSet, writerFactory, schema, messageKeyField, topic, partitioner);
} catch (final SchemaNotFoundException | MalformedRecordException e) {
throw new ProcessException(e);
}
Expand Down Expand Up @@ -460,4 +515,33 @@ public void process(final InputStream in) throws IOException {
}
}
}

private Function<Record, Integer> getPartitioner(final ProcessContext context, final FlowFile flowFile) {
final String partitionClass = context.getProperty(PARTITION_CLASS).getValue();
if (RECORD_PATH_PARTITIONING.getValue().equals(partitionClass)) {
final String recordPath = context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue();
final RecordPath compiled = recordPathCache.getCompiled(recordPath);

return record -> evaluateRecordPath(compiled, record);
} else if (EXPRESSION_LANGUAGE_PARTITIONING.getValue().equals(partitionClass)) {
final String partition = context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue();
final int hash = Objects.hashCode(partition);
return (record) -> hash;
}

return null;
}

private Integer evaluateRecordPath(final RecordPath recordPath, final Record record) {
final RecordPathResult result = recordPath.evaluate(record);
final LongAccumulator accumulator = new LongAccumulator(Long::sum, 0);

result.getSelectedFields().forEach(fieldValue -> {
final Object value = fieldValue.getValue();
final long hash = Objects.hashCode(value);
accumulator.accumulate(hash);
});

return accumulator.intValue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,6 @@

package org.apache.nifi.processors.kafka.pubsub;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.regex.Pattern;

import javax.xml.bind.DatatypeConverter;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.nifi.annotation.behavior.DynamicProperty;
Expand All @@ -60,6 +41,27 @@
import org.apache.nifi.processor.util.FlowFileFilters;
import org.apache.nifi.processor.util.StandardValidators;

import javax.xml.bind.DatatypeConverter;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.regex.Pattern;

import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;

@Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub", "1.0"})
@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka using the Kafka 1.0 Producer API."
+ "The messages to send may be individual FlowFiles or may be delimited, using a "
Expand Down Expand Up @@ -94,6 +96,9 @@ public class PublishKafka_1_0 extends AbstractProcessor {
+ "the next Partition to Partition 2, and so on, wrapping as necessary.");
static final AllowableValue RANDOM_PARTITIONING = new AllowableValue("org.apache.kafka.clients.producer.internals.DefaultPartitioner",
"DefaultPartitioner", "Messages will be assigned to random partitions.");
static final AllowableValue EXPRESSION_LANGUAGE_PARTITIONING = new AllowableValue(Partitioners.ExpressionLanguagePartitioner.class.getName(), "Expression Language Partitioner",
"Interprets the <Partition> property as Expression Language that will be evaluated against each FlowFile. This Expression will be evaluated once against the FlowFile, " +
"so all Records in a given FlowFile will go to the same partition.");

static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded",
Expand Down Expand Up @@ -187,11 +192,20 @@ public class PublishKafka_1_0 extends AbstractProcessor {
.name(ProducerConfig.PARTITIONER_CLASS_CONFIG)
.displayName("Partitioner class")
.description("Specifies which class to use to compute a partition id for a message. Corresponds to Kafka's 'partitioner.class' property.")
.allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING)
.allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING, EXPRESSION_LANGUAGE_PARTITIONING)
.defaultValue(RANDOM_PARTITIONING.getValue())
.required(false)
.build();

static final PropertyDescriptor PARTITION = new PropertyDescriptor.Builder()
.name("partition")
.displayName("Partition")
.description("Specifies which Partition Records will go to.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
.build();

static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder()
.name(ProducerConfig.COMPRESSION_TYPE_CONFIG)
.displayName("Compression Type")
Expand Down Expand Up @@ -273,6 +287,7 @@ public class PublishKafka_1_0 extends AbstractProcessor {
properties.add(ACK_WAIT_TIME);
properties.add(METADATA_WAIT_TIME);
properties.add(PARTITION_CLASS);
properties.add(PARTITION);
properties.add(COMPRESSION_CODEC);

PROPERTIES = Collections.unmodifiableList(properties);
Expand Down Expand Up @@ -322,6 +337,18 @@ protected Collection<ValidationResult> customValidate(final ValidationContext va
}
}

final String partitionClass = validationContext.getProperty(PARTITION_CLASS).getValue();
if (EXPRESSION_LANGUAGE_PARTITIONING.getValue().equals(partitionClass)) {
final String rawRecordPath = validationContext.getProperty(PARTITION).getValue();
if (rawRecordPath == null) {
results.add(new ValidationResult.Builder()
.subject("Partition")
.valid(false)
.explanation("The <Partition> property must be specified if using the Expression Language Partitioning class")
.build());
}
}

return results;
}

Expand Down Expand Up @@ -413,11 +440,12 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
demarcatorBytes = null;
}

final Integer partition = getPartition(context, flowFile);
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream rawIn) throws IOException {
try (final InputStream in = new BufferedInputStream(rawIn)) {
lease.publish(flowFile, in, messageKey, demarcatorBytes, topic);
lease.publish(flowFile, in, messageKey, demarcatorBytes, topic, partition);
}
}
});
Expand Down Expand Up @@ -469,4 +497,16 @@ private byte[] getMessageKey(final FlowFile flowFile, final ProcessContext conte

return DatatypeConverter.parseHexBinary(uninterpretedKey);
}

private Integer getPartition(final ProcessContext context, final FlowFile flowFile) {
final String partitionClass = context.getProperty(PARTITION_CLASS).getValue();
if (EXPRESSION_LANGUAGE_PARTITIONING.getValue().equals(partitionClass)) {
final String partition = context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue();
final int hash = Objects.hashCode(partition);
return hash;
}

return null;
}

}
Loading

0 comments on commit 3543b9c

Please sign in to comment.