Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@
import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
import org.apache.ignite.internal.tx.impl.VolatileTxStateMetaStorage;
import org.apache.ignite.internal.tx.message.TxMessageGroup;
import org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage;
import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
Expand Down Expand Up @@ -1260,7 +1261,8 @@ private class Node {
clusterIdService
);

lockManager = new HeapLockManager(systemConfiguration);
VolatileTxStateMetaStorage txStateVolatileStorage = new VolatileTxStateMetaStorage();
lockManager = new HeapLockManager(systemConfiguration, txStateVolatileStorage);

MetricManager metricManager = new NoOpMetricManager();

Expand Down Expand Up @@ -1452,6 +1454,7 @@ private class Node {
clusterService,
replicaSvc,
lockManager,
txStateVolatileStorage,
clockService,
new TransactionIdGenerator(addr.port()),
placementDriver,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@
import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
import org.apache.ignite.internal.tx.impl.VolatileTxStateMetaStorage;
import org.apache.ignite.internal.tx.message.TxMessageGroup;
import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
import org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage;
Expand Down Expand Up @@ -616,6 +617,7 @@ public CompletableFuture<Boolean> invoke(Condition condition, Operation success,
clusterService,
replicaSvc,
lockManager,
new VolatileTxStateMetaStorage(),
clockService,
new TransactionIdGenerator(address.port()),
placementDriverManager.placementDriver(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public ZonePartitionReplicaListener(
txMessageSender
);

txRecoveryMessageHandler = new TxRecoveryMessageHandler(txStatePartitionStorage, replicationGroupId, txRecoveryEngine);
txRecoveryMessageHandler = new TxRecoveryMessageHandler(txStatePartitionStorage, replicationGroupId, txRecoveryEngine, txManager);

txCleanupRecoveryRequestHandler = new TxCleanupRecoveryRequestHandler(
txStatePartitionStorage,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.ignite.internal.logger.IgniteLogger;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are also log messages mentioned in the ticket but not changed here:

  • Failed to acquire a lock
  • Transaction is already finished (changed only in InternalTableImpl)

This will require changes in HeapLockManager, ReadWriteTransactionImpl, InflightTransactionalOperationTracker, PartitionReplicaListener, maybe somewhere else.

import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.tx.TransactionLogUtils;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.TxMeta;
import org.apache.ignite.internal.tx.message.TxCleanupRecoveryRequest;
Expand Down Expand Up @@ -152,7 +153,7 @@ private CompletableFuture<?> callCleanup(TxMeta txMeta, UUID txId) {
txMeta.commitTimestamp(),
txId
).exceptionally(throwable -> {
LOG.warn("Failed to cleanup transaction [txId={}].", throwable, txId);
LOG.warn("Failed to cleanup transaction [{}].", throwable, TransactionLogUtils.formatTxInfo(txId, txManager));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's better to use static method imports


return null;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.ignite.internal.tx.IncompatibleSchemaAbortException;
import org.apache.ignite.internal.tx.MismatchingTransactionOutcomeInternalException;
import org.apache.ignite.internal.tx.PartitionEnlistment;
import org.apache.ignite.internal.tx.TransactionLogUtils;
import org.apache.ignite.internal.tx.TransactionResult;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.TxMeta;
Expand Down Expand Up @@ -203,14 +204,16 @@ private CompletableFuture<TransactionResult> finishAndCleanup(

// Let the client know a transaction has finished with a different outcome.
if (commit != (txMeta.txState() == COMMITTED)) {
LOG.error("Failed to finish a transaction that is already finished [txId={}, expectedState={}, actualState={}].",
txId,
LOG.error("Failed to finish a transaction that is already finished [{}, expectedState={}, actualState={}].",
TransactionLogUtils.formatTxInfo(txId, txManager),
commit ? COMMITTED : ABORTED,
txMeta.txState()
);

throw new MismatchingTransactionOutcomeInternalException(
"Failed to change the outcome of a finished transaction [txId=" + txId + ", txState=" + txMeta.txState() + "].",
format("Failed to change the outcome of a finished transaction [{}, txState={}].",
TransactionLogUtils.formatTxInfo(txId, txManager),
txMeta.txState()),
new TransactionResult(txMeta.txState(), txMeta.commitTimestamp())
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.partition.replicator.TxRecoveryEngine;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.tx.TransactionLogUtils;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.TxMeta;
import org.apache.ignite.internal.tx.message.TxRecoveryMessage;
import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
Expand All @@ -38,16 +40,19 @@ public class TxRecoveryMessageHandler {
private final TxStatePartitionStorage txStatePartitionStorage;
private final ZonePartitionId replicationGroupId;
private final TxRecoveryEngine txRecoveryEngine;
private final TxManager txManager;

/** Constructor. */
public TxRecoveryMessageHandler(
TxStatePartitionStorage txStatePartitionStorage,
ZonePartitionId replicationGroupId,
TxRecoveryEngine txRecoveryEngine
TxRecoveryEngine txRecoveryEngine,
TxManager txManager
) {
this.txStatePartitionStorage = txStatePartitionStorage;
this.replicationGroupId = replicationGroupId;
this.txRecoveryEngine = txRecoveryEngine;
this.txManager = txManager;
}

/**
Expand All @@ -67,7 +72,7 @@ public CompletableFuture<Void> handle(TxRecoveryMessage request, UUID senderId)
return txRecoveryEngine.runCleanupOnNode(replicationGroupId, txId, senderId);
}

LOG.info("Orphan transaction has to be aborted [tx={}, meta={}].", txId, txMeta);
LOG.info("Orphan transaction has to be aborted [{}, meta={}].", TransactionLogUtils.formatTxInfo(txId, txManager), txMeta);

return txRecoveryEngine.triggerTxRecovery(txId, senderId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.schema.SchemaSyncService;
import org.apache.ignite.internal.tx.TransactionLogUtils;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.message.TableWriteIntentSwitchReplicaRequest;
Expand Down Expand Up @@ -84,6 +85,7 @@ public class WriteIntentSwitchRequestHandler {
private final ReliableCatalogVersions reliableCatalogVersions;
private final ReplicaTxFinishMarker txFinishMarker;
private final ReplicationRaftCommandApplicator raftCommandApplicator;
private final TxManager txManager;

/** Constructor. */
public WriteIntentSwitchRequestHandler(
Expand All @@ -100,6 +102,7 @@ public WriteIntentSwitchRequestHandler(
this.clockService = clockService;
this.replicationGroupId = replicationGroupId;
this.tableAwareReplicaRequestPreProcessor = tableAwareReplicaRequestPreProcessor;
this.txManager = txManager;

reliableCatalogVersions = new ReliableCatalogVersions(schemaSyncService, catalogService);
txFinishMarker = new ReplicaTxFinishMarker(txManager);
Expand Down Expand Up @@ -202,7 +205,8 @@ private CompletableFuture<Object> applyCommandToGroup(WriteIntentSwitchReplicaRe
.applyCommandWithExceptionHandling(wiSwitchCmd)
.whenComplete((res, ex) -> {
if (ex != null && !ReplicatorRecoverableExceptions.isRecoverable(ex)) {
LOG.warn("Failed to complete transaction cleanup command [txId=" + request.txId() + ']', ex);
LOG.warn("Failed to complete transaction cleanup command [{}]", ex,
TransactionLogUtils.formatTxInfo(request.txId(), txManager));
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.ignite.internal.tx.LockMode;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
import org.apache.ignite.internal.tx.impl.VolatileTxStateMetaStorage;
import org.apache.ignite.internal.tx.impl.WaitDieDeadlockPreventionPolicy;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Measurement;
Expand Down Expand Up @@ -62,7 +63,7 @@ public class LockManagerBenchmark {
*/
@Setup
public void setUp() {
lockManager = new HeapLockManager(DEFAULT_SLOTS);
lockManager = new HeapLockManager(DEFAULT_SLOTS, new VolatileTxStateMetaStorage());
lockManager.start(new WaitDieDeadlockPreventionPolicy());
generator = new TransactionIdGenerator(0);
clock = new TestHybridClock(() -> 0L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@
import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
import org.apache.ignite.internal.tx.impl.VolatileTxStateMetaStorage;
import org.apache.ignite.internal.tx.message.TxMessageGroup;
import org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage;
import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
Expand Down Expand Up @@ -469,7 +470,7 @@ private PartialNode startPartialNode(
message -> threadPoolsManager.partitionOperationsExecutor()
);

var lockManager = new HeapLockManager(systemConfiguration);
var lockManager = new HeapLockManager(systemConfiguration, new VolatileTxStateMetaStorage());

var logicalTopologyService = new LogicalTopologyServiceImpl(logicalTopology, cmgManager);

Expand Down Expand Up @@ -629,6 +630,7 @@ public CompletableFuture<Boolean> invoke(Condition condition, List<Operation> su
clusterSvc.topologyService(),
replicaService,
lockManager,
new VolatileTxStateMetaStorage(),
clockService,
new TransactionIdGenerator(idx),
placementDriverManager.placementDriver(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@
import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
import org.apache.ignite.internal.tx.impl.VolatileTxStateMetaStorage;
import org.apache.ignite.internal.tx.message.TxMessageGroup;
import org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage;
import org.apache.ignite.internal.vault.VaultManager;
Expand Down Expand Up @@ -1038,7 +1039,8 @@ public class IgniteImpl implements Ignite {

var transactionInflights = new TransactionInflights(placementDriverMgr.placementDriver(), clockService);

LockManager lockMgr = new HeapLockManager(systemConfiguration);
VolatileTxStateMetaStorage txStateVolatileStorage = new VolatileTxStateMetaStorage();
LockManager lockMgr = new HeapLockManager(systemConfiguration, txStateVolatileStorage);

// TODO: IGNITE-19344 - use nodeId that is validated on join (and probably generated differently).
txManager = new TxManagerImpl(
Expand All @@ -1049,6 +1051,7 @@ public class IgniteImpl implements Ignite {
clusterSvc.topologyService(),
replicaSvc,
lockMgr,
txStateVolatileStorage,
clockService,
new TransactionIdGenerator(() -> clusterSvc.nodeName().hashCode()),
placementDriverMgr.placementDriver(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
package org.apache.ignite.internal.sql.engine;

import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static org.apache.ignite.internal.tx.TransactionLogUtils.formatTxInfo;
import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHED_ERR;

import org.apache.ignite.internal.sql.engine.exec.TransactionalOperationTracker;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.tx.TransactionException;

Expand All @@ -30,9 +32,11 @@
*/
class InflightTransactionalOperationTracker implements TransactionalOperationTracker {
private final TransactionInflights delegate;
private final TxManager txManager;

InflightTransactionalOperationTracker(TransactionInflights delegate) {
InflightTransactionalOperationTracker(TransactionInflights delegate, TxManager txManager) {
this.delegate = delegate;
this.txManager = txManager;
}

@Override
Expand All @@ -41,7 +45,8 @@ public void registerOperationStart(InternalTransaction tx) {
boolean result = tx.isReadOnly() ? delegate.addScanInflight(tx.id()) : delegate.track(tx.id());

if (!result) {
throw new TransactionException(TX_ALREADY_FINISHED_ERR, format("Transaction is already finished [tx={}]", tx));
throw new TransactionException(TX_ALREADY_FINISHED_ERR, format("Transaction is already finished [tx={}, {}]",
tx, formatTxInfo(tx.id(), txManager)));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ public SqlQueryProcessor(
this.placementDriver = placementDriver;
this.clusterCfg = clusterCfg;
this.nodeCfg = nodeCfg;
this.txTracker = new InflightTransactionalOperationTracker(transactionInflights);
this.txTracker = new InflightTransactionalOperationTracker(transactionInflights, txManager);
this.txManager = txManager;
this.commonScheduler = commonScheduler;
this.killCommandHandler = killCommandHandler;
Expand Down
Loading