Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -355,9 +355,10 @@ private void initIfNeeded(

grpStates = remap(stateRec);
}
else
initEx = new IgniteCheckedException(
else {
throw new IgniteCheckedException(
"Failed to find checkpoint record at the given WAL pointer: " + ptr);
}
}
catch (IgniteCheckedException e) {
initEx = e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.pagemem.wal.record.CacheState;
import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.util.lang.IgniteThrowableBiPredicate;
Expand Down Expand Up @@ -195,9 +196,10 @@ public Collection<Long> checkpoints() {
* finished yet.
*
* @param entry Entry to add.
* @param cacheStates Cache states map.
*/
public void addCheckpoint(CheckpointEntry entry) {
addCpToEarliestCpMap(entry);
public void addCheckpoint(CheckpointEntry entry, Map<Integer, CacheState> cacheStates) {
addCpCacheStatesToEarliestCpMap(entry, cacheStates);

histMap.put(entry.timestamp(), entry);
}
Expand Down Expand Up @@ -232,7 +234,7 @@ private void updateEarliestCpMap(CheckpointEntry entry) {
iter.remove();
}

addCpToEarliestCpMap(entry);
addCpGroupStatesToEarliestCpMap(entry, states);
}
catch (IgniteCheckedException ex) {
U.warn(log, "Failed to process checkpoint: " + (entry != null ? entry : "none"), ex);
Expand Down Expand Up @@ -261,31 +263,56 @@ public CheckpointEntry lastCheckpointMarkingAsInapplicable(Integer grpId) {
* Add last checkpoint to map of the earliest checkpoints.
*
* @param entry Checkpoint entry.
* @param cacheStates Cache states map.
*/
private void addCpToEarliestCpMap(CheckpointEntry entry) {
try {
Map<Integer, CheckpointEntry.GroupState> states = entry.groupState(wal);

for (Integer grpId : states.keySet()) {
CheckpointEntry.GroupState grpState = states.get(grpId);
private void addCpCacheStatesToEarliestCpMap(CheckpointEntry entry, Map<Integer, CacheState> cacheStates) {
for (Integer grpId : cacheStates.keySet()) {
CacheState cacheState = cacheStates.get(grpId);

for (int pIdx = 0; pIdx < grpState.size(); pIdx++) {
int part = grpState.getPartitionByIndex(pIdx);
for (int pIdx = 0; pIdx < cacheState.size(); pIdx++) {
int part = cacheState.partitionByIndex(pIdx);

GroupPartitionId grpPartKey = new GroupPartitionId(grpId, part);
GroupPartitionId grpPartKey = new GroupPartitionId(grpId, part);

if (!earliestCp.containsKey(grpPartKey))
earliestCp.put(grpPartKey, entry);
}
addPartitionToEarliestCheckpoints(grpPartKey, entry);
}
}
catch (IgniteCheckedException ex) {
U.warn(log, "Failed to process checkpoint: " + (entry != null ? entry : "none"), ex);
}

earliestCp.clear();
/**
* Add last checkpoint to map of the earliest checkpoints.
*
* @param entry Checkpoint entry.
* @param cacheGrpStates Group states map.
*/
private void addCpGroupStatesToEarliestCpMap(
CheckpointEntry entry,
Map<Integer, CheckpointEntry.GroupState> cacheGrpStates
) {
for (Integer grpId : cacheGrpStates.keySet()) {
CheckpointEntry.GroupState grpState = cacheGrpStates.get(grpId);

for (int pIdx = 0; pIdx < grpState.size(); pIdx++) {
int part = grpState.getPartitionByIndex(pIdx);

GroupPartitionId grpPartKey = new GroupPartitionId(grpId, part);

addPartitionToEarliestCheckpoints(grpPartKey, entry);
}
}
}

/**
* Add entry to earliest checkpoint map. Ignore is such key is already present.
*
* @param grpPartKey Key that consists of cache group id and partition index.
* @param entry Checkpoint entry.
*/
private void addPartitionToEarliestCheckpoints(GroupPartitionId grpPartKey, CheckpointEntry entry) {
if (!earliestCp.containsKey(grpPartKey))
earliestCp.put(grpPartKey, entry);
}

/**
* @return {@code true} if there is space for next checkpoint.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,9 +328,7 @@ private CheckpointEntry createCheckPointEntry(

Map<Integer, CacheState> cacheGrpStates = null;

// Do not hold groups state in-memory if there is no space in the checkpoint history to prevent possible OOM.
// In this case the actual group states will be readed from WAL by demand.
if (rec != null && cpHistory.hasSpace())
if (rec != null)
cacheGrpStates = rec.cacheGroupStates();

return new CheckpointEntry(cpTs, ptr, cpId, cacheGrpStates);
Expand Down Expand Up @@ -426,7 +424,7 @@ public CheckpointEntry writeCheckpointEntry(
);

if (type == CheckpointEntryType.START)
cpHistory.addCheckpoint(entry);
cpHistory.addCheckpoint(entry, rec.cacheGroupStates());

writeCheckpointEntry(tmpWriteBuf, entry, type, skipSync);

Expand Down