Skip to content

Commit 45532c3

Browse files
authored
Merge 0fa490f into c0c27d9
2 parents c0c27d9 + 0fa490f commit 45532c3

File tree

12 files changed

+168
-136
lines changed

12 files changed

+168
-136
lines changed

ydb/core/backup/impl/local_partition_reader.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
#include "local_partition_reader.h"
22
#include "logging.h"
33

4-
#include <ydb/library/actors/core/actor.h>
5-
#include <ydb/library/services/services.pb.h>
6-
74
#include <ydb/core/persqueue/events/global.h>
85
#include <ydb/core/protos/grpc_pq_old.pb.h>
96
#include <ydb/core/tx/replication/service/worker.h>
7+
#include <ydb/core/tx/replication/ydb_proxy/topic_message.h>
8+
#include <ydb/library/actors/core/actor.h>
9+
#include <ydb/library/services/services.pb.h>
1010

1111
using namespace NActors;
1212
using namespace NKikimr::NReplication::NService;
@@ -131,11 +131,11 @@ class TLocalPartitionReader
131131
}
132132

133133
auto gotOffset = Offset;
134-
TVector<TEvWorker::TEvData::TRecord> records(::Reserve(readResult.ResultSize()));
134+
TVector<NReplication::TTopicMessage> records(::Reserve(readResult.ResultSize()));
135135

136136
for (auto& result : readResult.GetResult()) {
137137
gotOffset = std::max(gotOffset, result.GetOffset());
138-
records.emplace_back(result.GetOffset(), GetDeserializedData(result.GetData()).GetData(), TInstant::MilliSeconds(result.GetCreateTimestampMS()), result.GetSourceId(), result.GetSourceId(), result.GetSeqNo());
138+
records.emplace_back(result.GetOffset(), GetDeserializedData(result.GetData()).GetData());
139139
}
140140
SentOffset = gotOffset + 1;
141141

ydb/core/tx/replication/service/base_table_writer.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include <ydb/core/change_exchange/util.h>
99
#include <ydb/core/tablet_flat/flat_row_eggs.h>
1010
#include <ydb/core/tx/datashard/datashard.h>
11+
#include <ydb/core/tx/replication/ydb_proxy/topic_message.h>
1112
#include <ydb/core/tx/scheme_cache/helpers.h>
1213
#include <ydb/core/tx/tx_proxy/proxy.h>
1314
#include <ydb/library/actors/core/actor_bootstrapped.h>
@@ -434,8 +435,8 @@ class TLocalTableWriter
434435
TSet<TRowVersion> versionsWithoutTxId;
435436

436437
for (auto& r : ev->Get()->Records) {
437-
auto offset = r.Offset;
438-
auto& data = r.Data;
438+
auto offset = r.GetOffset();
439+
auto& data = r.GetData();
439440

440441
auto record = Parser->Parse(ev->Get()->Source, offset, std::move(data));
441442

ydb/core/tx/replication/service/topic_reader.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include "topic_reader.h"
33
#include "worker.h"
44

5+
#include <ydb/core/tx/replication/ydb_proxy/topic_message.h>
56
#include <ydb/core/tx/replication/ydb_proxy/ydb_proxy.h>
67
#include <ydb/library/actors/core/actor.h>
78
#include <ydb/library/actors/core/hfunc.h>
@@ -54,11 +55,11 @@ class TRemoteTopicReader: public TActor<TRemoteTopicReader> {
5455
LOG_D("Handle " << ev->Get()->ToString());
5556

5657
auto& result = ev->Get()->Result;
57-
TVector<TEvWorker::TEvData::TRecord> records(::Reserve(result.Messages.size()));
58+
TVector<TTopicMessage> records(::Reserve(result.Messages.size()));
5859

5960
for (auto& msg : result.Messages) {
6061
Y_ABORT_UNLESS(msg.GetCodec() == NYdb::NTopic::ECodec::RAW);
61-
records.emplace_back(msg.GetOffset(), std::move(msg.GetData()), msg.GetCreateTime(), std::move(msg.GetMessageGroupId()), std::move(msg.GetProducerId()), msg.GetSeqNo());
62+
records.push_back(std::move(msg));
6263
}
6364

6465
Send(Worker, new TEvWorker::TEvData(result.PartitionId, ToString(result.PartitionId), std::move(records)));

ydb/core/tx/replication/service/transfer_writer.cpp

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,17 @@
22
#include "transfer_writer.h"
33
#include "worker.h"
44

5-
#include <ydb/library/actors/core/actor_bootstrapped.h>
6-
#include <ydb/library/actors/core/hfunc.h>
7-
#include <ydb/library/services/services.pb.h>
8-
9-
#include <ydb/public/lib/scheme_types/scheme_type_id.h>
105
#include <ydb/core/fq/libs/row_dispatcher/events/data_plane.h>
116
#include <ydb/core/fq/libs/row_dispatcher/purecalc_compilation/compile_service.h>
127
#include <ydb/core/kqp/runtime/kqp_write_table.h>
13-
#include <ydb/core/persqueue/purecalc/purecalc.h>
8+
#include <ydb/core/tx/replication/ydb_proxy/topic_message.h>
9+
#include <ydb/core/persqueue/purecalc/purecalc.h> // should be after topic_message
1410
#include <ydb/core/tx/scheme_cache/helpers.h>
1511
#include <ydb/core/tx/tx_proxy/upload_rows_common_impl.h>
12+
#include <ydb/library/actors/core/actor_bootstrapped.h>
13+
#include <ydb/library/actors/core/hfunc.h>
14+
#include <ydb/library/services/services.pb.h>
15+
#include <ydb/public/lib/scheme_types/scheme_type_id.h>
1616

1717
#include <yql/essentials/providers/common/schema/parser/yql_type_parser.h>
1818
#include <yql/essentials/public/purecalc/helpers/stream/stream_from_vector.h>
@@ -593,7 +593,7 @@ class TTransferWriter
593593
ProcessData(ev->Get()->PartitionId, ev->Get()->Records);
594594
}
595595

596-
void ProcessData(const ui32 partitionId, const TVector<TEvWorker::TEvData::TRecord>& records) {
596+
void ProcessData(const ui32 partitionId, const TVector<TTopicMessage>& records) {
597597
if (!records) {
598598
Send(Worker, new TEvWorker::TEvGone(TEvWorker::TEvGone::DONE));
599599
return;
@@ -603,20 +603,20 @@ class TTransferWriter
603603

604604
for (auto& message : records) {
605605
NYdb::NTopic::NPurecalc::TMessage input;
606-
input.Data = std::move(message.Data);
607-
input.MessageGroupId = std::move(message.MessageGroupId);
606+
input.Data = std::move(message.GetData());
607+
input.MessageGroupId = std::move(message.GetMessageGroupId());
608608
input.Partition = partitionId;
609-
input.ProducerId = std::move(message.ProducerId);
610-
input.Offset = message.Offset;
611-
input.SeqNo = message.SeqNo;
609+
input.ProducerId = std::move(message.GetProducerId());
610+
input.Offset = message.GetOffset();
611+
input.SeqNo = message.GetSeqNo();
612612

613613
try {
614614
auto result = ProgramHolder->GetProgram()->Apply(NYql::NPureCalc::StreamFromVector(TVector{input}));
615615
while (auto* m = result->Fetch()) {
616616
TableState->AddData(m->Data);
617617
}
618618
} catch (const yexception& e) {
619-
ProcessingError = TStringBuilder() << "Error transform message: '" << message.Data << "': " << e.what();
619+
ProcessingError = TStringBuilder() << "Error transform message: " << e.what();
620620
break;
621621
}
622622
}
@@ -730,7 +730,7 @@ class TTransferWriter
730730

731731
std::optional<TActorId> PendingWorker;
732732
ui32 PendingPartitionId = 0;
733-
std::optional<TVector<TEvWorker::TEvData::TRecord>> PendingRecords;
733+
std::optional<TVector<TTopicMessage>> PendingRecords;
734734

735735
ui32 Attempt = 0;
736736
TDuration Delay = TDuration::Minutes(1);

ydb/core/tx/replication/service/transfer_writer.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
namespace NKikimr {
88
struct TPathId;
99
}
10+
1011
namespace NKikimr::NReplication::NService {
1112

1213
IActor* CreateTransferWriter(const TString& transformLambda, const TPathId& tablePathId,

ydb/core/tx/replication/service/worker.cpp

Lines changed: 5 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include "worker.h"
44

55
#include <ydb/core/base/appdata.h>
6+
#include <ydb/core/tx/replication/ydb_proxy/topic_message.h>
67
#include <ydb/library/actors/core/actor_bootstrapped.h>
78
#include <ydb/library/actors/core/hfunc.h>
89
#include <ydb/library/services/services.pb.h>
@@ -13,48 +14,20 @@
1314

1415
namespace NKikimr::NReplication::NService {
1516

16-
TEvWorker::TEvData::TRecord::TRecord(ui64 offset, const TString& data, TInstant createTime, const TString& messageGroupId, const TString& producerId, ui64 seqNo)
17-
: Offset(offset)
18-
, Data(data)
19-
, CreateTime(createTime)
20-
, MessageGroupId(messageGroupId)
21-
, ProducerId(producerId)
22-
, SeqNo(seqNo)
23-
{
24-
}
25-
26-
TEvWorker::TEvData::TRecord::TRecord(ui64 offset, TString&& data, TInstant createTime, TString&& messageGroupId, TString&& producerId, ui64 seqNo)
27-
: Offset(offset)
28-
, Data(std::move(data))
29-
, CreateTime(createTime)
30-
, MessageGroupId(std::move(messageGroupId))
31-
, ProducerId(std::move(producerId))
32-
, SeqNo(seqNo)
33-
{
34-
}
35-
36-
TEvWorker::TEvData::TEvData(ui32 partitionId, const TString& source, const TVector<TRecord>& records)
17+
TEvWorker::TEvData::TEvData(ui32 partitionId, const TString& source, const TVector<TTopicMessage>& records)
3718
: PartitionId(partitionId)
3819
, Source(source)
3920
, Records(records)
4021
{
4122
}
4223

43-
TEvWorker::TEvData::TEvData(ui32 partitionId, const TString& source, TVector<TRecord>&& records)
24+
TEvWorker::TEvData::TEvData(ui32 partitionId, const TString& source, TVector<TTopicMessage>&& records)
4425
: PartitionId(partitionId)
4526
, Source(source)
4627
, Records(std::move(records))
4728
{
4829
}
4930

50-
void TEvWorker::TEvData::TRecord::Out(IOutputStream& out) const {
51-
out << "{"
52-
<< " Offset: " << Offset
53-
<< " Data: " << Data.size() << "b"
54-
<< " CreateTime: " << CreateTime.ToStringUpToSeconds()
55-
<< " }";
56-
}
57-
5831
TString TEvWorker::TEvData::ToString() const {
5932
return TStringBuilder() << ToStringHeader() << " {"
6033
<< " Source: " << Source
@@ -189,11 +162,11 @@ class TWorker: public TActorBootstrapped<TWorker> {
189162
if (InFlightData) {
190163
const auto& records = InFlightData->Records;
191164
auto it = MinElementBy(records, [](const auto& record) {
192-
return record.CreateTime;
165+
return record.GetCreateTime();
193166
});
194167

195168
if (it != records.end()) {
196-
Lag = TlsActivationContext->Now() - it->CreateTime;
169+
Lag = TlsActivationContext->Now() - it->GetCreateTime();
197170
}
198171
}
199172

ydb/core/tx/replication/service/worker.h

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,11 @@
88

99
#include <functional>
1010

11-
namespace NKikimr::NReplication::NService {
11+
namespace NKikimr::NReplication {
12+
13+
class TTopicMessage;
14+
15+
namespace NService {
1216

1317
struct TEvWorker {
1418
enum EEv {
@@ -30,25 +34,12 @@ struct TEvWorker {
3034
struct TEvPoll: public TEventLocal<TEvPoll, EvPoll> {};
3135

3236
struct TEvData: public TEventLocal<TEvData, EvData> {
33-
struct TRecord {
34-
ui64 Offset;
35-
TString Data;
36-
TInstant CreateTime;
37-
TString MessageGroupId;
38-
TString ProducerId;
39-
ui64 SeqNo;
40-
41-
explicit TRecord(ui64 offset, const TString& data, TInstant createTime, const TString& messageGroupId, const TString& producerId, ui64 seqNo);
42-
explicit TRecord(ui64 offset, TString&& data, TInstant createTime, TString&& messageGroupId, TString&& producerId, ui64 seqNo);
43-
void Out(IOutputStream& out) const;
44-
};
45-
4637
ui32 PartitionId;
4738
TString Source;
48-
TVector<TRecord> Records;
39+
TVector<TTopicMessage> Records;
4940

50-
explicit TEvData(ui32 partitionId, const TString& source, const TVector<TRecord>& records);
51-
explicit TEvData(ui32 partitionId, const TString& source, TVector<TRecord>&& records);
41+
explicit TEvData(ui32 partitionId, const TString& source, const TVector<TTopicMessage>& records);
42+
explicit TEvData(ui32 partitionId, const TString& source, TVector<TTopicMessage>&& records);
5243
TString ToString() const override;
5344
};
5445

@@ -89,8 +80,5 @@ IActor* CreateWorker(
8980
std::function<IActor*(void)>&& createReaderFn,
9081
std::function<IActor*(void)>&& createWriterFn);
9182

92-
}
93-
94-
Y_DECLARE_OUT_SPEC(inline, NKikimr::NReplication::NService::TEvWorker::TEvData::TRecord, o, x) {
95-
return x.Out(o);
96-
}
83+
} // NService
84+
} // NKikimr::NReplication
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
#include "topic_message.h"
2+
3+
namespace NKikimr::NReplication {
4+
5+
using TDataEvent = NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent;
6+
using ECodec = NYdb::NTopic::ECodec;
7+
8+
TTopicMessage::TTopicMessage(const TDataEvent::TMessageBase& msg, ECodec codec, ui64 uncompressedSize)
9+
: TDataEvent::TMessageInformation(
10+
msg.GetOffset(),
11+
msg.GetProducerId(),
12+
msg.GetSeqNo(),
13+
msg.GetCreateTime(),
14+
msg.GetWriteTime(),
15+
msg.GetMeta(),
16+
msg.GetMessageMeta(),
17+
uncompressedSize,
18+
msg.GetMessageGroupId()
19+
)
20+
, Codec(codec)
21+
, Data(msg.GetData())
22+
{
23+
}
24+
25+
TTopicMessage::TTopicMessage(const TDataEvent::TMessage& msg)
26+
: TTopicMessage(msg, ECodec::RAW, msg.GetData().size())
27+
{
28+
}
29+
30+
TTopicMessage::TTopicMessage(const TDataEvent::TCompressedMessage& msg)
31+
: TTopicMessage(msg, msg.GetCodec(), msg.GetUncompressedSize())
32+
{
33+
}
34+
35+
TTopicMessage::TTopicMessage(ui64 offset, const TString& data)
36+
: TDataEvent::TMessageInformation(offset, "", 0, TInstant::Zero(), TInstant::Zero(), nullptr, nullptr, 0, "")
37+
, Codec(ECodec::RAW)
38+
, Data(data)
39+
{
40+
}
41+
42+
ECodec TTopicMessage::GetCodec() const {
43+
return Codec;
44+
}
45+
46+
const TString& TTopicMessage::GetData() const {
47+
return Data;
48+
}
49+
50+
TString& TTopicMessage::GetData() {
51+
return Data;
52+
}
53+
54+
ui64 TTopicMessage::GetOffset() const {
55+
return Offset;
56+
}
57+
58+
ui64 TTopicMessage::GetSeqNo() const {
59+
return SeqNo;
60+
}
61+
62+
TInstant TTopicMessage::GetCreateTime() const {
63+
return CreateTime;
64+
}
65+
66+
TString TTopicMessage::GetMessageGroupId() const {
67+
return TString(MessageGroupId);
68+
}
69+
70+
TString TTopicMessage::GetProducerId() const {
71+
return TString(ProducerId);
72+
}
73+
74+
void TTopicMessage::Out(IOutputStream& out) const {
75+
out << "{"
76+
<< " Codec: " << Codec
77+
<< " Data: " << Data.size() << "b"
78+
<< " Offset: " << Offset
79+
<< " SeqNo: " << SeqNo
80+
<< " CreateTime: " << CreateTime
81+
<< " MessageGroupId: " << MessageGroupId
82+
<< " ProducerId: " << ProducerId
83+
<< " }";
84+
}
85+
86+
}
87+
88+
Y_DECLARE_OUT_SPEC(, NKikimr::NReplication::TTopicMessage, o, x) {
89+
return x.Out(o);
90+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
#pragma once
2+
3+
#include <ydb-cpp-sdk/client/topic/read_events.h>
4+
5+
namespace NKikimr::NReplication {
6+
7+
class TTopicMessage: protected NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessageInformation {
8+
using TDataEvent = NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent;
9+
using ECodec = NYdb::NTopic::ECodec;
10+
11+
explicit TTopicMessage(const TDataEvent::TMessageBase& msg, ECodec codec, ui64 uncompressedSize);
12+
13+
public:
14+
explicit TTopicMessage(const TDataEvent::TMessage& msg);
15+
explicit TTopicMessage(const TDataEvent::TCompressedMessage& msg);
16+
explicit TTopicMessage(ui64 offset, const TString& data); // from scratch
17+
18+
ECodec GetCodec() const;
19+
const TString& GetData() const;
20+
TString& GetData();
21+
ui64 GetOffset() const;
22+
ui64 GetSeqNo() const;
23+
TInstant GetCreateTime() const;
24+
TString GetMessageGroupId() const;
25+
TString GetProducerId() const;
26+
void Out(IOutputStream& out) const;
27+
28+
private:
29+
ECodec Codec;
30+
TString Data;
31+
};
32+
33+
}

0 commit comments

Comments
 (0)