Skip to content

Fix hbase tablegroup operations being hold for a long time #295

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1993,6 +1993,9 @@ public ObPair<Long, ObTableParam> getTableInternal(String tableName, TableEntry
ObPartitionLocationInfo obPartitionLocationInfo = null;
if (ObGlobal.obVsnMajor() >= 4) {
obPartitionLocationInfo = getOrRefreshPartitionInfo(tableEntry, tableName, tabletId);
if (obPartitionLocationInfo.getPartitionLocation() == null) {
throw new ObTableNotExistException("partition location is null after refresh, table: { " + tableName + " } may not exist");
}
replica = getPartitionLocation(obPartitionLocationInfo, route);
/**
* Normally, getOrRefreshPartitionInfo makes sure that a thread only continues if it finds the leader
Expand Down Expand Up @@ -2145,6 +2148,9 @@ private List<ObPair<Long, ReplicaLocation>> getPartitionReplica(TableEntry table
for (Long partId : partIds) {
long tabletId = getTabletIdByPartId(tableEntry, partId);
ObPartitionLocationInfo locationInfo = getOrRefreshPartitionInfo(tableEntry, tableName, tabletId);
if (locationInfo.getPartitionLocation() == null) {
throw new ObTableNotExistException("partition location is null after refresh, table: { " + tableName + " } may not exist");
}
replicas.add(new ObPair<>(partId, getPartitionLocation(locationInfo, route)));
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1294,19 +1294,20 @@ private static ObPartitionEntry getPartitionLocationFromResultSetByTablet(TableE
}
location.addReplicaLocation(replica);

if (location.getLeader() != null && partitionLocationInfo.initialized.compareAndSet(false, true)) {
partitionLocationInfo.initializationLatch.countDown();
if (location.getLeader() != null) {
partitionLocationInfo.initialized.compareAndSet(false, true);
} else if (rs.isLast() && location.getLeader() == null) {
partitionLocationInfo.initializationLatch.countDown();
RUNTIME.error(LCD.convert("01-00028"), partitionId, partitionEntry, tableEntry);
RUNTIME.error(format(
"partition=%d has no leader partitionEntry=%s original tableEntry=%s",
partitionId, partitionEntry, tableEntry));
"partition=%d has no leader partitionEntry=%s original tableEntry=%s",
partitionId, partitionEntry, tableEntry));
throw new ObTablePartitionNoMasterException(format(
"partition=%d has no leader partitionEntry=%s original tableEntry=%s",
partitionId, partitionEntry, tableEntry));
"partition=%d has no leader partitionEntry=%s original tableEntry=%s",
partitionId, partitionEntry, tableEntry));
}
}
partitionLocationInfo.initializationLatch.countDown();

return partitionEntry;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ public abstract class AbstractQueryStreamResult extends AbstractPayload implemen
private ObReadConsistency readConsistency = ObReadConsistency.STRONG;
// ObRowKey objs: [startKey, MIN_OBJECT, MIN_OBJECT]
public List<ObObj> currentStartKey;
protected ObTableClient client;
protected ObTableClient client;

/*
* Get pcode.
*/
Expand Down Expand Up @@ -146,8 +146,10 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger,
route.setBlackList(failedServerList);
}
if (ObGlobal.obVsnMajor() >= 4) {
TableEntry tableEntry = client.getOrRefreshTableEntry(indexTableName, false, false, false);
client.refreshTableLocationByTabletId(tableEntry, indexTableName, client.getTabletIdByPartId(tableEntry, partIdWithIndex.getLeft()));
TableEntry tableEntry = client.getOrRefreshTableEntry(indexTableName,
false, false, false);
client.refreshTableLocationByTabletId(tableEntry, indexTableName,
client.getTabletIdByPartId(tableEntry, partIdWithIndex.getLeft()));
}

subObTable = client
Expand Down Expand Up @@ -238,13 +240,21 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger,
throw e;
}
} else if (e instanceof ObTableException) {
if ((((ObTableException) e).getErrorCode() == ResultCodes.OB_TABLE_NOT_EXIST.errorCode || ((ObTableException) e)
.getErrorCode() == ResultCodes.OB_NOT_SUPPORTED.errorCode)
&& ((request instanceof ObTableQueryAsyncRequest && ((ObTableQueryAsyncRequest) request).getObTableQueryRequest().getTableQuery().isHbaseQuery())
|| (request instanceof ObTableQueryRequest && ((ObTableQueryRequest) request).getTableQuery().isHbaseQuery()))
if ((((ObTableException) e).getErrorCode() == ResultCodes.OB_TABLE_NOT_EXIST.errorCode
|| ((ObTableException) e).getErrorCode() == ResultCodes.OB_NOT_SUPPORTED.errorCode || ((ObTableException) e)
.getErrorCode() == ResultCodes.OB_SCHEMA_ERROR.errorCode)
&& ((request instanceof ObTableQueryAsyncRequest && ((ObTableQueryAsyncRequest) request)
.getObTableQueryRequest().getTableQuery().isHbaseQuery()) || (request instanceof ObTableQueryRequest && ((ObTableQueryRequest) request)
.getTableQuery().isHbaseQuery()))
&& client.getTableGroupInverted().get(indexTableName) != null) {
// table not exists && hbase mode && table group exists , three condition both
client.eraseTableGroupFromCache(tableName);
String newIndexTableName = client.tryGetTableNameFromTableGroupCache(tableName, true);
if (indexTableName.equalsIgnoreCase(newIndexTableName)) {
throw new ObTableNotExistException("multi column-family operations contain not existed table name", ResultCodes.OB_ERR_UNKNOWN_TABLE.errorCode);
} else {
indexTableName = newIndexTableName;
}
}
if (((ObTableException) e).isNeedRefreshTableEntry()) {
needRefreshTableEntry = true;
Expand All @@ -256,11 +266,10 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger,
throw e;
}
} else {
String logMessage = String.format(
String logMessage = String
.format(
"exhaust retry while meet NeedRefresh Exception, table name: %s, batch ops refresh table, errorCode: %d",
indexTableName,
((ObTableException) e).getErrorCode()
);
indexTableName, ((ObTableException) e).getErrorCode());
logger.warn(logMessage, e);
client.calculateContinuousFailure(indexTableName, e.getMessage());
throw new ObTableRetryExhaustedException(logMessage, e);
Expand Down Expand Up @@ -434,7 +443,8 @@ protected void checkStatus() throws IllegalStateException {
}

if (closed) {
throw new IllegalStateException("table " + indexTableName + " query stream result is closed");
throw new IllegalStateException("table " + indexTableName
+ " query stream result is closed");
}
}

Expand Down Expand Up @@ -565,7 +575,7 @@ public void init() throws Exception {
if (tableQuery.getBatchSize() == -1) {
if (!expectant.isEmpty()) {
Iterator<Map.Entry<Long, ObPair<Long, ObTableParam>>> it = expectant.entrySet()
.iterator();
.iterator();
int retryTimes = 0;
while (it.hasNext()) {
Map.Entry<Long, ObPair<Long, ObTableParam>> entry = it.next();
Expand All @@ -579,10 +589,11 @@ public void init() throws Exception {
retryTimes++;
if (retryTimes > client.getRuntimeRetryTimes()) {
RUNTIME.error("Fail to get refresh table entry response after {}",
retryTimes);
retryTimes);
throw new ObTableRetryExhaustedException(
"Fail to get refresh table entry response after " + retryTimes +
"errorCode:" + ((ObTableNeedFetchAllException) e).getErrorCode());
"Fail to get refresh table entry response after " + retryTimes
+ "errorCode:"
+ ((ObTableNeedFetchAllException) e).getErrorCode());

}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ public void addOperation(Get get) throws Exception {
Object[] rowKeyValues = get.getRowKey().getValues();
String[] propertiesNames = get.getSelectColumns();
ObTableSingleOpEntity entity = ObTableSingleOpEntity.getInstance(rowKeyNames, rowKeyValues,
propertiesNames, null);
propertiesNames, null);
ObTableSingleOp singleOp = new ObTableSingleOp();
singleOp.setSingleOpType(ObTableOperationType.GET);
singleOp.addEntity(entity);
Expand Down Expand Up @@ -344,8 +344,8 @@ public List<Object> executeWithResult() throws Exception {
}
} else {
results.add(ExceptionUtil.convertToObTableException(result.getExecuteHost(),
result.getExecutePort(), result.getSequence(), result.getUniqueId(), errCode,
result.getHeader().getErrMsg()));
result.getExecutePort(), result.getSequence(), result.getUniqueId(), errCode,
result.getHeader().getErrMsg()));
}
}
return results;
Expand Down Expand Up @@ -393,16 +393,33 @@ public Map<Long, Map<Long, ObPair<ObTableParam, List<ObPair<Integer, ObTableSing
if (this.entityType == ObTableEntityType.HKV && obTableClient.isTableGroupName(tableName)) {
real_tableName = obTableClient.tryGetTableNameFromTableGroupCache(tableName, false);
}
ObPair<Long, ObTableParam> tableObPair= obTableClient.getTable(real_tableName, rowKey,
false, false, obTableClient.getRoute(false));
long lsId = tableObPair.getRight().getLsId();
ObPair<Long, ObTableParam> tableObPair = null;
long lsId = INVALID_LS_ID;
try {
tableObPair = obTableClient.getTable(real_tableName, rowKey,
false, false, obTableClient.getRoute(false));
lsId = tableObPair.getRight().getLsId();
} catch (ObTableNotExistException e) {
if (this.entityType == ObTableEntityType.HKV
&& obTableClient.isTableGroupName(tableName)
&& obTableClient.getTableGroupInverted().get(real_tableName) != null) {
obTableClient.eraseTableGroupFromCache(tableName);
real_tableName = obTableClient.tryGetTableNameFromTableGroupCache(tableName, true);
tableObPair = obTableClient.getTable(real_tableName, rowKey,
false, false, obTableClient.getRoute(false));
lsId = tableObPair.getRight().getLsId();
} else {
throw e;
}
}

Map<Long, ObPair<ObTableParam, List<ObPair<Integer, ObTableSingleOp>>>> tabletOperations
= lsOperationsMap.computeIfAbsent(lsId, k -> new HashMap<>());
// if ls id not exists

ObPair<Long, ObTableParam> finalTableObPair = tableObPair;
ObPair<ObTableParam, List<ObPair<Integer, ObTableSingleOp>>> singleOperations =
tabletOperations.computeIfAbsent(tableObPair.getLeft(), k -> new ObPair<>(tableObPair.getRight(), new ArrayList<>()));
tabletOperations.computeIfAbsent(tableObPair.getLeft(), k -> new ObPair<>(finalTableObPair.getRight(), new ArrayList<>()));
// if tablet id not exists
singleOperations.getRight().add(operationsWithIndex.get(i));
}
Expand Down Expand Up @@ -565,6 +582,20 @@ public void partitionExecute(ObTableSingleOpResult[] results,
} else if (ex instanceof ObTableException
&& ((ObTableException) ex).isNeedRefreshTableEntry()) {
needRefreshTableEntry = true;
if ((((ObTableException) ex).getErrorCode() == ResultCodes.OB_TABLE_NOT_EXIST.errorCode ||
((ObTableException) ex).getErrorCode() == ResultCodes.OB_SCHEMA_ERROR.errorCode)
&& obTableClient.isTableGroupName(tableName)
&& obTableClient.getTableGroupInverted().get(realTableName) != null) {
// TABLE_NOT_EXIST + tableName is tableGroup + TableGroup cache is not empty
// means tableGroupName cache need to refresh
obTableClient.eraseTableGroupFromCache(tableName);
String newRealTableName = obTableClient.tryGetTableNameFromTableGroupCache(tableName, true);
if (realTableName.equalsIgnoreCase(newRealTableName)) {
throw new ObTableNotExistException("multi column-family operations contain not existed table name", ResultCodes.OB_ERR_UNKNOWN_TABLE.errorCode);
} else {
realTableName = newRealTableName;
}
}
if (obTableClient.isRetryOnChangeMasterTimes()
&& (tryTimes - 1) < obTableClient.getRuntimeRetryTimes()) {
if (ex instanceof ObTableNeedFetchAllException) {
Expand Down
Loading