Skip to content

Commit

Permalink
[#1672] Make processor dimension sensitive. (#1673)
Browse files Browse the repository at this point in the history
* [#1672] Make processor dimension sensitive.

* [#1672] Fix pom and add unit tests

* Clean up formatting

* [htfix] Fix checkstyle error
  • Loading branch information
obermeier authored Jun 12, 2023
1 parent 1079acc commit ca4e0c2
Show file tree
Hide file tree
Showing 4 changed files with 244 additions and 2 deletions.
5 changes: 5 additions & 0 deletions streampipes-extensions-management/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,11 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<configuration>
<propertyExpansion>
checkstyle.config.base.path=${project.parent.basedir}/tools/maven
</propertyExpansion>
</configuration>
</plugin>
</plugins>
</build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<configuration>
<propertyExpansion>
checkstyle.config.base.path=${project.parent.parent.basedir}/tools/maven
</propertyExpansion>
</configuration>
</plugin>
</plugins>
</build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
Expand All @@ -36,14 +37,25 @@
import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;

public class ChangedValueDetectionProcessor
extends StreamPipesDataProcessor {

private static Logger logger = LoggerFactory.getLogger(ChangedValueDetectionProcessor.class);

public static final String COMPARE_FIELD_ID = "compare";
public static final String CHANGE_FIELD_NAME = "change_detected";
private Integer maxStateSize = 5000; // Max number of stored states

private String compareParameter;
private Object lastObject = null;

private HashMap<String, Object> dimensionsState = new HashMap<>();

//TODO: Change Icon
@Override
Expand Down Expand Up @@ -73,16 +85,36 @@ public void onInvocation(ProcessorParams parameters,
public void onEvent(Event inputEvent, SpOutputCollector collector) throws SpRuntimeException {
Object newObject = inputEvent.getFieldBySelector(compareParameter).getRawValue();

String dimensionKey = this.getDimensionKey(inputEvent);
var lastObject = this.dimensionsState.get(dimensionKey);

if (lastObject == null && this.dimensionsState.size() > maxStateSize) {
throw new SpRuntimeException("Max state size reached " + maxStateSize);
}

if (newObject != null) {
if (!newObject.equals(lastObject)) {
lastObject = newObject;
dimensionsState.put(dimensionKey, newObject);
//TODO: Better handling of multiple timestamps (if the field "change_detected" is already in the input)?
inputEvent.addField(CHANGE_FIELD_NAME, System.currentTimeMillis());
collector.collect(inputEvent);
}
}
}

String getDimensionKey(Event inputEvent) {

List<EventProperty> eventProperties = inputEvent.getSchemaInfo().getEventSchema().getEventProperties();

return eventProperties.stream()
.filter(ep ->
ep.getPropertyScope() != null
&& ep.getPropertyScope().equals(PropertyScope.DIMENSION_PROPERTY.name())
)
.map(ep -> inputEvent.getFieldByRuntimeName(ep.getRuntimeName()).getRawValue().toString())
.sorted()
.collect(Collectors.joining());
}
@Override
public void onDetach() throws SpRuntimeException {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.processors.transformation.jvm.processor.value.change;

import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.messaging.InternalEventProcessor;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.runtime.EventFactory;
import org.apache.streampipes.model.runtime.SchemaInfo;
import org.apache.streampipes.model.runtime.SourceInfo;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.model.staticproperty.MappingPropertyUnary;
import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
import org.apache.streampipes.sdk.builder.adapter.GuessSchemaBuilder;
import org.apache.streampipes.sdk.utils.Datatypes;
import org.apache.streampipes.test.generator.InvocationGraphGenerator;
import org.apache.streampipes.wrapper.params.compat.ProcessorParams;

import org.junit.Test;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.junit.Assert.assertEquals;

public class TestChangedValueDetectionProcessor {

@Test
public void getDimensionKeyForMultipleDimensionProperties() {
var processor = new ChangedValueDetectionProcessor();
var event = this.createTestEvent();

assertEquals("l1sensor01", processor.getDimensionKey(event));
}

@Test
public void getDimensionKeyForOneDimension() {
var processor = new ChangedValueDetectionProcessor();
var event = this.createTestEvent();

// Create event with no DIMENSION_PROPERTY
event.getSchemaInfo()
.getEventSchema()
.getEventProperties().get(0)
.setPropertyScope(PropertyScope.MEASUREMENT_PROPERTY.name());

assertEquals("l1", processor.getDimensionKey(event));
}

@Test
public void getDimensionKeyForOneDimensionProperty() {
var processor = new ChangedValueDetectionProcessor();
var event = this.createTestEvent(4711);

// Remove second dimension property
event.removeFieldBySelector("location");
event.getSchemaInfo().getEventSchema().getEventProperties().remove(1);

assertEquals("sensor01", processor.getDimensionKey(event));
}

@Test
public void detectChangedValue() {
var processor = new ChangedValueDetectionProcessor();

// Set "s0::value" as COMPARE_FIELD_ID
DataProcessorInvocation graph = InvocationGraphGenerator.makeEmptyInvocation(processor.declareModel());

graph.getStaticProperties().stream()
.filter(p -> p instanceof MappingPropertyUnary)
.map((p -> (MappingPropertyUnary) p))
.findFirst().get().setSelectedProperty("s0::value");

ProcessorParams params = new ProcessorParams(graph);
processor.onInvocation(params, null, null);

var event = this.createTestEvent();

StoreEventCollector collector = new StoreEventCollector();

processor.onEvent(event, collector);
processor.onEvent(event, collector);
processor.onEvent(event, collector);
processor.onEvent(this.createTestEvent(4711), collector); // Change value
processor.onEvent(event, collector);
processor.onEvent(event, collector);
processor.onEvent(event, collector);

assertEquals(3, collector.getEvents().size());
}

@Test
public void detectChangedValueMultiDim() {
var processor = new ChangedValueDetectionProcessor();

// Set "s0::value" as COMPARE_FIELD_ID
DataProcessorInvocation graph = InvocationGraphGenerator.makeEmptyInvocation(processor.declareModel());

graph.getStaticProperties().stream()
.filter(p -> p instanceof MappingPropertyUnary)
.map((p -> (MappingPropertyUnary) p))
.findFirst().get().setSelectedProperty("s0::value");

ProcessorParams params = new ProcessorParams(graph);
processor.onInvocation(params, null, null);

StoreEventCollector collector = new StoreEventCollector();

processor.onEvent(this.createTestEvent(0, "loc_1"), collector);
processor.onEvent(this.createTestEvent(0, "loc_2"), collector);
processor.onEvent(this.createTestEvent(0, "loc_1"), collector);
processor.onEvent(this.createTestEvent(0, "loc_1"), collector);
processor.onEvent(this.createTestEvent(0, "loc_2"), collector);
processor.onEvent(this.createTestEvent(0, "loc_2"), collector);
processor.onEvent(this.createTestEvent(1, "loc_1"), collector);
processor.onEvent(this.createTestEvent(1, "loc_2"), collector);

assertEquals(4, collector.getEvents().size());
}

private Event createTestEvent() {
return this.createTestEvent(12, "l1");
}

private Event createTestEvent(Integer value) {
return this.createTestEvent(value, "l1");
}

private Event createTestEvent(Integer value, String location) {

var eventSchema = GuessSchemaBuilder.create()
.property(PrimitivePropertyBuilder
.create(Datatypes.String, "sensorId")
.scope(PropertyScope.DIMENSION_PROPERTY)
.build())
.property(PrimitivePropertyBuilder
.create(Datatypes.Double, "location")
.scope(PropertyScope.DIMENSION_PROPERTY)
.build())
.property(PrimitivePropertyBuilder
.create(Datatypes.Integer, "value")
.scope(PropertyScope.MEASUREMENT_PROPERTY)
.build())
.build().eventSchema;

Map<String, Object> map = new HashMap<>();
map.put("sensorId", "sensor01");
map.put("location", location);
map.put("value", value);

return EventFactory.fromMap(map, new SourceInfo("", "s0"), new SchemaInfo(eventSchema, new ArrayList<>()));
}

class StoreEventCollector implements SpOutputCollector {

ArrayList<Event> events = new ArrayList<Event>();

@Override
public void registerConsumer(String routeId, InternalEventProcessor<Map<String, Object>> consumer) {}

@Override
public void unregisterConsumer(String routeId) {}

@Override
public void connect() throws SpRuntimeException {}

@Override
public void disconnect() throws SpRuntimeException {}

@Override
public void collect(Event event) {
events.add(event);
}

public List<Event> getEvents() {
return this.events;
}

}
}

0 comments on commit ca4e0c2

Please sign in to comment.