Skip to content

immediate writing with no tx #8697

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@ ydb/core/kqp/ut/olap KqpOlapStatistics.StatsUsageWithTTL
ydb/core/kqp/ut/pg KqpPg.CreateIndex
ydb/core/kqp/ut/query KqpLimits.QueryReplySize
ydb/core/kqp/ut/query KqpQuery.QueryTimeout
ydb/core/kqp/ut/query KqpQuery.OlapCreateAsSelect_Complex
ydb/core/kqp/ut/query KqpQuery.OlapCreateAsSelect_Simple
ydb/core/kqp/ut/federated_query/s3 KqpFederatedQuery.CreateTableAsSelectFromExternalDataSource
ydb/core/kqp/ut/federated_query/s3 KqpFederatedQuery.CreateTableAsSelectFromExternalTable
ydb/core/kqp/ut/scan KqpRequestContext.TraceIdInErrorMessage
ydb/core/kqp/ut/scheme [*/*]*
ydb/core/kqp/ut/scheme KqpOlapScheme.DropThenAddColumn
Expand Down
15 changes: 12 additions & 3 deletions ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,13 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
if (!writeMeta.HasLongTxId()) {
auto operation = Self->OperationsManager->GetOperationVerified((TWriteId)writeMeta.GetWriteId());
Y_ABORT_UNLESS(operation->GetStatus() == EOperationStatus::Started);
operation->OnWriteFinish(txc, aggr->GetWriteIds());
if (operation->GetBehaviour() == EOperationBehaviour::InTxWrite) {
operation->OnWriteFinish(txc, aggr->GetWriteIds(), operation->GetBehaviour() == EOperationBehaviour::NoTxWrite);
if (operation->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
auto ev = NEvents::TDataEvents::TEvWriteResult::BuildCompleted(Self->TabletID());
Results.emplace_back(std::move(ev), writeMeta.GetSource(), operation->GetCookie());
Self->OperationsManager->AddTemporaryTxLink(operation->GetLockId());
Self->OperationsManager->CommitTransactionOnExecute(*Self, operation->GetLockId(), txc, Self->GetLastTxSnapshot());
} else if (operation->GetBehaviour() == EOperationBehaviour::InTxWrite) {
NKikimrTxColumnShard::TCommitWriteTxBody proto;
proto.SetLockId(operation->GetLockId());
TString txBody;
Expand Down Expand Up @@ -145,11 +150,15 @@ void TTxWrite::Complete(const TActorContext& ctx) {
const auto& writeMeta = buffer.GetAggregations()[i]->GetWriteMeta();
if (!writeMeta.HasLongTxId()) {
auto op = Self->GetOperationsManager().GetOperationVerified(NOlap::TWriteId(writeMeta.GetWriteId()));
if (op->GetBehaviour() == EOperationBehaviour::WriteWithLock) {
if (op->GetBehaviour() == EOperationBehaviour::WriteWithLock || op->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
auto evWrite = std::make_shared<NOlap::NTxInteractions::TEvWriteWriter>(writeMeta.GetTableId(),
buffer.GetAggregations()[i]->GetRecordBatch(), Self->GetIndexOptional()->GetVersionedIndex().GetPrimaryKey());
Self->GetOperationsManager().AddEventForLock(*Self, op->GetLockId(), evWrite);
}
if (op->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
Self->OperationsManager->CommitTransactionOnComplete(*Self, op->GetLockId(), Self->GetLastTxSnapshot());
}

}
Self->Counters.GetCSCounters().OnWriteTxComplete(now - writeMeta.GetWriteStartInstant());
Self->Counters.GetCSCounters().OnSuccessWriteResponse();
Expand Down
31 changes: 16 additions & 15 deletions ydb/core/tx/columnshard/operations/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ void TOperationsManager::CommitTransactionOnExecute(
opPtr->CommitOnExecute(owner, txc, snapshot);
commited.emplace_back(opPtr);
}
OnTransactionFinishOnExecute(commited, txId, txc);
OnTransactionFinishOnExecute(commited, lock, txId, txc);
}

void TOperationsManager::CommitTransactionOnComplete(
Expand All @@ -101,7 +101,7 @@ void TOperationsManager::CommitTransactionOnComplete(
opPtr->CommitOnComplete(owner, snapshot);
commited.emplace_back(opPtr);
}
OnTransactionFinishOnComplete(commited, txId);
OnTransactionFinishOnComplete(commited, lock, txId);
}

void TOperationsManager::AbortTransactionOnExecute(TColumnShard& owner, const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc) {
Expand All @@ -118,7 +118,7 @@ void TOperationsManager::AbortTransactionOnExecute(TColumnShard& owner, const ui
aborted.emplace_back(opPtr);
}

OnTransactionFinishOnExecute(aborted, txId, txc);
OnTransactionFinishOnExecute(aborted, *lock, txId, txc);
}

void TOperationsManager::AbortTransactionOnComplete(TColumnShard& owner, const ui64 txId) {
Expand All @@ -135,7 +135,7 @@ void TOperationsManager::AbortTransactionOnComplete(TColumnShard& owner, const u
aborted.emplace_back(opPtr);
}

OnTransactionFinishOnComplete(aborted, txId);
OnTransactionFinishOnComplete(aborted, *lock, txId);
}

TWriteOperation::TPtr TOperationsManager::GetOperation(const TWriteId writeId) const {
Expand All @@ -147,24 +147,20 @@ TWriteOperation::TPtr TOperationsManager::GetOperation(const TWriteId writeId) c
}

void TOperationsManager::OnTransactionFinishOnExecute(
const TVector<TWriteOperation::TPtr>& operations, const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc) {
const ui64 lockId = GetLockForTxVerified(txId);
auto itLock = LockFeatures.find(lockId);
AFL_VERIFY(itLock != LockFeatures.end());
const TVector<TWriteOperation::TPtr>& operations, const TLockFeatures& lock, const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc) {
for (auto&& op : operations) {
RemoveOperationOnExecute(op, txc);
}
NIceDb::TNiceDb db(txc.DB);
db.Table<Schema::OperationTxIds>().Key(txId, lockId).Delete();
db.Table<Schema::OperationTxIds>().Key(txId, lock.GetLockId()).Delete();
}

void TOperationsManager::OnTransactionFinishOnComplete(
const TVector<TWriteOperation::TPtr>& operations, const ui64 txId) {
const ui64 lockId = GetLockForTxVerified(txId);
auto itLock = LockFeatures.find(lockId);
AFL_VERIFY(itLock != LockFeatures.end());
itLock->second.RemoveInteractions(InteractionsContext);
LockFeatures.erase(lockId);
const TVector<TWriteOperation::TPtr>& operations, const TLockFeatures& lock, const ui64 txId) {
{
lock.RemoveInteractions(InteractionsContext);
LockFeatures.erase(lock.GetLockId());
}
Tx2Lock.erase(txId);
for (auto&& op : operations) {
RemoveOperationOnComplete(op);
Expand Down Expand Up @@ -233,6 +229,11 @@ EOperationBehaviour TOperationsManager::GetBehaviour(const NEvents::TDataEvents:
return EOperationBehaviour::Undefined;
}

if (!evWrite.Record.HasLockTxId() && !evWrite.Record.HasLockNodeId() &&
evWrite.Record.GetTxMode() == NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE) {
return EOperationBehaviour::NoTxWrite;
}

if (evWrite.Record.HasTxId() && evWrite.Record.GetTxMode() == NKikimrDataEvents::TEvWrite::MODE_PREPARE) {
return EOperationBehaviour::InTxWrite;
}
Expand Down
9 changes: 7 additions & 2 deletions ydb/core/tx/columnshard/operations/manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ class TOperationsManager {
TWriteId LastWriteId = TWriteId(0);

public:

bool Load(NTabletFlatExecutor::TTransactionContext& txc);
void AddEventForTx(TColumnShard& owner, const ui64 txId, const std::shared_ptr<NOlap::NTxInteractions::ITxEventWriter>& writer);
void AddEventForLock(TColumnShard& owner, const ui64 lockId, const std::shared_ptr<NOlap::NTxInteractions::ITxEventWriter>& writer);
Expand All @@ -139,6 +140,9 @@ class TOperationsManager {
TColumnShard& owner, const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc, const NOlap::TSnapshot& snapshot);
void CommitTransactionOnComplete(
TColumnShard& owner, const ui64 txId, const NOlap::TSnapshot& snapshot);
void AddTemporaryTxLink(const ui64 lockId) {
AFL_VERIFY(Tx2Lock.emplace(lockId, lockId).second);
}
void LinkTransactionOnExecute(const ui64 lockId, const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc);
void LinkTransactionOnComplete(const ui64 lockId, const ui64 txId);
void AbortTransactionOnExecute(TColumnShard& owner, const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc);
Expand Down Expand Up @@ -198,7 +202,8 @@ class TOperationsManager {
TWriteId BuildNextWriteId();
void RemoveOperationOnExecute(const TWriteOperation::TPtr& op, NTabletFlatExecutor::TTransactionContext& txc);
void RemoveOperationOnComplete(const TWriteOperation::TPtr& op);
void OnTransactionFinishOnExecute(const TVector<TWriteOperation::TPtr>& operations, const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc);
void OnTransactionFinishOnComplete(const TVector<TWriteOperation::TPtr>& operations, const ui64 txId);
void OnTransactionFinishOnExecute(const TVector<TWriteOperation::TPtr>& operations, const TLockFeatures& lock, const ui64 txId,
NTabletFlatExecutor::TTransactionContext& txc);
void OnTransactionFinishOnComplete(const TVector<TWriteOperation::TPtr>& operations, const TLockFeatures& lock, const ui64 txId);
};
} // namespace NKikimr::NColumnShard
6 changes: 5 additions & 1 deletion ydb/core/tx/columnshard/operations/write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,15 @@ void TWriteOperation::CommitOnComplete(TColumnShard& owner, const NOlap::TSnapsh
owner.UpdateInsertTableCounters();
}

void TWriteOperation::OnWriteFinish(NTabletFlatExecutor::TTransactionContext& txc, const TVector<TWriteId>& globalWriteIds) {
void TWriteOperation::OnWriteFinish(NTabletFlatExecutor::TTransactionContext& txc, const TVector<TWriteId>& globalWriteIds, const bool ephemeralFlag) {
Y_ABORT_UNLESS(Status == EOperationStatus::Started);
Status = EOperationStatus::Prepared;
GlobalWriteIds = globalWriteIds;

if (ephemeralFlag) {
return;
}

NIceDb::TNiceDb db(txc.DB);
NKikimrTxColumnShard::TInternalOperationData proto;
ToProto(proto);
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/tx/columnshard/operations/write.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ enum class EOperationBehaviour : ui32 {
InTxWrite = 2,
WriteWithLock = 3,
CommitWriteLock = 4,
AbortWriteLock = 5
AbortWriteLock = 5,
NoTxWrite = 6
};

class TWriteOperation {
Expand All @@ -61,7 +62,7 @@ class TWriteOperation {

void Start(TColumnShard& owner, const ui64 tableId, const NEvWrite::IDataContainer::TPtr& data, const NActors::TActorId& source,
const std::shared_ptr<NOlap::ISnapshotSchema>& schema, const TActorContext& ctx);
void OnWriteFinish(NTabletFlatExecutor::TTransactionContext& txc, const TVector<TWriteId>& globalWriteIds);
void OnWriteFinish(NTabletFlatExecutor::TTransactionContext& txc, const TVector<TWriteId>& globalWriteIds, const bool ephemeralFlag);
void CommitOnExecute(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc, const NOlap::TSnapshot& snapshot) const;
void CommitOnComplete(TColumnShard& owner, const NOlap::TSnapshot& snapshot) const;
void AbortOnExecute(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) const;
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/tx/data_events/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,13 @@ struct TDataEvents {
return result;
}

static std::unique_ptr<TEvWriteResult> BuildCompleted(const ui64 origin) {
auto result = std::make_unique<TEvWriteResult>();
result->Record.SetOrigin(origin);
result->Record.SetStatus(NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED);
return result;
}

static std::unique_ptr<TEvWriteResult> BuildCompleted(const ui64 origin, const ui64 txId) {
auto result = std::make_unique<TEvWriteResult>();
result->Record.SetOrigin(origin);
Expand Down
Loading