Skip to content

Commit

Permalink
[ODA-318] Removed handling of single event
Browse files Browse the repository at this point in the history
  • Loading branch information
adrimenen committed Feb 16, 2022
1 parent 42cb0bf commit 770662e
Show file tree
Hide file tree
Showing 41 changed files with 163 additions and 370 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,8 @@ public AbstractDatastreamsEvent(EventPublisher eventPublisher) {
}

@Override
public void publish(String deviceId, String datastreamId, List<String> path, Long at, Object value) {
public void publish(String deviceId, List<String> path, Map<String, Map<Long,Object>> events) {
String[] pathArray = Optional.ofNullable(path).map(list -> list.toArray(new String[0])).orElse(null);
eventPublisher.publishEvent(deviceId, datastreamId, pathArray, at, value);
}

@Override
public void publishGroup(String deviceId, List<String> path, Map<String, Map<Long,Object>> events) {
String[] pathArray = Optional.ofNullable(path).map(list -> list.toArray(new String[0])).orElse(null);
eventPublisher.publishGroupEvents(deviceId, pathArray, events);
eventPublisher.publishEvents(deviceId, pathArray, events);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,5 @@
public interface DatastreamsEvent {
void registerToEventSource();
void unregisterFromEventSource();
void publish(String deviceId, String datastreamId, List<String> path, Long at, Object value);
void publishGroup(String deviceId, List<String> path, Map<String, Map<Long, Object>> events);
void publish(String deviceId, List<String> path, Map<String, Map<Long, Object>> events);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@

public interface EventPublisher extends AutoCloseable {

void publishEvent(String deviceId, String datastreamId, String[] path, Long at, Object value);
void publishGroupEvents(String deviceId, String[] path, Map<String, Map<Long,Object>> events);
void publishEvents(String deviceId, String[] path, Map<String, Map<Long,Object>> events);
@Override
void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,7 @@ public EventPublisherProxy(BundleContext bundleContext) {
}

@Override
public void publishEvent(String deviceId, String datastreamId, String[] path, Long at, Object value) {
Map<String, Object> eventProperties = new HashMap<>();
eventProperties.put(DEVICE_ID_PROPERTY_NAME, deviceId);
eventProperties.put(DATASTREAM_ID_PROPERTY_NAME, datastreamId);
eventProperties.put(PATH_PROPERTY_NAME, path);
eventProperties.put(AT_PROPERTY_NAME, at);
eventProperties.put(VALUE_PROPERTY_NAME, value);
Event event = new Event(EVENT_TOPIC, eventProperties);

eventAdmin.sendEvent(event);
}

@Override
public void publishGroupEvents(String deviceId, String[] path, Map<String, Map<Long,Object>> events) {
public void publishEvents(String deviceId, String[] path, Map<String, Map<Long,Object>> events) {
List<Map<String, Object>> eventList = new ArrayList<>();
events.entrySet().stream()
.forEach(event -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package es.amplia.oda.core.commons.osgi.proxies;

import javafx.beans.binding.ObjectExpression;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -58,20 +57,6 @@ public void testConstructor() throws Exception {
PowerMockito.verifyNew(EventAdminProxy.class).withArguments(eq(mockedContext));
}

@Test
public void testPublishEvent() {
testEventPublisher.publishEvent(TEST_DEVICE_ID, TEST_DATASTREAM_ID, TEST_PATH, TEST_AT, TEST_VALUE);

verify(mockedEventAdmin).sendEvent(eventCaptor.capture());
Event capturedEvent = eventCaptor.getValue();
assertEquals(EVENT_TOPIC, capturedEvent.getTopic());
assertEquals(TEST_DEVICE_ID, capturedEvent.getProperty(DEVICE_ID_PROPERTY_NAME));
assertEquals(TEST_DATASTREAM_ID, capturedEvent.getProperty(DATASTREAM_ID_PROPERTY_NAME));
assertEquals(TEST_PATH, capturedEvent.getProperty(PATH_PROPERTY_NAME));
assertEquals(TEST_AT, capturedEvent.getProperty(AT_PROPERTY_NAME));
assertEquals(TEST_VALUE, capturedEvent.getProperty(VALUE_PROPERTY_NAME));
}

@Test
public void testPublishGroupEvents() {
Map<String, Map<Long, Object>> events = new HashMap<>();
Expand All @@ -82,7 +67,7 @@ public void testPublishGroupEvents() {
events.put(TEST_DATASTREAM_ID, event1);
events.put(TEST_DATASTREAM_ID_2, event2);

testEventPublisher.publishGroupEvents(TEST_DEVICE_ID, TEST_PATH, events);
testEventPublisher.publishEvents(TEST_DEVICE_ID, TEST_PATH, events);

verify(mockedEventAdmin).sendEvent(eventCaptor.capture());
Event capturedEvent = eventCaptor.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

class AdcDatastreamsEvent extends AbstractDatastreamsEvent {

Expand Down Expand Up @@ -47,7 +49,11 @@ public void registerToEventSource() {

private void publishEvent(AdcEvent event) {
float value = (float) (((max - min) * event.getScaledValue()) + min);
publish("", datastreamId, Collections.emptyList(), event.getEpochTime(), value);
Map<String, Map<Long, Object>> events = new HashMap<>();
Map<Long, Object> data = new HashMap<>();
data.put(event.getEpochTime(), value);
events.put(datastreamId, data);
publish("", Collections.emptyList(), events);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ public void testRegisterToEventSource() {

capturedListener.channelValueChanged(mockedEvent);

verify(mockedEventPublisher).publishEvent(eq(""), eq(TEST_DATASTREAM), eq(new String[0]), anyLong(),
eq((TEST_MAX - TEST_MIN) * TEST_VALUE - TEST_MIN));
verify(mockedEventPublisher).publishEvents(eq(""), eq(new String[0]), any());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;

class GpioDatastreamsEvent extends AbstractDatastreamsEvent {

private static final Logger LOGGER = LoggerFactory.getLogger(GpioDatastreamsEvent.class);
Expand Down Expand Up @@ -46,7 +49,11 @@ public void registerToEventSource() {
}

void publishValue(boolean value) {
publish("", datastreamId, null, System.currentTimeMillis(), value);
Map<String, Map<Long, Object>> event = new HashMap<>();
Map<Long, Object> data = new HashMap<>();
data.put(System.currentTimeMillis(), value);
event.put(datastreamId, data);
publish("", null, event);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
import org.mockito.runners.MockitoJUnitRunner;
import org.powermock.reflect.Whitebox;

import java.util.HashMap;
import java.util.Map;

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.*;

Expand Down Expand Up @@ -107,7 +110,7 @@ public void testPublishEvent() {

testGpioDatastreamsEvent.publishValue(testValue);

verify(mockedEventPublisher).publishEvent(eq(""), eq(TEST_DATASTREAM_ID), eq(null), anyLong(), eq(testValue));
verify(mockedEventPublisher).publishEvents(eq(""), eq(null), any());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -78,7 +80,11 @@ private void tryDeserializeAsLoraStatus(byte[] data) {
try {
LoraStatusPacket loraStayAlive = serializer.deserialize(data, LoraStatusPacket.class);
if(loraStayAlive != null && loraStayAlive.getStat() != null) {
this.publish(deviceId, "lora", null, System.currentTimeMillis(), loraStayAlive);
Map<String, Map<Long, Object>> event = new HashMap<>();
Map<Long, Object> eventData = new HashMap<>();
eventData.put(System.currentTimeMillis(), loraStayAlive);
event.put("lora", eventData);
this.publish(deviceId, null, event);
LOGGER.info("Sent LoRa status message at: {}", System.currentTimeMillis());
if(LOGGER.isDebugEnabled()) {
LOGGER.debug("LoRa packet content: {}", loraStayAlive.toShortString());
Expand All @@ -95,7 +101,11 @@ private void tryDeserializeAsLoraData(byte[] data) {
try {
LoraDataPacket loraPacket = serializer.deserialize(data, LoraDataPacket.class);
if(loraPacket != null && loraPacket.getRxpk() != null) {
this.publish(deviceId, "lora", null, System.currentTimeMillis(), loraPacket);
Map<String, Map<Long, Object>> event = new HashMap<>();
Map<Long, Object> eventData = new HashMap<>();
eventData.put(System.currentTimeMillis(), loraPacket);
event.put("lora", eventData);
this.publish(deviceId, null, event);
LOGGER.info("Sent LoRa data message");
if(LOGGER.isDebugEnabled()) {
LOGGER.debug("LoRa packet content: {}", loraPacket.toShortString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,14 +177,14 @@ public void testResgisterToEventSourceAndReadALoraStatus() throws InterruptedExc
when(mockedSerializer.deserialize(eq(LORA_STATUS_BYTE_ARRAY_EXPECTED_TO_DESERIALIZE), eq(LoraDataPacket.class)))
.thenReturn(null);
when(mockedStatus.getStat()).thenReturn(mockedStat);
doNothing().when(mockedPublisher).publishEvent(any(), any(), any(), any(), any());
doNothing().when(mockedPublisher).publishEvents(any(), any(), any());

testDatastreamsEvent.registerToEventSource();
TimeUnit.SECONDS.sleep(3);

verify(mockedSerializer, atLeastOnce()).deserialize(eq(LORA_STATUS_BYTE_ARRAY_EXPECTED_TO_DESERIALIZE), eq(LoraStatusPacket.class));
verify(mockedSerializer, atLeastOnce()).deserialize(eq(LORA_STATUS_BYTE_ARRAY_EXPECTED_TO_DESERIALIZE), eq(LoraDataPacket.class));
verify(mockedPublisher, atLeastOnce()).publishEvent(eq("gatewayForUnitTests"), eq("lora"), (String[]) isNull(), anyLong(), eq(mockedStatus));
verify(mockedPublisher, atLeastOnce()).publishEvents(eq("gatewayForUnitTests"), (String[]) isNull(), any());
Thread thread = Whitebox.getInternalState(testDatastreamsEvent, "readingThread");
assertNotNull(thread);
assertTrue(thread.isAlive());
Expand All @@ -203,14 +203,14 @@ public void testResgisterToEventSourceAndReadALoraData() throws InterruptedExcep
when(mockedSerializer.deserialize(eq(LORA_DATA_BYTE_ARRAY_EXPECTED_TO_DESERIALIZE), eq(LoraDataPacket.class)))
.thenReturn(mockedData);
when(mockedData.getRxpk()).thenReturn(Collections.singletonList(mockedRxpk));
doNothing().when(mockedPublisher).publishEvent(any(), any(), any(), any(), any());
doNothing().when(mockedPublisher).publishEvents(any(), any(), any());

testDatastreamsEvent.registerToEventSource();
TimeUnit.SECONDS.sleep(3);

verify(mockedSerializer, atLeastOnce()).deserialize(eq(LORA_DATA_BYTE_ARRAY_EXPECTED_TO_DESERIALIZE), eq(LoraStatusPacket.class));
verify(mockedSerializer, atLeastOnce()).deserialize(eq(LORA_DATA_BYTE_ARRAY_EXPECTED_TO_DESERIALIZE), eq(LoraDataPacket.class));
verify(mockedPublisher, atLeastOnce()).publishEvent(eq("gatewayForUnitTests"), eq("lora"), (String[]) isNull(), anyLong(), eq(mockedData));
verify(mockedPublisher, atLeastOnce()).publishEvents(eq("gatewayForUnitTests"), (String[]) isNull(), any());
Thread thread = Whitebox.getInternalState(testDatastreamsEvent, "readingThread");
assertNotNull(thread);
assertTrue(thread.isAlive());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,22 +71,15 @@ public void messageArrived(String topic, MqttMessage mqttMessage) {
String deviceId = extractDeviceIdFromTopic(topic);
DeviceEventMessage deviceEvent =
serializer.deserialize(mqttMessage.getPayload(), DeviceEventMessage.class);
if(deviceEvent.getDatastreams().size() == 1) {
deviceEvent.getDatastreams().stream()
.filter(event -> hasPermission(deviceId, event.getDatastreamId()))
.forEach(event -> publish(deviceId, event.getDatastreamId(), deviceEvent.getPath(),
event.getAt(), event.getValue()));
} else {
deviceEvent.getDatastreams().stream()
.filter(event -> hasPermission(deviceId, event.getDatastreamId()))
.forEach(event -> {
Map<Long, Object> entry = new HashMap<>();
entry.put(event.getAt(), event.getValue());
events.put(event.getDatastreamId(), entry);
});
if(events.size() > 0)
publishGroup(deviceId, deviceEvent.getPath(), events);
}
deviceEvent.getDatastreams().stream()
.filter(event -> hasPermission(deviceId, event.getDatastreamId()))
.forEach(event -> {
Map<Long, Object> entry = new HashMap<>();
entry.put(event.getAt(), event.getValue());
events.put(event.getDatastreamId(), entry);
});
if(events.size() > 0)
publish(deviceId, deviceEvent.getPath(), events);
} catch (Exception e) {
LOGGER.error("Error dispatching device event from MQTT message {}: {}", mqttMessage, e);
}
Expand All @@ -108,14 +101,17 @@ class DatastreamEventMessageListener implements MqttMessageListener {

@Override
public void messageArrived(String topic, MqttMessage mqttMessage) {
Map<String,Map<Long,Object>> events = new HashMap<>();
Map<Long,Object> eventInfo = new HashMap<>();
try {
LOGGER.info("Message arrived to the {} topic", topic);
DatastreamInfo datastreamInfo = extractDeviceInfoFromTopic(topic);
if (hasPermission(datastreamInfo.getDeviceId(), datastreamInfo.getDatastreamId())) {
DatastreamEvent event =
serializer.deserialize(mqttMessage.getPayload(), DatastreamEvent.class);
publish(datastreamInfo.getDeviceId(), datastreamInfo.getDatastreamId(), event.getPath(),
event.getAt(), event.getValue());
eventInfo.put(event.getAt(), event.getValue());
events.put(datastreamInfo.getDatastreamId(), eventInfo);
publish(datastreamInfo.getDeviceId(), event.getPath(), events);
}

} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void testDeviceEventMessageListenerMessageArriveWithPath() throws MqttExc
verify(mockedPermissionManager).hasReadPermission(eq(TEST_DEVICE_ID), eq(TEST_DATASTREAM_ID));
verify(mockedPermissionManager).hasReadPermission(eq(TEST_DEVICE_ID), eq(TEST_DATASTREAM_ID_2));
verify(mockedEventPublisher)
.publishGroupEvents(eq(TEST_DEVICE_ID), aryEq(expectedPath), eq(events));
.publishEvents(eq(TEST_DEVICE_ID), aryEq(expectedPath), eq(events));
}

@Test
Expand Down Expand Up @@ -132,8 +132,11 @@ public void testOneDeviceEventMessageListenerMessageArriveWithPath() throws Mqtt
verify(mockedSerializer)
.deserialize(aryEq(TEST_PAYLOAD), eq(MqttDatastreamsEvent.DeviceEventMessage.class));
verify(mockedPermissionManager).hasReadPermission(eq(TEST_DEVICE_ID), eq(TEST_DATASTREAM_ID));
verify(mockedEventPublisher)
.publishEvent(eq(TEST_DEVICE_ID), eq(TEST_DATASTREAM_ID), aryEq(expectedPath), eq(TEST_AT), eq(TEST_VALUE));
Map<String, Map<Long, Object>> eventToCompare = new HashMap<>();
Map<Long, Object> dataToCompare = new HashMap<>();
dataToCompare.put(TEST_AT, TEST_VALUE);
eventToCompare.put(TEST_DATASTREAM_ID, dataToCompare);
verify(mockedEventPublisher).publishEvents(eq(TEST_DEVICE_ID), aryEq(expectedPath), eq(eventToCompare));
}

@Test
Expand Down Expand Up @@ -168,7 +171,7 @@ public void testDeviceEventMessageListenerMessageArriveWithoutPath() throws Mqtt
verify(mockedPermissionManager).hasReadPermission(eq(TEST_DEVICE_ID), eq(TEST_DATASTREAM_ID_2));

verify(mockedEventPublisher)
.publishGroupEvents(eq(TEST_DEVICE_ID), eq(null), eq(events));
.publishEvents(eq(TEST_DEVICE_ID), eq(null), eq(events));
}

@Test
Expand All @@ -191,9 +194,12 @@ public void testOneDeviceEventMessageListenerMessageArriveWithoutPath() throws M

verify(mockedSerializer).deserialize(aryEq(TEST_PAYLOAD), eq(MqttDatastreamsEvent.DeviceEventMessage.class));
verify(mockedPermissionManager).hasReadPermission(eq(TEST_DEVICE_ID), eq(TEST_DATASTREAM_ID));

Map<String, Map<Long, Object>> eventToCompare = new HashMap<>();
Map<Long, Object> dataToCompare = new HashMap<>();
dataToCompare.put(TEST_AT, TEST_VALUE);
eventToCompare.put(TEST_DATASTREAM_ID, dataToCompare);
verify(mockedEventPublisher)
.publishEvent(eq(TEST_DEVICE_ID), eq(TEST_DATASTREAM_ID), eq(null), eq(TEST_AT), eq(TEST_VALUE));
.publishEvents(eq(TEST_DEVICE_ID), eq(null), eq(eventToCompare));
}

@Test
Expand Down Expand Up @@ -263,8 +269,12 @@ public void testDatastreamEventMessageListenerMessageArrive() throws MqttExcepti

verify(mockedPermissionManager).hasReadPermission(eq(TEST_DEVICE_ID), eq(TEST_DATASTREAM_ID));
verify(mockedSerializer).deserialize(aryEq(TEST_PAYLOAD), eq(MqttDatastreamsEvent.DatastreamEvent.class));
Map<String, Map<Long, Object>> eventToCompare = new HashMap<>();
Map<Long, Object> dataToCompare = new HashMap<>();
dataToCompare.put(TEST_AT, TEST_VALUE);
eventToCompare.put(TEST_DATASTREAM_ID, dataToCompare);
verify(mockedEventPublisher)
.publishEvent(eq(TEST_DEVICE_ID), eq(TEST_DATASTREAM_ID), aryEq(expectedPath), eq(TEST_AT), eq(TEST_VALUE));
.publishEvents(eq(TEST_DEVICE_ID), aryEq(expectedPath), eq(eventToCompare));
}

@Test
Expand All @@ -286,8 +296,12 @@ public void testDatastreamEventMessageListenerMessageArriveWithoutPath() throws

verify(mockedPermissionManager).hasReadPermission(eq(TEST_DEVICE_ID), eq(TEST_DATASTREAM_ID));
verify(mockedSerializer).deserialize(aryEq(TEST_PAYLOAD), eq(MqttDatastreamsEvent.DatastreamEvent.class));
Map<String, Map<Long, Object>> eventToCompare = new HashMap<>();
Map<Long, Object> dataToCompare = new HashMap<>();
dataToCompare.put(TEST_AT, TEST_VALUE);
eventToCompare.put(TEST_DATASTREAM_ID, dataToCompare);
verify(mockedEventPublisher)
.publishEvent(eq(TEST_DEVICE_ID), eq(TEST_DATASTREAM_ID), eq(null), eq(TEST_AT), eq(TEST_VALUE));
.publishEvents(eq(TEST_DEVICE_ID), eq(null), eq(eventToCompare));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,6 @@ public void loadDatastreamIdsToCollect(Collection<String> datastreamIds) {
datastreamIdsToCollect.addAll(datastreamIds);
}

@Override
public void publish(Event event) {
if (isEventFromCollectedDatastream(event)) {
LOGGER.info("Collected event {} of datastream {}", event, event.getDatastreamId());
collect(event);
} else {
eventDispatcher.publish(event);
}
}

@Override
public void publish(List<Event> events) {
List<Event> eventsToPublish = new ArrayList<>();
Expand Down Expand Up @@ -79,7 +69,7 @@ public void publishCollectedEvents(Collection<String> datastreamIds) {
for(String datastreamId : datastreamIds) {
List<Event> events = collectedEvents.remove(datastreamId);
if (events != null) {
events.forEach(event -> outputDatastreamPerDevice.merge(event.getDeviceId(), eventDispatcher.parse(event),
events.forEach(event -> outputDatastreamPerDevice.merge(event.getDeviceId(), eventDispatcher.parse(Collections.singletonList(event)),
this::mergeOutputDatastreams));
} else {
LOGGER.info("No events collected for {}", datastreamId);
Expand Down
Loading

0 comments on commit 770662e

Please sign in to comment.