Skip to content

Commit 826e54c

Browse files
committed
HBASE-25988 Store the store file list by a file (#3578)
Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
1 parent 04e1980 commit 826e54c

File tree

9 files changed

+451
-19
lines changed

9 files changed

+451
-19
lines changed
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
syntax = "proto2";
19+
// This file contains protocol buffers that are used for store file tracker.
20+
package hbase.pb;
21+
22+
option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated";
23+
option java_outer_classname = "StoreFileTrackerProtos";
24+
option java_generic_services = true;
25+
option java_generate_equals_and_hash = true;
26+
option optimize_for = SPEED;
27+
28+
message StoreFileEntry {
29+
required string name = 1;
30+
required uint64 size = 2;
31+
}
32+
33+
message StoreFileList {
34+
required uint64 timestamp = 1;
35+
repeated StoreFileEntry store_file = 2;
36+
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreContext.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.function.Supplier;
2323
import org.apache.hadoop.fs.Path;
2424
import org.apache.hadoop.hbase.CellComparator;
25+
import org.apache.hadoop.hbase.TableName;
2526
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
2627
import org.apache.hadoop.hbase.client.RegionInfo;
2728
import org.apache.hadoop.hbase.io.HeapSize;
@@ -109,6 +110,10 @@ public RegionCoprocessorHost getCoprocessorHost() {
109110
return coprocessorHost;
110111
}
111112

113+
public TableName getTableName() {
114+
return getRegionInfo().getTable();
115+
}
116+
112117
public RegionInfo getRegionInfo() {
113118
return regionFileSystem.getRegionInfo();
114119
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -173,9 +173,9 @@ public StoreFlusher getStoreFlusher() {
173173
return this.storeFlusher;
174174
}
175175

176-
private StoreFileTracker createStoreFileTracker(HStore store) {
177-
return StoreFileTrackerFactory.create(store.conf, store.getRegionInfo().getTable(),
178-
store.isPrimaryReplicaStore(), store.getStoreContext());
176+
private StoreFileTracker createStoreFileTracker(Configuration conf, HStore store) {
177+
return StoreFileTrackerFactory.create(conf, store.isPrimaryReplicaStore(),
178+
store.getStoreContext());
179179
}
180180

181181
/**
@@ -206,7 +206,7 @@ protected final void createComponentsOnce(Configuration conf, HStore store,
206206
this.ctx = store.getStoreContext();
207207
this.coprocessorHost = store.getHRegion().getCoprocessorHost();
208208
this.openStoreFileThreadPoolCreator = store.getHRegion()::getStoreFileOpenAndCloseThreadPool;
209-
this.storeFileTracker = createStoreFileTracker(store);
209+
this.storeFileTracker = createStoreFileTracker(conf, store);
210210
assert compactor != null && compactionPolicy != null && storeFileManager != null &&
211211
storeFlusher != null && storeFileTracker != null;
212212
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/DefaultStoreFileTracker.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.util.Collection;
2222
import java.util.List;
2323
import org.apache.hadoop.conf.Configuration;
24-
import org.apache.hadoop.hbase.TableName;
2524
import org.apache.hadoop.hbase.regionserver.StoreContext;
2625
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
2726
import org.apache.yetus.audience.InterfaceAudience;
@@ -33,9 +32,9 @@
3332
@InterfaceAudience.Private
3433
class DefaultStoreFileTracker extends StoreFileTrackerBase {
3534

36-
public DefaultStoreFileTracker(Configuration conf, TableName tableName, boolean isPrimaryReplica,
35+
public DefaultStoreFileTracker(Configuration conf, boolean isPrimaryReplica,
3736
StoreContext ctx) {
38-
super(conf, tableName, isPrimaryReplica, ctx);
37+
super(conf, isPrimaryReplica, ctx);
3938
}
4039

4140
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.regionserver.storefiletracker;
19+
20+
import java.io.IOException;
21+
import java.util.ArrayList;
22+
import java.util.Collection;
23+
import java.util.Collections;
24+
import java.util.HashMap;
25+
import java.util.List;
26+
import java.util.Map;
27+
import java.util.Set;
28+
import java.util.stream.Collectors;
29+
import org.apache.hadoop.conf.Configuration;
30+
import org.apache.hadoop.fs.FileSystem;
31+
import org.apache.hadoop.fs.Path;
32+
import org.apache.hadoop.hbase.regionserver.StoreContext;
33+
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
34+
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
35+
import org.apache.yetus.audience.InterfaceAudience;
36+
37+
import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileEntry;
38+
import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileList;
39+
40+
/**
41+
* A file based store file tracker.
42+
* <p/>
43+
* For this tracking way, the store file list will be persistent into a file, so we can write the
44+
* new store files directly to the final data directory, as we will not load the broken files. This
45+
* will greatly reduce the time for flush and compaction on some object storages as a rename is
46+
* actual a copy on them. And it also avoid listing when loading store file list, which could also
47+
* speed up the loading of store files as listing is also not a fast operation on most object
48+
* storages.
49+
*/
50+
@InterfaceAudience.Private
51+
public class FileBasedStoreFileTracker extends StoreFileTrackerBase {
52+
53+
private final StoreFileListFile backedFile;
54+
55+
private final Map<String, StoreFileInfo> storefiles = new HashMap<>();
56+
57+
public FileBasedStoreFileTracker(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) {
58+
super(conf, isPrimaryReplica, ctx);
59+
backedFile = new StoreFileListFile(ctx);
60+
}
61+
62+
@Override
63+
public List<StoreFileInfo> load() throws IOException {
64+
StoreFileList list = backedFile.load();
65+
if (list == null) {
66+
return Collections.emptyList();
67+
}
68+
FileSystem fs = ctx.getRegionFileSystem().getFileSystem();
69+
List<StoreFileInfo> infos = new ArrayList<>();
70+
for (StoreFileEntry entry : list.getStoreFileList()) {
71+
infos.add(ServerRegionReplicaUtil.getStoreFileInfo(conf, fs, ctx.getRegionInfo(),
72+
ctx.getRegionFileSystem().getRegionInfoForFS(), ctx.getFamily().getNameAsString(),
73+
new Path(ctx.getFamilyStoreDirectoryPath(), entry.getName())));
74+
}
75+
// In general, for primary replica, the load method should only be called once when
76+
// initialization, so we do not need synchronized here. And for secondary replicas, though the
77+
// load method could be called multiple times, we will never call other methods so no
78+
// synchronized is also fine.
79+
// But we have a refreshStoreFiles method in the Region interface, which can be called by CPs,
80+
// and we have a RefreshHFilesEndpoint example to expose the refreshStoreFiles method as RPC, so
81+
// for safety, let's still keep the synchronized here.
82+
synchronized (storefiles) {
83+
for (StoreFileInfo info : infos) {
84+
storefiles.put(info.getPath().getName(), info);
85+
}
86+
}
87+
return infos;
88+
}
89+
90+
@Override
91+
protected boolean requireWritingToTmpDirFirst() {
92+
return false;
93+
}
94+
95+
private StoreFileEntry toStoreFileEntry(StoreFileInfo info) {
96+
return StoreFileEntry.newBuilder().setName(info.getPath().getName()).setSize(info.getSize())
97+
.build();
98+
}
99+
100+
@Override
101+
protected void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException {
102+
synchronized (storefiles) {
103+
StoreFileList.Builder builder = StoreFileList.newBuilder();
104+
for (StoreFileInfo info : storefiles.values()) {
105+
builder.addStoreFile(toStoreFileEntry(info));
106+
}
107+
for (StoreFileInfo info : newFiles) {
108+
builder.addStoreFile(toStoreFileEntry(info));
109+
}
110+
backedFile.update(builder);
111+
for (StoreFileInfo info : newFiles) {
112+
storefiles.put(info.getPath().getName(), info);
113+
}
114+
}
115+
}
116+
117+
@Override
118+
protected void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,
119+
Collection<StoreFileInfo> newFiles) throws IOException {
120+
Set<String> compactedFileNames =
121+
compactedFiles.stream().map(info -> info.getPath().getName()).collect(Collectors.toSet());
122+
synchronized (storefiles) {
123+
StoreFileList.Builder builder = StoreFileList.newBuilder();
124+
storefiles.forEach((name, info) -> {
125+
if (compactedFileNames.contains(name)) {
126+
return;
127+
}
128+
builder.addStoreFile(toStoreFileEntry(info));
129+
});
130+
for (StoreFileInfo info : newFiles) {
131+
builder.addStoreFile(toStoreFileEntry(info));
132+
}
133+
backedFile.update(builder);
134+
for (String name : compactedFileNames) {
135+
storefiles.remove(name);
136+
}
137+
for (StoreFileInfo info : newFiles) {
138+
storefiles.put(info.getPath().getName(), info);
139+
}
140+
}
141+
}
142+
}
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.regionserver.storefiletracker;
19+
20+
import java.io.FileNotFoundException;
21+
import java.io.IOException;
22+
import org.apache.hadoop.fs.FSDataInputStream;
23+
import org.apache.hadoop.fs.FSDataOutputStream;
24+
import org.apache.hadoop.fs.FileSystem;
25+
import org.apache.hadoop.fs.Path;
26+
import org.apache.hadoop.hbase.regionserver.StoreContext;
27+
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
28+
import org.apache.yetus.audience.InterfaceAudience;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
32+
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
33+
import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
34+
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
35+
36+
import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileList;
37+
38+
/**
39+
* To fully avoid listing, here we use two files for tracking. When loading, we will try to read
40+
* both the two files, if only one exists, we will trust this one, if both exist, we will compare
41+
* the timestamp to see which one is newer and trust that one. And we will record in memory that
42+
* which one is trusted by us, and when we need to update the store file list, we will write to the
43+
* other file.
44+
* <p/>
45+
* So in this way, we could avoid listing when we want to load the store file list file.
46+
*/
47+
@InterfaceAudience.Private
48+
class StoreFileListFile {
49+
50+
private static final Logger LOG = LoggerFactory.getLogger(StoreFileListFile.class);
51+
52+
private static final String TRACK_FILE_DIR = ".filelist";
53+
54+
private static final String TRACK_FILE = "f1";
55+
56+
private static final String TRACK_FILE_ROTATE = "f2";
57+
58+
private final StoreContext ctx;
59+
60+
private final Path trackFileDir;
61+
62+
private final Path[] trackFiles = new Path[2];
63+
64+
// this is used to make sure that we do not go backwards
65+
private long prevTimestamp = -1;
66+
67+
private int nextTrackFile = -1;
68+
69+
StoreFileListFile(StoreContext ctx) {
70+
this.ctx = ctx;
71+
trackFileDir = new Path(ctx.getFamilyStoreDirectoryPath(), TRACK_FILE_DIR);
72+
trackFiles[0] = new Path(trackFileDir, TRACK_FILE);
73+
trackFiles[1] = new Path(trackFileDir, TRACK_FILE_ROTATE);
74+
}
75+
76+
private StoreFileList load(Path path) throws IOException {
77+
FileSystem fs = ctx.getRegionFileSystem().getFileSystem();
78+
byte[] bytes;
79+
try (FSDataInputStream in = fs.open(path)) {
80+
bytes = ByteStreams.toByteArray(in);
81+
}
82+
// Read all the bytes and then parse it, so we will only throw InvalidProtocolBufferException
83+
// here. This is very important for upper layer to determine whether this is the normal case,
84+
// where the file does not exist or is incomplete. If there is another type of exception, the
85+
// upper layer should throw it out instead of just ignoring it, otherwise it will lead to data
86+
// loss.
87+
return StoreFileList.parseFrom(bytes);
88+
}
89+
90+
private int select(StoreFileList[] lists) {
91+
if (lists[0] == null) {
92+
return 1;
93+
}
94+
if (lists[1] == null) {
95+
return 0;
96+
}
97+
return lists[0].getTimestamp() >= lists[1].getTimestamp() ? 0 : 1;
98+
}
99+
100+
StoreFileList load() throws IOException {
101+
StoreFileList[] lists = new StoreFileList[2];
102+
for (int i = 0; i < 2; i++) {
103+
try {
104+
lists[i] = load(trackFiles[i]);
105+
} catch (FileNotFoundException | InvalidProtocolBufferException e) {
106+
// this is normal case, so use info and do not log stacktrace
107+
LOG.info("Failed to load track file {}: {}", trackFiles[i], e);
108+
}
109+
}
110+
int winnerIndex = select(lists);
111+
if (lists[winnerIndex] != null) {
112+
nextTrackFile = 1 - winnerIndex;
113+
prevTimestamp = lists[winnerIndex].getTimestamp();
114+
} else {
115+
nextTrackFile = 0;
116+
}
117+
return lists[winnerIndex];
118+
}
119+
120+
/**
121+
* We will set the timestamp in this method so just pass the builder in
122+
*/
123+
void update(StoreFileList.Builder builder) throws IOException {
124+
Preconditions.checkState(nextTrackFile >= 0, "should call load first before calling update");
125+
FileSystem fs = ctx.getRegionFileSystem().getFileSystem();
126+
long timestamp = Math.max(prevTimestamp + 1, EnvironmentEdgeManager.currentTime());
127+
try (FSDataOutputStream out = fs.create(trackFiles[nextTrackFile], true)) {
128+
builder.setTimestamp(timestamp).build().writeTo(out);
129+
}
130+
// record timestamp
131+
prevTimestamp = timestamp;
132+
// rotate the file
133+
nextTrackFile = 1 - nextTrackFile;
134+
try {
135+
fs.delete(trackFiles[nextTrackFile], false);
136+
} catch (IOException e) {
137+
// we will create new file with overwrite = true, so not a big deal here, only for speed up
138+
// loading as we do not need to read this file when loading(we will hit FileNotFoundException)
139+
LOG.debug("failed to delete old track file {}, not a big deal, just ignore", e);
140+
}
141+
}
142+
}

0 commit comments

Comments
 (0)