Skip to content

Commit 7ff968c

Browse files
authored
[IOTDB-2027] Rollback invalid entry after wal writing failure (#4424)
1 parent aaadc5c commit 7ff968c

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+100
-83
lines changed

server/src/main/java/org/apache/iotdb/db/mqtt/PublishHandler.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.apache.iotdb.db.qp.executor.PlanExecutor;
2828
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
2929
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
30-
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
3130

3231
import io.moquette.interception.AbstractInterceptHandler;
3332
import io.moquette.interception.messages.InterceptPublishMessage;
@@ -93,16 +92,15 @@ public void onPublish(InterceptPublishMessage msg) {
9392
continue;
9493
}
9594

96-
InsertRowPlan plan = new InsertRowPlan();
97-
plan.setTime(event.getTimestamp());
98-
plan.setMeasurements(event.getMeasurements().toArray(new String[0]));
99-
plan.setValues(event.getValues().toArray(new Object[0]));
100-
plan.setDataTypes(new TSDataType[event.getValues().size()]);
101-
plan.setNeedInferType(true);
102-
10395
boolean status = false;
10496
try {
105-
plan.setDeviceId(new PartialPath(event.getDevice()));
97+
PartialPath path = new PartialPath(event.getDevice());
98+
InsertRowPlan plan =
99+
new InsertRowPlan(
100+
path,
101+
event.getTimestamp(),
102+
event.getMeasurements().toArray(new String[0]),
103+
event.getValues().toArray(new String[0]));
106104
status = executeNonQuery(plan);
107105
} catch (Exception e) {
108106
LOG.warn(

server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,9 @@
7373
import org.apache.iotdb.db.qp.utils.EmptyOutputStream;
7474
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
7575

76+
import org.slf4j.Logger;
77+
import org.slf4j.LoggerFactory;
78+
7679
import java.io.DataOutputStream;
7780
import java.io.IOException;
7881
import java.nio.ByteBuffer;
@@ -82,6 +85,7 @@
8285

8386
/** This class is a abstract class for all type of PhysicalPlan. */
8487
public abstract class PhysicalPlan {
88+
private static final Logger logger = LoggerFactory.getLogger(PhysicalPlan.class);
8589

8690
private static final String SERIALIZATION_UNIMPLEMENTED = "serialization unimplemented";
8791

@@ -182,11 +186,27 @@ public void serialize(DataOutputStream stream) throws IOException {
182186

183187
/**
184188
* Serialize the plan into the given buffer. This is provided for WAL, so fields that can be
185-
* recovered will not be serialized.
189+
* recovered will not be serialized. If error occurs when serializing this plan, the buffer will
190+
* be reset.
186191
*
187192
* @param buffer
188193
*/
189194
public void serialize(ByteBuffer buffer) {
195+
buffer.mark();
196+
try {
197+
serializeImpl(buffer);
198+
} catch (UnsupportedOperationException e) {
199+
// ignore and throw
200+
throw e;
201+
} catch (Exception e) {
202+
logger.error(
203+
"Rollback buffer entry because error occurs when serializing this physical plan.", e);
204+
buffer.reset();
205+
throw e;
206+
}
207+
}
208+
209+
protected void serializeImpl(ByteBuffer buffer) {
190210
throw new UnsupportedOperationException(SERIALIZATION_UNIMPLEMENTED);
191211
}
192212

server/src/main/java/org/apache/iotdb/db/qp/physical/crud/DeletePlan.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ public void serialize(DataOutputStream stream) throws IOException {
151151
}
152152

153153
@Override
154-
public void serialize(ByteBuffer buffer) {
154+
public void serializeImpl(ByteBuffer buffer) {
155155
int type = PhysicalPlanType.DELETE.ordinal();
156156
buffer.put((byte) type);
157157
buffer.putLong(deleteStartTime);

server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ public void recoverFromFailure() {
258258
}
259259

260260
@Override
261-
public void serialize(ByteBuffer buffer) {
261+
public void serializeImpl(ByteBuffer buffer) {
262262
int type = PhysicalPlanType.MULTI_BATCH_INSERT.ordinal();
263263
buffer.put((byte) type);
264264
buffer.putInt(insertTabletPlanList.size());

server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -478,7 +478,7 @@ public void fillValues(ByteBuffer buffer) throws QueryProcessException {
478478
}
479479

480480
@Override
481-
public void serialize(ByteBuffer buffer) {
481+
public void serializeImpl(ByteBuffer buffer) {
482482
int type = PhysicalPlanType.INSERT.ordinal();
483483
buffer.put((byte) type);
484484
subSerialize(buffer);

server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ public void serialize(DataOutputStream stream) throws IOException {
159159
}
160160

161161
@Override
162-
public void serialize(ByteBuffer buffer) {
162+
public void serializeImpl(ByteBuffer buffer) {
163163
int type = PhysicalPlanType.BATCH_INSERT_ONE_DEVICE.ordinal();
164164
buffer.put((byte) type);
165165

server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsPlan.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ public int hashCode() {
168168
}
169169

170170
@Override
171-
public void serialize(ByteBuffer buffer) {
171+
public void serializeImpl(ByteBuffer buffer) {
172172
int type = PhysicalPlanType.BATCH_INSERT_ROWS.ordinal();
173173
buffer.put((byte) type);
174174
buffer.putInt(insertRowPlanList.size());

server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ private void writeValues(DataOutputStream stream) throws IOException {
247247
}
248248

249249
@Override
250-
public void serialize(ByteBuffer buffer) {
250+
public void serializeImpl(ByteBuffer buffer) {
251251
int type = PhysicalPlanType.BATCHINSERT.ordinal();
252252
buffer.put((byte) type);
253253
subSerialize(buffer);

server/src/main/java/org/apache/iotdb/db/qp/physical/crud/SelectIntoPlan.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public void serialize(DataOutputStream outputStream) throws IOException {
6868
}
6969

7070
@Override
71-
public void serialize(ByteBuffer buffer) {
71+
public void serializeImpl(ByteBuffer buffer) {
7272
buffer.put((byte) PhysicalPlanType.SELECT_INTO.ordinal());
7373

7474
queryPlan.serialize(buffer);

server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ActivateTemplatePlan.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public PartialPath getPrefixPath() {
5757
}
5858

5959
@Override
60-
public void serialize(ByteBuffer buffer) {
60+
public void serializeImpl(ByteBuffer buffer) {
6161
buffer.put((byte) PhysicalPlanType.ACTIVATE_TEMPLATE.ordinal());
6262
ReadWriteIOUtils.write(prefixPath.getFullPath(), buffer);
6363
buffer.putLong(index);

0 commit comments

Comments
 (0)