Skip to content

HBASE-26263 [Rolling Upgrading] Persist the StoreFileTracker configur… #3700

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Nov 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
d75b0ce
HBASE-26064 Introduce a StoreFileTracker to abstract the store file t…
Apache9 Jul 29, 2021
cab6140
HBASE-25988 Store the store file list by a file (#3578)
Apache9 Aug 26, 2021
708b7c1
HBASE-26079 Use StoreFileTracker when splitting and merging (#3617)
wchevreuil Sep 8, 2021
41ed7e2
HBASE-26224 Introduce a MigrationStoreFileTracker to support migratin…
Apache9 Sep 9, 2021
f2addb1
HBASE-26246 Persist the StoreFileTracker configurations to TableDescr…
wchevreuil Sep 12, 2021
5c35744
HBASE-26248 Should find a suitable way to let users specify the store…
Apache9 Sep 14, 2021
767a3e1
HBASE-26264 Add more checks to prevent misconfiguration on store file…
Apache9 Sep 15, 2021
be5a148
HBASE-26280 Use store file tracker when snapshoting (#3685)
Apache9 Sep 17, 2021
d3556bf
HBASE-26326 CreateTableProcedure fails when FileBasedStoreFileTracker…
wchevreuil Oct 13, 2021
36a4846
HBASE-26263 [Rolling Upgrading] Persist the StoreFileTracker configur…
GeorryHuang Sep 27, 2021
b633a52
modification
GeorryHuang Sep 28, 2021
d2fece5
typo
GeorryHuang Oct 19, 2021
de2d050
limit the count of table
GeorryHuang Oct 19, 2021
e1ede25
Merge branch 'HBASE-26067' into HBASE-26263
GeorryHuang Oct 22, 2021
a2bed30
Update MigrationStoreFileTracker.java
GeorryHuang Oct 22, 2021
144a29d
Modify according to comments
GeorryHuang Nov 2, 2021
3d05b10
Remove empty line
GeorryHuang Nov 2, 2021
5dfd1c3
Add unit to conf string
GeorryHuang Nov 3, 2021
ffdff2f
Added UT
GeorryHuang Nov 4, 2021
f84ea4a
Fixed Checkstyle
GeorryHuang Nov 5, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@
import org.apache.hadoop.hbase.master.http.MasterStatusServlet;
import org.apache.hadoop.hbase.master.janitor.CatalogJanitor;
import org.apache.hadoop.hbase.master.locking.LockManager;
import org.apache.hadoop.hbase.master.migrate.RollingUpgradeChore;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerFactory;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerManager;
import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
Expand Down Expand Up @@ -376,6 +377,7 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
private ReplicationBarrierCleaner replicationBarrierCleaner;
private MobFileCleanerChore mobFileCleanerChore;
private MobFileCompactionChore mobFileCompactionChore;
private RollingUpgradeChore rollingUpgradeChore;
// used to synchronize the mobCompactionStates
private final IdLock mobCompactionLock = new IdLock();
// save the information of mob compactions in tables.
Expand Down Expand Up @@ -1222,6 +1224,9 @@ private void finishActiveMasterInitialization(MonitoredTask status) throws IOExc
LOG.debug("Balancer post startup initialization complete, took " + (
(EnvironmentEdgeManager.currentTime() - start) / 1000) + " seconds");
}

this.rollingUpgradeChore = new RollingUpgradeChore(this);
getChoreService().scheduleChore(rollingUpgradeChore);
}

private void createMissingCFsInMetaDuringUpgrade(
Expand Down Expand Up @@ -1713,6 +1718,7 @@ protected void stopChores() {
shutdownChore(snapshotCleanerChore);
shutdownChore(hbckChore);
shutdownChore(regionsRecoveryChore);
shutdownChore(rollingUpgradeChore);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hbase.master.migrate;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.regionserver.storefiletracker.MigrateStoreFileTrackerProcedure;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* To avoid too many migrating/upgrade threads to be submitted at the time during master
* initialization, RollingUpgradeChore handles all rolling-upgrade tasks.
* */
@InterfaceAudience.Private
public class RollingUpgradeChore extends ScheduledChore {

static final String ROLLING_UPGRADE_CHORE_PERIOD_SECONDS_KEY =
"hbase.master.rolling.upgrade.chore.period.secs";
static final int DFAULT_ROLLING_UPGRADE_CHORE_PERIOD_SECONDS = 10; // 10 seconds by default

static final String ROLLING_UPGRADE_CHORE_DELAY_SECONDS_KEY =
"hbase.master.rolling.upgrade.chore.delay.secs";
static final long DEFAULT_ROLLING_UPGRADE_CHORE_DELAY_SECONDS = 30; // 30 seconds

static final int CONCURRENT_PROCEDURES_COUNT = 5;

private final static Logger LOG = LoggerFactory.getLogger(RollingUpgradeChore.class);
ProcedureExecutor<MasterProcedureEnv> procedureExecutor;
private TableDescriptors tableDescriptors;
private List<MigrateStoreFileTrackerProcedure> processingProcs = new ArrayList<>();

public RollingUpgradeChore(MasterServices masterServices) {
this(masterServices.getConfiguration(), masterServices.getMasterProcedureExecutor(),
masterServices.getTableDescriptors(), masterServices);
}

private RollingUpgradeChore(Configuration conf,
ProcedureExecutor<MasterProcedureEnv> procedureExecutor, TableDescriptors tableDescriptors,
Stoppable stopper) {
super(RollingUpgradeChore.class.getSimpleName(), stopper, conf
.getInt(ROLLING_UPGRADE_CHORE_PERIOD_SECONDS_KEY,
DFAULT_ROLLING_UPGRADE_CHORE_PERIOD_SECONDS), conf
.getLong(ROLLING_UPGRADE_CHORE_DELAY_SECONDS_KEY,
DEFAULT_ROLLING_UPGRADE_CHORE_DELAY_SECONDS),
TimeUnit.SECONDS);
this.procedureExecutor = procedureExecutor;
this.tableDescriptors = tableDescriptors;
}

@Override
protected void chore() {
if (isCompletelyMigrateSFT(CONCURRENT_PROCEDURES_COUNT)) {
LOG.info("All Rolling-Upgrade tasks are complete, shutdown RollingUpgradeChore!");
shutdown();
}
}

private boolean isCompletelyMigrateSFT(int concurrentCount){
Iterator<MigrateStoreFileTrackerProcedure> iter = processingProcs.iterator();
while(iter.hasNext()){
MigrateStoreFileTrackerProcedure proc = iter.next();
if(procedureExecutor.isFinished(proc.getProcId())){
iter.remove();
}
}
// No new migration procedures will be submitted until
// all procedures executed last time are completed.
if (!processingProcs.isEmpty()) {
return false;
}

Map<String, TableDescriptor> migrateSFTTables;
try {
migrateSFTTables = tableDescriptors.getAll().entrySet().stream().filter(entry -> {
TableDescriptor td = entry.getValue();
return StringUtils.isEmpty(td.getValue(StoreFileTrackerFactory.TRACKER_IMPL));
}).limit(concurrentCount).collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
} catch (IOException e) {
LOG.warn("Failed to migrate StoreFileTracker", e);
return false;
}

if (migrateSFTTables.isEmpty()) {
LOG.info("There is no table to migrate StoreFileTracker!");
return true;
}

for (Map.Entry<String, TableDescriptor> entry : migrateSFTTables.entrySet()) {
TableDescriptor tableDescriptor = entry.getValue();
MigrateStoreFileTrackerProcedure proc =
new MigrateStoreFileTrackerProcedure(procedureExecutor.getEnvironment(), tableDescriptor);
procedureExecutor.submitProcedure(proc);
processingProcs.add(proc);
}
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.storefiletracker;

import java.util.Optional;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.ModifyTableDescriptorProcedure;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;

/**
* Procedure for migrating StoreFileTracker information to table descriptor.
*/
@InterfaceAudience.Private
public class MigrateStoreFileTrackerProcedure extends ModifyTableDescriptorProcedure {

public MigrateStoreFileTrackerProcedure(){}

public MigrateStoreFileTrackerProcedure(MasterProcedureEnv env, TableDescriptor unmodified) {
super(env, unmodified);
}

@Override
protected Optional<TableDescriptor> modify(MasterProcedureEnv env, TableDescriptor current) {
if (StringUtils.isEmpty(current.getValue(StoreFileTrackerFactory.TRACKER_IMPL))) {
TableDescriptor td =
StoreFileTrackerFactory.updateWithTrackerConfigs(env.getMasterConfiguration(), current);
return Optional.of(td);
}
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.migrate;

import java.io.IOException;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(MediumTests.class)
public class TestMigrateStoreFileTracker {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMigrateStoreFileTracker.class);
private final static String[] tables = new String[] { "t1", "t2", "t3", "t4", "t5", "t6" };
private final static String famStr = "f1";
private final static byte[] fam = Bytes.toBytes(famStr);

private HBaseTestingUtil HTU;
private Configuration conf;
private TableDescriptor tableDescriptor;

@Before
public void setUp() throws Exception {
conf = HBaseConfiguration.create();
//Speed up the launch of RollingUpgradeChore
conf.setInt(RollingUpgradeChore.ROLLING_UPGRADE_CHORE_PERIOD_SECONDS_KEY, 1);
conf.setLong(RollingUpgradeChore.ROLLING_UPGRADE_CHORE_DELAY_SECONDS_KEY, 1);
HTU = new HBaseTestingUtil(conf);
HTU.startMiniCluster();
}

@After
public void tearDown() throws Exception {
HTU.shutdownMiniCluster();
}

@Test
public void testMigrateStoreFileTracker() throws IOException, InterruptedException {
//create tables to test
for (int i = 0; i < tables.length; i++) {
tableDescriptor = HTU.createModifyableTableDescriptor(tables[i])
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(fam).build()).build();
HTU.createTable(tableDescriptor, null);
}
TableDescriptors tableDescriptors = HTU.getMiniHBaseCluster().getMaster().getTableDescriptors();
for (int i = 0; i < tables.length; i++) {
TableDescriptor tdAfterCreated = tableDescriptors.get(TableName.valueOf(tables[i]));
//make sure that TRACKER_IMPL was set by default after tables have been created.
Assert.assertNotNull(tdAfterCreated.getValue(StoreFileTrackerFactory.TRACKER_IMPL));
//Remove StoreFileTracker impl from tableDescriptor
TableDescriptor tdRemovedSFT = TableDescriptorBuilder.newBuilder(tdAfterCreated)
.removeValue(StoreFileTrackerFactory.TRACKER_IMPL).build();
tableDescriptors.update(tdRemovedSFT);
}
HTU.getMiniHBaseCluster().stopMaster(0).join();
HTU.getMiniHBaseCluster().startMaster();
HTU.getMiniHBaseCluster().waitForActiveAndReadyMaster(30000);
//wait until all tables have been migrated
TableDescriptors tds = HTU.getMiniHBaseCluster().getMaster().getTableDescriptors();
HTU.waitFor(30000, () -> {
try {
for (int i = 0; i < tables.length; i++) {
TableDescriptor td = tds.get(TableName.valueOf(tables[i]));
if (StringUtils.isEmpty(td.getValue(StoreFileTrackerFactory.TRACKER_IMPL))) {
return false;
}
}
return true;
} catch (IOException e) {
return false;
}
});
}
}