22
33#include < ydb/core/engine/mkql_proto.h>
44#include < ydb/core/engine/minikql/flat_local_tx_factory.h>
5+ #include < ydb/core/tx/data_events/events.h>
6+ #include < ydb/core/tx/data_events/payload_helper.h>
57#include < ydb/core/tx/schemeshard/schemeshard.h>
68#include < ydb/core/tx/tx_proxy/proxy.h>
79#include < ydb/core/persqueue/events/global.h>
@@ -2256,31 +2258,31 @@ namespace NSchemeShardUT_Private {
22562258 NKikimr::NPQ::CmdWrite (&runtime, tabletId, edge, partitionId, " sourceid0" , msgSeqNo, data, false , {}, true , cookie, 0 );
22572259 }
22582260
2259- void WriteRow (TTestActorRuntime& runtime, const TString& key, const TString& value, ui64 tabletId) {
2261+ void UpdateRow (TTestActorRuntime& runtime, const TString& table, const ui32 key, const TString& value, ui64 tabletId) {
22602262 NKikimrMiniKQL::TResult result;
22612263 TString error;
22622264 NKikimrProto::EReplyStatus status = LocalMiniKQL (runtime, tabletId, Sprintf (R"(
22632265 (
2264- (let key '( '('key (Utf8 '%s ) ) ) )
2266+ (let key '( '('key (Uint32 '%d ) ) ) )
22652267 (let row '( '('value (Utf8 '%s) ) ) )
2266- (return (AsList (UpdateRow '__user__Table key row) ))
2268+ (return (AsList (UpdateRow '__user__%s key row) ))
22672269 )
2268- )" , key.c_str (), value .c_str ()), result, error);
2270+ )" , key, value .c_str (), table .c_str ()), result, error);
22692271
22702272 UNIT_ASSERT_VALUES_EQUAL_C (status, NKikimrProto::EReplyStatus::OK, error);
22712273 UNIT_ASSERT_VALUES_EQUAL (error, " " );
22722274 }
22732275
2274- void WriteRowPg (TTestActorRuntime& runtime, const TString& key, ui32 value, ui64 tabletId) {
2276+ void UpdateRowPg (TTestActorRuntime& runtime, const TString& table, const ui32 key, ui32 value, ui64 tabletId) {
22752277 NKikimrMiniKQL::TResult result;
22762278 TString error;
22772279 NKikimrProto::EReplyStatus status = LocalMiniKQL (runtime, tabletId, Sprintf (R"(
22782280 (
2279- (let key '( '('key (Utf8 '%s ) ) ) )
2281+ (let key '( '('key (Utf8 '%d ) ) ) )
22802282 (let row '( '('value (PgConst '%u (PgType 'int4)) ) ) )
2281- (return (AsList (UpdateRow '__user__Table key row) ))
2283+ (return (AsList (UpdateRow '__user__%s key row) ))
22822284 )
2283- )" , key.c_str (), value ), result, error);
2285+ )" , key, value, table .c_str ()), result, error);
22842286
22852287 UNIT_ASSERT_VALUES_EQUAL_C (status, NKikimrProto::EReplyStatus::OK, error);
22862288 UNIT_ASSERT_VALUES_EQUAL (error, " " );
@@ -2291,6 +2293,7 @@ namespace NSchemeShardUT_Private {
22912293 auto tableDesc = DescribePath (runtime, tablePath, true , true );
22922294 const auto & tablePartitions = tableDesc.GetPathDescription ().GetTablePartitions ();
22932295 UNIT_ASSERT (partitionIdx < tablePartitions.size ());
2296+ const ui64 datashardTabletId = tablePartitions[partitionIdx].GetDatashardId ();
22942297
22952298 auto ev = MakeHolder<TEvDataShard::TEvUploadRowsRequest>();
22962299 ev->Record .SetTableId (tableDesc.GetPathId ());
@@ -2314,7 +2317,34 @@ namespace NSchemeShardUT_Private {
23142317 }
23152318
23162319 const auto & sender = runtime.AllocateEdgeActor ();
2317- ForwardToTablet (runtime, tablePartitions[partitionIdx]. GetDatashardId () , sender, ev.Release ());
2320+ ForwardToTablet (runtime, datashardTabletId , sender, ev.Release ());
23182321 runtime.GrabEdgeEvent <TEvDataShard::TEvUploadRowsResponse>(sender);
23192322 }
2323+
2324+ void WriteRow (TTestActorRuntime& runtime, const ui64 txId, const TString& tablePath, int partitionIdx, const ui32 key, const TString& value, bool successIsExpected) {
2325+ auto tableDesc = DescribePath (runtime, tablePath, true , true );
2326+ const auto & tablePartitions = tableDesc.GetPathDescription ().GetTablePartitions ();
2327+ UNIT_ASSERT (partitionIdx < tablePartitions.size ());
2328+ const ui64 tableId = tableDesc.GetPathId ();
2329+ const ui64 datashardTabletId = tablePartitions[partitionIdx].GetDatashardId ();
2330+
2331+ const auto & sender = runtime.AllocateEdgeActor ();
2332+
2333+ std::vector<ui32> columnIds{1 , 2 };
2334+
2335+ TVector<TCell> cells{TCell ((const char *)&key, sizeof (ui32)), TCell (value.c_str (), value.size ())};
2336+
2337+ TSerializedCellMatrix matrix (cells, 1 , 2 );
2338+
2339+ auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
2340+ ui64 payloadIndex = NKikimr::NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload (std::move (matrix.ReleaseBuffer ()));
2341+ evWrite->AddOperation (NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, tableId, 1 , columnIds, payloadIndex, NKikimrDataEvents::FORMAT_CELLVEC);
2342+
2343+ ForwardToTablet (runtime, datashardTabletId, sender, evWrite.release ());
2344+
2345+ auto ev = runtime.GrabEdgeEventRethrow <NEvents::TDataEvents::TEvWriteResult>(sender);
2346+ auto status = ev->Get ()->Record .GetStatus ();
2347+
2348+ UNIT_ASSERT_C (successIsExpected == (status == NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED), " Status: " << ev->Get ()->Record .GetStatus () << " Issues: " << ev->Get ()->Record .GetIssues ());
2349+ }
23202350}
0 commit comments