Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -1032,6 +1032,15 @@ TSStatus processPlanLocally(PhysicalPlan plan) {
log.setCurrLogTerm(getTerm().get());
log.setCurrLogIndex(logManager.getLastLogIndex() + 1);

// if a single log exceeds the threshold
// we need to return error code to the client as in server mode
if (log.serialize().capacity() + Integer.BYTES
>= ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
logger.error(
"Log cannot fit into buffer, please increase raft_log_buffer_size;"
+ "or reduce the size of requests you send.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. We need to control the log size. We can't let him have this anomaly.
  2. If an exception occurs, the server obtains the exception and then splits the log file or uses other methods to apply the log file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. We need to control the log size. We can't let him have this anomaly.
  2. If an exception occurs, the server obtains the exception and then splits the log file or uses other methods to apply the log file.

When this problem occurs, the service can only be restarted. And the current modification method is consistent with the single server mode. So the operation of splitting logs involves several modules, we may need to control the plan size but not only log.
We may need to spend some time on this part in the future to make it better.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs further discussion, is some error happened during raft log apply, maybe only one raft group failed, it will lose some data, and cause the raft group not consistent with each other, how to deal with the exception?

For now, I think it ok just throw the exception and not let the log be committed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, Thanks for submit a new issues for further discussion in jira, this pr could merged.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for all your suggesions,I will merge the pr, further discussion can be found here #3856

return StatusUtils.INTERNAL_ERROR;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like EXECUTE_STATEMENT_ERROR should be a more proper status code to tell user.

}
logManager.append(log);
}
Timer.Statistic.RAFT_SENDER_APPEND_LOG.calOperationCostTimeFromStart(startTime);
Expand Down Expand Up @@ -1077,6 +1086,14 @@ private TSStatus processPlanLocallyV2(PhysicalPlan plan) {
log.setCurrLogIndex(logManager.getLastLogIndex() + 1);

startTime = Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.getOperationStartTime();
// just like processPlanLocally,we need to check the size of log
if (log.serialize().capacity() + Integer.BYTES
>= ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
logger.error(
"Log cannot fit into buffer, please increase raft_log_buffer_size;"
+ "or reduce the size of requests you send.");
return StatusUtils.INTERNAL_ERROR;
}
// logDispatcher will serialize log, and set log size, and we will use the size after it
logManager.append(log);
Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.calOperationCostTimeFromStart(startTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ public void write(PhysicalPlan plan) throws IOException {
sync();
}
} catch (BufferOverflowException e) {
// if the size of a single plan bigger than logBufferWorking
// we need to clear the buffer to drop something wrong that has written.
logBufferWorking.clear();
throw new IOException("Log cannot fit into the buffer, please increase wal_buffer_size", e);
} finally {
lock.unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,32 @@ public void testOverSizedWAL() throws IOException, IllegalPathException {
}
assertTrue(caught);

// if last insertplan failed by overflow,it can not take affect the next insertplan
InsertRowPlan bwInsertPlan2 =
new InsertRowPlan(
new PartialPath("root.logTestDevice.oversize"),
100,
new String[] {"s1", "s2", "s3", "s4"},
new TSDataType[] {
TSDataType.DOUBLE, TSDataType.INT64, TSDataType.TEXT, TSDataType.BOOLEAN
},
// try to apply a insertplan whose size will fill the entire logBufferWorking
new String[] {
"1.0",
"15",
new String(
new char
[(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2) - 109]),
"false"
});
caught = false;
try {
logNode.write(bwInsertPlan2);
} catch (IOException e) {
caught = true;
}
assertFalse(caught);

ByteBuffer[] array = logNode.delete();
for (ByteBuffer byteBuffer : array) {
MmapUtil.clean((MappedByteBuffer) byteBuffer);
Expand Down