Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix](partial update) Fix core when successfully schema change and load during a partial update #26210

Merged
merged 4 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
update
  • Loading branch information
bobhan1 committed Nov 2, 2023
commit 8c312df9684b1262e579c6700aee278a0ef4f07f
4 changes: 0 additions & 4 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3109,10 +3109,6 @@ Status Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
}

if (pos > 0) {
LOG_INFO(
"[Tablet::calc_segment_delete_bitmap] generate_new_block_for_partial_update, pos: "
"{}",
pos);
auto partial_update_info = rowset_writer->get_partial_update_info();
DCHECK(partial_update_info);
RETURN_IF_ERROR(generate_new_block_for_partial_update(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1001,6 +1001,9 @@ public void complete() throws UserException {
throw new DdlException("txn does not exist: " + transactionId);
}
txnState.addTableIndexes((OlapTable) targetTable);
if (!isFromDeleteOrUpdateStmt && isPartialUpdate) {
txnState.setSchemaForPartialUpdate((OlapTable) targetTable);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,9 @@ brokerFileGroups, getDeadlineMs(), getExecMemLimit(),
throw new UserException("txn does not exist: " + transactionId);
}
txnState.addTableIndexes(table);
if (isPartialUpdate()) {
txnState.setSchemaForPartialUpdate(table);
}
}
} finally {
MetaLockUtils.readUnlockTables(tableList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,9 @@ public TExecPlanFragmentParams plan(TUniqueId loadId, long txnId) throws UserExc
throw new MetaNotFoundException("txn does not exist: " + txnId);
}
txnState.addTableIndexes(planner.getDestTable());
if (isPartialUpdate) {
txnState.setSchemaForPartialUpdate((OlapTable) table);
}

return planParams;
} finally {
Expand All @@ -919,6 +922,9 @@ public TPipelineFragmentParams planForPipeline(TUniqueId loadId, long txnId) thr
throw new MetaNotFoundException("txn does not exist: " + txnId);
}
txnState.addTableIndexes(planner.getDestTable());
if (isPartialUpdate) {
txnState.setSchemaForPartialUpdate((OlapTable) table);
}

return planParams;
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
throw new DdlException("txn does not exist: " + txn.getTxnId());
}
state.addTableIndexes(physicalOlapTableSink.getTargetTable());
if (physicalOlapTableSink.isFromNativeInsertStmt() && physicalOlapTableSink.isPartialUpdate()) {
state.setSchemaForPartialUpdate(physicalOlapTableSink.getTargetTable());
}

executor.setProfileType(ProfileType.LOAD);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2221,6 +2221,9 @@ private TExecPlanFragmentParams generatePlanFragmentParams(TStreamLoadPutRequest
throw new UserException("txn does not exist: " + request.getTxnId());
}
txnState.addTableIndexes(table);
if (request.isPartialUpdate()) {
txnState.setSchemaForPartialUpdate(table);
}
}
plan.setTableName(table.getName());
plan.query_options.setFeProcessUuid(ExecuteEnv.getInstance().getProcessUUID());
Expand Down Expand Up @@ -2284,6 +2287,9 @@ private TPipelineFragmentParams generatePipelineStreamLoadPut(TStreamLoadPutRequ
throw new UserException("txn does not exist: " + request.getTxnId());
}
txnState.addTableIndexes(table);
if (request.isPartialUpdate()) {
txnState.setSchemaForPartialUpdate(table);
}
}
return plan;
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,33 @@ public void commitTransaction(List<Table> tableList, long transactionId, List<Ta
+ "] is prepare, not pre-committed.");
}

if (transactionState.isPartialUpdate()) {
if (is2PC) {
Iterator<TableCommitInfo> tableCommitInfoIterator
= transactionState.getIdToTableCommitInfos().values().iterator();
while (tableCommitInfoIterator.hasNext()) {
TableCommitInfo tableCommitInfo = tableCommitInfoIterator.next();
long tableId = tableCommitInfo.getTableId();
OlapTable table = (OlapTable) db.getTableNullable(tableId);
if (table != null && table instanceof OlapTable) {
if (!transactionState.checkSchemaCompatibility((OlapTable) table)) {
throw new TransactionCommitFailedException("transaction [" + transactionId
+ "] check schema compatibility failed");
}
}
}
} else {
for (Table table : tableList) {
if (table instanceof OlapTable) {
if (!transactionState.checkSchemaCompatibility((OlapTable) table)) {
throw new TransactionCommitFailedException("transaction [" + transactionId
+ "] check schema compatibility failed");
}
}
}
}
}

Set<Long> errorReplicaIds = Sets.newHashSet();
Set<Long> totalInvolvedBackends = Sets.newHashSet();
Map<Long, Set<Long>> tableToPartition = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.doris.transaction;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedIndexMeta;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeMetaVersion;
Expand Down Expand Up @@ -45,8 +47,10 @@
import java.io.DataOutput;
import java.io.IOException;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -265,6 +269,24 @@ public String toString() {
// no need to persist.
private String errMsg = "";

public class SchemaInfo {
public List<Column> schema;
public int schemaVersion;

public SchemaInfo(OlapTable olapTable) {
Map<Long, MaterializedIndexMeta> indexIdToMeta = olapTable.getIndexIdToMeta();
for (MaterializedIndexMeta indexMeta : indexIdToMeta.values()) {
schema = indexMeta.getSchema();
schemaVersion = indexMeta.getSchemaVersion();
break;
}
}
}

private boolean isPartialUpdate = false;
// table id -> schema info
private Map<Long, SchemaInfo> txnSchemas = new HashMap<>();

public TransactionState() {
this.dbId = -1;
this.tableIdList = Lists.newArrayList();
Expand Down Expand Up @@ -725,4 +747,45 @@ public void clearErrorMsg() {
public String getErrMsg() {
return this.errMsg;
}

public void setSchemaForPartialUpdate(OlapTable olapTable) {
// the caller should hold the read lock of the table
isPartialUpdate = true;
txnSchemas.put(olapTable.getId(), new SchemaInfo(olapTable));
}

public boolean isPartialUpdate() {
return isPartialUpdate;
}

public SchemaInfo getTxnSchema(long id) {
return txnSchemas.get(id);
}

public boolean checkSchemaCompatibility(OlapTable olapTable) {
SchemaInfo currentSchemaInfo = new SchemaInfo(olapTable);
SchemaInfo txnSchemaInfo = txnSchemas.get(olapTable.getId());
if (txnSchemaInfo == null) {
return true;
}
if (txnSchemaInfo.schemaVersion >= currentSchemaInfo.schemaVersion) {
return true;
}
for (Column txnCol : txnSchemaInfo.schema) {
if (!txnCol.isVisible() || !txnCol.getType().isStringType()) {
continue;
}
int uniqueId = txnCol.getUniqueId();
Optional<Column> currentCol = currentSchemaInfo.schema.stream()
.filter(col -> col.getUniqueId() == uniqueId).findFirst();
if (currentCol.isPresent() && currentCol.get().getType().isStringType()) {
if (currentCol.get().getStrLen() != txnCol.getStrLen()) {
LOG.warn("Check schema compatibility failed, txnId={}, table={}",
transactionId, olapTable.getName());
return false;
}
}
}
return true;
}
}
65 changes: 59 additions & 6 deletions regression-test/conf/regression-conf.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,18 @@ defaultDb = "regression_test"
// init cmd like: select @@session.tx_read_only
// at each time we connect.
// add allowLoadLocalInfile so that the jdbc can execute mysql load data from client.
jdbcUrl = "jdbc:mysql://127.0.0.1:9039/?useLocalSessionState=true&allowLoadLocalInfile=true"
targetJdbcUrl = "jdbc:mysql://127.0.0.1:9039/?useLocalSessionState=true&allowLoadLocalInfile=true"
jdbcUrl = "jdbc:mysql://127.0.0.1:9030/?useLocalSessionState=true&allowLoadLocalInfile=true"
targetJdbcUrl = "jdbc:mysql://127.0.0.1:9030/?useLocalSessionState=true&allowLoadLocalInfile=true"
jdbcUser = "root"
jdbcPassword = ""

feSourceThriftAddress = "127.0.0.1:9029"
feTargetThriftAddress = "127.0.0.1:9029"
feSourceThriftAddress = "127.0.0.1:9020"
feTargetThriftAddress = "127.0.0.1:9020"
syncerAddress = "127.0.0.1:9190"
feSyncerUser = "root"
feSyncerPassword = ""

feHttpAddress = "127.0.0.1:8039"
feHttpAddress = "127.0.0.1:8030"
feHttpUser = "root"
feHttpPassword = ""

Expand All @@ -46,6 +47,31 @@ pluginPath = "${DORIS_HOME}/regression-test/plugins"
realDataPath = "${DORIS_HOME}/regression-test/realdata"
sslCertificatePath = "${DORIS_HOME}/regression-test/ssl_default_certificate"

// suite configs
suites = {

//// equals to:
//// suites.test_suite_1.key1 = "val1"
//// suites.test_suite_1.key2 = "val2"
////
//test_suite_1 {
// key1 = "val1"
// key2 = "val2"
//}

//test_suite_2 {
// key3 = "val1"
// key4 = "val2"
//}
}

// docker image
image = ""
dockerEndDeleteFiles = false
dorisComposePath = "${DORIS_HOME}/docker/runtime/doris-compose/doris-compose.py"
// do run docker test because pipeline not support build image now
excludeDockerTest = true

// will test <group>/<suite>.groovy
// empty group will test all group
testGroups = ""
Expand Down Expand Up @@ -84,13 +110,23 @@ pg_14_port=5442
oracle_11_port=1521
sqlserver_2022_port=1433
clickhouse_22_port=8123
doris_port=9030
mariadb_10_port=3326

// hive catalog test config
// To enable hive test, you need first start hive container.
// To enable hive/paimon test, you need first start hive container.
// See `docker/thirdparties/start-thirdparties-docker.sh`
enableHiveTest=false
enablePaimonTest=false
hms_port=9183
hdfs_port=8120
hiveServerPort=10000

// kafka test config
// to enable kafka test, you need firstly to start kafka container
// See `docker/thirdparties/start-thirdparties-docker.sh`
enableKafkaTest=false
kafka_port=19193

// elasticsearch catalog test config
// See `docker/thirdparties/start-thirdparties-docker.sh`
Expand All @@ -105,9 +141,13 @@ enableExternalHiveTest = false
extHiveHmsHost = "***.**.**.**"
extHiveHmsPort = 7004
extHdfsPort = 4007
extHiveServerPort= 7001
extHiveHmsUser = "****"
extHiveHmsPassword= "***********"

//paimon catalog test config for bigdata
enableExternalPaimonTest = false

//mysql jdbc connector test config for bigdata
enableExternalMysqlTest = false
extMysqlHost = "***.**.**.**"
Expand All @@ -129,14 +169,27 @@ extEsPort = 9200
extEsUser = "*******"
extEsPassword = "***********"

enableObjStorageTest=false
enableMaxComputeTest=false
aliYunAk="***********"
dlfUid="***********"
aliYunSk="***********"
hwYunAk="***********"
hwYunSk="***********"

s3Endpoint = "cos.ap-hongkong.myqcloud.com"
s3BucketName = "doris-build-hk-1308700295"
s3Region = "ap-hongkong"

// iceberg rest catalog config
iceberg_rest_uri_port=18181

// If the failure suite num exceeds this config
// all following suite will be skipped to fast quit the run.
// <=0 means no limit.
max_failure_num=0

// used for exporting test
s3ExportBucketName = ""

externalEnvIp="127.0.0.1"
Loading