22#include " transfer_writer.h"
33#include " worker.h"
44
5- #include < ydb/library/actors/core/actor_bootstrapped.h>
6- #include < ydb/library/actors/core/hfunc.h>
7- #include < ydb/library/services/services.pb.h>
8-
9- #include < ydb/public/lib/scheme_types/scheme_type_id.h>
105#include < ydb/core/fq/libs/row_dispatcher/events/data_plane.h>
116#include < ydb/core/fq/libs/row_dispatcher/purecalc_compilation/compile_service.h>
127#include < ydb/core/kqp/runtime/kqp_write_table.h>
13- #include < ydb/core/persqueue/purecalc/purecalc.h>
8+ #include < ydb/core/tx/replication/ydb_proxy/topic_message.h>
9+ #include < ydb/core/persqueue/purecalc/purecalc.h> // should be after topic_message
1410#include < ydb/core/tx/scheme_cache/helpers.h>
1511#include < ydb/core/tx/tx_proxy/upload_rows_common_impl.h>
12+ #include < ydb/library/actors/core/actor_bootstrapped.h>
13+ #include < ydb/library/actors/core/hfunc.h>
14+ #include < ydb/library/services/services.pb.h>
15+ #include < ydb/public/lib/scheme_types/scheme_type_id.h>
1616
1717#include < yql/essentials/providers/common/schema/parser/yql_type_parser.h>
1818#include < yql/essentials/public/purecalc/helpers/stream/stream_from_vector.h>
@@ -593,7 +593,7 @@ class TTransferWriter
593593 ProcessData (ev->Get ()->PartitionId , ev->Get ()->Records );
594594 }
595595
596- void ProcessData (const ui32 partitionId, const TVector<TEvWorker::TEvData::TRecord >& records) {
596+ void ProcessData (const ui32 partitionId, const TVector<TTopicMessage >& records) {
597597 if (!records) {
598598 Send (Worker, new TEvWorker::TEvGone (TEvWorker::TEvGone::DONE));
599599 return ;
@@ -603,20 +603,20 @@ class TTransferWriter
603603
604604 for (auto & message : records) {
605605 NYdb::NTopic::NPurecalc::TMessage input;
606- input.Data = std::move (message.Data );
607- input.MessageGroupId = std::move (message.MessageGroupId );
606+ input.Data = std::move (message.GetData () );
607+ input.MessageGroupId = std::move (message.GetMessageGroupId () );
608608 input.Partition = partitionId;
609- input.ProducerId = std::move (message.ProducerId );
610- input.Offset = message.Offset ;
611- input.SeqNo = message.SeqNo ;
609+ input.ProducerId = std::move (message.GetProducerId () );
610+ input.Offset = message.GetOffset () ;
611+ input.SeqNo = message.GetSeqNo () ;
612612
613613 try {
614614 auto result = ProgramHolder->GetProgram ()->Apply (NYql::NPureCalc::StreamFromVector (TVector{input}));
615615 while (auto * m = result->Fetch ()) {
616616 TableState->AddData (m->Data );
617617 }
618618 } catch (const yexception& e) {
619- ProcessingError = TStringBuilder () << " Error transform message: ' " << message. Data << " ': " << e.what ();
619+ ProcessingError = TStringBuilder () << " Error transform message: " << e.what ();
620620 break ;
621621 }
622622 }
@@ -730,7 +730,7 @@ class TTransferWriter
730730
731731 std::optional<TActorId> PendingWorker;
732732 ui32 PendingPartitionId = 0 ;
733- std::optional<TVector<TEvWorker::TEvData::TRecord >> PendingRecords;
733+ std::optional<TVector<TTopicMessage >> PendingRecords;
734734
735735 ui32 Attempt = 0 ;
736736 TDuration Delay = TDuration::Minutes(1 );
0 commit comments