Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -48,55 +48,57 @@ public CommitHandlerWithGroupCommit(
}

@Override
public void commit(Snapshot snapshot, boolean readOnly)
public void commit(TransactionContext context)
throws CommitException, UnknownTransactionStatusException {
if (!readOnly && !snapshot.hasWritesOrDeletes() && coordinatorWriteOmissionOnReadOnlyEnabled) {
cancelGroupCommitIfNeeded(snapshot.getId());
if (!context.readOnly
&& !context.snapshot.hasWritesOrDeletes()
&& coordinatorWriteOmissionOnReadOnlyEnabled) {
cancelGroupCommitIfNeeded(context.transactionId);
}

super.commit(snapshot, readOnly);
super.commit(context);
}

@Override
boolean canOnePhaseCommit(Snapshot snapshot) throws CommitException {
boolean canOnePhaseCommit(TransactionContext context) throws CommitException {
try {
return super.canOnePhaseCommit(snapshot);
return super.canOnePhaseCommit(context);
} catch (CommitException e) {
cancelGroupCommitIfNeeded(snapshot.getId());
cancelGroupCommitIfNeeded(context.transactionId);
throw e;
}
}

@Override
void onePhaseCommitRecords(Snapshot snapshot)
void onePhaseCommitRecords(TransactionContext context)
throws CommitConflictException, UnknownTransactionStatusException {
cancelGroupCommitIfNeeded(snapshot.getId());
super.onePhaseCommitRecords(snapshot);
cancelGroupCommitIfNeeded(context.transactionId);
super.onePhaseCommitRecords(context);
}

@Override
protected void onFailureBeforeCommit(Snapshot snapshot) {
cancelGroupCommitIfNeeded(snapshot.getId());
protected void onFailureBeforeCommit(TransactionContext context) {
cancelGroupCommitIfNeeded(context.transactionId);
}

private void commitStateViaGroupCommit(Snapshot snapshot)
private void commitStateViaGroupCommit(TransactionContext context)
throws CommitConflictException, UnknownTransactionStatusException {
String id = snapshot.getId();
String id = context.transactionId;
try {
// Group commit the state by internally calling `groupCommitState()` via the emitter.
groupCommitter.ready(id, snapshot);
groupCommitter.ready(id, context);
logger.debug(
"Transaction {} is committed successfully at {}", id, System.currentTimeMillis());
} catch (GroupCommitConflictException e) {
cancelGroupCommitIfNeeded(id);
// Throw a proper exception from this method if needed.
handleCommitConflict(snapshot, e);
handleCommitConflict(context, e);
} catch (GroupCommitException e) {
cancelGroupCommitIfNeeded(id);
Throwable cause = e.getCause();
if (cause instanceof CoordinatorConflictException) {
// Throw a proper exception from this method if needed.
handleCommitConflict(snapshot, (CoordinatorConflictException) cause);
handleCommitConflict(context, (CoordinatorConflictException) cause);
} else {
// Failed to access the coordinator state. The state is unknown.
throw new UnknownTransactionStatusException("Coordinator status is unknown", cause, id);
Expand All @@ -118,9 +120,9 @@ private void cancelGroupCommitIfNeeded(String id) {
}

@Override
public void commitState(Snapshot snapshot)
public void commitState(TransactionContext context)
throws CommitConflictException, UnknownTransactionStatusException {
commitStateViaGroupCommit(snapshot);
commitStateViaGroupCommit(context);
}

@Override
Expand All @@ -129,25 +131,25 @@ public TransactionState abortState(String id) throws UnknownTransactionStatusExc
return super.abortState(id);
}

private static class Emitter implements Emittable<String, String, Snapshot> {
private static class Emitter implements Emittable<String, String, TransactionContext> {
private final Coordinator coordinator;

public Emitter(Coordinator coordinator) {
this.coordinator = coordinator;
}

@Override
public void emitNormalGroup(String parentId, List<Snapshot> snapshots)
public void emitNormalGroup(String parentId, List<TransactionContext> contexts)
throws CoordinatorException {
if (snapshots.isEmpty()) {
if (contexts.isEmpty()) {
// This means all buffered transactions were manually rolled back. Nothing to do.
return;
}

// These transactions are contained in a normal group that has multiple transactions.
// Therefore, the transaction states should be put together in Coordinator.State.
List<String> transactionIds =
snapshots.stream().map(Snapshot::getId).collect(Collectors.toList());
contexts.stream().map(c -> c.transactionId).collect(Collectors.toList());

coordinator.putStateForGroupCommit(
parentId, transactionIds, TransactionState.COMMITTED, System.currentTimeMillis());
Expand All @@ -159,7 +161,8 @@ public void emitNormalGroup(String parentId, List<Snapshot> snapshots)
}

@Override
public void emitDelayedGroup(String fullId, Snapshot snapshot) throws CoordinatorException {
public void emitDelayedGroup(String fullId, TransactionContext context)
throws CoordinatorException {
// This transaction is contained in a delayed group that has only a single transaction.
// Therefore, the transaction state can be committed as if it's a normal commit (not a
// group commit).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,20 @@
@NotThreadSafe
public class ConsensusCommit extends AbstractDistributedTransaction {
private static final Logger logger = LoggerFactory.getLogger(ConsensusCommit.class);
private final TransactionContext context;
private final CrudHandler crud;
private final CommitHandler commit;
private final ConsensusCommitMutationOperationChecker mutationOperationChecker;
@Nullable private final CoordinatorGroupCommitter groupCommitter;

@SuppressFBWarnings("EI_EXPOSE_REP2")
public ConsensusCommit(
TransactionContext context,
CrudHandler crud,
CommitHandler commit,
ConsensusCommitMutationOperationChecker mutationOperationChecker,
@Nullable CoordinatorGroupCommitter groupCommitter) {
this.context = checkNotNull(context);
this.crud = checkNotNull(crud);
this.commit = checkNotNull(commit);
this.mutationOperationChecker = mutationOperationChecker;
Expand All @@ -66,23 +69,23 @@ public ConsensusCommit(

@Override
public String getId() {
return crud.getSnapshot().getId();
return context.transactionId;
}

@Override
public Optional<Result> get(Get get) throws CrudException {
return crud.get(copyAndSetTargetToIfNot(get));
return crud.get(copyAndSetTargetToIfNot(get), context);
}

@Override
public List<Result> scan(Scan scan) throws CrudException {
return crud.scan(copyAndSetTargetToIfNot(scan));
return crud.scan(copyAndSetTargetToIfNot(scan), context);
}

@Override
public Scanner getScanner(Scan scan) throws CrudException {
scan = copyAndSetTargetToIfNot(scan);
return crud.getScanner(scan);
return crud.getScanner(scan, context);
}

/** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */
Expand All @@ -91,7 +94,7 @@ public Scanner getScanner(Scan scan) throws CrudException {
public void put(Put put) throws CrudException {
put = copyAndSetTargetToIfNot(put);
checkMutation(put);
crud.put(put);
crud.put(put, context);
}

/** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */
Expand All @@ -108,7 +111,7 @@ public void put(List<Put> puts) throws CrudException {
public void delete(Delete delete) throws CrudException {
delete = copyAndSetTargetToIfNot(delete);
checkMutation(delete);
crud.delete(delete);
crud.delete(delete, context);
}

/** @deprecated As of release 3.13.0. Will be removed in release 5.0.0. */
Expand All @@ -126,15 +129,15 @@ public void insert(Insert insert) throws CrudException {
insert = copyAndSetTargetToIfNot(insert);
Put put = ConsensusCommitUtils.createPutForInsert(insert);
checkMutation(put);
crud.put(put);
crud.put(put, context);
}

@Override
public void upsert(Upsert upsert) throws CrudException {
upsert = copyAndSetTargetToIfNot(upsert);
Put put = ConsensusCommitUtils.createPutForUpsert(upsert);
checkMutation(put);
crud.put(put);
crud.put(put, context);
}

@Override
Expand All @@ -144,7 +147,7 @@ public void update(Update update) throws CrudException {
Put put = ConsensusCommitUtils.createPutForUpdate(update);
checkMutation(put);
try {
crud.put(put);
crud.put(put, context);
} catch (UnsatisfiedConditionException e) {
if (update.getCondition().isPresent()) {
throw new UnsatisfiedConditionException(
Expand Down Expand Up @@ -179,13 +182,13 @@ public void mutate(List<? extends Mutation> mutations) throws CrudException {

@Override
public void commit() throws CommitException, UnknownTransactionStatusException {
if (!crud.areAllScannersClosed()) {
if (!context.areAllScannersClosed()) {
throw new IllegalStateException(CoreError.CONSENSUS_COMMIT_SCANNER_NOT_CLOSED.buildMessage());
}

// Execute implicit pre-read
try {
crud.readIfImplicitPreReadEnabled();
crud.readIfImplicitPreReadEnabled(context);
} catch (CrudConflictException e) {
throw new CommitConflictException(
CoreError.CONSENSUS_COMMIT_CONFLICT_OCCURRED_WHILE_IMPLICIT_PRE_READ.buildMessage(
Expand All @@ -201,29 +204,34 @@ public void commit() throws CommitException, UnknownTransactionStatusException {
}

try {
crud.waitForRecoveryCompletionIfNecessary();
crud.waitForRecoveryCompletionIfNecessary(context);
} catch (CrudConflictException e) {
throw new CommitConflictException(e.getMessage(), e, getId());
} catch (CrudException e) {
throw new CommitException(e.getMessage(), e, getId());
}

commit.commit(crud.getSnapshot(), crud.isReadOnly());
commit.commit(context);
}

@Override
public void rollback() {
try {
crud.closeScanners();
context.closeScanners();
} catch (CrudException e) {
logger.warn("Failed to close the scanner. Transaction ID: {}", getId(), e);
}

if (groupCommitter != null && !crud.isReadOnly()) {
if (groupCommitter != null && !context.readOnly) {
groupCommitter.remove(getId());
}
}

@VisibleForTesting
TransactionContext getTransactionContext() {
return context;
}

@VisibleForTesting
CrudHandler getCrudHandler() {
return crud;
Expand All @@ -234,6 +242,11 @@ CommitHandler getCommitHandler() {
return commit;
}

@VisibleForTesting
void waitForRecoveryCompletion() throws CrudException {
crud.waitForRecoveryCompletion(context);
}

private void checkMutation(Mutation mutation) throws CrudException {
try {
mutationOperationChecker.check(mutation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ public class ConsensusCommitManager extends AbstractDistributedTransactionManage
private final Coordinator coordinator;
private final ParallelExecutor parallelExecutor;
private final RecoveryExecutor recoveryExecutor;
private final CrudHandler crud;
protected final CommitHandler commit;
private final Isolation isolation;
private final boolean isIncludeMetadataEnabled;
private final ConsensusCommitMutationOperationChecker mutationOperationChecker;
@Nullable private final CoordinatorGroupCommitter groupCommitter;

Expand All @@ -75,9 +75,15 @@ public ConsensusCommitManager(
RecoveryHandler recovery = new RecoveryHandler(storage, coordinator, tableMetadataManager);
recoveryExecutor = new RecoveryExecutor(coordinator, recovery, tableMetadataManager);
groupCommitter = CoordinatorGroupCommitter.from(config).orElse(null);
crud =
new CrudHandler(
storage,
recoveryExecutor,
tableMetadataManager,
config.isIncludeMetadataEnabled(),
parallelExecutor);
commit = createCommitHandler(config);
isolation = config.getIsolation();
isIncludeMetadataEnabled = config.isIncludeMetadataEnabled();
mutationOperationChecker = new ConsensusCommitMutationOperationChecker(tableMetadataManager);
}

Expand All @@ -96,9 +102,15 @@ protected ConsensusCommitManager(DatabaseConfig databaseConfig) {
RecoveryHandler recovery = new RecoveryHandler(storage, coordinator, tableMetadataManager);
recoveryExecutor = new RecoveryExecutor(coordinator, recovery, tableMetadataManager);
groupCommitter = CoordinatorGroupCommitter.from(config).orElse(null);
crud =
new CrudHandler(
storage,
recoveryExecutor,
tableMetadataManager,
config.isIncludeMetadataEnabled(),
parallelExecutor);
commit = createCommitHandler(config);
isolation = config.getIsolation();
isIncludeMetadataEnabled = config.isIncludeMetadataEnabled();
mutationOperationChecker = new ConsensusCommitMutationOperationChecker(tableMetadataManager);
}

Expand All @@ -111,9 +123,9 @@ protected ConsensusCommitManager(DatabaseConfig databaseConfig) {
Coordinator coordinator,
ParallelExecutor parallelExecutor,
RecoveryExecutor recoveryExecutor,
CrudHandler crud,
CommitHandler commit,
Isolation isolation,
boolean isIncludeMetadataEnabled,
@Nullable CoordinatorGroupCommitter groupCommitter) {
super(databaseConfig);
this.storage = storage;
Expand All @@ -124,10 +136,10 @@ protected ConsensusCommitManager(DatabaseConfig databaseConfig) {
this.coordinator = coordinator;
this.parallelExecutor = parallelExecutor;
this.recoveryExecutor = recoveryExecutor;
this.crud = crud;
this.commit = commit;
this.groupCommitter = groupCommitter;
this.isolation = isolation;
this.isIncludeMetadataEnabled = isIncludeMetadataEnabled;
this.mutationOperationChecker =
new ConsensusCommitMutationOperationChecker(tableMetadataManager);
}
Expand Down Expand Up @@ -244,18 +256,10 @@ DistributedTransaction begin(
+ "anomalies");
}
Snapshot snapshot = new Snapshot(txId, isolation, tableMetadataManager, parallelExecutor);
CrudHandler crud =
new CrudHandler(
storage,
snapshot,
recoveryExecutor,
tableMetadataManager,
isIncludeMetadataEnabled,
parallelExecutor,
readOnly,
oneOperation);
TransactionContext context =
new TransactionContext(txId, snapshot, isolation, readOnly, oneOperation);
DistributedTransaction transaction =
new ConsensusCommit(crud, commit, mutationOperationChecker, groupCommitter);
new ConsensusCommit(context, crud, commit, mutationOperationChecker, groupCommitter);
if (readOnly) {
transaction = new ReadOnlyDistributedTransaction(transaction);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.scalar.db.transaction.consensuscommit;

import com.scalar.db.api.TransactionCrudOperable;

public interface ConsensusCommitScanner extends TransactionCrudOperable.Scanner {
boolean isClosed();
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import java.util.Optional;

public class CoordinatorGroupCommitter
extends GroupCommitter<String, String, String, String, String, Snapshot> {
extends GroupCommitter<String, String, String, String, String, TransactionContext> {
CoordinatorGroupCommitter(GroupCommitConfig config) {
super("coordinator", config, new CoordinatorGroupCommitKeyManipulator());
}
Expand Down
Loading