Skip to content

Commit b369668

Browse files
committed
HBASE-27543 May be miss data when use mob
1 parent 382681e commit b369668

File tree

2 files changed

+97
-7
lines changed

2 files changed

+97
-7
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -128,13 +128,13 @@ S createWriter(InternalScanner scanner, FileDetails fd, boolean shouldDropBehind
128128
}
129129

130130
/** The sole reason this class exists is that java has no ref/out/pointer parameters. */
131-
protected static class FileDetails {
131+
public static class FileDetails {
132132
/** Maximum key count after compaction (for blooms) */
133133
public long maxKeyCount = 0;
134134
/** Earliest put timestamp if major compaction */
135135
public long earliestPutTs = HConstants.LATEST_TIMESTAMP;
136136
/** Latest put timestamp */
137-
public long latestPutTs = HConstants.LATEST_TIMESTAMP;
137+
public long latestPutTs = 0;
138138
/** The last key in the files we're compacting. */
139139
public long maxSeqId = 0;
140140
/** Latest memstore read point found in any of the involved files */
@@ -154,11 +154,12 @@ protected static class FileDetails {
154154
* @parma major If major compaction
155155
* @return The result.
156156
*/
157-
private FileDetails getFileDetails(Collection<HStoreFile> filesToCompact, boolean allFiles,
158-
boolean major) throws IOException {
157+
static FileDetails getFileDetails(Collection<HStoreFile> filesToCompact, long keepSeqIdPeriod,
158+
boolean allFiles, boolean major, Compression.Algorithm majorCompactionCompression,
159+
Compression.Algorithm minorCompactionCompression) throws IOException {
159160
FileDetails fd = new FileDetails();
160161
long oldestHFileTimestampToKeepMVCC =
161-
EnvironmentEdgeManager.currentTime() - (1000L * 60 * 60 * 24 * this.keepSeqIdPeriod);
162+
EnvironmentEdgeManager.currentTime() - (1000L * 60 * 60 * 24 * keepSeqIdPeriod);
162163

163164
for (HStoreFile file : filesToCompact) {
164165
if (allFiles && (file.getModificationTimestamp() < oldestHFileTimestampToKeepMVCC)) {
@@ -216,8 +217,9 @@ private FileDetails getFileDetails(Collection<HStoreFile> filesToCompact, boolea
216217
}
217218
}
218219
tmp = fileInfo.get(TIMERANGE_KEY);
219-
fd.latestPutTs =
220+
long latestPutTs =
220221
tmp == null ? HConstants.LATEST_TIMESTAMP : TimeRangeTracker.parseFrom(tmp).getMax();
222+
fd.latestPutTs = Math.max(fd.latestPutTs, latestPutTs);
221223
LOG.debug(
222224
"Compacting {}, keycount={}, bloomtype={}, size={}, "
223225
+ "encoding={}, compression={}, seqNum={}{}",
@@ -328,7 +330,9 @@ private InternalScanner postCompactScannerOpen(CompactionRequestImpl request, Sc
328330
protected final List<Path> compact(final CompactionRequestImpl request,
329331
InternalScannerFactory scannerFactory, CellSinkFactory<T> sinkFactory,
330332
ThroughputController throughputController, User user) throws IOException {
331-
FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles(), request.isMajor());
333+
FileDetails fd =
334+
getFileDetails(request.getFiles(), keepSeqIdPeriod, request.isAllFiles(), request.isMajor(),
335+
majorCompactionCompression, minorCompactionCompression);
332336

333337
// Find the smallest read point across all the Scanners.
334338
long smallestReadPoint = getSmallestReadPoint();
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.regionserver.compactions;
19+
20+
import static org.junit.Assert.assertEquals;
21+
22+
import java.io.IOException;
23+
import java.util.ArrayList;
24+
import java.util.HashMap;
25+
import java.util.List;
26+
import java.util.Map;
27+
28+
import org.apache.hadoop.hbase.HConstants;
29+
import org.apache.hadoop.hbase.io.compress.Compression;
30+
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
31+
import org.apache.hadoop.hbase.io.hfile.HFile;
32+
import org.apache.hadoop.hbase.regionserver.BloomType;
33+
import org.apache.hadoop.hbase.regionserver.HStoreFile;
34+
import org.apache.hadoop.hbase.regionserver.StoreFileReader;
35+
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
36+
import org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails;
37+
import org.junit.Test;
38+
import org.mockito.Mockito;
39+
40+
public class TestFileDetails {
41+
42+
@Test public void testLatestPutTs() throws IOException {
43+
List<HStoreFile> sfs = new ArrayList<>(3);
44+
long curTS = System.currentTimeMillis();
45+
Map<byte[], byte[]> fileInfo = new HashMap<>();
46+
TimeRangeTracker tracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC, 0, 3000);
47+
fileInfo.put(HStoreFile.TIMERANGE_KEY, TimeRangeTracker.toByteArray(tracker));
48+
sfs.add(createStoreFile(curTS, fileInfo));
49+
tracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC, 0, 2000);
50+
fileInfo.put(HStoreFile.TIMERANGE_KEY, TimeRangeTracker.toByteArray(tracker));
51+
sfs.add(createStoreFile(curTS, fileInfo));
52+
tracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC, 0, 1000);
53+
fileInfo.put(HStoreFile.TIMERANGE_KEY, TimeRangeTracker.toByteArray(tracker));
54+
sfs.add(createStoreFile(curTS, fileInfo));
55+
56+
FileDetails fd = Compactor.getFileDetails(sfs, HConstants.MIN_KEEP_SEQID_PERIOD, false, false,
57+
Compression.Algorithm.NONE, Compression.Algorithm.NONE);
58+
assertEquals(3000, fd.latestPutTs);
59+
60+
// when TIMERANGE_KEY is null
61+
fileInfo.clear();
62+
sfs.add(createStoreFile(curTS, fileInfo));
63+
fd = Compactor.getFileDetails(sfs, HConstants.MIN_KEEP_SEQID_PERIOD, false, false,
64+
Compression.Algorithm.NONE, Compression.Algorithm.NONE);
65+
assertEquals(HConstants.LATEST_TIMESTAMP, fd.latestPutTs);
66+
}
67+
68+
private static HStoreFile createStoreFile(long curTS, Map<byte[], byte[]> fileInfo)
69+
throws IOException {
70+
HStoreFile sf = Mockito.mock(HStoreFile.class);
71+
Mockito.doReturn(curTS).when(sf).getModificationTimestamp();
72+
Mockito.doReturn(0L).when(sf).getMaxSequenceId();
73+
StoreFileReader reader = Mockito.mock(StoreFileReader.class);
74+
Mockito.doReturn(0L).when(reader).getEntries();
75+
Mockito.doReturn(new HashMap<>(fileInfo)).when(reader).loadFileInfo();
76+
Mockito.doReturn(0L).when(reader).length();
77+
Mockito.doReturn(false).when(reader).isBulkLoaded();
78+
Mockito.doReturn(BloomType.NONE).when(reader).getBloomFilterType();
79+
HFile.Reader hfr = Mockito.mock(HFile.Reader.class);
80+
Mockito.doReturn(DataBlockEncoding.NONE).when(hfr).getDataBlockEncoding();
81+
Mockito.doReturn(hfr).when(reader).getHFileReader();
82+
Mockito.doReturn(reader).when(sf).getReader();
83+
return sf;
84+
}
85+
86+
}

0 commit comments

Comments
 (0)