Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,14 @@ public void accept(final @NotNull List<DataPoint> dataPoints) {
try {
final var jsonMap=objectMapper.readValue((String)jsonDataPoint.getTagValue(), typeRef);
final var value = jsonMap.get("value");
if(value!=null) {
if(value!=null && jsonMap.size() == 1) {
return dataPointFactory.create(jsonDataPoint.getTagName(), value);
} else {
} else if(value!=null && jsonMap.size() > 1) {
return dataPointFactory.create(jsonDataPoint.getTagName(), jsonMap);
}else {
throw new RuntimeException("No value entry in JSON message");
}
} catch (JsonProcessingException e) {
} catch (final JsonProcessingException e) {
throw new RuntimeException(e);
}
}).toList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.hivemq.edge.adapters.opcua;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.hivemq.adapter.sdk.api.events.EventService;
import com.hivemq.adapter.sdk.api.factories.DataPointFactory;
import com.hivemq.adapter.sdk.api.services.ProtocolAdapterMetricsService;
Expand Down Expand Up @@ -55,6 +57,7 @@

class OpcUaClientConnection {
private static final @NotNull Logger log = LoggerFactory.getLogger(OpcUaClientConnection.class);
private static final @NotNull Gson GSON = new GsonBuilder().disableHtmlEscaping().create();

private final @NotNull OpcUaSpecificAdapterConfig config;
private final @NotNull List<OpcuaTag> tags;
Expand Down Expand Up @@ -206,7 +209,7 @@ void destroy() {
log.debug("Creating new OPC UA subscription");
final OpcUaSubscription subscription = new OpcUaSubscription(client);
subscription.setPublishingInterval((double) config.getOpcuaToMqttConfig().publishingInterval());
subscription.setSubscriptionListener(new OpcUaSubscriptionListener(protocolAdapterMetricsService, tagStreamingService, eventService, adapterId, tags, client, dataPointFactory));
subscription.setSubscriptionListener(new OpcUaSubscriptionListener(protocolAdapterMetricsService, tagStreamingService, eventService, adapterId, tags, client, dataPointFactory, GSON));
try {
subscription.create();
return subscription
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.util.Objects;

public class OpcuaTagDefinition implements TagDefinition {

@JsonProperty(value = "node", required = true)
Expand All @@ -30,35 +32,45 @@ public class OpcuaTagDefinition implements TagDefinition {
required = true)
private final @NotNull String node;

@JsonProperty(value = "collectAllProperties")
@ModuleConfigField(title = "Collect all properties of the node",
description = "OPC UA defines a set of properties for each node. If this is enabled, all properties will be collected and sent to the MQTT broker.")
private final @NotNull boolean collectAllProperties;

@JsonCreator
public OpcuaTagDefinition(@JsonProperty(value = "node", required = true) final @NotNull String node) {
public OpcuaTagDefinition(
@JsonProperty(value = "node", required = true) final @NotNull String node,
@JsonProperty(value = "collectAllProperties", defaultValue = "false") final @Nullable Boolean collectAllProperties) {
this.node = node;
if(collectAllProperties == null) {
this.collectAllProperties = false;
} else {
this.collectAllProperties = collectAllProperties;
}
}

public @NotNull String getNode() {
return node;
}

@Override
public boolean equals(final @Nullable Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
public boolean isCollectAllProperties() {
return collectAllProperties;
}

@Override
public boolean equals(final Object o) {
if (o == null || getClass() != o.getClass()) return false;
final OpcuaTagDefinition that = (OpcuaTagDefinition) o;
return node.equals(that.node);
return isCollectAllProperties() == that.isCollectAllProperties() && Objects.equals(getNode(), that.getNode());
}

@Override
public int hashCode() {
return node.hashCode();
return Objects.hash(getNode(), isCollectAllProperties());
}

@Override
public @NotNull String toString() {
return "OpcuaTagDefinition{" + "node='" + node + '\'' + '}';
public String toString() {
return "OpcuaTagDefinition{" + "node='" + node + '\'' + ", collectAllProperties=" + collectAllProperties + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.hivemq.edge.adapters.opcua.listeners;

import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.hivemq.adapter.sdk.api.events.EventService;
import com.hivemq.adapter.sdk.api.events.model.Event;
import com.hivemq.adapter.sdk.api.factories.DataPointFactory;
Expand All @@ -31,11 +33,7 @@
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -46,8 +44,6 @@

public class OpcUaSubscriptionListener implements OpcUaSubscription.SubscriptionListener {

private static final Logger log = LoggerFactory.getLogger(OpcUaSubscriptionListener.class);

private final @NotNull ProtocolAdapterMetricsService protocolAdapterMetricsService;
private final @NotNull ProtocolAdapterTagStreamingService tagStreamingService;
private final @NotNull EventService eventService;
Expand All @@ -56,6 +52,7 @@ public class OpcUaSubscriptionListener implements OpcUaSubscription.Subscription
private final @NotNull Map<NodeId, OpcuaTag> nodeIdToTag;
private final @NotNull OpcUaClient client;
private final @NotNull DataPointFactory dataPointFactory;
private final @NotNull Gson gson;

public OpcUaSubscriptionListener(
final @NotNull ProtocolAdapterMetricsService protocolAdapterMetricsService,
Expand All @@ -64,16 +61,19 @@ public OpcUaSubscriptionListener(
final @NotNull String adapterId,
final @NotNull List<OpcuaTag> tags,
final @NotNull OpcUaClient client,
final @NotNull DataPointFactory dataPointFactory) {
final @NotNull DataPointFactory dataPointFactory,
final @NotNull Gson gson) {
this.protocolAdapterMetricsService = protocolAdapterMetricsService;
this.tagStreamingService = tagStreamingService;
this.eventService = eventService;
this.adapterId = adapterId;
this.client = client;
this.dataPointFactory = dataPointFactory;
this.gson = gson;
nodeIdToTag = tags.stream()
.collect(Collectors.toMap(tag -> NodeId.parse(tag.getDefinition().getNode()), Function.identity()));
}

@Override
public void onKeepAliveReceived(final @NotNull OpcUaSubscription subscription) {
protocolAdapterMetricsService.increment(Constants.METRIC_SUBSCRIPTION_KEEPALIVE_COUNT);
Expand Down Expand Up @@ -106,7 +106,8 @@ public void onDataReceived(
}
try {
protocolAdapterMetricsService.increment(Constants.METRIC_SUBSCRIPTION_DATA_RECEIVED_COUNT);
final String payload = extractPayload(client, values.get(i));
final var currentValue = values.get(i);
final String payload = extractPayload(client, currentValue, gson, tag.getDefinition().isCollectAllProperties());
tagStreamingService.feed(tn, List.of(dataPointFactory.createJsonDataPoint(tn, payload)));
} catch (final Throwable e) {
protocolAdapterMetricsService.increment(Constants.METRIC_SUBSCRIPTION_DATA_ERROR_COUNT);
Expand All @@ -115,15 +116,47 @@ public void onDataReceived(
}
}

private static @NotNull String extractPayload(final @NotNull OpcUaClient client, final @NotNull DataValue value)
throws UaException {
private static @NotNull String extractPayload(
final @NotNull OpcUaClient client,
final @NotNull DataValue value,
final @NotNull Gson gson,
final boolean collectAllProperties) throws UaException {

if (value.getValue().getValue() == null) {
return "";
}
final var jsonObject = OpcUaToJsonConverter.convertPayload(client.getDynamicEncodingContext(), value, gson);

return jsonObject
.map(json -> {
final var ret = new JsonObject();
ret.add("value", json);
if(collectAllProperties) {
if(value.getServerPicoseconds() != null) {
ret.addProperty("serverPicoseconds", value.getServerPicoseconds().longValue());
} else {
ret.addProperty("serverPicoseconds", 0);
}
if(value.getSourcePicoseconds() != null) {
ret.addProperty("sourcePicoseconds", value.getSourcePicoseconds().longValue());
} else {
ret.addProperty("sourcePicoseconds", 0);
}
if(value.getSourceTime() != null) {
ret.addProperty("sourceTime", value.getSourceTime().getUtcTime());
} else {
ret.addProperty("sourceTime", 0);
}
if(value.getServerTime() != null) {
ret.addProperty("serverTime", value.getServerTime().getUtcTime());
} else {
ret.addProperty("serverTime", 0);
}
}
return ret;
})
.map(gson::toJson)
.orElseGet(() -> gson.toJson(new JsonObject()));

final ByteBuffer byteBuffer = OpcUaToJsonConverter.convertPayload(client.getDynamicEncodingContext(), value);
final byte[] buffer = new byte[byteBuffer.remaining()];
byteBuffer.get(buffer);
return new String(buffer, StandardCharsets.UTF_8);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.Base64;
import java.util.Optional;
import java.util.UUID;

import static com.hivemq.edge.adapters.opcua.Constants.EMPTY_BYTES;
Expand All @@ -59,15 +60,15 @@ public class OpcUaToJsonConverter {

private static final @NotNull Logger log = LoggerFactory.getLogger(OpcUaToJsonConverter.class);

private static final @NotNull Gson GSON = new GsonBuilder().disableHtmlEscaping().create();
private static final @NotNull Base64.Encoder BASE_64 = Base64.getEncoder();

public static @NotNull ByteBuffer convertPayload(
public static @NotNull Optional<JsonElement> convertPayload(
final @NotNull EncodingContext serializationContext,
final @NotNull DataValue dataValue) {
final @NotNull DataValue dataValue,
final @NotNull Gson gson) {
final Object value = dataValue.getValue().getValue();
if (value == null) {
return ByteBuffer.wrap(EMPTY_BYTES);
return Optional.empty();
}
final JsonObject jsonObject = new JsonObject();
if (value instanceof final DataValue v) {
Expand All @@ -89,15 +90,15 @@ public class OpcUaToJsonConverter {
jsonObject.add("serverPicoseconds", new JsonPrimitive(v.getServerPicoseconds().intValue()));
}
}
jsonObject.add("value", convertValue(value, serializationContext));
return ByteBuffer.wrap(GSON.toJson(jsonObject).getBytes(StandardCharsets.UTF_8));
return Optional.ofNullable(convertValue(value, serializationContext, gson));
}

private static JsonElement convertValue(
final @NotNull Object value,
final @NotNull EncodingContext serializationContext) {
final @NotNull EncodingContext serializationContext,
final @NotNull Gson gson) {
if (value instanceof final DataValue dv) {
return convertValue(dv.getValue(), serializationContext);
return convertValue(dv.getValue(), serializationContext, gson);
} else if (value instanceof final Boolean b) {
return new JsonPrimitive(b);
} else if (value instanceof final Byte b) {
Expand Down Expand Up @@ -162,31 +163,31 @@ private static JsonElement convertValue(
} else if (value instanceof final ExtensionObject eo) {
try {
final Object decodedValue = eo.decode(serializationContext);
return convertValue(decodedValue, serializationContext);
return convertValue(decodedValue, serializationContext, gson);
} catch (final Throwable t) {
log.debug("Not able to decode body of OPC UA ExtensionObject, using undecoded body value instead", t);
return convertValue(eo.getBody(), serializationContext);
return convertValue(eo.getBody(), serializationContext, gson);
}
} else if (value instanceof final Variant variant) {
final Object variantValue = variant.getValue();
return variantValue != null ? convertValue(variantValue, serializationContext) : null;
return variantValue != null ? convertValue(variantValue, serializationContext, gson) : null;
} else if (value instanceof final DiagnosticInfo info) {
return convertDiagnosticInfo(info);
} else if (value instanceof final DynamicStructType struct) {
final JsonObject structRoot = new JsonObject();
struct.getMembers()
.forEach((key, value1) -> structRoot.add(key, convertValue(value1, serializationContext)));
.forEach((key, value1) -> structRoot.add(key, convertValue(value1, serializationContext, gson)));
return structRoot;
} else if (value.getClass().isArray()) {
final Object[] values = (Object[]) value;
final JsonArray ret = new JsonArray();
Arrays.asList(values).forEach(in -> ret.add(convertValue(in, serializationContext)));
Arrays.asList(values).forEach(in -> ret.add(convertValue(in, serializationContext, gson)));
return ret;
}

log.warn("No explicit converter for OPC UA type {} falling back to best effort json",
value.getClass().getSimpleName());
return GSON.toJsonTree(value);
return gson.toJsonTree(value);
}

private static @NotNull JsonObject convertNodeId(final @NotNull NodeId nodeId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void before() {
}

@NotNull
protected OpcUaProtocolAdapter createAndStartAdapter(final @NotNull String subcribedNodeId)
protected OpcUaProtocolAdapter createAndStartAdapter(final @NotNull String subcribedNodeId, final @Nullable Boolean collectAllProperties)
throws Exception {

final OpcUaToMqttConfig opcuaToMqttConfig =
Expand All @@ -107,7 +107,7 @@ protected OpcUaProtocolAdapter createAndStartAdapter(final @NotNull String subcr
null);

when(protocolAdapterInput.getConfig()).thenReturn(config);
when(protocolAdapterInput.getTags()).thenReturn(List.of(new OpcuaTag(subcribedNodeId, "", new OpcuaTagDefinition(subcribedNodeId))));
when(protocolAdapterInput.getTags()).thenReturn(List.of(new OpcuaTag(subcribedNodeId, "", new OpcuaTagDefinition(subcribedNodeId, collectAllProperties))));
final OpcUaProtocolAdapter protocolAdapter =
new OpcUaProtocolAdapter(OpcUaProtocolAdapterInformation.INSTANCE, protocolAdapterInput);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void whenTypeSubscriptionPresent_thenReceiveMsg(
final String nodeId =
opcUaServerExtension.getTestNamespace().addNode("Test" + name + "Node", typeId, () -> serverValue, 999);

final OpcUaProtocolAdapter protocolAdapter = createAndStartAdapter(nodeId);
final OpcUaProtocolAdapter protocolAdapter = createAndStartAdapter(nodeId, null);
assertEquals(ProtocolAdapterState.ConnectionStatus.CONNECTED,
protocolAdapter.getProtocolAdapterState().getConnectionStatus());
final var received = expectAdapterPublish();
Expand Down
Loading
Loading