Skip to content

Commit

Permalink
Merge pull request #1166 from SteveYurongSu/iotdb-session-client
Browse files Browse the repository at this point in the history
[#1146] Session based implementation of IoTDB sink (fix issues in current JDBC based implementation)
  • Loading branch information
tenthe authored Jan 27, 2023
2 parents 540650a + 56f464c commit 3abfd88
Show file tree
Hide file tree
Showing 13 changed files with 271 additions and 243 deletions.
5 changes: 4 additions & 1 deletion installer/cli/deploy/standalone/iotdb/docker-compose.dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,8 @@
version: "3.4"
services:
iotdb:
# 0.13.3 is the only recommended production ready version for edge side (standalone) deployment
image: apache/iotdb:0.13.3-node
ports:
- 6667:6667
# rpc port
- "6667:6667"
5 changes: 3 additions & 2 deletions installer/cli/deploy/standalone/iotdb/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
version: "3.4"
services:
iotdb:
image: apache/iotdb
# 0.13.3 is the only recommended production ready version for edge side (standalone) deployment
image: apache/iotdb:0.13.3-node
volumes:
- iotdb:/iotdb/data
- iotdb:/iotdb
logging:
driver: "json-file"
options:
Expand Down
12 changes: 0 additions & 12 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@
<errorprone-annotations.version>2.2.0</errorprone-annotations.version>
<google-maps-services.version>0.10.0</google-maps-services.version>
<graalvm.js.version>22.3.1</graalvm.js.version>
<iotdb.version>1.0.0</iotdb.version>
<java-websocket.version>1.5.0</java-websocket.version>
<jakarta-websocket-client-api.version>2.1.0</jakarta-websocket-client-api.version>
<jaxb-runtime.version>2.3.2</jaxb-runtime.version>
Expand Down Expand Up @@ -1283,17 +1282,6 @@
<artifactId>flink-runtime_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-jdbc</artifactId>
<version>${iotdb.version}</version>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.opennlp</groupId>
<artifactId>opennlp-tools</artifactId>
Expand Down
17 changes: 16 additions & 1 deletion streampipes-extensions/streampipes-sinks-databases-jvm/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@
</dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-jdbc</artifactId>
<artifactId>iotdb-session</artifactId>
<!-- 0.13.3 is the only recommended production ready version for edge side (standalone) deployment -->
<!-- Should be worked for IoTDB version >= 0.13.0. -->
<version>0.13.3</version>
</dependency>
<dependency>
<groupId>org.eclipse.milo</groupId>
Expand Down Expand Up @@ -108,6 +111,18 @@
<artifactId>bcpkix-jdk15on</artifactId>
</dependency>

<!-- Test dependencies -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>

<!-- remove these javax dependencies whenever possible -->
<dependency>
<groupId>javax.xml.bind</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,166 +18,112 @@

package org.apache.streampipes.sinks.databases.jvm.iotdb;

import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.logging.api.Logger;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.model.schema.EventPropertyPrimitive;
import org.apache.streampipes.sinks.databases.jvm.jdbcclient.JdbcClient;
import org.apache.streampipes.sinks.databases.jvm.jdbcclient.model.DbDataTypeFactory;
import org.apache.streampipes.sinks.databases.jvm.jdbcclient.model.DbDataTypes;
import org.apache.streampipes.sinks.databases.jvm.jdbcclient.model.ParameterInformation;
import org.apache.streampipes.sinks.databases.jvm.jdbcclient.model.SupportedDbEngines;
import org.apache.streampipes.sinks.databases.jvm.jdbcclient.utils.SQLStatementUtils;
import org.apache.streampipes.model.runtime.field.AbstractField;
import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
import org.apache.streampipes.wrapper.runtime.EventSink;

import java.sql.SQLException;
import java.sql.Statement;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.pool.SessionPool;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;

public class IotDb extends JdbcClient implements EventSink<IotDbParameters> {
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

private IotDbParameters params;
private static Logger log;
public class IotDb implements EventSink<IotDbParameters> {

private String timestampField;
private static Logger logger;

private final SupportedDbEngines dbEngine = SupportedDbEngines.IOT_DB;
private String timestampFieldId;
private String deviceId;

// IoTDB provides a connection pool (SessionPool) for Native API.
// Using the interface, we need to define the pool size.
// If we can not get a session connection in 60 seconds, there is a warning log but the program will hang.
// If a session has finished an operation, it will be put back to the pool automatically.
// If a session connection is broken, the session will be removed automatically and the pool will try to create a
// new session and redo the operation.
private SessionPool sessionPool;

@Override
public void onInvocation(IotDbParameters parameters, EventSinkRuntimeContext runtimeContext)
throws SpRuntimeException {

this.params = parameters;
log = parameters.getGraph().getLogger(IotDb.class);
timestampField = parameters.getTimestampField();

// tablename is the identifier for the storage group in the IoTDB Adapter (e.g. root.data.table1) in which all
// time series are written
//TODO: Add better regular expression
initializeJdbc(
parameters.getGraph().getInputStreams().get(0).getEventSchema(),
parameters,
dbEngine,
log);
public void onInvocation(IotDbParameters parameters, EventSinkRuntimeContext runtimeContext) {
logger = parameters.getGraph().getLogger(IotDb.class);

deviceId = "root." + parameters.getDatabase() + "." + parameters.getDevice();
timestampFieldId = parameters.getTimestampField();

// In our case, the pool size is set to 2.
// One connection is for current requests, and the other is a backup for fast-recovery when connection dies.
sessionPool = new SessionPool.Builder().maxSize(2).enableCompression(false).host(parameters.getHost())
.port(parameters.getPort()).user(parameters.getUser()).password(parameters.getPassword()).build();
}

@Override
public void onEvent(Event event) {
try {
if (event.getRaw().containsKey("value")) {
// Renaming value. Very ugly
event.addField("value_1", event.getFieldBySelector("s0::value").getRawValue());
event.removeFieldBySelector("s0::value");
}
save(event);
} catch (SpRuntimeException e) {
log.error(e.getMessage());
if (event == null) {
return;
}
}

@Override
public void onDetach() throws SpRuntimeException {
closeAll();
}

@Override
protected void save(final Event event) throws SpRuntimeException {
checkConnected();
try {
Long timestampValue = event.getFieldBySelector(timestampField).getAsPrimitive().getAsLong();
event.removeFieldBySelector(timestampField);
Statement statement;
statement = connection.createStatement();
StringBuilder sb1 = new StringBuilder();
StringBuilder sb2 = new StringBuilder();
//TODO: Check for SQL-Injection
// Timestamp must be in the beginning of the values
sb1.append("INSERT INTO ").append(this.params.getDbTable()).append("(timestamp, ");
sb2.append(" VALUES (").append(timestampValue).append(", ");
for (String s : event.getRaw().keySet()) {
sb1.append(s).append(", ");
if (event.getFieldByRuntimeName(s).getRawValue() instanceof String) {
sb2.append("\"").append(event.getFieldByRuntimeName(s).getRawValue().toString()).append("\", ");
} else {
sb2.append(event.getFieldByRuntimeName(s).getRawValue().toString()).append(", ");
}
}
sb1.setLength(sb1.length() - 2);
sb2.setLength(sb2.length() - 2);
sb1.append(") ").append(sb2).append(")");
statement.execute(sb1.toString());
} catch (SQLException e) {
e.printStackTrace();
final AbstractField timestampAbstractField = event.getFieldBySelector(timestampFieldId);
final Long timestamp = timestampAbstractField.getAsPrimitive().getAsLong();
if (timestamp == null) {
return;
}
}

@Override
protected void ensureDatabaseExists(String url, String databaseName) throws SpRuntimeException {
SQLStatementUtils.checkRegEx(this.params.getDbTable(), "Storage Group name", this.dbDescription);
try {
Statement statement = connection.createStatement();
statement.execute("SET STORAGE GROUP TO " + this.params.getDbTable());
} catch (SQLException e) {
// Storage group already exists
//TODO: Catch other exceptions
final Map<String, Object> measurementValuePairs = event.getRaw();
// should be at least a timestamp field and a measurement field
if (measurementValuePairs.size() <= 1) {
return;
}
}

/**
* Needs to be reimplemented since the IoTDB JDBC implementation does not support the methods used in the
* JDBC-Client class
*
* @param url The JDBC url containing the needed information (e.g. "jdbc:iotdb://127.0.0.1:6667/")
* @throws SpRuntimeException
*/
@Override
protected void ensureTableExists(String url, String databaseName) throws SpRuntimeException {
int index = 1;
this.statementHandler.putEventParameterMap("timestamp",
new ParameterInformation(index++, DbDataTypeFactory.getLong(dbEngine)));
for (EventProperty eventProperty : this.tableDescription.getEventSchema().getEventProperties()) {
try {
if (eventProperty.getRuntimeName().equals(timestampField.substring(4))) {
continue;
}
Statement statement = null;
statement = connection.createStatement();
// The identifier cannot be called "value"
//TODO: Do not simply add a _1 but look instead, if the name is already taken
String runtimeName = eventProperty.getRuntimeName();
if (eventProperty.getRuntimeName().equals("value")) {
runtimeName = "value_1";
}
DbDataTypes datatype = extractAndAddEventPropertyRuntimeType(eventProperty, index++);

statement.execute("CREATE TIMESERIES "
+ params.getDbTable()
+ "."
+ runtimeName
+ " WITH DATATYPE="
+ datatype.toString()
+ ", ENCODING=PLAIN");
} catch (SQLException e) {
// Probably because it already exists
//TODO: Add better exception handling
e.printStackTrace();
final int measurementFieldCount = measurementValuePairs.size() - 1;
final List<String> measurements = new ArrayList<>(measurementFieldCount);
final List<TSDataType> types = new ArrayList<>(measurementFieldCount);
final List<Object> values = new ArrayList<>(measurementFieldCount);

for (Map.Entry<String, Object> measurementValuePair : measurementValuePairs.entrySet()) {
if (timestampAbstractField.getFieldNameIn().equals(measurementValuePair.getKey())) {
continue;
}

measurements.add(measurementValuePair.getKey());

final Object value = measurementValuePair.getValue();
if (value instanceof Integer) {
types.add(TSDataType.INT32);
values.add(value);
} else if (value instanceof Long) {
types.add(TSDataType.INT64);
values.add(value);
} else if (value instanceof Float) {
types.add(TSDataType.FLOAT);
values.add(value);
} else if (value instanceof Double) {
types.add(TSDataType.DOUBLE);
values.add(value);
} else if (value instanceof Boolean) {
types.add(TSDataType.BOOLEAN);
values.add(value);
} else {
types.add(TSDataType.TEXT);
values.add(Binary.valueOf(value.toString()));
}
}
//tableExists = true;
}

private DbDataTypes extractAndAddEventPropertyRuntimeType(EventProperty eventProperty, int index) {
// Supported datatypes can be found here: https://iotdb.apache.org/#/Documents/0.8.0/chap2/sec2
DbDataTypes dataType = DbDataTypes.TEXT;
if (eventProperty instanceof EventPropertyPrimitive) {
dataType = DbDataTypeFactory.getFromUri(((EventPropertyPrimitive) eventProperty).getRuntimeType(),
SupportedDbEngines.IOT_DB);
this.statementHandler.putEventParameterMap(eventProperty.getRuntimeName(),
new ParameterInformation(index, dataType));
try {
sessionPool.insertRecord(deviceId, timestamp, measurements, types, values);
} catch (IoTDBConnectionException | StatementExecutionException e) {
logger.error("Failed to save event to IoTDB, because: " + e.getMessage());
e.printStackTrace();
}
}

return dataType;
@Override
public void onDetach() {
sessionPool.close();
}
}
Loading

0 comments on commit 3abfd88

Please sign in to comment.