Skip to content

Commit

Permalink
Revert "[Fix](partial update) Fix core when successfully schema chang…
Browse files Browse the repository at this point in the history
…e and load during a partial update (apache#26210)"

This reverts commit 6983736.
  • Loading branch information
zhannngchen committed Jan 5, 2024
1 parent 9f38dfc commit 8700a7f
Show file tree
Hide file tree
Showing 9 changed files with 0 additions and 156 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1032,9 +1032,6 @@ public void complete() throws UserException {
throw new DdlException("txn does not exist: " + transactionId);
}
txnState.addTableIndexes((OlapTable) targetTable);
if (isPartialUpdate) {
txnState.setSchemaForPartialUpdate((OlapTable) targetTable);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,6 @@ brokerFileGroups, getDeadlineMs(), getExecMemLimit(),
throw new UserException("txn does not exist: " + transactionId);
}
txnState.addTableIndexes(table);
if (isPartialUpdate()) {
txnState.setSchemaForPartialUpdate(table);
}
}
} finally {
MetaLockUtils.readUnlockTables(tableList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -896,9 +896,6 @@ public TExecPlanFragmentParams plan(TUniqueId loadId, long txnId) throws UserExc
throw new MetaNotFoundException("txn does not exist: " + txnId);
}
txnState.addTableIndexes(planner.getDestTable());
if (isPartialUpdate) {
txnState.setSchemaForPartialUpdate((OlapTable) table);
}

return planParams;
} finally {
Expand All @@ -919,9 +916,6 @@ public TPipelineFragmentParams planForPipeline(TUniqueId loadId, long txnId) thr
throw new MetaNotFoundException("txn does not exist: " + txnId);
}
txnState.addTableIndexes(planner.getDestTable());
if (isPartialUpdate) {
txnState.setSchemaForPartialUpdate((OlapTable) table);
}

return planParams;
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,6 @@ public void finalizeSink(DataSink sink, boolean isPartialUpdate, boolean isFromI
throw new AnalysisException("txn does not exist: " + txnId);
}
state.addTableIndexes((OlapTable) table);
if (isPartialUpdate) {
state.setSchemaForPartialUpdate((OlapTable) table);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2049,9 +2049,6 @@ private TExecPlanFragmentParams generatePlanFragmentParams(TStreamLoadPutRequest
throw new UserException("txn does not exist: " + request.getTxnId());
}
txnState.addTableIndexes(table);
if (request.isPartialUpdate()) {
txnState.setSchemaForPartialUpdate(table);
}
}
plan.setTableName(table.getName());
plan.query_options.setFeProcessUuid(ExecuteEnv.getInstance().getProcessUUID());
Expand Down Expand Up @@ -2115,9 +2112,6 @@ private TPipelineFragmentParams generatePipelineStreamLoadPut(TStreamLoadPutRequ
throw new UserException("txn does not exist: " + request.getTxnId());
}
txnState.addTableIndexes(table);
if (request.isPartialUpdate()) {
txnState.setSchemaForPartialUpdate(table);
}
}
return plan;
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -682,35 +682,6 @@ public void commitTransaction(List<Table> tableList, long transactionId, List<Ta
+ "] is prepare, not pre-committed.");
}

if (transactionState.isPartialUpdate()) {
if (is2PC) {
Iterator<TableCommitInfo> tableCommitInfoIterator
= transactionState.getIdToTableCommitInfos().values().iterator();
while (tableCommitInfoIterator.hasNext()) {
TableCommitInfo tableCommitInfo = tableCommitInfoIterator.next();
long tableId = tableCommitInfo.getTableId();
OlapTable table = (OlapTable) db.getTableNullable(tableId);
if (table != null && table instanceof OlapTable) {
if (!transactionState.checkSchemaCompatibility((OlapTable) table)) {
throw new TransactionCommitFailedException("transaction [" + transactionId
+ "] check schema compatibility failed, partial update can't commit with"
+ " old schema sucessfully .");
}
}
}
} else {
for (Table table : tableList) {
if (table instanceof OlapTable) {
if (!transactionState.checkSchemaCompatibility((OlapTable) table)) {
throw new TransactionCommitFailedException("transaction [" + transactionId
+ "] check schema compatibility failed, partial update can't commit with"
+ " old schema sucessfully .");
}
}
}
}
}

Set<Long> errorReplicaIds = Sets.newHashSet();
Set<Long> totalInvolvedBackends = Sets.newHashSet();
Map<Long, Set<Long>> tableToPartition = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@

package org.apache.doris.transaction;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedIndexMeta;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeMetaVersion;
Expand Down Expand Up @@ -47,10 +45,8 @@
import java.io.DataOutput;
import java.io.IOException;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -276,24 +272,6 @@ public String toString() {
// no need to persist.
private String errMsg = "";

public class SchemaInfo {
public List<Column> schema;
public int schemaVersion;

public SchemaInfo(OlapTable olapTable) {
Map<Long, MaterializedIndexMeta> indexIdToMeta = olapTable.getIndexIdToMeta();
for (MaterializedIndexMeta indexMeta : indexIdToMeta.values()) {
schema = indexMeta.getSchema();
schemaVersion = indexMeta.getSchemaVersion();
break;
}
}
}

private boolean isPartialUpdate = false;
// table id -> schema info
private Map<Long, SchemaInfo> txnSchemas = new HashMap<>();

public TransactionState() {
this.dbId = -1;
this.tableIdList = Lists.newArrayList();
Expand Down Expand Up @@ -773,46 +751,4 @@ public void pruneAfterVisible() {
tableIdToTotalNumDeltaRows.clear();
}

public void setSchemaForPartialUpdate(OlapTable olapTable) {
// the caller should hold the read lock of the table
isPartialUpdate = true;
txnSchemas.put(olapTable.getId(), new SchemaInfo(olapTable));
}

public boolean isPartialUpdate() {
return isPartialUpdate;
}

public SchemaInfo getTxnSchema(long id) {
return txnSchemas.get(id);
}

public boolean checkSchemaCompatibility(OlapTable olapTable) {
SchemaInfo currentSchemaInfo = new SchemaInfo(olapTable);
SchemaInfo txnSchemaInfo = txnSchemas.get(olapTable.getId());
if (txnSchemaInfo == null) {
return true;
}
if (txnSchemaInfo.schemaVersion >= currentSchemaInfo.schemaVersion) {
return true;
}
for (Column txnCol : txnSchemaInfo.schema) {
if (!txnCol.isVisible() || !txnCol.getType().isStringType()) {
continue;
}
int uniqueId = txnCol.getUniqueId();
Optional<Column> currentCol = currentSchemaInfo.schema.stream()
.filter(col -> col.getUniqueId() == uniqueId).findFirst();
// for now Doris's light schema change only supports adding columns,
// dropping columns, and type conversions that increase the varchar length
if (currentCol.isPresent() && currentCol.get().getType().isStringType()) {
if (currentCol.get().getStrLen() != txnCol.getStrLen()) {
LOG.warn("Check schema compatibility failed, txnId={}, table={}",
transactionId, olapTable.getName());
return false;
}
}
}
return true;
}
}

This file was deleted.

This file was deleted.

0 comments on commit 8700a7f

Please sign in to comment.