Skip to content

Commit 5223e44

Browse files
tkalkirillsergey-chugunov-1985
authored andcommitted
IGNITE-13910 Missing segment is not released - Fixes #8612.
Signed-off-by: Sergey Chugunov <sergey.chugunov@gmail.com>
1 parent 9123e97 commit 5223e44

File tree

5 files changed

+198
-15
lines changed

5 files changed

+198
-15
lines changed

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -584,18 +584,21 @@ private boolean isArchiverEnabled() {
584584
}
585585

586586
/**
587-
* Collect wal segment files from low pointer (include) to high pointer (not include) and reserve low pointer.
587+
* Collects WAL segments from the archive only if they are all present.
588+
* Will wait for the last segment to be archived if it is not.
589+
* If there are missing segments an empty collection is returned.
588590
*
589-
* @param low Low bound.
590-
* @param high High bound.
591+
* @param low Low bound (include).
592+
* @param high High bound (not include).
593+
* @return WAL segments from the archive, or an empty collection if at
594+
* least a segment between {@code low} and {@code high} is missing.
595+
* @throws IgniteCheckedException If failed.
591596
*/
592-
public Collection<File> getAndReserveWalFiles(WALPointer low, WALPointer high) throws IgniteCheckedException {
593-
final long awaitIdx = high.index() - 1;
594-
595-
segmentAware.awaitSegmentArchived(awaitIdx);
596-
597-
if (!reserve(low))
598-
throw new IgniteCheckedException("WAL archive segment has been deleted [idx=" + low.index() + "]");
597+
public Collection<File> getWalFilesFromArchive(
598+
WALPointer low,
599+
WALPointer high
600+
) throws IgniteCheckedException {
601+
segmentAware.awaitSegmentArchived(high.index() - 1);
599602

600603
List<File> res = new ArrayList<>();
601604

@@ -610,11 +613,10 @@ public Collection<File> getAndReserveWalFiles(WALPointer low, WALPointer high) t
610613
else if (fileZip.exists())
611614
res.add(fileZip);
612615
else {
613-
if (log.isInfoEnabled()) {
616+
if (log.isInfoEnabled())
614617
log.info("Segment not found: " + file.getName() + "/" + fileZip.getName());
615618

616-
log.info("Stopped iteration on idx: " + i);
617-
}
619+
res.clear();
618620

619621
break;
620622
}
@@ -1008,7 +1010,7 @@ private FileWriteHandle closeBufAndRollover(
10081010

10091011
// Segment presence check.
10101012
if (reserved && !hasIndex(start.index())) {
1011-
segmentAware.reserve(start.index());
1013+
segmentAware.release(start.index());
10121014

10131015
reserved = false;
10141016
}

modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,9 @@
307307
*/
308308
@SuppressWarnings({"UnusedReturnValue"})
309309
public abstract class IgniteUtils {
310+
/** */
311+
public static final long KB = 1024L;
312+
310313
/** */
311314
public static final long MB = 1024L * 1024;
312315

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.processors.cache.persistence.db.wal;
19+
20+
import java.io.File;
21+
import java.util.Arrays;
22+
import java.util.Collection;
23+
import java.util.concurrent.CountDownLatch;
24+
import org.apache.ignite.IgniteCheckedException;
25+
import org.apache.ignite.cluster.ClusterState;
26+
import org.apache.ignite.configuration.CacheConfiguration;
27+
import org.apache.ignite.configuration.DataRegionConfiguration;
28+
import org.apache.ignite.configuration.DataStorageConfiguration;
29+
import org.apache.ignite.configuration.IgniteConfiguration;
30+
import org.apache.ignite.internal.IgniteEx;
31+
import org.apache.ignite.internal.IgniteInternalFuture;
32+
import org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor;
33+
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
34+
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
35+
import org.apache.ignite.internal.util.typedef.internal.U;
36+
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
37+
import org.junit.Test;
38+
39+
import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
40+
import static org.apache.ignite.testframework.GridTestUtils.runAsync;
41+
42+
/**
43+
* Class for testing WAL manager.
44+
*/
45+
public class WriteAheadLogManagerSelfTest extends GridCommonAbstractTest {
46+
/** {@inheritDoc} */
47+
@Override protected void beforeTest() throws Exception {
48+
super.beforeTest();
49+
50+
stopAllGrids();
51+
cleanPersistenceDir();
52+
}
53+
54+
/** {@inheritDoc} */
55+
@Override protected void afterTest() throws Exception {
56+
super.afterTest();
57+
58+
stopAllGrids();
59+
cleanPersistenceDir();
60+
}
61+
62+
/** {@inheritDoc} */
63+
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
64+
return super.getConfiguration(igniteInstanceName)
65+
.setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME))
66+
.setDataStorageConfiguration(
67+
new DataStorageConfiguration()
68+
.setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true))
69+
);
70+
}
71+
72+
/** {@inheritDoc} */
73+
@Override protected IgniteEx startGrids(int cnt) throws Exception {
74+
IgniteEx n = super.startGrids(cnt);
75+
76+
n.cluster().state(ClusterState.ACTIVE);
77+
awaitPartitionMapExchange();
78+
79+
return n;
80+
}
81+
82+
/**
83+
* Checking the correctness of WAL segment reservation.
84+
*
85+
* @throws Exception If failed.
86+
*/
87+
@Test
88+
public void testReservation() throws Exception {
89+
IgniteEx n = startGrids(1);
90+
91+
for (int i = 0; walMgr(n).lastArchivedSegment() < 2; i++)
92+
n.cache(DEFAULT_CACHE_NAME).put(i, new byte[(int)(10 * U.KB)]);
93+
94+
forceCheckpoint();
95+
96+
assertTrue(walMgr(n).lastArchivedSegment() >= 2);
97+
assertTrue(walMgr(n).lastTruncatedSegment() == -1);
98+
99+
WALPointer segment0WalPtr = new WALPointer(0, 0, 0);
100+
assertTrue(walMgr(n).reserve(segment0WalPtr));
101+
assertTrue(walMgr(n).reserved(segment0WalPtr));
102+
103+
WALPointer segment1WalPtr = new WALPointer(1, 0, 0);
104+
105+
// Delete segment manually.
106+
FileDescriptor segment1 = Arrays.stream(walMgr(n).walArchiveFiles())
107+
.filter(fd -> fd.idx() == segment1WalPtr.index()).findAny().orElseThrow(AssertionError::new);
108+
109+
assertTrue(segment1.file().delete());
110+
111+
assertFalse(walMgr(n).reserve(segment1WalPtr));
112+
assertTrue(walMgr(n).reserved(segment1WalPtr));
113+
114+
walMgr(n).release(segment0WalPtr);
115+
assertFalse(walMgr(n).reserved(segment0WalPtr));
116+
assertFalse(walMgr(n).reserved(segment1WalPtr));
117+
118+
assertEquals(1, walMgr(n).truncate(segment1WalPtr));
119+
assertFalse(walMgr(n).reserve(segment0WalPtr));
120+
assertFalse(walMgr(n).reserve(segment1WalPtr));
121+
assertFalse(walMgr(n).reserved(segment0WalPtr));
122+
assertFalse(walMgr(n).reserved(segment1WalPtr));
123+
124+
WALPointer segmentMaxWalPtr = new WALPointer(Long.MAX_VALUE, 0, 0);
125+
assertFalse(walMgr(n).reserve(segmentMaxWalPtr));
126+
assertFalse(walMgr(n).reserved(segmentMaxWalPtr));
127+
}
128+
129+
/**
130+
* Checking the correctness of the method {@link FileWriteAheadLogManager#getWalFilesFromArchive}.
131+
*
132+
* @throws Exception If failed.
133+
*/
134+
@Test
135+
public void testGetWalFilesFromArchive() throws Exception {
136+
IgniteEx n = startGrids(1);
137+
138+
WALPointer segment0WalPtr = new WALPointer(0, 0, 0);
139+
WALPointer segment1WalPtr = new WALPointer(1, 0, 0);
140+
WALPointer segment2WalPtr = new WALPointer(2, 0, 0);
141+
142+
CountDownLatch startLatch = new CountDownLatch(1);
143+
IgniteInternalFuture<Collection<File>> fut = runAsync(() -> {
144+
startLatch.countDown();
145+
146+
return walMgr(n).getWalFilesFromArchive(segment0WalPtr, segment2WalPtr);
147+
});
148+
149+
startLatch.await();
150+
151+
// Check that the expected archiving segment 1.
152+
assertThrows(log, () -> fut.get(1_000), IgniteCheckedException.class, null);
153+
154+
for (int i = 0; walMgr(n).lastArchivedSegment() < 2; i++)
155+
n.cache(DEFAULT_CACHE_NAME).put(i, new byte[(int)(10 * U.KB)]);
156+
157+
assertEquals(2, fut.get(getTestTimeout()).size());
158+
159+
forceCheckpoint();
160+
161+
assertEquals(1, walMgr(n).truncate(segment1WalPtr));
162+
assertEquals(0, walMgr(n).getWalFilesFromArchive(segment0WalPtr, segment2WalPtr).size());
163+
assertEquals(1, walMgr(n).getWalFilesFromArchive(segment1WalPtr, segment2WalPtr).size());
164+
}
165+
166+
/**
167+
* Getting WAL manager.
168+
*
169+
* @param n Node.
170+
* @return WAL manager.
171+
*/
172+
private FileWriteAheadLogManager walMgr(IgniteEx n) {
173+
return (FileWriteAheadLogManager)n.context().cache().context().wal();
174+
}
175+
}

modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -827,7 +827,7 @@ protected IgniteEx startGrid() throws Exception {
827827
* @return First started grid.
828828
* @throws Exception If failed.
829829
*/
830-
protected final IgniteEx startGrids(int cnt) throws Exception {
830+
protected IgniteEx startGrids(int cnt) throws Exception {
831831
assert cnt > 0;
832832

833833
IgniteEx ignite = null;

modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@
8282
import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalDeletionArchiveFsyncTest;
8383
import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalDeletionArchiveLogOnlyTest;
8484
import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalRolloverTypesTest;
85+
import org.apache.ignite.internal.processors.cache.persistence.db.wal.WriteAheadLogManagerSelfTest;
8586
import org.apache.ignite.internal.processors.cache.persistence.db.wal.crc.IgniteDataIntegrityTests;
8687
import org.apache.ignite.internal.processors.cache.persistence.db.wal.crc.IgniteFsyncReplayWalIteratorInvalidCrcTest;
8788
import org.apache.ignite.internal.processors.cache.persistence.db.wal.crc.IgnitePureJavaCrcCompatibility;
@@ -237,5 +238,7 @@ public static void addRealPageStoreTests(List<Class<?>> suite, Collection<Class>
237238
GridTestUtils.addTestIfNeeded(suite, HistoricalRebalanceHeuristicsTest.class, ignoredTests);
238239

239240
GridTestUtils.addTestIfNeeded(suite, IgniteLocalWalSizeTest.class, ignoredTests);
241+
242+
GridTestUtils.addTestIfNeeded(suite, WriteAheadLogManagerSelfTest.class, ignoredTests);
240243
}
241244
}

0 commit comments

Comments
 (0)