@@ -80,7 +80,6 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
80
80
, Callbacks(args.Callback)
81
81
, Counters(counters)
82
82
, TypeEnv(args.TypeEnv)
83
- , Alloc(args.Alloc)
84
83
, TxId(args.TxId)
85
84
, TableId(Settings.GetTable().GetOwnerId(), Settings.GetTable().GetTableId())
86
85
{
@@ -103,10 +102,6 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
103
102
virtual ~TKqpWriteActor () {
104
103
}
105
104
106
- TGuard<NKikimr::NMiniKQL::TScopedAlloc> BindAllocator () {
107
- return TypeEnv.BindAllocator ();
108
- }
109
-
110
105
void CommitState (const NYql::NDqProto::TCheckpoint&) final {};
111
106
void LoadState (const NYql::NDqProto::TSinkState&) final {};
112
107
@@ -140,7 +135,7 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
140
135
Columns[index].PType ,
141
136
row.GetElement (index),
142
137
TypeEnv,
143
- /* copy */ true );
138
+ /* copy */ false );
144
139
}
145
140
BatchBuilder->AddRow (
146
141
TConstArrayRef<TCell>{cells.begin (), cells.begin () + KeyColumns.size ()},
@@ -250,80 +245,6 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
250
245
ProcessRows ();
251
246
}
252
247
253
- void RuntimeError (const TString& message, NYql::NDqProto::StatusIds::StatusCode statusCode, const NYql::TIssues& subIssues = {}) {
254
- NYql::TIssue issue (message);
255
- for (const auto & i : subIssues) {
256
- issue.AddSubIssue (MakeIntrusive<NYql::TIssue>(i));
257
- }
258
-
259
- NYql::TIssues issues;
260
- issues.AddIssue (std::move (issue));
261
-
262
- Callbacks->OnAsyncOutputError (OutputIndex, std::move (issues), statusCode);
263
- }
264
-
265
- void PassAway () override {
266
- TActorBootstrapped<TKqpWriteActor>::PassAway ();
267
- }
268
-
269
- void BuildColumns () {
270
- KeyColumns.reserve (Settings.KeyColumnsSize ());
271
- i32 number = 0 ;
272
- for (const auto & column : Settings.GetKeyColumns ()) {
273
- KeyColumns.emplace_back (
274
- column.GetName (),
275
- column.GetId (),
276
- NScheme::TTypeInfo {
277
- static_cast <NScheme::TTypeId>(column.GetTypeId ()),
278
- column.GetTypeId () == NScheme::NTypeIds::Pg
279
- ? NPg::TypeDescFromPgTypeId (column.GetTypeInfo ().GetPgTypeId ())
280
- : nullptr
281
- },
282
- column.GetTypeInfo ().GetPgTypeMod (),
283
- number++
284
- );
285
- }
286
-
287
- ColumnIds.reserve (Settings.ColumnsSize ());
288
- Columns.reserve (Settings.ColumnsSize ());
289
- number = 0 ;
290
- for (const auto & column : Settings.GetColumns ()) {
291
- ColumnIds.push_back (column.GetId ());
292
- Columns.emplace_back (
293
- column.GetName (),
294
- column.GetId (),
295
- NScheme::TTypeInfo {
296
- static_cast <NScheme::TTypeId>(column.GetTypeId ()),
297
- column.GetTypeId () == NScheme::NTypeIds::Pg
298
- ? NPg::TypeDescFromPgTypeId (column.GetTypeInfo ().GetPgTypeId ())
299
- : nullptr
300
- },
301
- column.GetTypeInfo ().GetPgTypeMod (),
302
- number++
303
- );
304
- }
305
- }
306
-
307
- void PrepareBatchBuilder () {
308
- std::vector<std::pair<TString, NScheme::TTypeInfo>> columns;
309
- for (const auto & column : Columns) {
310
- columns.emplace_back (column.Name , column.PType );
311
- }
312
- std::set<std::string> notNullColumns;
313
- for (const auto & column : Settings.GetColumns ()) {
314
- if (column.GetNotNull ()) {
315
- notNullColumns.insert (column.GetName ());
316
- }
317
- }
318
-
319
- BatchBuilder = std::make_unique<NArrow::TArrowBatchBuilder>(arrow::Compression::UNCOMPRESSED, notNullColumns);
320
-
321
- TString err;
322
- if (!BatchBuilder->Start (columns, 0 , 0 , err)) {
323
- RuntimeError (" Failed to start batch builder: " + err, NYql::NDqProto::StatusIds::PRECONDITION_FAILED);
324
- }
325
- }
326
-
327
248
void ProcessRows () {
328
249
SplitBatchByShards ();
329
250
SendNewBatchesToShards ();
@@ -333,27 +254,6 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
333
254
}
334
255
}
335
256
336
- NKikimr::NEvWrite::IShardsSplitter::IEvWriteDataAccessor::TPtr GetDataAccessor (
337
- const std::shared_ptr<arrow::RecordBatch>& batch) const {
338
- struct TDataAccessor : public NKikimr ::NEvWrite::IShardsSplitter::IEvWriteDataAccessor {
339
- std::shared_ptr<arrow::RecordBatch> Batch;
340
-
341
- TDataAccessor (const std::shared_ptr<arrow::RecordBatch>& batch)
342
- : Batch(batch) {
343
- }
344
-
345
- std::shared_ptr<arrow::RecordBatch> GetDeserializedBatch () const override {
346
- return Batch;
347
- }
348
-
349
- TString GetSerializedData () const override {
350
- return NArrow::SerializeBatchNoCompression (Batch);
351
- }
352
- };
353
-
354
- return std::make_shared<TDataAccessor>(batch);
355
- }
356
-
357
257
void SplitBatchByShards () {
358
258
if (!SchemeEntry || BatchBuilder->Bytes () == 0 ) {
359
259
return ;
@@ -485,6 +385,102 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
485
385
return result;
486
386
}
487
387
388
+ void RuntimeError (const TString& message, NYql::NDqProto::StatusIds::StatusCode statusCode, const NYql::TIssues& subIssues = {}) {
389
+ NYql::TIssue issue (message);
390
+ for (const auto & i : subIssues) {
391
+ issue.AddSubIssue (MakeIntrusive<NYql::TIssue>(i));
392
+ }
393
+
394
+ NYql::TIssues issues;
395
+ issues.AddIssue (std::move (issue));
396
+
397
+ Callbacks->OnAsyncOutputError (OutputIndex, std::move (issues), statusCode);
398
+ }
399
+
400
+ void PassAway () override {
401
+ TActorBootstrapped<TKqpWriteActor>::PassAway ();
402
+ }
403
+
404
+ void BuildColumns () {
405
+ KeyColumns.reserve (Settings.KeyColumnsSize ());
406
+ i32 number = 0 ;
407
+ for (const auto & column : Settings.GetKeyColumns ()) {
408
+ KeyColumns.emplace_back (
409
+ column.GetName (),
410
+ column.GetId (),
411
+ NScheme::TTypeInfo {
412
+ static_cast <NScheme::TTypeId>(column.GetTypeId ()),
413
+ column.GetTypeId () == NScheme::NTypeIds::Pg
414
+ ? NPg::TypeDescFromPgTypeId (column.GetTypeInfo ().GetPgTypeId ())
415
+ : nullptr
416
+ },
417
+ column.GetTypeInfo ().GetPgTypeMod (),
418
+ number++
419
+ );
420
+ }
421
+
422
+ ColumnIds.reserve (Settings.ColumnsSize ());
423
+ Columns.reserve (Settings.ColumnsSize ());
424
+ number = 0 ;
425
+ for (const auto & column : Settings.GetColumns ()) {
426
+ ColumnIds.push_back (column.GetId ());
427
+ Columns.emplace_back (
428
+ column.GetName (),
429
+ column.GetId (),
430
+ NScheme::TTypeInfo {
431
+ static_cast <NScheme::TTypeId>(column.GetTypeId ()),
432
+ column.GetTypeId () == NScheme::NTypeIds::Pg
433
+ ? NPg::TypeDescFromPgTypeId (column.GetTypeInfo ().GetPgTypeId ())
434
+ : nullptr
435
+ },
436
+ column.GetTypeInfo ().GetPgTypeMod (),
437
+ number++
438
+ );
439
+ }
440
+ }
441
+
442
+ void PrepareBatchBuilder () {
443
+ std::vector<std::pair<TString, NScheme::TTypeInfo>> columns;
444
+ for (const auto & column : Columns) {
445
+ columns.emplace_back (column.Name , column.PType );
446
+ }
447
+ std::set<std::string> notNullColumns;
448
+ for (const auto & column : Settings.GetColumns ()) {
449
+ if (column.GetNotNull ()) {
450
+ notNullColumns.insert (column.GetName ());
451
+ }
452
+ }
453
+
454
+ BatchBuilder = std::make_unique<NArrow::TArrowBatchBuilder>(arrow::Compression::UNCOMPRESSED, notNullColumns);
455
+
456
+ TString err;
457
+ if (!BatchBuilder->Start (columns, 0 , 0 , err)) {
458
+ RuntimeError (" Failed to start batch builder: " + err, NYql::NDqProto::StatusIds::PRECONDITION_FAILED);
459
+ }
460
+ }
461
+
462
+ NKikimr::NEvWrite::IShardsSplitter::IEvWriteDataAccessor::TPtr GetDataAccessor (
463
+ const std::shared_ptr<arrow::RecordBatch>& batch) const {
464
+ struct TDataAccessor : public NKikimr ::NEvWrite::IShardsSplitter::IEvWriteDataAccessor {
465
+ std::shared_ptr<arrow::RecordBatch> Batch;
466
+
467
+ TDataAccessor (const std::shared_ptr<arrow::RecordBatch>& batch)
468
+ : Batch(batch) {
469
+ }
470
+
471
+ std::shared_ptr<arrow::RecordBatch> GetDeserializedBatch () const override {
472
+ return Batch;
473
+ }
474
+
475
+ TString GetSerializedData () const override {
476
+ return NArrow::SerializeBatchNoCompression (Batch);
477
+ }
478
+ };
479
+
480
+ return std::make_shared<TDataAccessor>(batch);
481
+ }
482
+
483
+
488
484
NActors::TActorId TxProxyId = MakeTxProxyID();
489
485
NActors::TActorId PipeCacheId = NKikimr::MakePipePeNodeCacheID(false );
490
486
@@ -495,7 +491,6 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
495
491
NYql::NDq::IDqComputeActorAsyncOutput::ICallbacks * Callbacks = nullptr ;
496
492
TIntrusivePtr<TKqpCounters> Counters;
497
493
const NMiniKQL::TTypeEnvironment& TypeEnv;
498
- std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
499
494
500
495
const NYql::NDq::TTxId TxId;
501
496
const TTableId TableId;
0 commit comments