Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iotdb.db.protocol.mqtt.TreeMessage;

import io.netty.buffer.ByteBuf;
import org.apache.commons.lang3.NotImplementedException;

import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -33,7 +34,7 @@
public class CustomizedJsonPayloadFormatter implements PayloadFormatter {

@Override
public List<Message> format(ByteBuf payload) {
public List<Message> format(String topic, ByteBuf payload) {
// Suppose the payload is a json format
if (payload == null) {
return Collections.emptyList();
Expand All @@ -54,6 +55,12 @@ public List<Message> format(ByteBuf payload) {
return ret;
}

@Override
@Deprecated
public List<Message> format(ByteBuf payload) {
throw new NotImplementedException();
}

@Override
public String getName() {
// set the value of mqtt_payload_formatter in iotdb-common.properties as the following string:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.gson.JsonParseException;
import com.google.gson.reflect.TypeToken;
import io.netty.buffer.ByteBuf;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.tsfile.enums.TSDataType;

import java.nio.charset.StandardCharsets;
Expand All @@ -50,7 +51,7 @@ public class JSONPayloadFormatter implements PayloadFormatter {
private static final Gson GSON = new GsonBuilder().create();

@Override
public List<Message> format(ByteBuf payload) {
public List<Message> format(String topic, ByteBuf payload) {
if (payload == null) {
return new ArrayList<>();
}
Expand Down Expand Up @@ -81,6 +82,12 @@ public List<Message> format(ByteBuf payload) {
throw new JsonParseException("payload is invalidate");
}

@Override
@Deprecated
public List<Message> format(ByteBuf payload) {
throw new NotImplementedException();
}

private List<Message> formatJson(JsonObject jsonObject) {
TreeMessage message = new TreeMessage();
message.setDevice(jsonObject.get(JSON_KEY_DEVICE).getAsString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iotdb.db.protocol.mqtt;

import io.netty.buffer.ByteBuf;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.Pair;
Expand Down Expand Up @@ -63,14 +64,16 @@ public LinePayloadFormatter() {
}

@Override
public List<Message> format(ByteBuf payload) {
public List<Message> format(String topic, ByteBuf payload) {
List<Message> messages = new ArrayList<>();
if (payload == null) {
return messages;
}

String txt = payload.toString(StandardCharsets.UTF_8);
String[] lines = txt.split(LINE_BREAK);
// '/' previously defined as a database name
String database = !topic.contains("/") ? topic : topic.substring(0, topic.indexOf("/"));
for (String line : lines) {
if (line.trim().startsWith(WELL)) {
continue;
Expand All @@ -83,6 +86,9 @@ public List<Message> format(ByteBuf payload) {
continue;
}

// Parsing Database Name
message.setDatabase((database));

// Parsing Table Names
message.setTable(matcher.group(TABLE));

Expand Down Expand Up @@ -121,6 +127,12 @@ public List<Message> format(ByteBuf payload) {
return messages;
}

@Override
@Deprecated
public List<Message> format(ByteBuf payload) {
throw new NotImplementedException();
}

private boolean setTags(Matcher matcher, TableMessage message) {
List<String> tagKeys = new ArrayList<>();
List<Object> tagValues = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public void onPublish(InterceptPublishMessage msg) {
topic,
payload);

List<Message> messages = payloadFormat.format(payload);
List<Message> messages = payloadFormat.format(topic, payload);
if (messages == null) {
return;
}
Expand All @@ -146,14 +146,7 @@ public void onPublish(InterceptPublishMessage msg) {
continue;
}
if (useTableInsert) {
TableMessage tableMessage = (TableMessage) message;
// '/' previously defined as a database name
String database =
!msg.getTopicName().contains("/")
? msg.getTopicName()
: msg.getTopicName().substring(0, msg.getTopicName().indexOf("/"));
tableMessage.setDatabase(database);
insertTable(tableMessage, session);
insertTable((TableMessage) message, session);
} else {
insertTree((TreeMessage) message, session);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,20 @@ public interface PayloadFormatter {
* @param payload
* @return
*/
@Deprecated
List<Message> format(ByteBuf payload);

/**
* format a payload of a topic to a list of messages
*
* @param topic
* @param payload
* @return
*/
default List<Message> format(String topic, ByteBuf payload) {
return format(payload);
}

/**
* get the formatter name
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ public void formatJson() {
+ " }";

ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
String topic = "";

JSONPayloadFormatter formatter = new JSONPayloadFormatter();
TreeMessage message = (TreeMessage) formatter.format(buf).get(0);
TreeMessage message = (TreeMessage) formatter.format(topic, buf).get(0);

assertEquals("root.sg.d1", message.getDevice());
assertEquals(Long.valueOf(1586076045524L), message.getTimestamp());
Expand All @@ -59,9 +60,10 @@ public void formatBatchJson() {
+ " }";

ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
String topic = "";

JSONPayloadFormatter formatter = new JSONPayloadFormatter();
TreeMessage message = (TreeMessage) formatter.format(buf).get(1);
TreeMessage message = (TreeMessage) formatter.format(topic, buf).get(1);

assertEquals("root.sg.d1", message.getDevice());
assertEquals(Long.valueOf(1586076065526L), message.getTimestamp());
Expand All @@ -88,9 +90,10 @@ public void formatJsonArray() {
+ "]";

ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
String topic = "";

JSONPayloadFormatter formatter = new JSONPayloadFormatter();
TreeMessage message = (TreeMessage) formatter.format(buf).get(1);
TreeMessage message = (TreeMessage) formatter.format(topic, buf).get(1);

assertEquals("root.sg.d2", message.getDevice());
assertEquals(Long.valueOf(1586076065526L), message.getTimestamp());
Expand All @@ -117,9 +120,10 @@ public void formatBatchJsonArray() {
+ "]";

ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
String topic = "";

JSONPayloadFormatter formatter = new JSONPayloadFormatter();
TreeMessage message = (TreeMessage) formatter.format(buf).get(3);
TreeMessage message = (TreeMessage) formatter.format(topic, buf).get(3);

assertEquals("root.sg.d2", message.getDevice());
assertEquals(Long.valueOf(1586076065526L), message.getTimestamp());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ public void formatLine() {
"test1,tag1=t1,tag2=t2 attr1=a1,attr2=a2 field1=\"value1\",field2=1i,field3=2u,field4=3i32,field5=t,field6=false,field7=4,field8=5f 1";

ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
String topic = "";

LinePayloadFormatter formatter = new LinePayloadFormatter();
TableMessage message = (TableMessage) formatter.format(buf).get(0);
TableMessage message = (TableMessage) formatter.format(topic, buf).get(0);

assertEquals("test1", message.getTable());
assertEquals(Long.valueOf(1L), message.getTimestamp());
Expand All @@ -64,9 +65,10 @@ public void formatBatchLine() {
+ "test2,tag3=t3,tag4=t4 attr3=a3,attr4=a4 field4=\"value4\",field5=10i,field6=10i32 2 ";

ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
String topic = "";

LinePayloadFormatter formatter = new LinePayloadFormatter();
TableMessage message = (TableMessage) formatter.format(buf).get(1);
TableMessage message = (TableMessage) formatter.format(topic, buf).get(1);

assertEquals("test2", message.getTable());
assertEquals(Long.valueOf(2L), message.getTimestamp());
Expand All @@ -82,9 +84,10 @@ public void formatLineAnnotation() {
+ " # test2,tag3=t3,tag4=t4 attr3=a3,attr4=a4 field4=\"value4\",field5=10i,field6=10i32 2 ";

ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
String topic = "";

LinePayloadFormatter formatter = new LinePayloadFormatter();
List<Message> message = formatter.format(buf);
List<Message> message = formatter.format(topic, buf);

assertEquals(1, message.size());
}
Expand Down
Loading