Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.iotdb.consensus.config.IoTConsensusConfig;
import org.apache.iotdb.consensus.iot.thrift.TLogEntry;

import java.nio.Buffer;
import java.util.ArrayList;
import java.util.List;

Expand All @@ -37,7 +36,7 @@ public class Batch {

private long logEntriesNumFromWAL = 0L;

private long serializedSize;
private long memorySize;
// indicates whether this batch has been successfully synchronized to another node
private boolean synced;

Expand All @@ -60,14 +59,12 @@ public void addTLogEntry(TLogEntry entry) {
if (entry.fromWAL) {
logEntriesNumFromWAL++;
}
// TODO Maybe we need to add in additional fields for more accurate calculations
serializedSize +=
entry.getData() == null ? 0 : entry.getData().stream().mapToInt(Buffer::capacity).sum();
memorySize += entry.getMemorySize();
}

public boolean canAccumulate() {
return logEntries.size() < config.getReplication().getMaxLogEntriesNumPerBatch()
&& serializedSize < config.getReplication().getMaxSizePerBatch();
&& memorySize < config.getReplication().getMaxSizePerBatch();
}

public long getStartIndex() {
Expand All @@ -94,8 +91,8 @@ public boolean isEmpty() {
return logEntries.isEmpty();
}

public long getSerializedSize() {
return serializedSize;
public long getMemorySize() {
return memorySize;
}

public long getLogEntriesNumFromWAL() {
Expand All @@ -111,8 +108,8 @@ public String toString() {
+ endIndex
+ ", size="
+ logEntries.size()
+ ", serializedSize="
+ serializedSize
+ ", memorySize="
+ memorySize
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,8 @@ private boolean constructBatchFromWAL(long currentIndex, long maxIndex, Batch lo
data.buildSerializedRequests();
// construct request from wal
logBatches.addTLogEntry(
new TLogEntry(data.getSerializedRequests(), data.getSearchIndex(), true));
new TLogEntry(
data.getSerializedRequests(), data.getSearchIndex(), true, data.getMemorySize()));
}
// In the case of corrupt Data, we return true so that we can send a batch as soon as
// possible, avoiding potential duplication
Expand All @@ -577,7 +578,11 @@ private boolean constructBatchFromWAL(long currentIndex, long maxIndex, Batch lo
private void constructBatchIndexedFromConsensusRequest(
IndexedConsensusRequest request, Batch logBatches) {
logBatches.addTLogEntry(
new TLogEntry(request.getSerializedRequests(), request.getSearchIndex(), false));
new TLogEntry(
request.getSerializedRequests(),
request.getSearchIndex(),
false,
request.getMemorySize()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public SyncStatus(IndexController controller, IoTConsensusConfig config) {
*/
public synchronized void addNextBatch(Batch batch) throws InterruptedException {
while (pendingBatches.size() >= config.getReplication().getMaxPendingBatchesNum()
|| !iotConsensusMemoryManager.reserve(batch.getSerializedSize(), false)) {
|| !iotConsensusMemoryManager.reserve(batch.getMemorySize(), false)) {
wait();
}
pendingBatches.add(batch);
Expand All @@ -64,7 +64,7 @@ public synchronized void removeBatch(Batch batch) {
while (current.isSynced()) {
controller.update(current.getEndIndex(), false);
iterator.remove();
iotConsensusMemoryManager.free(current.getSerializedSize(), false);
iotConsensusMemoryManager.free(current.getMemorySize(), false);
if (iterator.hasNext()) {
current = iterator.next();
} else {
Expand All @@ -79,7 +79,7 @@ public synchronized void removeBatch(Batch batch) {
public synchronized void free() {
long size = 0;
for (Batch pendingBatch : pendingBatches) {
size += pendingBatch.getSerializedSize();
size += pendingBatch.getMemorySize();
}
pendingBatches.clear();
controller.update(0L, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ struct TLogEntry {
1: required list<binary> data
2: required i64 searchIndex
3: required bool fromWAL
4: required i64 memorySize
}

struct TSyncLogEntriesReq {
Expand Down
Loading