Skip to content

Commit 0f76e80

Browse files
GeorryHuangApache9
authored andcommitted
HBASE-26263 [Rolling Upgrading] Persist the StoreFileTracker configurations to TableDescriptor for existing tables (#3700)
Signed-off-by: Duo Zhang <zhangduo@apache.org> Reviewed-by: Wellington Ramos Chevreuil <wchevreuil@apache.org>
1 parent 30acffe commit 0f76e80

File tree

7 files changed

+480
-0
lines changed

7 files changed

+480
-0
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -440,6 +440,11 @@ public TableDescriptorBuilder modifyColumnFamily(final ColumnFamilyDescriptor fa
440440
return this;
441441
}
442442

443+
public TableDescriptorBuilder removeValue(final String key) {
444+
desc.removeValue(key);
445+
return this;
446+
}
447+
443448
public TableDescriptorBuilder removeValue(Bytes key) {
444449
desc.removeValue(key);
445450
return this;
@@ -788,6 +793,17 @@ private static <T> Bytes toBytesOrNull(T t, Function<T, byte[]> f) {
788793
}
789794
}
790795

796+
/**
797+
* Remove metadata represented by the key from the {@link #values} map
798+
*
799+
* @param key Key whose key and value we're to remove from TableDescriptor
800+
* parameters.
801+
* @return the modifyable TD
802+
*/
803+
public ModifyableTableDescriptor removeValue(final String key) {
804+
return setValue(key, (String) null);
805+
}
806+
791807
/**
792808
* Remove metadata represented by the key from the {@link #values} map
793809
*

hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -584,3 +584,14 @@ enum ClaimReplicationQueuesState {
584584
CLAIM_REPLICATION_QUEUES_DISPATCH = 1;
585585
CLAIM_REPLICATION_QUEUES_FINISH = 2;
586586
}
587+
588+
589+
enum ModifyTableDescriptorState {
590+
MODIFY_TABLE_DESCRIPTOR_PREPARE = 1;
591+
MODIFY_TABLE_DESCRIPTOR_UPDATE = 2;
592+
}
593+
594+
message ModifyTableDescriptorStateData {
595+
required TableSchema unmodified_table_schema = 1;
596+
optional TableSchema modified_table_schema = 2;
597+
}

hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@
130130
import org.apache.hadoop.hbase.master.http.MasterStatusServlet;
131131
import org.apache.hadoop.hbase.master.janitor.CatalogJanitor;
132132
import org.apache.hadoop.hbase.master.locking.LockManager;
133+
import org.apache.hadoop.hbase.master.migrate.RollingUpgradeChore;
133134
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerFactory;
134135
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerManager;
135136
import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
@@ -355,6 +356,7 @@ public class HMaster extends HRegionServer implements MasterServices {
355356
private ExpiredMobFileCleanerChore expiredMobFileCleanerChore;
356357
private MobCompactionChore mobCompactChore;
357358
private MasterMobCompactionThread mobCompactThread;
359+
private RollingUpgradeChore rollingUpgradeChore;
358360
// used to synchronize the mobCompactionStates
359361
private final IdLock mobCompactionLock = new IdLock();
360362
// save the information of mob compactions in tables.
@@ -1212,6 +1214,9 @@ private void finishActiveMasterInitialization(MonitoredTask status)
12121214
LOG.debug("Balancer post startup initialization complete, took " + (
12131215
(EnvironmentEdgeManager.currentTime() - start) / 1000) + " seconds");
12141216
}
1217+
1218+
this.rollingUpgradeChore = new RollingUpgradeChore(this);
1219+
getChoreService().scheduleChore(rollingUpgradeChore);
12151220
}
12161221

12171222
private void createMissingCFsInMetaDuringUpgrade(
@@ -1695,6 +1700,7 @@ private void stopChores() {
16951700
shutdownChore(snapshotCleanerChore);
16961701
shutdownChore(hbckChore);
16971702
shutdownChore(regionsRecoveryChore);
1703+
shutdownChore(rollingUpgradeChore);
16981704
}
16991705
}
17001706

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.hbase.master.migrate;
20+
21+
import java.io.IOException;
22+
import java.util.ArrayList;
23+
import java.util.Iterator;
24+
import java.util.List;
25+
import java.util.Map;
26+
import java.util.concurrent.TimeUnit;
27+
import java.util.stream.Collectors;
28+
import org.apache.commons.lang3.StringUtils;
29+
import org.apache.hadoop.conf.Configuration;
30+
import org.apache.hadoop.hbase.ScheduledChore;
31+
import org.apache.hadoop.hbase.Stoppable;
32+
import org.apache.hadoop.hbase.TableDescriptors;
33+
import org.apache.hadoop.hbase.client.TableDescriptor;
34+
import org.apache.hadoop.hbase.master.MasterServices;
35+
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
36+
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
37+
import org.apache.hadoop.hbase.regionserver.storefiletracker.MigrateStoreFileTrackerProcedure;
38+
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
39+
import org.apache.yetus.audience.InterfaceAudience;
40+
import org.slf4j.Logger;
41+
import org.slf4j.LoggerFactory;
42+
43+
/**
44+
* To avoid too many migrating/upgrade threads to be submitted at the time during master
45+
* initialization, RollingUpgradeChore handles all rolling-upgrade tasks.
46+
* */
47+
@InterfaceAudience.Private
48+
public class RollingUpgradeChore extends ScheduledChore {
49+
50+
static final String ROLLING_UPGRADE_CHORE_PERIOD_SECONDS_KEY =
51+
"hbase.master.rolling.upgrade.chore.period.secs";
52+
static final int DFAULT_ROLLING_UPGRADE_CHORE_PERIOD_SECONDS = 10; // 10 seconds by default
53+
54+
static final String ROLLING_UPGRADE_CHORE_DELAY_SECONDS_KEY =
55+
"hbase.master.rolling.upgrade.chore.delay.secs";
56+
static final long DEFAULT_ROLLING_UPGRADE_CHORE_DELAY_SECONDS = 30; // 30 seconds
57+
58+
static final int CONCURRENT_PROCEDURES_COUNT = 5;
59+
60+
private final static Logger LOG = LoggerFactory.getLogger(RollingUpgradeChore.class);
61+
ProcedureExecutor<MasterProcedureEnv> procedureExecutor;
62+
private TableDescriptors tableDescriptors;
63+
private List<MigrateStoreFileTrackerProcedure> processingProcs = new ArrayList<>();
64+
65+
public RollingUpgradeChore(MasterServices masterServices) {
66+
this(masterServices.getConfiguration(), masterServices.getMasterProcedureExecutor(),
67+
masterServices.getTableDescriptors(), masterServices);
68+
}
69+
70+
private RollingUpgradeChore(Configuration conf,
71+
ProcedureExecutor<MasterProcedureEnv> procedureExecutor, TableDescriptors tableDescriptors,
72+
Stoppable stopper) {
73+
super(RollingUpgradeChore.class.getSimpleName(), stopper, conf
74+
.getInt(ROLLING_UPGRADE_CHORE_PERIOD_SECONDS_KEY,
75+
DFAULT_ROLLING_UPGRADE_CHORE_PERIOD_SECONDS), conf
76+
.getLong(ROLLING_UPGRADE_CHORE_DELAY_SECONDS_KEY,
77+
DEFAULT_ROLLING_UPGRADE_CHORE_DELAY_SECONDS),
78+
TimeUnit.SECONDS);
79+
this.procedureExecutor = procedureExecutor;
80+
this.tableDescriptors = tableDescriptors;
81+
}
82+
83+
@Override
84+
protected void chore() {
85+
if (isCompletelyMigrateSFT(CONCURRENT_PROCEDURES_COUNT)) {
86+
LOG.info("All Rolling-Upgrade tasks are complete, shutdown RollingUpgradeChore!");
87+
shutdown();
88+
}
89+
}
90+
91+
private boolean isCompletelyMigrateSFT(int concurrentCount){
92+
Iterator<MigrateStoreFileTrackerProcedure> iter = processingProcs.iterator();
93+
while(iter.hasNext()){
94+
MigrateStoreFileTrackerProcedure proc = iter.next();
95+
if(procedureExecutor.isFinished(proc.getProcId())){
96+
iter.remove();
97+
}
98+
}
99+
// No new migration procedures will be submitted until
100+
// all procedures executed last time are completed.
101+
if (!processingProcs.isEmpty()) {
102+
return false;
103+
}
104+
105+
Map<String, TableDescriptor> migrateSFTTables;
106+
try {
107+
migrateSFTTables = tableDescriptors.getAll().entrySet().stream().filter(entry -> {
108+
TableDescriptor td = entry.getValue();
109+
return StringUtils.isEmpty(td.getValue(StoreFileTrackerFactory.TRACKER_IMPL));
110+
}).limit(concurrentCount).collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
111+
} catch (IOException e) {
112+
LOG.warn("Failed to migrate StoreFileTracker", e);
113+
return false;
114+
}
115+
116+
if (migrateSFTTables.isEmpty()) {
117+
LOG.info("There is no table to migrate StoreFileTracker!");
118+
return true;
119+
}
120+
121+
for (Map.Entry<String, TableDescriptor> entry : migrateSFTTables.entrySet()) {
122+
TableDescriptor tableDescriptor = entry.getValue();
123+
MigrateStoreFileTrackerProcedure proc =
124+
new MigrateStoreFileTrackerProcedure(procedureExecutor.getEnvironment(), tableDescriptor);
125+
procedureExecutor.submitProcedure(proc);
126+
processingProcs.add(proc);
127+
}
128+
return false;
129+
}
130+
}
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.master.procedure;
19+
20+
import java.io.IOException;
21+
import java.util.Optional;
22+
import org.apache.hadoop.hbase.TableName;
23+
import org.apache.hadoop.hbase.client.TableDescriptor;
24+
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
25+
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
26+
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
27+
import org.apache.yetus.audience.InterfaceAudience;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
32+
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ModifyTableDescriptorState;
33+
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ModifyTableDescriptorStateData;
34+
35+
/**
36+
* The procedure will only update the table descriptor without reopening all the regions.
37+
* <p/>
38+
* It is usually used for migrating when upgrading, where we need to add something into the table
39+
* descriptor, such as the rs group information.
40+
*/
41+
@InterfaceAudience.Private
42+
public abstract class ModifyTableDescriptorProcedure
43+
extends AbstractStateMachineTableProcedure<ModifyTableDescriptorState> {
44+
45+
private static final Logger LOG = LoggerFactory.getLogger(ModifyTableDescriptorProcedure.class);
46+
47+
private TableDescriptor unmodifiedTableDescriptor;
48+
private TableDescriptor modifiedTableDescriptor;
49+
50+
protected ModifyTableDescriptorProcedure() {
51+
}
52+
53+
protected ModifyTableDescriptorProcedure(MasterProcedureEnv env, TableDescriptor unmodified) {
54+
super(env);
55+
this.unmodifiedTableDescriptor = unmodified;
56+
}
57+
58+
@Override
59+
public TableName getTableName() {
60+
return unmodifiedTableDescriptor.getTableName();
61+
}
62+
63+
@Override
64+
public TableOperationType getTableOperationType() {
65+
return TableOperationType.EDIT;
66+
}
67+
68+
/**
69+
* Sub class should implement this method to modify the table descriptor, such as storing the rs
70+
* group information.
71+
* <p/>
72+
* Since the migrating is asynchronouns, it is possible that users have already changed the rs
73+
* group for a table, in this case we do not need to modify the table descriptor any more, then
74+
* you could just return {@link Optional#empty()}.
75+
*/
76+
protected abstract Optional<TableDescriptor> modify(MasterProcedureEnv env,
77+
TableDescriptor current) throws IOException;
78+
79+
@Override
80+
protected Flow executeFromState(MasterProcedureEnv env, ModifyTableDescriptorState state)
81+
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
82+
try {
83+
switch (state) {
84+
case MODIFY_TABLE_DESCRIPTOR_PREPARE:
85+
Optional<TableDescriptor> modified = modify(env, unmodifiedTableDescriptor);
86+
if (modified.isPresent()) {
87+
modifiedTableDescriptor = modified.get();
88+
setNextState(ModifyTableDescriptorState.MODIFY_TABLE_DESCRIPTOR_UPDATE);
89+
return Flow.HAS_MORE_STATE;
90+
} else {
91+
// do not need to modify
92+
return Flow.NO_MORE_STATE;
93+
}
94+
case MODIFY_TABLE_DESCRIPTOR_UPDATE:
95+
env.getMasterServices().getTableDescriptors().update(modifiedTableDescriptor);
96+
return Flow.NO_MORE_STATE;
97+
default:
98+
throw new UnsupportedOperationException("unhandled state=" + state);
99+
}
100+
} catch (IOException e) {
101+
if (isRollbackSupported(state)) {
102+
setFailure("master-modify-table-descriptor", e);
103+
} else {
104+
LOG.warn("Retriable error trying to modify table descriptor={} (in state={})",
105+
getTableName(), state, e);
106+
}
107+
}
108+
return Flow.HAS_MORE_STATE;
109+
}
110+
111+
@Override
112+
protected void rollbackState(MasterProcedureEnv env, ModifyTableDescriptorState state)
113+
throws IOException, InterruptedException {
114+
if (state == ModifyTableDescriptorState.MODIFY_TABLE_DESCRIPTOR_PREPARE) {
115+
return;
116+
}
117+
throw new UnsupportedOperationException("unhandled state=" + state);
118+
}
119+
120+
@Override
121+
protected boolean isRollbackSupported(ModifyTableDescriptorState state) {
122+
return state == ModifyTableDescriptorState.MODIFY_TABLE_DESCRIPTOR_PREPARE;
123+
}
124+
125+
@Override
126+
protected ModifyTableDescriptorState getState(int stateId) {
127+
return ModifyTableDescriptorState.forNumber(stateId);
128+
}
129+
130+
@Override
131+
protected int getStateId(ModifyTableDescriptorState state) {
132+
return state.getNumber();
133+
}
134+
135+
@Override
136+
protected ModifyTableDescriptorState getInitialState() {
137+
return ModifyTableDescriptorState.MODIFY_TABLE_DESCRIPTOR_PREPARE;
138+
}
139+
140+
@Override
141+
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
142+
super.serializeStateData(serializer);
143+
ModifyTableDescriptorStateData.Builder builder = ModifyTableDescriptorStateData.newBuilder()
144+
.setUnmodifiedTableSchema(ProtobufUtil.toTableSchema(unmodifiedTableDescriptor));
145+
if (modifiedTableDescriptor != null) {
146+
builder.setModifiedTableSchema(ProtobufUtil.toTableSchema(modifiedTableDescriptor));
147+
}
148+
serializer.serialize(builder.build());
149+
}
150+
151+
@Override
152+
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
153+
super.deserializeStateData(serializer);
154+
ModifyTableDescriptorStateData data =
155+
serializer.deserialize(ModifyTableDescriptorStateData.class);
156+
unmodifiedTableDescriptor = ProtobufUtil.toTableDescriptor(data.getUnmodifiedTableSchema());
157+
if (data.hasModifiedTableSchema()) {
158+
modifiedTableDescriptor = ProtobufUtil.toTableDescriptor(data.getModifiedTableSchema());
159+
}
160+
}
161+
}

0 commit comments

Comments
 (0)