Skip to content

Commit

Permalink
[Enhancement] Add full clone and version GC for PseudoCluster (StarRo…
Browse files Browse the repository at this point in the history
…cks#10099)

This PR add full clone and version GC functionalities to PseudoCluster, it also fixes a bug in PseudoCluster query handling and a bug in the new publish mechanism's TransactionChecker init timing.
  • Loading branch information
decster authored Aug 22, 2022
1 parent e3364f2 commit 07a66e8
Show file tree
Hide file tree
Showing 15 changed files with 538 additions and 251 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ public void complete() throws UserException {
TOlapTableSink tSink = tDataSink.getOlap_table_sink();

tSink.setTable_id(dstTable.getId());
tSink.setTable_name(dstTable.getName());
tSink.setTuple_id(tupleDescriptor.getId().asInt());
int numReplicas = 1;
for (Partition partition : dstTable.getPartitions()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,6 @@ public VisibleStateWaiter commitTransaction(long transactionId, List<TabletCommi
tableListString.append(table.getName());
stateListeners.add(listener);
}
transactionState.buildFinishChecker(db);
txnSpan.setAttribute("tables", tableListString.toString());

// before state transform
Expand All @@ -455,6 +454,7 @@ public VisibleStateWaiter commitTransaction(long transactionId, List<TabletCommi
// after state transform
transactionState.afterStateTransform(TransactionStatus.COMMITTED, txnOperated, callback, null);
}
transactionState.prepareFinishChecker(db);

// 6. update nextVersion because of the failure of persistent transaction resulting in error version
Span updateCatalogAfterCommittedSpan = TraceManager.startSpan("updateCatalogAfterCommitted", txnSpan);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ public void preCommit(TransactionState txnState, List<TabletCommitInfo> tabletCo
// save the error replica ids for current tablet
// this param is used for log
Set<Long> errorBackendIdsForTablet = Sets.newHashSet();
StringBuilder failedReplicaInfoSB = new StringBuilder();
int successReplicaNum = 0;
for (long tabletBackend : tabletBackends) {
Replica replica = tabletInvertedIndex.getReplica(tabletId, tabletBackend);
Expand All @@ -136,8 +137,12 @@ public void preCommit(TransactionState txnState, List<TabletCommitInfo> tabletCo
// if the backend load success but the backend has some errors previously, then it is not a normal replica
// ignore it but not log it
// for example, a replica is in clone state
if (replica.getLastFailedVersion() < 0) {
long lfv = replica.getLastFailedVersion();
if (lfv < 0) {
++successReplicaNum;
} else {
failedReplicaInfoSB.append(
String.format("[be:%d V:%d LFV:%d]", tabletBackend, replica.getVersion(), lfv));
}
} else {
errorBackendIdsForTablet.add(tabletBackend);
Expand All @@ -154,10 +159,12 @@ public void preCommit(TransactionState txnState, List<TabletCommitInfo> tabletCo
errorBackends.add(backend.getId() + ":" + backend.getHost());
}

LOG.warn("Fail to load files. tablet_id: {}, txn_id: {}, backends: {}",
String failedReplicaInfo = failedReplicaInfoSB.toString();
LOG.warn("Fail to load files. tablet_id: {}, txn_id: {}, backends: {} failed replicas: {}",
tablet.getId(), txnState.getTransactionId(),
Joiner.on(",").join(errorBackends));
throw new TabletQuorumFailedException(tablet.getId(), txnState.getTransactionId(), errorBackends);
Joiner.on(",").join(errorBackends), failedReplicaInfo);
throw new TabletQuorumFailedException(tablet.getId(), txnState.getTransactionId(), errorBackends,
failedReplicaInfo);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,19 @@
public class TabletQuorumFailedException extends TransactionException {

private static final String TABLET_QUORUM_FAILED_MSG = "Fail to load files. tablet_id: %s"
+ ", txn_id: %s, backends: %s";
+ ", txn_id: %s, backends: %s, replicas: %s";

private long tabletId;
private List<String> errorBackends = new ArrayList<String>();

private String replicaInfos;

public TabletQuorumFailedException(long tabletId, long transactionId,
List<String> errorBackends) {
List<String> errorBackends, String replicaInfos) {
super(String.format(TABLET_QUORUM_FAILED_MSG, tabletId, transactionId,
Joiner.on(",").join(errorBackends)));
Joiner.on(",").join(errorBackends), replicaInfos));
this.tabletId = tabletId;
this.errorBackends = errorBackends;
this.replicaInfos = replicaInfos;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ public String toString() {

// used for PublishDaemon to check whether this txn can be published
// not persisted, so need to rebuilt if FE restarts
private TransactionChecker finishChecker = null;
private volatile TransactionChecker finishChecker = null;
private long checkTimes = 0;
private Span txnSpan = null;
private String traceParent = null;
Expand Down Expand Up @@ -579,7 +579,7 @@ public boolean isExpired(long currentMillis) {
public boolean isTimeout(long currentMillis) {
return (transactionStatus == TransactionStatus.PREPARE && currentMillis - prepareTime > timeoutMs)
|| (transactionStatus == TransactionStatus.PREPARED && (currentMillis - commitTime)
/ 1000 > Config.prepared_transaction_default_timeout_second);
/ 1000 > Config.prepared_transaction_default_timeout_second);
}

/*
Expand Down Expand Up @@ -889,17 +889,27 @@ public boolean allPublishTasksFinishedOrQuorumWaitTimeout(Set<Long> publishError
}

// Note: caller should hold db lock
public void buildFinishChecker(Database db) {
this.finishChecker = TransactionChecker.create(this, db);
public void prepareFinishChecker(Database db) {
if (finishChecker == null) {
synchronized (this) {
if (finishChecker == null) {
finishChecker = TransactionChecker.create(this, db);
}
}
}
}

public boolean checkCanFinish() {
// this may happen if FE restarts
if (finishChecker == null) {
Database db = GlobalStateMgr.getCurrentState().getDb(dbId);
if (db == null) {
// consider txn finished if db is dropped
return true;
}
db.readLock();
try {
finishChecker = TransactionChecker.create(this, db);
prepareFinishChecker(db);
} finally {
db.readUnlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,10 @@ public synchronized Map<Long, TTablet> getAllTabletInfo() {
}
return tabletInfo;
}

public synchronized void maintenance() {
for (Tablet tablet : tablets.values()) {
tablet.versionGC();
}
}
}
Loading

0 comments on commit 07a66e8

Please sign in to comment.