Skip to content

Commit fb106cc

Browse files
authored
Move size checks to the parse phase (#767)
1 parent f2395c8 commit fb106cc

File tree

8 files changed

+57
-78
lines changed

8 files changed

+57
-78
lines changed

ydb/core/tx/datashard/check_write_unit.cpp

Lines changed: 0 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -77,42 +77,6 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op,
7777
{
7878
for (const auto& key : writeTx->TxInfo().Keys) {
7979
if (key.IsWrite && DataShard.IsUserTable(key.Key->TableId)) {
80-
ui64 keySize = 0;
81-
for (const auto& cell : key.Key->Range.From) {
82-
keySize += cell.Size();
83-
}
84-
if (keySize > NLimits::MaxWriteKeySize) {
85-
TString err = TStringBuilder()
86-
<< "Operation " << *op << " writes key of " << keySize
87-
<< " bytes which exceeds limit " << NLimits::MaxWriteKeySize
88-
<< " bytes at " << DataShard.TabletID();
89-
90-
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, err, DataShard.TabletID());
91-
op->Abort(EExecutionUnitKind::FinishProposeWrite);
92-
93-
LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err);
94-
95-
return EExecutionStatus::Executed;
96-
}
97-
for (const auto& col : key.Key->Columns) {
98-
if (col.Operation == TKeyDesc::EColumnOperation::Set ||
99-
col.Operation == TKeyDesc::EColumnOperation::InplaceUpdate)
100-
{
101-
if (col.ImmediateUpdateSize > NLimits::MaxWriteValueSize) {
102-
TString err = TStringBuilder()
103-
<< "Transaction write column value of " << col.ImmediateUpdateSize
104-
<< " bytes is larger than the allowed threshold";
105-
106-
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, err, DataShard.TabletID());
107-
op->Abort(EExecutionUnitKind::FinishProposeWrite);
108-
109-
LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err);
110-
111-
return EExecutionStatus::Executed;
112-
}
113-
}
114-
}
115-
11680
if (DataShard.IsSubDomainOutOfSpace()) {
11781
switch (key.Key->RowOperation) {
11882
case TKeyDesc::ERowOperation::Read:

ydb/core/tx/datashard/datashard__write.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,16 +72,17 @@ bool TDataShard::TTxWrite::Execute(TTransactionContext& txc, const TActorContext
7272
}
7373

7474
TOperation::TPtr op = Self->Pipeline.BuildOperation(Ev, ReceivedAt, TieBreakerIndex, txc, ctx, ProposeTransactionSpan.GetTraceId());
75+
TWriteOperation* writeOp = TWriteOperation::CastWriteOperation(op);
7576

7677
// Unsuccessful operation parse.
7778
if (op->IsAborted()) {
7879
LWTRACK(ProposeTransactionParsed, op->Orbit, false);
79-
Y_ABORT_UNLESS(op->Result());
80+
Y_ABORT_UNLESS(writeOp->GetWriteResult());
8081

8182
if (ProposeTransactionSpan) {
8283
ProposeTransactionSpan.EndError("TTxWrite:: unsuccessful operation parse");
8384
}
84-
ctx.Send(op->GetTarget(), op->Result().Release());
85+
ctx.Send(op->GetTarget(), writeOp->ReleaseWriteResult().release());
8586
return true;
8687
}
8788
LWTRACK(ProposeTransactionParsed, op->Orbit, true);

ydb/core/tx/datashard/datashard_pipeline.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1571,12 +1571,12 @@ TOperation::TPtr TPipeline::BuildOperation(NEvents::TDataEvents::TEvWrite::TPtr&
15711571
writeOp->OperationSpan = NWilson::TSpan(TWilsonTablet::Tablet, std::move(traceId), "WriteOperation", NWilson::EFlags::AUTO_END);
15721572

15731573
auto badRequest = [&](const TString& error) {
1574-
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, error, Self->TabletID());
1574+
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, TStringBuilder() << error << "at tablet# " << Self->TabletID(), Self->TabletID());
15751575
LOG_ERROR_S(TActivationContext::AsActorContext(), NKikimrServices::TX_DATASHARD, error);
15761576
};
15771577

15781578
if (!writeTx->Ready()) {
1579-
badRequest(TStringBuilder() << "Shard " << Self->TabletID() << " cannot parse tx " << writeOp->GetTxId() << ": " << writeOp->GetWriteTx()->GetError());
1579+
badRequest(TStringBuilder() << "Cannot parse tx " << writeOp->GetTxId() << ". " << writeOp->GetWriteTx()->GetErrCode() << ": " << writeOp->GetWriteTx()->GetErrStr());
15801580
return writeOp;
15811581
}
15821582

@@ -1599,7 +1599,7 @@ TOperation::TPtr TPipeline::BuildOperation(NEvents::TDataEvents::TEvWrite::TPtr&
15991599
// Make config checks for immediate op.
16001600
if (writeOp->IsImmediate()) {
16011601
if (Config.NoImmediate() || (Config.ForceOnlineRW())) {
1602-
LOG_INFO_S(TActivationContext::AsActorContext(), NKikimrServices::TX_DATASHARD, "Shard " << Self->TabletID() << " force immediate writeOp " << writeOp->GetTxId() << " to online according to config");
1602+
LOG_INFO_S(TActivationContext::AsActorContext(), NKikimrServices::TX_DATASHARD, "Force immediate writeOp " << writeOp->GetTxId() << " to online according to config, at tablet #" << Self->TabletID());
16031603
writeOp->SetForceOnlineFlag();
16041604
} else {
16051605
if (Config.DirtyImmediate())

ydb/core/tx/datashard/datashard_ut_write.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
8080
const auto& record = Write(runtime, sender, shards[0], std::move(evWrite), NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST);
8181

8282
UNIT_ASSERT_VALUES_EQUAL(record.GetIssues().size(), 1);
83-
UNIT_ASSERT(record.GetIssues(0).message().Contains("Operation [0:100] writes key of 1049601 bytes which exceeds limit 1049600 bytes"));
83+
UNIT_ASSERT(record.GetIssues(0).message().Contains("Row key size of 1049601 bytes is larger than the allowed threshold 1049600"));
8484
}
8585

8686
Y_UNIT_TEST(WriteOnShard) {

ydb/core/tx/datashard/datashard_write_operation.cpp

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,36 @@ bool TValidatedWriteTx::ParseRecord(const TDataShard::TTableInfos& tableInfos) {
129129
}
130130
}
131131

132+
for (ui32 rowIdx = 0; rowIdx < Matrix.GetRowCount(); ++rowIdx)
133+
{
134+
ui64 keyBytes = 0;
135+
for (ui16 keyColIdx = 0; keyColIdx < TableInfo->KeyColumnIds.size(); ++keyColIdx) {
136+
const auto& cellType = TableInfo->KeyColumnTypes[keyColIdx];
137+
const TCell& cell = Matrix.GetCell(rowIdx, keyColIdx);
138+
if (cellType.GetTypeId() == NScheme::NTypeIds::Uint8 && !cell.IsNull() && cell.AsValue<ui8>() > 127) {
139+
ErrCode = NKikimrTxDataShard::TError::BAD_ARGUMENT;
140+
ErrStr = TStringBuilder() << "Keys with Uint8 column values >127 are currently prohibited";
141+
return false;
142+
}
143+
keyBytes += cell.IsNull() ? 1 : cell.Size();
144+
}
145+
146+
if (keyBytes > NLimits::MaxWriteKeySize) {
147+
ErrCode = NKikimrTxDataShard::TError::BAD_ARGUMENT;
148+
ErrStr = TStringBuilder() << "Row key size of " << keyBytes << " bytes is larger than the allowed threshold " << NLimits::MaxWriteKeySize;
149+
return false;
150+
}
151+
152+
for (ui16 valueColIdx = TableInfo->KeyColumnIds.size(); valueColIdx < Matrix.GetColCount(); ++valueColIdx) {
153+
const TCell& cell = Matrix.GetCell(rowIdx, valueColIdx);
154+
if (cell.Size() > NLimits::MaxWriteValueSize) {
155+
ErrCode = NKikimrTxDataShard::TError::BAD_ARGUMENT;
156+
ErrStr = TStringBuilder() << "Row cell size of " << cell.Size() << " bytes is larger than the allowed threshold " << NLimits::MaxWriteValueSize;
157+
return false;
158+
}
159+
}
160+
}
161+
132162
TableId = TTableId(tableIdRecord.ownerid(), tableIdRecord.GetTableId(), tableIdRecord.GetSchemaVersion());
133163
return true;
134164
}
@@ -213,6 +243,13 @@ void TValidatedWriteTx::ComputeTxSize() {
213243
TxSize = sizeof(TValidatedWriteTx);
214244
}
215245

246+
TWriteOperation* TWriteOperation::CastWriteOperation(TOperation::TPtr op)
247+
{
248+
TWriteOperation* writeOp = dynamic_cast<TWriteOperation*>(op.Get());
249+
Y_ABORT_UNLESS(writeOp);
250+
return writeOp;
251+
}
252+
216253
TWriteOperation::TWriteOperation(const TBasicOpInfo& op, NEvents::TDataEvents::TEvWrite::TPtr ev, TDataShard* self, TTransactionContext& txc, const TActorContext& ctx)
217254
: TOperation(op)
218255
, Ev(ev)
@@ -228,9 +265,6 @@ TWriteOperation::TWriteOperation(const TBasicOpInfo& op, NEvents::TDataEvents::T
228265

229266
BuildWriteTx(self, txc, ctx);
230267

231-
// First time parsing, so we can fail
232-
Y_DEBUG_ABORT_UNLESS(WriteTx->Ready());
233-
234268
TrackMemory();
235269
}
236270

ydb/core/tx/datashard/datashard_write_operation.h

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,6 @@ class TValidatedWriteTx: TNonCopyable {
2626
return 100;
2727
}
2828

29-
NKikimrTxDataShard::TError::EKind Code() const {
30-
return ErrCode;
31-
}
32-
const TString GetError() const {
33-
return ErrStr;
34-
}
35-
3629
const NEvents::TDataEvents::TEvWrite::TPtr& GetEv() const {
3730
return Ev;
3831
}
@@ -43,6 +36,7 @@ class TValidatedWriteTx: TNonCopyable {
4336

4437
const NKikimrDataEvents::TEvWrite::TOperation& RecordOperation() const {
4538
Y_ABORT_UNLESS(GetRecord().operations().size() == 1, "Only one operation is supported now");
39+
Y_ABORT_UNLESS(GetRecord().operations(0).GetType() == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, "Only UPSERT operation is supported now");
4640
return GetRecord().operations(0);
4741
}
4842

@@ -199,6 +193,8 @@ class TValidatedWriteTx: TNonCopyable {
199193
class TWriteOperation : public TOperation {
200194
friend class TWriteUnit;
201195
public:
196+
static TWriteOperation* CastWriteOperation(TOperation::TPtr op);
197+
202198
explicit TWriteOperation(const TBasicOpInfo& op, NEvents::TDataEvents::TEvWrite::TPtr ev, TDataShard* self, TTransactionContext& txc, const TActorContext& ctx);
203199

204200
~TWriteOperation();
@@ -316,6 +312,9 @@ class TWriteOperation : public TOperation {
316312
const TValidatedWriteTx::TPtr& GetWriteTx() const {
317313
return WriteTx;
318314
}
315+
TValidatedWriteTx::TPtr& GetWriteTx() {
316+
return WriteTx;
317+
}
319318
TValidatedWriteTx::TPtr BuildWriteTx(TDataShard* self, TTransactionContext& txc, const TActorContext& ctx);
320319

321320
void ClearWriteTx() {

ydb/core/tx/datashard/finish_propose_write_unit.cpp

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ class TFinishProposeWriteUnit : public TExecutionUnit {
2525
void AddDiagnosticsResult(NEvents::TDataEvents::TEvWriteResult& res);
2626
void UpdateCounters(const TWriteOperation* writeOp, const TActorContext& ctx);
2727

28-
static TWriteOperation* CastWriteOperation(TOperation::TPtr op);
28+
2929
};
3030

3131
TFinishProposeWriteUnit::TFinishProposeWriteUnit(TDataShard &dataShard,
@@ -38,13 +38,6 @@ TFinishProposeWriteUnit::~TFinishProposeWriteUnit()
3838
{
3939
}
4040

41-
TWriteOperation* TFinishProposeWriteUnit::CastWriteOperation(TOperation::TPtr op)
42-
{
43-
TWriteOperation* writeOp = dynamic_cast<TWriteOperation*>(op.Get());
44-
Y_ABORT_UNLESS(writeOp);
45-
return writeOp;
46-
}
47-
4841
bool TFinishProposeWriteUnit::IsReadyToExecute(TOperation::TPtr) const
4942
{
5043
return true;
@@ -75,7 +68,7 @@ EExecutionStatus TFinishProposeWriteUnit::Execute(TOperation::TPtr op,
7568
TTransactionContext &txc,
7669
const TActorContext &ctx)
7770
{
78-
TWriteOperation* writeOp = CastWriteOperation(op);
71+
TWriteOperation* writeOp = TWriteOperation::CastWriteOperation(op);
7972
if (writeOp->GetWriteResult())
8073
UpdateCounters(writeOp, ctx);
8174

@@ -132,7 +125,7 @@ EExecutionStatus TFinishProposeWriteUnit::Execute(TOperation::TPtr op,
132125

133126
void TFinishProposeWriteUnit::Complete(TOperation::TPtr op, const TActorContext &ctx)
134127
{
135-
TWriteOperation* writeOp = CastWriteOperation(op);
128+
TWriteOperation* writeOp = TWriteOperation::CastWriteOperation(op);
136129

137130
if (!op->HasResultSentFlag()) {
138131
DataShard.IncCounter(COUNTER_PREPARE_COMPLETE);
@@ -155,7 +148,7 @@ void TFinishProposeWriteUnit::Complete(TOperation::TPtr op, const TActorContext
155148

156149
void TFinishProposeWriteUnit::CompleteRequest(TOperation::TPtr op, const TActorContext &ctx)
157150
{
158-
TWriteOperation* writeOp = CastWriteOperation(op);
151+
TWriteOperation* writeOp = TWriteOperation::CastWriteOperation(op);
159152
auto res = writeOp->ReleaseWriteResult();
160153

161154
TDuration duration = TAppData::TimeProvider->Now() - op->GetReceivedAt();

ydb/core/tx/datashard/write_unit.cpp

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ class TWriteUnit : public TExecutionUnit {
5353

5454
const ui32 writeTableId = localTableId;
5555
auto [readVersion, writeVersion] = self->GetReadWriteVersions(writeOp);
56+
writeTx->SetReadVersion(readVersion);
57+
writeTx->SetWriteVersion(writeVersion);
5658

5759
TDataShardUserDb userDb(*self, txc.DB, readVersion);
5860
TDataShardChangeGroupProvider groupProvider(*self, txc.DB);
@@ -68,34 +70,20 @@ class TWriteUnit : public TExecutionUnit {
6870
{
6971
key.clear();
7072
keyCells.clear();
73+
keyCells.reserve(TableInfo_.KeyColumnIds.size());
7174
ui64 keyBytes = 0;
7275
for (ui16 keyColIdx = 0; keyColIdx < TableInfo_.KeyColumnIds.size(); ++keyColIdx) {
7376
const auto& cellType = TableInfo_.KeyColumnTypes[keyColIdx];
7477
const TCell& cell = matrix.GetCell(rowIdx, keyColIdx);
75-
if (cellType.GetTypeId() == NScheme::NTypeIds::Uint8 && !cell.IsNull() && cell.AsValue<ui8>() > 127) {
76-
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "Keys with Uint8 column values >127 are currently prohibited", self->TabletID());
77-
return;
78-
}
79-
8078
keyBytes += cell.Size();
8179
key.emplace_back(TRawTypeValue(cell.AsRef(), cellType));
8280
keyCells.emplace_back(cell);
8381
}
8482

85-
if (keyBytes > NLimits::MaxWriteKeySize) {
86-
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, TStringBuilder() << "Row key size of " << keyBytes << " bytes is larger than the allowed threshold " << NLimits::MaxWriteKeySize, self->TabletID());
87-
return;
88-
}
89-
9083
value.clear();
9184
for (ui16 valueColIdx = TableInfo_.KeyColumnIds.size(); valueColIdx < matrix.GetColCount(); ++valueColIdx) {
9285
ui32 columnTag = writeTx->RecordOperation().GetColumnIds(valueColIdx);
9386
const TCell& cell = matrix.GetCell(rowIdx, valueColIdx);
94-
if (cell.Size() > NLimits::MaxWriteValueSize) {
95-
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, TStringBuilder() << "Row cell size of " << cell.Size() << " bytes is larger than the allowed threshold " << NLimits::MaxWriteValueSize, self->TabletID());
96-
return;
97-
}
98-
9987
auto* col = TableInfo_.Columns.FindPtr(valueColIdx + 1);
10088
Y_ABORT_UNLESS(col);
10189

0 commit comments

Comments
 (0)