Skip to content

Commit

Permalink
refactor(#2779): Fix the test for TestStringToStateProcessor (#3031)
Browse files Browse the repository at this point in the history
* refactor(#2779): Fix the test for TestStringToStateProcessor

* refactor(#2779): Add a utils class to deal with the stream prefix selector

* refactor(#2778): Fix junit test TestStringCounterProcessor (#3036)
  • Loading branch information
tenthe authored Jul 17, 2024
1 parent 95f18b6 commit 5dca130
Show file tree
Hide file tree
Showing 8 changed files with 276 additions and 294 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-test-utils-executors</artifactId>
<version>0.97.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@
import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;

import com.google.common.collect.Lists;

import java.util.ArrayList;
import java.util.List;

public class StringToStateProcessor extends StreamPipesDataProcessor {
Expand Down Expand Up @@ -77,13 +76,13 @@ public void onInvocation(ProcessorParams parameters,

@Override
public void onEvent(Event event, SpOutputCollector collector) throws SpRuntimeException {
List<String> states = Lists.newArrayList();
List<String> states = new ArrayList<>();

for (String stateField : stateFields) {
states.add(event.getFieldBySelector(stateField).getAsPrimitive().getAsString());
}

event.addField(CURRENT_STATE, states.toArray());
event.addField(CURRENT_STATE, states);
collector.collect(event);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,133 +18,61 @@

package org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.counter;

//@RunWith(Parameterized.class)
import org.apache.streampipes.test.executors.PrefixStrategy;
import org.apache.streampipes.test.executors.ProcessingElementTestExecutor;
import org.apache.streampipes.test.executors.StreamPrefix;
import org.apache.streampipes.test.executors.TestConfiguration;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

public class TestStringCounterProcessor {
// private static final Logger LOG = LoggerFactory.getLogger(TestStringCounterProcessor.class);
//
// @org.junit.runners.Parameterized.Parameters
// public static Iterable<Object[]> data() {
// return Arrays.asList(new Object[][] {
// {"Test", List.of("t1"), new Tuple3<>("", "", 0)},
// {"Test", Arrays.asList("t1", "t2"), new Tuple3<>("t1", "t2", 1)},
// {"Test", Arrays.asList("t1", "t2", "t1", "t2"), new Tuple3<>("t1", "t2", 2)},
// {"Test", Arrays.asList("t1", "t2", "t1", "t3"), new Tuple3<>("t1", "t3", 1)}
// });
// }
//
// @org.junit.runners.Parameterized.Parameter
// public String selectedFieldName;
//
// @org.junit.runners.Parameterized.Parameter(1)
// public List<String> eventStrings;
//
// @org.junit.runners.Parameterized.Parameter(2)
// public Tuple3<String, String, Integer> expectedValue;
//
// @Test
// public void testStringCounter() {
// StringCounterProcessor stringCounter = new StringCounterProcessor();
// DataProcessorDescription originalGraph = stringCounter.declareModel();
// originalGraph.setSupportedGrounding(EventGroundingGenerator.makeDummyGrounding());
//
// DataProcessorInvocation graph = InvocationGraphGenerator.makeEmptyInvocation(originalGraph);
// graph.setInputStreams(Collections
// .singletonList(EventStreamGenerator
// .makeStreamWithProperties(Collections.singletonList(selectedFieldName))));
// graph.setOutputStream(EventStreamGenerator.makeStreamWithProperties(
// Collections.singletonList(selectedFieldName)));
// graph.getOutputStream().getEventGrounding().getTransportProtocol().getTopicDefinition()
// .setActualTopicName("output-topic");
//
// MappingPropertyUnary mappingPropertyUnary = graph.getStaticProperties().stream()
// .filter(p -> p instanceof MappingPropertyUnary)
// .map((p -> (MappingPropertyUnary) p))
// .filter(p -> p.getInternalName().equals(StringCounterProcessor.FIELD_ID))
// .findFirst().orElse(null);
// assert mappingPropertyUnary != null;
// mappingPropertyUnary.setSelectedProperty("s0::" + selectedFieldName);
// ProcessorParams params = new ProcessorParams(graph);
//
// SpOutputCollector spOut = new SpOutputCollector() {
// @Override
// public void registerConsumer(String routeId, InternalEventProcessor<Map<String, Object>> consumer) {
// }
//
// @Override
// public void unregisterConsumer(String routeId) {
// }
//
// @Override
// public void connect() throws SpRuntimeException {
// }
//
// @Override
// public void disconnect() throws SpRuntimeException {
// }
//
// @Override
// public void collect(Event event) {
// }
// };
//
// stringCounter.onInvocation(params, spOut, null);
// Tuple3<String, String, Integer> tuple = sendEvents(stringCounter, spOut);
// LOG.info("Expected match count is {}.", expectedValue.x);
// LOG.info("Actual match count is {}.", tuple.x);
// assertEquals(expectedValue.x, tuple.x);
// LOG.info("Expected change from is {}.", expectedValue.k);
// LOG.info("Actual change from is {}.", tuple.k);
// assertEquals(expectedValue.k, tuple.k);
// LOG.info("Expected change to is {}.", expectedValue.k);
// LOG.info("Actual change to is {}.", tuple.k);
// assertEquals(expectedValue.v, tuple.v);
// }
//
// private Tuple3<String, String, Integer> sendEvents(StringCounterProcessor stringCounter, SpOutputCollector spOut) {
// int counter = 0;
// String changeFrom = "", changeTo = "";
// List<Event> events = makeEvents();
// for (Event event : events) {
// LOG.info("Sending event with value "
// + event.getFieldBySelector("s0::" + selectedFieldName).getAsPrimitive().getAsString());
// stringCounter.onEvent(event, spOut);
// try {
// TimeUnit.MILLISECONDS.sleep(100);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
// try {
// counter = event.getFieldBySelector(StringCounterProcessor.COUNT_FIELD_RUNTIME_NAME)
// .getAsPrimitive()
// .getAsInt();
// changeFrom = event.getFieldBySelector(StringCounterProcessor.CHANGE_FROM_FIELD_RUNTIME_NAME)
// .getAsPrimitive()
// .getAsString();
// changeTo = event.getFieldBySelector(StringCounterProcessor.CHANGE_TO_FIELD_RUNTIME_NAME)
// .getAsPrimitive()
// .getAsString();
// LOG.info(changeFrom + " change to " + changeTo + ", value = " + counter);
// } catch (IllegalArgumentException e) {
//
// }
// }
// return new Tuple3<>(changeFrom, changeTo, counter);
// }
//
//
// private List<Event> makeEvents() {
// List<Event> events = Lists.newArrayList();
// for (String eventSetting : eventStrings) {
// events.add(makeEvent(eventSetting));
// }
// return events;
// }
//
// private Event makeEvent(String value) {
// Map<String, Object> map = Maps.newHashMap();
// map.put(selectedFieldName, value);
// return EventFactory.fromMap(map,
// new SourceInfo("test" + "-topic", "s0"),
// new SchemaInfo(null, Lists.newArrayList()));
// }
}
private static final String KEY_1 = "key1";

private StringCounterProcessor processor;

@BeforeEach
public void setup() {
processor = new StringCounterProcessor();
}


static Stream<Arguments> arguments() {
return Stream.of(
Arguments.of(
List.of(Map.of(KEY_1, "v1"), Map.of(KEY_1, "v2")),
List.of(Map.of(
KEY_1, "v2",
StringCounterProcessor.CHANGE_FROM_FIELD_RUNTIME_NAME, "v1",
StringCounterProcessor.CHANGE_TO_FIELD_RUNTIME_NAME, "v2",
StringCounterProcessor.COUNT_FIELD_RUNTIME_NAME, 1
))
)
);
}

@ParameterizedTest
@MethodSource("arguments")
public void testStringToState(
List<Map<String, Object>> intpuEvents,
List<Map<String, Object>> outputEvents
) {

var configuration = TestConfiguration
.builder()
.config(StringCounterProcessor.FIELD_ID, StreamPrefix.s0(KEY_1))
.prefixStrategy(PrefixStrategy.SAME_PREFIX)
.build();

var testExecutor = new ProcessingElementTestExecutor(processor, configuration);

testExecutor.run(intpuEvents, outputEvents);
}

}
Loading

0 comments on commit 5dca130

Please sign in to comment.