diff --git a/installer/cli/deploy/standalone/iotdb/docker-compose.dev.yml b/installer/cli/deploy/standalone/iotdb/docker-compose.dev.yml
index b01ab7c0fa..8cfad035c3 100644
--- a/installer/cli/deploy/standalone/iotdb/docker-compose.dev.yml
+++ b/installer/cli/deploy/standalone/iotdb/docker-compose.dev.yml
@@ -16,5 +16,8 @@
version: "3.4"
services:
iotdb:
+ # 0.13.3 is the only recommended production ready version for edge side (standalone) deployment
+ image: apache/iotdb:0.13.3-node
ports:
- - 6667:6667
+ # rpc port
+ - "6667:6667"
diff --git a/installer/cli/deploy/standalone/iotdb/docker-compose.yml b/installer/cli/deploy/standalone/iotdb/docker-compose.yml
index eb315f3433..f45807b9d6 100644
--- a/installer/cli/deploy/standalone/iotdb/docker-compose.yml
+++ b/installer/cli/deploy/standalone/iotdb/docker-compose.yml
@@ -16,9 +16,10 @@
version: "3.4"
services:
iotdb:
- image: apache/iotdb
+ # 0.13.3 is the only recommended production ready version for edge side (standalone) deployment
+ image: apache/iotdb:0.13.3-node
volumes:
- - iotdb:/iotdb/data
+ - iotdb:/iotdb
logging:
driver: "json-file"
options:
diff --git a/pom.xml b/pom.xml
index e70ec21ed9..045286e085 100644
--- a/pom.xml
+++ b/pom.xml
@@ -157,7 +157,6 @@
2.2.0
0.10.0
22.3.1
- 1.0.0
1.5.0
2.1.0
2.3.2
@@ -1283,17 +1282,6 @@
flink-runtime_2.11
${flink.version}
-
- org.apache.iotdb
- iotdb-jdbc
- ${iotdb.version}
-
-
- javax.servlet
- servlet-api
-
-
-
org.apache.opennlp
opennlp-tools
diff --git a/streampipes-extensions/streampipes-sinks-databases-jvm/pom.xml b/streampipes-extensions/streampipes-sinks-databases-jvm/pom.xml
index 0cea72e1df..16128fd3d6 100644
--- a/streampipes-extensions/streampipes-sinks-databases-jvm/pom.xml
+++ b/streampipes-extensions/streampipes-sinks-databases-jvm/pom.xml
@@ -55,7 +55,10 @@
org.apache.iotdb
- iotdb-jdbc
+ iotdb-session
+
+
+ 0.13.3
org.eclipse.milo
@@ -108,6 +111,18 @@
bcpkix-jdk15on
+
+
+ junit
+ junit
+ test
+
+
+ org.mockito
+ mockito-core
+ test
+
+
javax.xml.bind
diff --git a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/iotdb/IotDb.java b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/iotdb/IotDb.java
index 7e023167fb..df8300f710 100644
--- a/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/iotdb/IotDb.java
+++ b/streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/iotdb/IotDb.java
@@ -18,166 +18,112 @@
package org.apache.streampipes.sinks.databases.jvm.iotdb;
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.logging.api.Logger;
import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.model.schema.EventProperty;
-import org.apache.streampipes.model.schema.EventPropertyPrimitive;
-import org.apache.streampipes.sinks.databases.jvm.jdbcclient.JdbcClient;
-import org.apache.streampipes.sinks.databases.jvm.jdbcclient.model.DbDataTypeFactory;
-import org.apache.streampipes.sinks.databases.jvm.jdbcclient.model.DbDataTypes;
-import org.apache.streampipes.sinks.databases.jvm.jdbcclient.model.ParameterInformation;
-import org.apache.streampipes.sinks.databases.jvm.jdbcclient.model.SupportedDbEngines;
-import org.apache.streampipes.sinks.databases.jvm.jdbcclient.utils.SQLStatementUtils;
+import org.apache.streampipes.model.runtime.field.AbstractField;
import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
import org.apache.streampipes.wrapper.runtime.EventSink;
-import java.sql.SQLException;
-import java.sql.Statement;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.pool.SessionPool;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
-public class IotDb extends JdbcClient implements EventSink {
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
- private IotDbParameters params;
- private static Logger log;
+public class IotDb implements EventSink {
- private String timestampField;
+ private static Logger logger;
- private final SupportedDbEngines dbEngine = SupportedDbEngines.IOT_DB;
+ private String timestampFieldId;
+ private String deviceId;
+
+ // IoTDB provides a connection pool (SessionPool) for Native API.
+ // Using the interface, we need to define the pool size.
+ // If we can not get a session connection in 60 seconds, there is a warning log but the program will hang.
+ // If a session has finished an operation, it will be put back to the pool automatically.
+ // If a session connection is broken, the session will be removed automatically and the pool will try to create a
+ // new session and redo the operation.
+ private SessionPool sessionPool;
@Override
- public void onInvocation(IotDbParameters parameters, EventSinkRuntimeContext runtimeContext)
- throws SpRuntimeException {
-
- this.params = parameters;
- log = parameters.getGraph().getLogger(IotDb.class);
- timestampField = parameters.getTimestampField();
-
- // tablename is the identifier for the storage group in the IoTDB Adapter (e.g. root.data.table1) in which all
- // time series are written
- //TODO: Add better regular expression
- initializeJdbc(
- parameters.getGraph().getInputStreams().get(0).getEventSchema(),
- parameters,
- dbEngine,
- log);
+ public void onInvocation(IotDbParameters parameters, EventSinkRuntimeContext runtimeContext) {
+ logger = parameters.getGraph().getLogger(IotDb.class);
+
+ deviceId = "root." + parameters.getDatabase() + "." + parameters.getDevice();
+ timestampFieldId = parameters.getTimestampField();
+ // In our case, the pool size is set to 2.
+ // One connection is for current requests, and the other is a backup for fast-recovery when connection dies.
+ sessionPool = new SessionPool.Builder().maxSize(2).enableCompression(false).host(parameters.getHost())
+ .port(parameters.getPort()).user(parameters.getUser()).password(parameters.getPassword()).build();
}
@Override
public void onEvent(Event event) {
- try {
- if (event.getRaw().containsKey("value")) {
- // Renaming value. Very ugly
- event.addField("value_1", event.getFieldBySelector("s0::value").getRawValue());
- event.removeFieldBySelector("s0::value");
- }
- save(event);
- } catch (SpRuntimeException e) {
- log.error(e.getMessage());
+ if (event == null) {
+ return;
}
- }
- @Override
- public void onDetach() throws SpRuntimeException {
- closeAll();
- }
-
- @Override
- protected void save(final Event event) throws SpRuntimeException {
- checkConnected();
- try {
- Long timestampValue = event.getFieldBySelector(timestampField).getAsPrimitive().getAsLong();
- event.removeFieldBySelector(timestampField);
- Statement statement;
- statement = connection.createStatement();
- StringBuilder sb1 = new StringBuilder();
- StringBuilder sb2 = new StringBuilder();
- //TODO: Check for SQL-Injection
- // Timestamp must be in the beginning of the values
- sb1.append("INSERT INTO ").append(this.params.getDbTable()).append("(timestamp, ");
- sb2.append(" VALUES (").append(timestampValue).append(", ");
- for (String s : event.getRaw().keySet()) {
- sb1.append(s).append(", ");
- if (event.getFieldByRuntimeName(s).getRawValue() instanceof String) {
- sb2.append("\"").append(event.getFieldByRuntimeName(s).getRawValue().toString()).append("\", ");
- } else {
- sb2.append(event.getFieldByRuntimeName(s).getRawValue().toString()).append(", ");
- }
- }
- sb1.setLength(sb1.length() - 2);
- sb2.setLength(sb2.length() - 2);
- sb1.append(") ").append(sb2).append(")");
- statement.execute(sb1.toString());
- } catch (SQLException e) {
- e.printStackTrace();
+ final AbstractField timestampAbstractField = event.getFieldBySelector(timestampFieldId);
+ final Long timestamp = timestampAbstractField.getAsPrimitive().getAsLong();
+ if (timestamp == null) {
+ return;
}
- }
- @Override
- protected void ensureDatabaseExists(String url, String databaseName) throws SpRuntimeException {
- SQLStatementUtils.checkRegEx(this.params.getDbTable(), "Storage Group name", this.dbDescription);
- try {
- Statement statement = connection.createStatement();
- statement.execute("SET STORAGE GROUP TO " + this.params.getDbTable());
- } catch (SQLException e) {
- // Storage group already exists
- //TODO: Catch other exceptions
+ final Map measurementValuePairs = event.getRaw();
+ // should be at least a timestamp field and a measurement field
+ if (measurementValuePairs.size() <= 1) {
+ return;
}
- }
- /**
- * Needs to be reimplemented since the IoTDB JDBC implementation does not support the methods used in the
- * JDBC-Client class
- *
- * @param url The JDBC url containing the needed information (e.g. "jdbc:iotdb://127.0.0.1:6667/")
- * @throws SpRuntimeException
- */
- @Override
- protected void ensureTableExists(String url, String databaseName) throws SpRuntimeException {
- int index = 1;
- this.statementHandler.putEventParameterMap("timestamp",
- new ParameterInformation(index++, DbDataTypeFactory.getLong(dbEngine)));
- for (EventProperty eventProperty : this.tableDescription.getEventSchema().getEventProperties()) {
- try {
- if (eventProperty.getRuntimeName().equals(timestampField.substring(4))) {
- continue;
- }
- Statement statement = null;
- statement = connection.createStatement();
- // The identifier cannot be called "value"
- //TODO: Do not simply add a _1 but look instead, if the name is already taken
- String runtimeName = eventProperty.getRuntimeName();
- if (eventProperty.getRuntimeName().equals("value")) {
- runtimeName = "value_1";
- }
- DbDataTypes datatype = extractAndAddEventPropertyRuntimeType(eventProperty, index++);
-
- statement.execute("CREATE TIMESERIES "
- + params.getDbTable()
- + "."
- + runtimeName
- + " WITH DATATYPE="
- + datatype.toString()
- + ", ENCODING=PLAIN");
- } catch (SQLException e) {
- // Probably because it already exists
- //TODO: Add better exception handling
- e.printStackTrace();
+ final int measurementFieldCount = measurementValuePairs.size() - 1;
+ final List measurements = new ArrayList<>(measurementFieldCount);
+ final List types = new ArrayList<>(measurementFieldCount);
+ final List