From 5dca130b725e3da1db813fa297ac3bcfe46c3c7a Mon Sep 17 00:00:00 2001 From: Philipp Zehnder Date: Wed, 17 Jul 2024 14:32:14 +0200 Subject: [PATCH] refactor(#2779): Fix the test for TestStringToStateProcessor (#3031) * refactor(#2779): Fix the test for TestStringToStateProcessor * refactor(#2779): Add a utils class to deal with the stream prefix selector * refactor(#2778): Fix junit test TestStringCounterProcessor (#3036) --- .../pom.xml | 6 + .../state/StringToStateProcessor.java | 7 +- .../counter/TestStringCounterProcessor.java | 186 +++++--------- .../state/TestStringToStateProcessor.java | 239 +++++++----------- .../ProcessingElementTestExecutor.java | 13 +- .../test/executors/StreamPrefix.java | 53 ++++ .../executors/TestConfigurationBuilder.java | 4 +- .../test/executors/StreamPrefixTest.java | 62 +++++ 8 files changed, 276 insertions(+), 294 deletions(-) create mode 100644 streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/StreamPrefix.java create mode 100644 streampipes-test-utils-executors/src/test/java/org/apache/streampipes/test/executors/StreamPrefixTest.java diff --git a/streampipes-extensions/streampipes-processors-transformation-jvm/pom.xml b/streampipes-extensions/streampipes-processors-transformation-jvm/pom.xml index 26d9b8f430..4f05f1112d 100644 --- a/streampipes-extensions/streampipes-processors-transformation-jvm/pom.xml +++ b/streampipes-extensions/streampipes-processors-transformation-jvm/pom.xml @@ -69,6 +69,12 @@ org.junit.jupiter junit-jupiter-params + + org.apache.streampipes + streampipes-test-utils-executors + 0.97.0-SNAPSHOT + test + diff --git a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/StringToStateProcessor.java b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/StringToStateProcessor.java index 50ecbab838..08bcff5349 100644 --- a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/StringToStateProcessor.java +++ b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/StringToStateProcessor.java @@ -38,8 +38,7 @@ import org.apache.streampipes.wrapper.params.compat.ProcessorParams; import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor; -import com.google.common.collect.Lists; - +import java.util.ArrayList; import java.util.List; public class StringToStateProcessor extends StreamPipesDataProcessor { @@ -77,13 +76,13 @@ public void onInvocation(ProcessorParams parameters, @Override public void onEvent(Event event, SpOutputCollector collector) throws SpRuntimeException { - List states = Lists.newArrayList(); + List states = new ArrayList<>(); for (String stateField : stateFields) { states.add(event.getFieldBySelector(stateField).getAsPrimitive().getAsString()); } - event.addField(CURRENT_STATE, states.toArray()); + event.addField(CURRENT_STATE, states); collector.collect(event); } diff --git a/streampipes-extensions/streampipes-processors-transformation-jvm/src/test/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/counter/TestStringCounterProcessor.java b/streampipes-extensions/streampipes-processors-transformation-jvm/src/test/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/counter/TestStringCounterProcessor.java index 5887dda629..7db3fcb8a8 100644 --- a/streampipes-extensions/streampipes-processors-transformation-jvm/src/test/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/counter/TestStringCounterProcessor.java +++ b/streampipes-extensions/streampipes-processors-transformation-jvm/src/test/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/counter/TestStringCounterProcessor.java @@ -18,133 +18,61 @@ package org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.counter; -//@RunWith(Parameterized.class) +import org.apache.streampipes.test.executors.PrefixStrategy; +import org.apache.streampipes.test.executors.ProcessingElementTestExecutor; +import org.apache.streampipes.test.executors.StreamPrefix; +import org.apache.streampipes.test.executors.TestConfiguration; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + public class TestStringCounterProcessor { -// private static final Logger LOG = LoggerFactory.getLogger(TestStringCounterProcessor.class); -// -// @org.junit.runners.Parameterized.Parameters -// public static Iterable data() { -// return Arrays.asList(new Object[][] { -// {"Test", List.of("t1"), new Tuple3<>("", "", 0)}, -// {"Test", Arrays.asList("t1", "t2"), new Tuple3<>("t1", "t2", 1)}, -// {"Test", Arrays.asList("t1", "t2", "t1", "t2"), new Tuple3<>("t1", "t2", 2)}, -// {"Test", Arrays.asList("t1", "t2", "t1", "t3"), new Tuple3<>("t1", "t3", 1)} -// }); -// } -// -// @org.junit.runners.Parameterized.Parameter -// public String selectedFieldName; -// -// @org.junit.runners.Parameterized.Parameter(1) -// public List eventStrings; -// -// @org.junit.runners.Parameterized.Parameter(2) -// public Tuple3 expectedValue; -// -// @Test -// public void testStringCounter() { -// StringCounterProcessor stringCounter = new StringCounterProcessor(); -// DataProcessorDescription originalGraph = stringCounter.declareModel(); -// originalGraph.setSupportedGrounding(EventGroundingGenerator.makeDummyGrounding()); -// -// DataProcessorInvocation graph = InvocationGraphGenerator.makeEmptyInvocation(originalGraph); -// graph.setInputStreams(Collections -// .singletonList(EventStreamGenerator -// .makeStreamWithProperties(Collections.singletonList(selectedFieldName)))); -// graph.setOutputStream(EventStreamGenerator.makeStreamWithProperties( -// Collections.singletonList(selectedFieldName))); -// graph.getOutputStream().getEventGrounding().getTransportProtocol().getTopicDefinition() -// .setActualTopicName("output-topic"); -// -// MappingPropertyUnary mappingPropertyUnary = graph.getStaticProperties().stream() -// .filter(p -> p instanceof MappingPropertyUnary) -// .map((p -> (MappingPropertyUnary) p)) -// .filter(p -> p.getInternalName().equals(StringCounterProcessor.FIELD_ID)) -// .findFirst().orElse(null); -// assert mappingPropertyUnary != null; -// mappingPropertyUnary.setSelectedProperty("s0::" + selectedFieldName); -// ProcessorParams params = new ProcessorParams(graph); -// -// SpOutputCollector spOut = new SpOutputCollector() { -// @Override -// public void registerConsumer(String routeId, InternalEventProcessor> consumer) { -// } -// -// @Override -// public void unregisterConsumer(String routeId) { -// } -// -// @Override -// public void connect() throws SpRuntimeException { -// } -// -// @Override -// public void disconnect() throws SpRuntimeException { -// } -// -// @Override -// public void collect(Event event) { -// } -// }; -// -// stringCounter.onInvocation(params, spOut, null); -// Tuple3 tuple = sendEvents(stringCounter, spOut); -// LOG.info("Expected match count is {}.", expectedValue.x); -// LOG.info("Actual match count is {}.", tuple.x); -// assertEquals(expectedValue.x, tuple.x); -// LOG.info("Expected change from is {}.", expectedValue.k); -// LOG.info("Actual change from is {}.", tuple.k); -// assertEquals(expectedValue.k, tuple.k); -// LOG.info("Expected change to is {}.", expectedValue.k); -// LOG.info("Actual change to is {}.", tuple.k); -// assertEquals(expectedValue.v, tuple.v); -// } -// -// private Tuple3 sendEvents(StringCounterProcessor stringCounter, SpOutputCollector spOut) { -// int counter = 0; -// String changeFrom = "", changeTo = ""; -// List events = makeEvents(); -// for (Event event : events) { -// LOG.info("Sending event with value " -// + event.getFieldBySelector("s0::" + selectedFieldName).getAsPrimitive().getAsString()); -// stringCounter.onEvent(event, spOut); -// try { -// TimeUnit.MILLISECONDS.sleep(100); -// } catch (InterruptedException e) { -// throw new RuntimeException(e); -// } -// try { -// counter = event.getFieldBySelector(StringCounterProcessor.COUNT_FIELD_RUNTIME_NAME) -// .getAsPrimitive() -// .getAsInt(); -// changeFrom = event.getFieldBySelector(StringCounterProcessor.CHANGE_FROM_FIELD_RUNTIME_NAME) -// .getAsPrimitive() -// .getAsString(); -// changeTo = event.getFieldBySelector(StringCounterProcessor.CHANGE_TO_FIELD_RUNTIME_NAME) -// .getAsPrimitive() -// .getAsString(); -// LOG.info(changeFrom + " change to " + changeTo + ", value = " + counter); -// } catch (IllegalArgumentException e) { -// -// } -// } -// return new Tuple3<>(changeFrom, changeTo, counter); -// } -// -// -// private List makeEvents() { -// List events = Lists.newArrayList(); -// for (String eventSetting : eventStrings) { -// events.add(makeEvent(eventSetting)); -// } -// return events; -// } -// -// private Event makeEvent(String value) { -// Map map = Maps.newHashMap(); -// map.put(selectedFieldName, value); -// return EventFactory.fromMap(map, -// new SourceInfo("test" + "-topic", "s0"), -// new SchemaInfo(null, Lists.newArrayList())); -// } -} + private static final String KEY_1 = "key1"; + + private StringCounterProcessor processor; + + @BeforeEach + public void setup() { + processor = new StringCounterProcessor(); + } + + + static Stream arguments() { + return Stream.of( + Arguments.of( + List.of(Map.of(KEY_1, "v1"), Map.of(KEY_1, "v2")), + List.of(Map.of( + KEY_1, "v2", + StringCounterProcessor.CHANGE_FROM_FIELD_RUNTIME_NAME, "v1", + StringCounterProcessor.CHANGE_TO_FIELD_RUNTIME_NAME, "v2", + StringCounterProcessor.COUNT_FIELD_RUNTIME_NAME, 1 + )) + ) + ); + } + + @ParameterizedTest + @MethodSource("arguments") + public void testStringToState( + List> intpuEvents, + List> outputEvents + ) { + + var configuration = TestConfiguration + .builder() + .config(StringCounterProcessor.FIELD_ID, StreamPrefix.s0(KEY_1)) + .prefixStrategy(PrefixStrategy.SAME_PREFIX) + .build(); + + var testExecutor = new ProcessingElementTestExecutor(processor, configuration); + + testExecutor.run(intpuEvents, outputEvents); + } + +} \ No newline at end of file diff --git a/streampipes-extensions/streampipes-processors-transformation-jvm/src/test/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/TestStringToStateProcessor.java b/streampipes-extensions/streampipes-processors-transformation-jvm/src/test/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/TestStringToStateProcessor.java index 332b07d9e0..5278e44bd3 100644 --- a/streampipes-extensions/streampipes-processors-transformation-jvm/src/test/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/TestStringToStateProcessor.java +++ b/streampipes-extensions/streampipes-processors-transformation-jvm/src/test/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/TestStringToStateProcessor.java @@ -18,159 +18,90 @@ package org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.state; -//@RunWith(Parameterized.class) +import org.apache.streampipes.test.executors.PrefixStrategy; +import org.apache.streampipes.test.executors.ProcessingElementTestExecutor; +import org.apache.streampipes.test.executors.StreamPrefix; +import org.apache.streampipes.test.executors.TestConfiguration; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + public class TestStringToStateProcessor { -// -// private static final Logger LOG = LoggerFactory.getLogger(TestStringToStateProcessor.class); -// -// @org.junit.runners.Parameterized.Parameters -// public static Iterable data() { -// return Arrays.asList(new Object[][] { -// { -// List.of(), -// List.of("c1", "c2", "c3"), -// List.of(Arrays.asList("t1", "t2", "t3")), -// List.of() -// }, -// { -// List.of("c1"), -// List.of("c1", "c2", "c3"), -// List.of(Arrays.asList("t1", "t2", "t3")), -// List.of("t1") -// }, -// { -// List.of("c1", "c2"), -// List.of("c1", "c2", "c3"), -// List.of(Arrays.asList("t1", "t2", "t3")), -// Arrays.asList("t1", "t2") -// }, -// { -// List.of("c1", "c2"), -// List.of("c1", "c2", "c3"), -// Arrays.asList( -// Arrays.asList("t1-1", "t2-1", "t3-1"), -// Arrays.asList("t1-2", "t2-2", "t3-2") -// ), -// Arrays.asList("t1-2", "t2-2") -// }, -// { -// List.of("c1", "c2", "c3"), -// List.of("c1", "c2", "c3"), -// Arrays.asList( -// Arrays.asList("t1-1", "t2-1", "t3-1"), -// Arrays.asList("t1-2", "t2-2", "t3-2"), -// Arrays.asList("t1-3", "t2-3", "t3-3") -// ), -// Arrays.asList("t1-3", "t2-3", "t3-3") -// } -// }); -// } -// -// @org.junit.runners.Parameterized.Parameter -// public List selectedFieldNames; -// -// @org.junit.runners.Parameterized.Parameter(1) -// public List fieldNames; -// -// @org.junit.runners.Parameterized.Parameter(2) -// public List> eventStrings; -// -// @org.junit.runners.Parameterized.Parameter(3) -// public List expectedValue; -// -// private static final String DEFAULT_STREAM_NAME = "stream1"; -// -// @Test -// public void testStringToState() { -// StringToStateProcessor stringToStateProcessor = new StringToStateProcessor(); -// DataProcessorDescription originalGraph = stringToStateProcessor.declareModel(); -// originalGraph.setSupportedGrounding(EventGroundingGenerator.makeDummyGrounding()); -// -// DataProcessorInvocation graph = InvocationGraphGenerator.makeEmptyInvocation(originalGraph); -// graph.setInputStreams(Collections -// .singletonList(EventStreamGenerator -// .makeStreamWithProperties(Collections.singletonList("stream-in")))); -// graph.setOutputStream(EventStreamGenerator.makeStreamWithProperties(Collections.singletonList("stream-out"))); -// graph.getOutputStream().getEventGrounding().getTransportProtocol().getTopicDefinition() -// .setActualTopicName("output-topic"); -// -// MappingPropertyNary mappingPropertyNary = graph.getStaticProperties().stream() -// .filter(p -> p instanceof MappingPropertyNary) -// .map(p -> (MappingPropertyNary) p) -// .filter(p -> p.getInternalName().equals(StringToStateProcessor.STRING_STATE_FIELD)) -// .findFirst().orElse(null); -// -// assert mappingPropertyNary != null; -// mappingPropertyNary.setSelectedProperties( -// selectedFieldNames.stream().map(field -> DEFAULT_STREAM_NAME + "::" + field).toList()); -// -// ProcessorParams params = new ProcessorParams(graph); -// -// SpOutputCollector spOutputCollector = new SpOutputCollector() { -// @Override -// public void registerConsumer(String routeId, InternalEventProcessor> consumer) { -// } -// -// @Override -// public void unregisterConsumer(String routeId) { -// } -// -// @Override -// public void connect() throws SpRuntimeException { -// } -// -// @Override -// public void disconnect() throws SpRuntimeException { -// } -// -// @Override -// public void collect(Event event) { -// } -// }; -// -// stringToStateProcessor.onInvocation(params, spOutputCollector, null); -// Object[] states = sendEvents(stringToStateProcessor, spOutputCollector); -// LOG.info("Expected states is {}.", expectedValue); -// LOG.info("Actual states is {}.", Arrays.toString(states)); -// assertArrayEquals(expectedValue.toArray(), states); -// } -// -// private Object[] sendEvents(StringToStateProcessor stateProcessor, SpOutputCollector spOut) { -// List events = makeEvents(); -// Object[] states = null; -// for (Event event : events) { -// stateProcessor.onEvent(event, spOut); -// try { -// TimeUnit.MILLISECONDS.sleep(100); -// } catch (InterruptedException e) { -// throw new RuntimeException(e); -// } -// try { -// states = (Object[]) event.getFieldBySelector(StringToStateProcessor.CURRENT_STATE) -// .getAsPrimitive().getRawValue(); -// LOG.info("Current states: " + Arrays.toString(states)); -// } catch (IllegalArgumentException e) { -// -// } -// } -// return states; -// } -// -// private List makeEvents() { -// List events = Lists.newArrayList(); -// for (List eventSetting : eventStrings) { -// events.add(makeEvent(eventSetting)); -// } -// return events; -// } -// -// private Event makeEvent(List value) { -// Map map = Maps.newHashMap(); -// for (int i = 0; i < selectedFieldNames.size(); i++) { -// map.put(selectedFieldNames.get(i), value.get(i)); -// } -// return EventFactory.fromMap(map, -// new SourceInfo("test-topic", DEFAULT_STREAM_NAME), -// new SchemaInfo(null, Lists.newArrayList())); -// } + + private static final String KEY_1 = "key1"; + private static final String KEY_2 = "key2"; + private static final String VALUE_1 = "value 1"; + private static final String VALUE_2 = "value 2"; + + private static final String PREFIX_KEY_1 = StreamPrefix.s0(KEY_1); + private static final String PREFIX_KEY_2 = StreamPrefix.s0(KEY_2); + + private StringToStateProcessor processor; + + @BeforeEach + public void setup() { + processor = new StringToStateProcessor(); + } + + static Stream arguments() { + return Stream.of( + Arguments.of( + Collections.emptyList(), + List.of(Map.of(KEY_1, VALUE_1)), + List.of(Map.of( + KEY_1, VALUE_1, + StringToStateProcessor.CURRENT_STATE, Collections.emptyList() + )) + ), + Arguments.of( + List.of(PREFIX_KEY_1), + List.of(Map.of( + KEY_1, VALUE_1 + )), + List.of(Map.of( + KEY_1, VALUE_1, + StringToStateProcessor.CURRENT_STATE, List.of(VALUE_1) + )) + ), + Arguments.of( + List.of(PREFIX_KEY_1, PREFIX_KEY_2), + List.of(Map.of( + KEY_1, VALUE_1, + KEY_2, VALUE_2 + )), + List.of(Map.of( + KEY_1, VALUE_1, + KEY_2, VALUE_2, + StringToStateProcessor.CURRENT_STATE, List.of(VALUE_1, VALUE_2) + )) + ) + ); + } + + @ParameterizedTest + @MethodSource("arguments") + public void testStringToState( + List selectedFieldNames, + List> intpuEvents, + List> outputEvents + ) { + + var configuration = TestConfiguration + .builder() + .config(StringToStateProcessor.STRING_STATE_FIELD, selectedFieldNames) + .prefixStrategy(PrefixStrategy.SAME_PREFIX) + .build(); + + var testExecutor = new ProcessingElementTestExecutor(processor, configuration); + + testExecutor.run(intpuEvents, outputEvents); + } + } diff --git a/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/ProcessingElementTestExecutor.java b/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/ProcessingElementTestExecutor.java index f2d87197ad..b27f0e887a 100644 --- a/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/ProcessingElementTestExecutor.java +++ b/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/ProcessingElementTestExecutor.java @@ -44,6 +44,9 @@ import java.util.function.Consumer; import java.util.stream.IntStream; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + public class ProcessingElementTestExecutor { @@ -93,17 +96,17 @@ public void run( invocationConfig.accept(dataProcessorInvocation); } - var e = getProcessingElementParameterExtractor(dataProcessorInvocation); - var mockParams = Mockito.mock(IDataProcessorParameters.class); + var extractor = getProcessingElementParameterExtractor(dataProcessorInvocation); + var mockParams = mock(IDataProcessorParameters.class); - Mockito.when(mockParams.getModel()).thenReturn(dataProcessorInvocation); - Mockito.when(mockParams.extractor()).thenReturn(e); + when(mockParams.getModel()).thenReturn(dataProcessorInvocation); + when(mockParams.extractor()).thenReturn(extractor); // calls the onPipelineStarted method of the processor to initialize it processor.onPipelineStarted(mockParams, null, null); // mock the output collector to capture the output events and validate the results later - var mockCollector = Mockito.mock(SpOutputCollector.class); + var mockCollector = mock(SpOutputCollector.class); var spOutputCollectorCaptor = ArgumentCaptor.forClass(Event.class); diff --git a/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/StreamPrefix.java b/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/StreamPrefix.java new file mode 100644 index 0000000000..3e69a0a421 --- /dev/null +++ b/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/StreamPrefix.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.test.executors; + +/** + * Provides utility methods to append stream prefixes to property key values. + * This class is used to be used in unit tests. + * Consider integrating prefix configuration into the application's logic, possibly within the TestConfiguration class + */ +public class StreamPrefix { + public static final String S0 = "s0"; + public static final String S1 = "s1"; + + /** + * Appends the S0 prefix to a given property value. + * + * @param propertyValue The value to which the S0 prefix will be appended. + * @return A string with the S0 prefix followed by the property value. + */ + public static String s0(String propertyValue) { + return addPrefix(S0, propertyValue); + } + + /** + * Appends the S1 prefix to a given property value. + * + * @param propertyValue The value to which the S1 prefix will be appended. + * @return A string with the S1 prefix followed by the property value. + */ + public static String s1(String propertyValue) { + return addPrefix(S1, propertyValue); + } + + private static String addPrefix(String prefix, String propertyValue) { + return prefix + "::" + propertyValue; + } +} diff --git a/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/TestConfigurationBuilder.java b/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/TestConfigurationBuilder.java index e2260c5366..ec20e24f77 100644 --- a/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/TestConfigurationBuilder.java +++ b/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/TestConfigurationBuilder.java @@ -45,8 +45,8 @@ public TestConfigurationBuilder config(Map config){ public TestConfigurationBuilder prefixStrategy(PrefixStrategy strategy){ this.eventPrefixes = switch (strategy){ - case SAME_PREFIX -> List.of("s0"); - case ALTERNATE -> List.of("s0", "s1"); + case SAME_PREFIX -> List.of(StreamPrefix.S0); + case ALTERNATE -> List.of(StreamPrefix.S0, StreamPrefix.S1); }; return this; } diff --git a/streampipes-test-utils-executors/src/test/java/org/apache/streampipes/test/executors/StreamPrefixTest.java b/streampipes-test-utils-executors/src/test/java/org/apache/streampipes/test/executors/StreamPrefixTest.java new file mode 100644 index 0000000000..c89fa55ac4 --- /dev/null +++ b/streampipes-test-utils-executors/src/test/java/org/apache/streampipes/test/executors/StreamPrefixTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.test.executors; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class StreamPrefixTest { + + @Test + void s0_AppendsPropertyValueCorrectly() { + var result = StreamPrefix.s0("testValue"); + assertEquals("s0::testValue", result); + } + + @Test + void s1_AppendsPropertyValueCorrectly() { + var result = StreamPrefix.s1("anotherTestValue"); + assertEquals("s1::anotherTestValue", result); + } + + @Test + void s0_HandlesEmptyPropertyValue() { + var result = StreamPrefix.s0(""); + assertEquals("s0::", result); + } + + @Test + void s1_HandlesEmptyPropertyValue() { + var result = StreamPrefix.s1(""); + assertEquals("s1::", result); + } + + @Test + void s0_HandlesSpecialCharactersInPropertyValue() { + var result = StreamPrefix.s0("value$&*"); + assertEquals("s0::value$&*", result); + } + + @Test + void s1_HandlesSpecialCharactersInPropertyValue() { + var result = StreamPrefix.s1("value@#%"); + assertEquals("s1::value@#%", result); + } +} \ No newline at end of file