Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
bobhan1 committed Nov 1, 2023
1 parent 3048ced commit 2b0032c
Show file tree
Hide file tree
Showing 14 changed files with 182 additions and 531 deletions.
4 changes: 0 additions & 4 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3105,10 +3105,6 @@ Status Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
}

if (pos > 0) {
LOG_INFO(
"[Tablet::calc_segment_delete_bitmap] generate_new_block_for_partial_update, pos: "
"{}",
pos);
auto partial_update_info = rowset_writer->get_partial_update_info();
DCHECK(partial_update_info);
RETURN_IF_ERROR(generate_new_block_for_partial_update(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1001,6 +1001,9 @@ public void complete() throws UserException {
throw new DdlException("txn does not exist: " + transactionId);
}
txnState.addTableIndexes((OlapTable) targetTable);
if (!isFromDeleteOrUpdateStmt && isPartialUpdate) {
txnState.setSchemaForPartialUpdate((OlapTable) targetTable);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,9 @@ brokerFileGroups, getDeadlineMs(), getExecMemLimit(),
throw new UserException("txn does not exist: " + transactionId);
}
txnState.addTableIndexes(table);
if (isPartialUpdate()) {
txnState.setSchemaForPartialUpdate(table);
}
}
} finally {
MetaLockUtils.readUnlockTables(tableList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,9 @@ public TExecPlanFragmentParams plan(TUniqueId loadId, long txnId) throws UserExc
throw new MetaNotFoundException("txn does not exist: " + txnId);
}
txnState.addTableIndexes(planner.getDestTable());
if (isPartialUpdate) {
txnState.setSchemaForPartialUpdate((OlapTable) table);
}

return planParams;
} finally {
Expand All @@ -919,6 +922,9 @@ public TPipelineFragmentParams planForPipeline(TUniqueId loadId, long txnId) thr
throw new MetaNotFoundException("txn does not exist: " + txnId);
}
txnState.addTableIndexes(planner.getDestTable());
if (isPartialUpdate) {
txnState.setSchemaForPartialUpdate((OlapTable) table);
}

return planParams;
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,9 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
throw new DdlException("txn does not exist: " + txn.getTxnId());
}
state.addTableIndexes(physicalOlapTableSink.getTargetTable());
if (physicalOlapTableSink.isFromNativeInsertStmt() && physicalOlapTableSink.isPartialUpdate()) {
state.setSchemaForPartialUpdate(physicalOlapTableSink.getTargetTable());
}

executor.setProfileType(ProfileType.LOAD);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2221,6 +2221,9 @@ private TExecPlanFragmentParams generatePlanFragmentParams(TStreamLoadPutRequest
throw new UserException("txn does not exist: " + request.getTxnId());
}
txnState.addTableIndexes(table);
if (request.isPartialUpdate()) {
txnState.setSchemaForPartialUpdate(table);
}
}
plan.setTableName(table.getName());
plan.query_options.setFeProcessUuid(ExecuteEnv.getInstance().getProcessUUID());
Expand Down Expand Up @@ -2284,6 +2287,9 @@ private TPipelineFragmentParams generatePipelineStreamLoadPut(TStreamLoadPutRequ
throw new UserException("txn does not exist: " + request.getTxnId());
}
txnState.addTableIndexes(table);
if (request.isPartialUpdate()) {
txnState.setSchemaForPartialUpdate(table);
}
}
return plan;
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,33 @@ public void commitTransaction(List<Table> tableList, long transactionId, List<Ta
+ "] is prepare, not pre-committed.");
}

if (transactionState.isPartialUpdate()) {
if (is2PC) {
Iterator<TableCommitInfo> tableCommitInfoIterator
= transactionState.getIdToTableCommitInfos().values().iterator();
while (tableCommitInfoIterator.hasNext()) {
TableCommitInfo tableCommitInfo = tableCommitInfoIterator.next();
long tableId = tableCommitInfo.getTableId();
OlapTable table = (OlapTable) db.getTableNullable(tableId);
if (table != null && table instanceof OlapTable) {
if (!transactionState.checkSchemaCompatibility((OlapTable) table)) {
throw new TransactionCommitFailedException("transaction [" + transactionId
+ "] check schema compatibility failed");
}
}
}
} else {
for (Table table : tableList) {
if (table instanceof OlapTable) {
if (!transactionState.checkSchemaCompatibility((OlapTable) table)) {
throw new TransactionCommitFailedException("transaction [" + transactionId
+ "] check schema compatibility failed");
}
}
}
}
}

Set<Long> errorReplicaIds = Sets.newHashSet();
Set<Long> totalInvolvedBackends = Sets.newHashSet();
Map<Long, Set<Long>> tableToPartition = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.doris.transaction;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedIndexMeta;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeMetaVersion;
Expand Down Expand Up @@ -45,8 +47,10 @@
import java.io.DataOutput;
import java.io.IOException;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -265,6 +269,24 @@ public String toString() {
// no need to persist.
private String errMsg = "";

public class SchemaInfo {
public List<Column> schema;
public int schemaVersion;

public SchemaInfo(OlapTable olapTable) {
Map<Long, MaterializedIndexMeta> indexIdToMeta = olapTable.getIndexIdToMeta();
for (MaterializedIndexMeta indexMeta : indexIdToMeta.values()) {
schema = indexMeta.getSchema();
schemaVersion = indexMeta.getSchemaVersion();
break;
}
}
}

private boolean isPartialUpdate = false;
// table id -> schema info
private Map<Long, SchemaInfo> txnSchemas = new HashMap<>();

public TransactionState() {
this.dbId = -1;
this.tableIdList = Lists.newArrayList();
Expand Down Expand Up @@ -725,4 +747,45 @@ public void clearErrorMsg() {
public String getErrMsg() {
return this.errMsg;
}

public void setSchemaForPartialUpdate(OlapTable olapTable) {
// the caller should hold the read lock of the table
isPartialUpdate = true;
txnSchemas.put(olapTable.getId(), new SchemaInfo(olapTable));
}

public boolean isPartialUpdate() {
return isPartialUpdate;
}

public SchemaInfo getTxnSchema(long id) {
return txnSchemas.get(id);
}

public boolean checkSchemaCompatibility(OlapTable olapTable) {
SchemaInfo currentSchemaInfo = new SchemaInfo(olapTable);
SchemaInfo txnSchemaInfo = txnSchemas.get(olapTable.getId());
if (txnSchemaInfo == null) {
return true;
}
if (txnSchemaInfo.schemaVersion >= currentSchemaInfo.schemaVersion) {
return true;
}
for (Column txnCol : txnSchemaInfo.schema) {
if (!txnCol.isVisible() || !txnCol.getType().isStringType()) {
continue;
}
int uniqueId = txnCol.getUniqueId();
Optional<Column> currentCol = currentSchemaInfo.schema.stream()
.filter(col -> col.getUniqueId() == uniqueId).findFirst();
if (currentCol.isPresent() && currentCol.get().getType().isStringType()) {
if (currentCol.get().getStrLen() != txnCol.getStrLen()) {
LOG.warn("Check schema compatibility failed, txnId={}, table={}",
transactionId, olapTable.getName());
return false;
}
}
}
return true;
}
}
65 changes: 59 additions & 6 deletions regression-test/conf/regression-conf.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,18 @@ defaultDb = "regression_test"
// init cmd like: select @@session.tx_read_only
// at each time we connect.
// add allowLoadLocalInfile so that the jdbc can execute mysql load data from client.
jdbcUrl = "jdbc:mysql://127.0.0.1:9039/?useLocalSessionState=true&allowLoadLocalInfile=true"
targetJdbcUrl = "jdbc:mysql://127.0.0.1:9039/?useLocalSessionState=true&allowLoadLocalInfile=true"
jdbcUrl = "jdbc:mysql://127.0.0.1:9030/?useLocalSessionState=true&allowLoadLocalInfile=true"
targetJdbcUrl = "jdbc:mysql://127.0.0.1:9030/?useLocalSessionState=true&allowLoadLocalInfile=true"
jdbcUser = "root"
jdbcPassword = ""

feSourceThriftAddress = "127.0.0.1:9029"
feTargetThriftAddress = "127.0.0.1:9029"
feSourceThriftAddress = "127.0.0.1:9020"
feTargetThriftAddress = "127.0.0.1:9020"
syncerAddress = "127.0.0.1:9190"
feSyncerUser = "root"
feSyncerPassword = ""

feHttpAddress = "127.0.0.1:8039"
feHttpAddress = "127.0.0.1:8030"
feHttpUser = "root"
feHttpPassword = ""

Expand All @@ -46,6 +47,31 @@ pluginPath = "${DORIS_HOME}/regression-test/plugins"
realDataPath = "${DORIS_HOME}/regression-test/realdata"
sslCertificatePath = "${DORIS_HOME}/regression-test/ssl_default_certificate"

// suite configs
suites = {

//// equals to:
//// suites.test_suite_1.key1 = "val1"
//// suites.test_suite_1.key2 = "val2"
////
//test_suite_1 {
// key1 = "val1"
// key2 = "val2"
//}

//test_suite_2 {
// key3 = "val1"
// key4 = "val2"
//}
}

// docker image
image = ""
dockerEndDeleteFiles = false
dorisComposePath = "${DORIS_HOME}/docker/runtime/doris-compose/doris-compose.py"
// do run docker test because pipeline not support build image now
excludeDockerTest = true

// will test <group>/<suite>.groovy
// empty group will test all group
testGroups = ""
Expand Down Expand Up @@ -84,13 +110,23 @@ pg_14_port=5442
oracle_11_port=1521
sqlserver_2022_port=1433
clickhouse_22_port=8123
doris_port=9030
mariadb_10_port=3326

// hive catalog test config
// To enable hive test, you need first start hive container.
// To enable hive/paimon test, you need first start hive container.
// See `docker/thirdparties/start-thirdparties-docker.sh`
enableHiveTest=false
enablePaimonTest=false
hms_port=9183
hdfs_port=8120
hiveServerPort=10000

// kafka test config
// to enable kafka test, you need firstly to start kafka container
// See `docker/thirdparties/start-thirdparties-docker.sh`
enableKafkaTest=false
kafka_port=19193

// elasticsearch catalog test config
// See `docker/thirdparties/start-thirdparties-docker.sh`
Expand All @@ -105,9 +141,13 @@ enableExternalHiveTest = false
extHiveHmsHost = "***.**.**.**"
extHiveHmsPort = 7004
extHdfsPort = 4007
extHiveServerPort= 7001
extHiveHmsUser = "****"
extHiveHmsPassword= "***********"

//paimon catalog test config for bigdata
enableExternalPaimonTest = false

//mysql jdbc connector test config for bigdata
enableExternalMysqlTest = false
extMysqlHost = "***.**.**.**"
Expand All @@ -129,14 +169,27 @@ extEsPort = 9200
extEsUser = "*******"
extEsPassword = "***********"

enableObjStorageTest=false
enableMaxComputeTest=false
aliYunAk="***********"
dlfUid="***********"
aliYunSk="***********"
hwYunAk="***********"
hwYunSk="***********"

s3Endpoint = "cos.ap-hongkong.myqcloud.com"
s3BucketName = "doris-build-hk-1308700295"
s3Region = "ap-hongkong"

// iceberg rest catalog config
iceberg_rest_uri_port=18181

// If the failure suite num exceeds this config
// all following suite will be skipped to fast quit the run.
// <=0 means no limit.
max_failure_num=0

// used for exporting test
s3ExportBucketName = ""

externalEnvIp="127.0.0.1"
Loading

0 comments on commit 2b0032c

Please sign in to comment.