Skip to content

Commit

Permalink
Update kafka topic dynamically
Browse files Browse the repository at this point in the history
Set sensitive as true in USM Users JSON content property

Trigger CI pipeline

[NIFI-13303] - Remove text indicating property verification is disabled. (apache#8884)

This closes apache#8884

[NIFI-13269] - Order parameter reference list alphabetically (apache#8885)

* [NIFI-13269] - Order parameter reference list alphabetically

* ran prettier:format to address minor code style issue

* update nfpr and nfel to sort combo entries the same as the combo entries in the property table (case insensitive)

This closes apache#8885

NIFI-13284: (apache#8891)

- Only saving canvas routes that are not edit or history.

This closes apache#8891

NIFI-12343 Added Max JSON Field String Length for Elasticsearch

This closes apache#8881

Signed-off-by: David Handermann <exceptionfactory@apache.org>

NIFI-13308 Upgraded Spring Framework from 6.1.7 to 6.1.8

- Upgraded Spring Boot from 3.2.5 to 3.2.6
- Upgraded Slack bolt-socket-mode from 1.39.2 to 1.39.3
- Upgraded maven-artifact from 3.9.6 to 3.9.7
- Upgraded mariadb-java-client from 3.3.3 to 3.4.0
- Upgraded software.amazon.awssdk from 2.25.55 to 2.25.60
- Upgraded com.amazonaws from 1.12.725 to 1.12.730
- Upgraded Jersey from 3.1.6 to 3.1.7
- Upgraded Netty from 4.1.109.Final to 4.1.110.Final
- Upgraded box-java-sdk from 4.9.0 to 4.9.1

This closes apache#8887

Signed-off-by: David Handermann <exceptionfactory@apache.org>

NIFI-13299: (apache#8894)

- Adding min validators where appropriate.

This closes apache#8894

NIFI-13289 add tooltips to NewCanvasItem (apache#8870)

This closes apache#8870

NIFI-13315 Fixed ListAzureBlobStorage_v12 fails when Record Writer is used

This closes apache#8897

Signed-off-by: Mark Bathori <mbathori@apache.org>

[NIFI-13312] - Restructure as an Nx monorepo (apache#8893)

* [NIFI-13312] - Restructure as an Nx monorepo

* restored lint:fix functionality, updated package-lock

This closes apache#8893

[NIFI-13234] update unauthorized canvas component colors (apache#8902)

* [NIFI-13234] update unautorized canvas component colors

* restore web font loader to ensure positions of canvas text is calculate correctly

This closes apache#8902

[NIFI-13246] move actions from details columns into menu (apache#8900)

* [NIFI-13246] move actions from details columns into menu

* move View Documentation menu option lower

This closes apache#8900

NIFI-13321: (apache#8901)

- Fixing the mocking of child components in unit tests.
- Removing any provided dependency that is no longer needed.

This closes apache#8901

NIFI-13265 Removed instantiation of Object arrays for log arguments

This closes apache#8896

Signed-off-by: David Handermann <exceptionfactory@apache.org>

NIFI-13309 Lookup compatible bundles even if previous flow was empty

Signed-off-by: Ferenc Kis <briansolo1985@gmail.com>

This closes apache#8888.

NIFI-13320 Upgraded Spring Boot from 3.2.6 to 3.3.0

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes apache#8899.

NIFI-13267 - Bump NiFi NAR Maven plugin version (apache#8860)

* NIFI-13267 - Bump NiFi NAR Maven plugin version
* Review - adding Parameter Providers and Flow Analaysis Rules in c2-protocol-component-api ComponentManifest
* Review - fix build() of ComponentManifest

NIFI-13307 Replaced KeyStoreUtils with nifi-security-ssl Builders (apache#8895)

- Removed unused test classes from nifi-web-api

NIFI-13336 updating various deps for aws google azure and more

- com.amazonaws	* 1.12.730 1.12.733
- com.azure azure-sdk-bom 1.2.23 1.2.24
- com.google.cloud libraries-bom 26.39.0 26.40.0
- commons-cli 1.7.0 1.8.0
- commons-net 3.10.0 3.11.0
- io.fabric8 * 6.12.1 6.13.0
- org.apache.commons commons-compress 1.26.1 1.26.2
- software.amazon.awssdk 2.25.60 2.25.63
- com.google.apis	google-api-services-drive v3-rev20240327-2.0.0	 v3-rev20240521-2.0.0
- org.neo4j.driver	neo4j-java-driver 5.20.0 5.21.0
- org.springframework.integration	spring-integration-mail 6.2.4 6.2.5

Signed-off-by: Joseph Witt <joewitt@apache.org>
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes apache#8907.

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

[NIFI-13325] update dark mode theme density to match light mode (apache#8904)

* [NIFI-13325] update dark mode theme density to match light mode

* remove density from nifi themes as only colors are used from this theme

This closes apache#8904

NIFI-12801 Add local file upload option in PutHDFS processor

This closes apache#8415.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>

NIFI-13329 - Updating the standard content viewer to render an error message when there is an error formatting.

Co-authored-by: Pierre Villard <pierre.villard.fr@gmail.com>
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes apache#8905.

NIFI-13342 restored sts dependency in aws service api

Signed-off-by: Matt Burgess <mattyb149@apache.org>

This closes apache#8910

NIFI-11078: Adds Component UUID to Flow Configuration History Table (apache#8909)

This closes apache#8909

Update BinFiles not to write attributes to FlowFiles for auto-terminated ORIGINAL relationship

Signed-off-by: Matt Burgess <mattyb149@apache.org>

This closes apache#8911

NIFI-13350: (apache#8912)

- Allowing parameters to be edited in New Parameter Context dialog.
- Ensuring the proper tab is selected in the Parameter Context dialog based on the current usage.

This closes apache#8912

NIFI-13138 Add Bundle extensions name and description in NiFi Registry

This closes apache#8740

Signed-off-by: David Handermann <exceptionfactory@apache.org>

NIFI-13352 Adjusted Shutdown handling in ListenOTLP and Test Class
This closes apache#8913

- Added quick duration for shutdown quiet period in ListenOTLP HttpServerFactory
- Added TestRunner.stop() to ListenOTLPTest to close listening sockets
- Increased Connect Timeout from 5 to 10 seconds in ListenOTLPTest

Signed-off-by: Joseph Witt <joewitt@apache.org>

NIFI-13337: (apache#8915)

- Adjust column widths of the queue listing table.

NIFI-13310 merged RAT declarations

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes apache#8916.

NIFI-13231 Added App Private Key Auth to GitHub FlowRegistryClient

This closes apache#8890

Signed-off-by: David Handermann <exceptionfactory@apache.org>
Co-authored-by: David Handermann <exceptionfactory@apache.org>

[NIFI-13355] move view cluster details and view flow configuration details into action kebab menus (apache#8921)

This closes apache#8921

NIFI-13288 Improved SplitXml and SplitAvro to call session.putAttributes()

This closes apache#8917

Signed-off-by: David Handermann <exceptionfactory@apache.org>

NIFI-13351 Improved QueryDatabaseTable Processors to call session.putAttributes()

This closes apache#8919

Signed-off-by: David Handermann <exceptionfactory@apache.org>

NIFI-13339 Set sensitive as true in USM Users JSON content on ListenTrapSNMP

This closes apache#8908

Signed-off-by: David Handermann <exceptionfactory@apache.org>

[NIFI-13353] improve anchor tag hover state styles in dark mode (apache#8920)

This closes apache#8920

NIFI-13357 Removed APP_INSTALLATION_TOKEN from GitHubFlowRegistryClient (apache#8923)

Signed-off-by: David Handermann <exceptionfactory@apache.org>

[NIFI-13349] align angular material and tailwind typography (apache#8918)

* [NIFI-13349] align angular material and tailwind typography

* override default tailwind fontSize configurations to match up with angular material typography configuration

* cleanup duplicate style

* add text-3xl tailwind configuration

* update primary-node-only to use text-sm

* replace .refresh-container with text-sm

* add comments for $subtitle-2 material typography config

* adjust $subtitle-2 font size and line height

This closes apache#8918

[NIFI-13331] set default table density to -4 for all listings in nifi (apache#8925)

This closes apache#8925

[NIFI-13361] determine extension description height base on $body-2 line-height configuration (apache#8927)

This closes apache#8927

[NIFI-13360] rename nifi theme to supplemental theme (apache#8926)

* [NIFI-13360] rename nifi theme to supplemental theme

* Update nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/apps/nifi/src/assets/styles/_app.scss

Co-authored-by: Rob Fellows <rob.fellows@gmail.com>

* Update nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-frontend/src/main/nifi/apps/nifi/src/assets/styles/_app.scss

Co-authored-by: Rob Fellows <rob.fellows@gmail.com>

---------

Co-authored-by: Rob Fellows <rob.fellows@gmail.com>

This closes apache#8926

NIFI-13030 Adding endpoint for comparing versions of registered flows

This closes apache#8670

Signed-off-by: Peter Gyori <pgyori@apache.org>

NIFI-13313: Remove old UI (apache#8906)

* NIFI-13313:
- Use nifi-web-frontend as the default UI hosted at /nifi no longer deploying nifi-web-ui.

* NIFI-13313:
- Adding logout complete page.
- Updating backend to redirect to new logout complete page.

* NIFI-13313:
- Remove nifi-web-ui module.

* NIFI-13313:
- Updating LICENSE and NOTICE files for dependencies that are no longer included.

* NIFI-13313:
- Updating README.
- Updating proxy config to mirror actual context path.

* NIFI-13313:
- Establishing rewrite rules for redirecting logout complete.
- Setting the default handler for when a request isn't handled to redirect the user to /nifi.

* NIFI-13313:
- Removing nifi-web-error module.

* NIFI-13313:
- Restoring /nifi/logout-complete path.

* NIFI-13313:
- Adding an error handler for the ui which handles redirects to a 404 page.

This closes apache#8906

NIFI-13365 - Fix unit tests running in kubernetes pod

NIFI-13364 Removed autorefresh and banner UI properties

This closes apache#8932

Signed-off-by: David Handermann <exceptionfactory@apache.org>

NIFI-13367: (apache#8933)

- Updating the page title to align with the root Process Group.

This closes apache#8933

NIFI-13368: (apache#8931)

- Allowing tooltip mouse listeners to be destroyed when necessary.
- Ensuring connection source/destination run status and validation errors are updated when deleted.

This closes apache#8931

Add dynamic topic change capability

[NIFI-13370] - dark-mode support for browser inputs (apache#8935)

This closes apache#8935

NIFI-13354: (apache#8936)

- Moving NiFi front end source into a top level maven module. This prepares for the introduction of other UIs that NiFi offers (like Custom UIs, Data Viewers, etc) to be colocated and share common components, styles, and dependencies.
- The nifi-web-frontend module which produces the war that is included in the server nar now no longer contains any source. It simply depends on the new nifi-frontend artifact that bundles all built UIs and unpacks its contents into the resulting war.
- Renaming nifi-web-frontend to nifi-ui. Now nifi-frontend at the top level will hold all frontend apps. In this commit the nifi app in nifi-frontend is bundled into a war called nifi-ui.

This closes apache#8936

NIFI-12983 Qdrant vector store support

Co-authored-by: Pierre Villard <pierre.villard.fr@gmail.com>
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes apache#8590.

NIFI-13375 Added missing logging parameter in SplitRecord

Signed-off-by: Mike Moser <mosermw@apache.org>

This closes apache#8941

NIFI-13378: Adds Version State as filterable option in Process Groups tab in Summary (apache#8945)

* NIFI-13378: Adds Version State as filterable option in Process Groups tab in Summary

* updated value of selections to enum values

This closes apache#8945

[NIFI-13371] update canvas context menu (apache#8949)

This closes apache#8949

NIFI-13373: Adding support for banner text (apache#8947)

* NIFI-13373:
- Adding support for banner text.

* NIFI-13373:
- Prettier.

* NIFI-13373:
- Removing unused property.

* NIFI-13373:
- Defining reponse payload when loading banner text.
- Removing banner text from login, logout, and error pages.

* NIFI-13373:
- Only loading the banner text when necessary.

This closes apache#8947

NIFI-13385: (apache#8953)

- Disabling nx daemon during builds.

This closes apache#8953

NIFI-13359 Tune ExecuteSQL/Record to create fewer transient flow files

Signed-off-by: Matt Burgess <mattyb149@apache.org>

This closes apache#8928

NIFI-13383 Changed info log level to debug in XXEValidator (apache#8952)

Signed-off-by: David Handermann <exceptionfactory@apache.org>

NIFI-13266 Removed String concatenation in logging messages (apache#8940)

Signed-off-by: David Handermann <exceptionfactory@apache.org>

NIFI-13242 MiNiFi Sync Resource C2 command

Signed-off-by: Ferenc Erdei <erdei.ferenc90@gmail.com>
This closes apache#8898.

NIFI-13391: (apache#8960)

- Setting up different environment files to configure ngrx store.

NIFI-13379 Replaced use of deprecated com.nimbusds.oauth2.sdk.http.HTTPResponse method getContentAsJSONObject with API suggested replacement getBodyAsJSONObject.

This closes apache#8944

Signed-off-by: Mike Thomsen <mthomsen@apache.org>

Improving Junit test to better cover the feature

Remove unnecessary code
  • Loading branch information
andrealves23 committed Jun 17, 2024
1 parent 039cd2f commit 71ab750
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@
import org.apache.nifi.processor.VerifiableProcessor;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.util.FlowFileFilters;

import java.net.UnknownHostException;
import java.nio.charset.Charset;
Expand Down Expand Up @@ -77,7 +81,8 @@
@WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_TOPIC, description = "The topic the message or message bundle is from"),
@WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_TOMBSTONE, description = "Set to true if the consumed message is a tombstone message")
})
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@TriggerWhenEmpty
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.",
description = "These properties will be added on the Kafka configuration after loading any provided configuration properties."
+ " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged."
Expand All @@ -90,6 +95,7 @@ public class ConsumeKafka_2_6 extends AbstractProcessor implements KafkaClientCo
static final AllowableValue OFFSET_NONE = new AllowableValue("none", "none", "Throw exception to the consumer if no previous offset is found for the consumer's group");

static final AllowableValue TOPIC_NAME = new AllowableValue("names", "names", "Topic is a full topic name or comma separated list of names");
static final AllowableValue TOPIC_FLOW_NAME = new AllowableValue("flowNames", "flowNames", "Topic is a full topic name or comma separated list of names in the flowfile");
static final AllowableValue TOPIC_PATTERN = new AllowableValue("pattern", "pattern", "Topic is a regex using the Java Pattern syntax");

static final PropertyDescriptor TOPICS = new PropertyDescriptor.Builder()
Expand All @@ -98,15 +104,15 @@ public class ConsumeKafka_2_6 extends AbstractProcessor implements KafkaClientCo
.description("The name of the Kafka Topic(s) to pull from. More than one can be supplied if comma separated.")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();

static final PropertyDescriptor TOPIC_TYPE = new PropertyDescriptor.Builder()
.name("topic_type")
.displayName("Topic Name Format")
.description("Specifies whether the Topic(s) provided are a comma separated list of names or a single regular expression")
.required(true)
.allowableValues(TOPIC_NAME, TOPIC_PATTERN)
.allowableValues(TOPIC_NAME, TOPIC_PATTERN, TOPIC_FLOW_NAME)
.defaultValue(TOPIC_NAME)
.build();

Expand Down Expand Up @@ -273,6 +279,9 @@ public class ConsumeKafka_2_6 extends AbstractProcessor implements KafkaClientCo
);
static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS);

private volatile String topicFlowfile = "";
private volatile Boolean isToUpdateTopic = false;

private volatile ConsumerPool consumerPool = null;
private final Set<ConsumerLease> activeLeases = Collections.synchronizedSet(new HashSet<>());

Expand Down Expand Up @@ -332,6 +341,12 @@ protected Collection<ValidationResult> customValidate(final ValidationContext va
}

private synchronized ConsumerPool getConsumerPool(final ProcessContext context) {

if (isToUpdateTopic) {
isToUpdateTopic = false;
close();
}

ConsumerPool pool = consumerPool;
if (pool != null) {
return pool;
Expand Down Expand Up @@ -370,9 +385,13 @@ protected ConsumerPool createConsumerPool(final ProcessContext context, final Co
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());

final String topicListing = context.getProperty(ConsumeKafka_2_6.TOPICS).evaluateAttributeExpressions().getValue();
final String topicType = context.getProperty(ConsumeKafka_2_6.TOPIC_TYPE).evaluateAttributeExpressions().getValue();
final String topicListing;
if (topicType.equals(TOPIC_FLOW_NAME.getValue())) {
topicListing = topicFlowfile;
} else {
topicListing = context.getProperty(ConsumeKafka_2_6.TOPICS).evaluateAttributeExpressions().getValue();
}
final List<String> topics = new ArrayList<>();
final KeyEncoding keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).asAllowableValue(KeyEncoding.class);
final String securityProtocol = context.getProperty(SECURITY_PROTOCOL).getValue();
Expand All @@ -396,7 +415,7 @@ protected ConsumerPool createConsumerPool(final ProcessContext context, final Co
throw new ProcessException("Could not determine localhost's hostname", uhe);
}

if (topicType.equals(TOPIC_NAME.getValue())) {
if (topicType.equals(TOPIC_NAME.getValue()) || topicType.equals(TOPIC_FLOW_NAME.getValue()) ) {
for (final String topic : topicListing.split(",")) {
final String trimmedName = topic.trim();
if (!trimmedName.isEmpty()) {
Expand Down Expand Up @@ -449,6 +468,18 @@ public void interruptActiveThreads() {

@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final String topicType = getTopicType(context);
if (isFlowNameTopic(topicType)) {
processFlowNameTopic(context, session);

if (topicFlowfile.isEmpty()) {
getLogger().warn("Consumer will not be configured. Topic is empty.");
return;
}

getLogger().info("Consumer is configured with new topic name: {}", topicFlowfile);
}

final ConsumerPool pool = getConsumerPool(context);
if (pool == null) {
context.yield();
Expand All @@ -469,6 +500,9 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro

if (!lease.commit()) {
context.yield();
if (topicType.equals(TOPIC_FLOW_NAME.getValue())) {
session.commit();
}
}
} catch (final WakeupException we) {
getLogger().warn("Was interrupted while trying to communicate with Kafka with lease {}. "
Expand All @@ -485,6 +519,30 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
}
}

private String getTopicType(ProcessContext context) {
return context.getProperty(ConsumeKafka_2_6.TOPIC_TYPE).evaluateAttributeExpressions().getValue();
}

private boolean isFlowNameTopic(String topicType) {
return topicType.equals(TOPIC_FLOW_NAME.getValue());
}

private void processFlowNameTopic(ProcessContext context, ProcessSession session) {
getLogger().info("Received flowfile; entering flow name configuration procedure");

final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(1, DataUnit.MB, 500));
if (!flowFiles.isEmpty()) {
String currentTopicFlowfile = context.getProperty(ConsumeKafka_2_6.TOPICS).evaluateAttributeExpressions(flowFiles.get(flowFiles.size() - 1)).getValue();
session.remove(flowFiles);

if (!currentTopicFlowfile.equalsIgnoreCase(topicFlowfile)) {
topicFlowfile = currentTopicFlowfile;
isToUpdateTopic = true;
}
getLogger().info("Received topic {} from flow file", topicFlowfile);
}
}

@Override
public List<ConfigVerificationResult> verify(final ProcessContext context, final ComponentLog verificationLogger, final Map<String, String> attributes) {
try (final ConsumerPool consumerPool = createConsumerPool(context, verificationLogger)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.nifi.kerberos.KerberosUserService;
import org.apache.nifi.kerberos.SelfContainedKerberosUserService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
Expand All @@ -33,6 +34,8 @@
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.List;
import java.util.HashMap;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
Expand Down Expand Up @@ -154,4 +157,34 @@ private SelfContainedKerberosUserService enableKerberosUserService(final TestRun
return kerberosUserService;
}

@Test
public void testDynamicTopicChange() {
ConsumeKafka_2_6 consumeKafka = new ConsumeKafka_2_6();
TestRunner runner = TestRunners.newTestRunner(consumeKafka);
runner.setValidateExpressionUsage(false);
runner.setProperty(ConsumeKafka_2_6.BOOTSTRAP_SERVERS, "localhost:1234");
runner.setProperty(ConsumeKafka_2_6.TOPICS, "${kafka.topic}");
runner.setProperty(ConsumeKafka_2_6.GROUP_ID, "foo");
runner.setProperty(ConsumeKafka_2_6.AUTO_OFFSET_RESET, ConsumeKafka_2_6.OFFSET_EARLIEST);

Map<String, String> flowFileAttributes = new HashMap<>();
flowFileAttributes.put("kafka.topic", "initial-topic");

runner.enqueue(new byte[0], flowFileAttributes);
runner.run();

runner.assertTransferCount(ConsumeKafka_2_6.REL_SUCCESS, 0);
PropertyValue topicsProperty = runner.getProcessContext().getProperty(ConsumeKafka_2_6.TOPICS);
String evaluatedTopic = topicsProperty.evaluateAttributeExpressions(flowFileAttributes).getValue();
assertEquals(evaluatedTopic, "initial-topic");

flowFileAttributes.put("kafka.topic", "updated-topic");
runner.enqueue(new byte[0], flowFileAttributes);
runner.run();

runner.assertTransferCount(ConsumeKafka_2_6.REL_SUCCESS, 0);
topicsProperty = runner.getProcessContext().getProperty(ConsumeKafka_2_6.TOPICS);
evaluatedTopic = topicsProperty.evaluateAttributeExpressions(flowFileAttributes).getValue();
assertEquals(evaluatedTopic, "updated-topic");
}
}

0 comments on commit 71ab750

Please sign in to comment.