Skip to content

Commit 1281fb6

Browse files
authored
Merge 08e0969 into 6c52ddb
2 parents 6c52ddb + 08e0969 commit 1281fb6

File tree

5 files changed

+86
-89
lines changed

5 files changed

+86
-89
lines changed

ydb/library/yql/providers/pq/async_io/dq_pq_meta_extractor.cpp

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,12 @@
11
#include "dq_pq_meta_extractor.h"
22

3-
#include <optional>
4-
53
#include <ydb/library/yql/minikql/mkql_string_util.h>
64
#include <ydb/library/yql/providers/pq/common/pq_meta_fields.h>
7-
#include <ydb/library/yql/public/udf/udf_data_type.h>
8-
#include <ydb/library/yql/public/udf/udf_value.h>
9-
10-
#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h>
11-
12-
#include <util/generic/string.h>
135

146
namespace {
157
const std::unordered_map<TString, NYql::NDq::TPqMetaExtractor::TPqMetaExtractorLambda> ExtractorsMap = {
168
{
17-
"_yql_sys_create_time", [](const NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TMessage& message){
9+
"_yql_sys_create_time", [](const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage& message){
1810
using TDataType = NYql::NUdf::TDataType<NYql::NUdf::TTimestamp>;
1911
return std::make_pair(
2012
NYql::NUdf::TUnboxedValuePod(static_cast<TDataType::TLayout>(message.GetCreateTime().MicroSeconds())),
@@ -23,7 +15,7 @@ namespace {
2315
}
2416
},
2517
{
26-
"_yql_sys_tsp_write_time", [](const NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TMessage& message){
18+
"_yql_sys_tsp_write_time", [](const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage& message){
2719
using TDataType = NYql::NUdf::TDataType<NYql::NUdf::TTimestamp>;
2820
return std::make_pair(
2921
NYql::NUdf::TUnboxedValuePod(static_cast<TDataType::TLayout>(message.GetWriteTime().MicroSeconds())),
@@ -32,24 +24,24 @@ namespace {
3224
}
3325
},
3426
{
35-
"_yql_sys_partition_id", [](const NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TMessage& message){
27+
"_yql_sys_partition_id", [](const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage& message){
3628
using TDataType = NYql::NUdf::TDataType<ui64>;
3729
return std::make_pair(
38-
NYql::NUdf::TUnboxedValuePod(message.GetPartitionStream()->GetPartitionId()),
30+
NYql::NUdf::TUnboxedValuePod(message.GetPartitionSession()->GetPartitionId()),
3931
NYql::NUdf::GetDataTypeInfo(TDataType::Slot).FixedSize
4032
);
4133
}
4234
},
4335
{
44-
"_yql_sys_offset", [](const NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TMessage& message){
36+
"_yql_sys_offset", [](const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage& message){
4537
using TDataType = NYql::NUdf::TDataType<ui64>;
4638
return std::make_pair(
4739
NYql::NUdf::TUnboxedValuePod(message.GetOffset()),
4840
NYql::NUdf::GetDataTypeInfo(TDataType::Slot).FixedSize);
4941
}
5042
},
5143
{
52-
"_yql_sys_message_group_id", [](const NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TMessage& message){
44+
"_yql_sys_message_group_id", [](const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage& message){
5345
const auto& data = message.GetMessageGroupId();
5446
return std::make_pair(
5547
NKikimr::NMiniKQL::MakeString(NYql::NUdf::TStringRef(data.Data(), data.Size())),
@@ -58,7 +50,7 @@ namespace {
5850
}
5951
},
6052
{
61-
"_yql_sys_seq_no", [](const NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TMessage& message){
53+
"_yql_sys_seq_no", [](const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage& message){
6254
using TDataType = NYql::NUdf::TDataType<ui64>;
6355
return std::make_pair(
6456
NYql::NUdf::TUnboxedValuePod(message.GetSeqNo()),
@@ -72,14 +64,14 @@ namespace {
7264
namespace NYql::NDq {
7365

7466
TPqMetaExtractor::TPqMetaExtractor() {
75-
for (auto key : AllowedPqMetaSysColumns()) {
67+
for (const auto& key : AllowedPqMetaSysColumns()) {
7668
Y_ENSURE(
7769
ExtractorsMap.contains(key),
7870
"Pq metadata field " << key << " hasn't valid runtime extractor. You should add it.");
7971
}
8072
}
8173

82-
TPqMetaExtractor::TPqMetaExtractorLambda TPqMetaExtractor::FindExtractorLambda(TString sysColumn) const {
74+
TPqMetaExtractor::TPqMetaExtractorLambda TPqMetaExtractor::FindExtractorLambda(const TString& sysColumn) const {
8375
auto iter = ExtractorsMap.find(sysColumn);
8476
Y_ENSURE(iter != ExtractorsMap.end(), sysColumn);
8577

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,20 @@
11
#pragma once
22

3-
#include "ydb/library/yql/minikql/mkql_string_util.h"
4-
#include <optional>
3+
#include <functional>
54

65
#include <ydb/library/yql/public/udf/udf_data_type.h>
76
#include <ydb/library/yql/public/udf/udf_value.h>
87

9-
#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h>
8+
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
109

1110
#include <util/generic/string.h>
1211

1312
namespace NYql::NDq {
1413
struct TPqMetaExtractor {
15-
using TPqMetaExtractorLambda = std::function<std::pair<NYql::NUdf::TUnboxedValuePod, i64>(const NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TMessage&)>;
14+
using TPqMetaExtractorLambda = std::function<std::pair<NYql::NUdf::TUnboxedValuePod, i64>(const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage&)>;
1615

1716
public:
1817
TPqMetaExtractor();
19-
TPqMetaExtractorLambda FindExtractorLambda(TString sysColumn) const;
18+
TPqMetaExtractorLambda FindExtractorLambda(const TString& sysColumn) const;
2019
};
2120
}

ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp

Lines changed: 48 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,15 @@
1717
#include <ydb/library/yql/utils/log/log.h>
1818
#include <ydb/library/yql/utils/yql_panic.h>
1919

20-
#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h>
20+
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
2121
#include <ydb/public/sdk/cpp/client/ydb_types/credentials/credentials.h>
2222

2323
#include <ydb/library/actors/core/actor.h>
2424
#include <ydb/library/actors/core/event_local.h>
2525
#include <ydb/library/actors/core/events.h>
2626
#include <ydb/library/actors/core/hfunc.h>
2727
#include <ydb/library/actors/core/log.h>
28+
#include <ydb/library/actors/log_backend/actor_log_backend.h>
2829
#include <library/cpp/lwtrace/mon/mon_lwtrace.h>
2930

3031
#include <util/generic/algorithm.h>
@@ -131,8 +132,8 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute
131132
IngressStats.Level = statsLevel;
132133
}
133134

134-
NYdb::NPersQueue::TPersQueueClientSettings GetPersQueueClientSettings() const {
135-
NYdb::NPersQueue::TPersQueueClientSettings opts;
135+
NYdb::NTopic::TTopicClientSettings GetTopicClientSettings() const {
136+
NYdb::NTopic::TTopicClientSettings opts;
136137
opts.Database(SourceParams.GetDatabase())
137138
.DiscoveryEndpoint(SourceParams.GetEndpoint())
138139
.SslCredentials(NYdb::TSslCredentials(SourceParams.GetUseSsl()))
@@ -173,7 +174,7 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute
173174
data->SetBlob(stateBlob);
174175

175176
DeferredCommits.emplace(checkpoint.GetId(), std::make_pair(std::move(CurrentDeferredCommit), CurrentDeferredCommitOffset));
176-
CurrentDeferredCommit = NYdb::NPersQueue::TDeferredCommit();
177+
CurrentDeferredCommit = NYdb::NTopic::TDeferredCommit();
177178
CurrentDeferredCommitOffset.Clear();
178179
}
179180

@@ -238,16 +239,16 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute
238239
return IngressStats;
239240
}
240241

241-
NYdb::NPersQueue::TPersQueueClient& GetPersQueueClient() {
242-
if (!PersQueueClient) {
243-
PersQueueClient = std::make_unique<NYdb::NPersQueue::TPersQueueClient>(Driver, GetPersQueueClientSettings());
242+
NYdb::NTopic::TTopicClient& GetTopicClient() {
243+
if (!TopicClient) {
244+
TopicClient = std::make_unique<NYdb::NTopic::TTopicClient>(Driver, GetTopicClientSettings());
244245
}
245-
return *PersQueueClient;
246+
return *TopicClient;
246247
}
247248

248-
NYdb::NPersQueue::IReadSession& GetReadSession() {
249+
NYdb::NTopic::IReadSession& GetReadSession() {
249250
if (!ReadSession) {
250-
ReadSession = GetPersQueueClient().CreateReadSession(GetReadSessionSettings());
251+
ReadSession = GetTopicClient().CreateReadSession(GetReadSessionSettings());
251252
}
252253
return *ReadSession;
253254
}
@@ -272,7 +273,7 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute
272273
ReadSession->Close(TDuration::Zero());
273274
ReadSession.reset();
274275
}
275-
PersQueueClient.reset();
276+
TopicClient.reset();
276277
TActor<TDqPqReadActor>::PassAway();
277278
}
278279

@@ -312,13 +313,13 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute
312313

313314
ui32 batchItemsEstimatedCount = 0;
314315
for (auto& event : events) {
315-
if (const auto* val = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent>(&event)) {
316+
if (const auto* val = std::get_if<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent>(&event)) {
316317
batchItemsEstimatedCount += val->GetMessages().size();
317318
}
318319
}
319320

320321
for (auto& event : events) {
321-
std::visit(TPQEventProcessor{*this, batchItemsEstimatedCount, LogPrefix}, event);
322+
std::visit(TTopicEventProcessor{*this, batchItemsEstimatedCount, LogPrefix}, event);
322323
}
323324
}
324325

@@ -374,26 +375,30 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute
374375
TInstant::Now());
375376
}
376377

377-
NYdb::NPersQueue::TReadSessionSettings GetReadSessionSettings() const {
378-
NYdb::NPersQueue::TTopicReadSettings topicReadSettings;
378+
NYdb::NTopic::TReadSessionSettings GetReadSessionSettings() const {
379+
NYdb::NTopic::TTopicReadSettings topicReadSettings;
379380
topicReadSettings.Path(SourceParams.GetTopicPath());
380381
auto partitionsToRead = GetPartitionsToRead();
381382
SRC_LOG_D("PartitionsToRead: " << JoinSeq(", ", partitionsToRead));
382383
for (const auto partitionId : partitionsToRead) {
383-
topicReadSettings.AppendPartitionGroupIds(partitionId);
384+
topicReadSettings.AppendPartitionIds(partitionId);
384385
}
385386

386-
return NYdb::NPersQueue::TReadSessionSettings()
387-
.DisableClusterDiscovery(SourceParams.GetClusterType() == NPq::NProto::DataStreams)
387+
Y_UNUSED(RangesMode);
388+
TLog log(MakeHolder<TActorLogBackend>(NActors::TActivationContext::ActorSystem(), NKikimrServices::KQP_COMPUTE));
389+
return NYdb::NTopic::TReadSessionSettings()
390+
//.DisableClusterDiscovery(SourceParams.GetClusterType() == NPq::NProto::DataStreams)
388391
.AppendTopics(topicReadSettings)
389392
.ConsumerName(SourceParams.GetConsumerName())
390393
.MaxMemoryUsageBytes(BufferSize)
391-
.StartingMessageTimestamp(StartingMessageTimestamp)
392-
.RangesMode(RangesMode);
394+
.ReadFromTimestamp(StartingMessageTimestamp)
395+
.Log(log)
396+
;
397+
//.RangesMode(RangesMode);
393398
}
394399

395-
static TPartitionKey MakePartitionKey(const NYdb::NPersQueue::TPartitionStream::TPtr& partitionStreamPtr) {
396-
return std::make_pair(partitionStreamPtr->GetCluster(), partitionStreamPtr->GetPartitionId());
400+
static TPartitionKey MakePartitionKey(const NYdb::NTopic::TPartitionSession::TPtr& partitionSession) {
401+
return std::make_pair(partitionSession->GetTopicPath(), partitionSession->GetPartitionId());
397402
}
398403

399404
void SubscribeOnNextEvent() {
@@ -408,15 +413,15 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute
408413

409414
struct TReadyBatch {
410415
public:
411-
TReadyBatch(TMaybe<TInstant> watermark, ui32 dataCapacity) : Watermark(watermark){
416+
TReadyBatch(TMaybe<TInstant> watermark, ui32 dataCapacity): Watermark(watermark){
412417
Data.reserve(dataCapacity);
413418
}
414419

415420
public:
416421
TMaybe<TInstant> Watermark;
417422
TUnboxedValueVector Data;
418423
i64 UsedSpace = 0;
419-
THashMap<NYdb::NPersQueue::TPartitionStream::TPtr, TList<std::pair<ui64, ui64>>> OffsetRanges; // [start, end)
424+
THashMap<NYdb::NTopic::TPartitionSession::TPtr, TList<std::pair<ui64, ui64>>> OffsetRanges; // [start, end)
420425
};
421426

422427
bool MaybeReturnReadyBatch(NKikimr::NMiniKQL::TUnboxedValueBatch& buffer, TMaybe<TInstant>& watermark, i64& usedSpace) {
@@ -431,17 +436,17 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute
431436
watermark = readyBatch.Watermark;
432437
usedSpace = readyBatch.UsedSpace;
433438

434-
for (const auto& [partitionStream, ranges] : readyBatch.OffsetRanges) {
439+
for (const auto& [PartitionSession, ranges] : readyBatch.OffsetRanges) {
435440
for (const auto& [start, end] : ranges) {
436-
CurrentDeferredCommit.Add(partitionStream, start, end);
441+
CurrentDeferredCommit.Add(PartitionSession, start, end);
437442
if (!CurrentDeferredCommitOffset) {
438443
CurrentDeferredCommitOffset = std::make_pair(start, end);
439444
} else {
440445
CurrentDeferredCommitOffset->first = std::min(CurrentDeferredCommitOffset->first, start);
441446
CurrentDeferredCommitOffset->second = std::max(CurrentDeferredCommitOffset->second, end);
442447
}
443448
}
444-
PartitionToOffset[MakePartitionKey(partitionStream)] = ranges.back().second;
449+
PartitionToOffset[MakePartitionKey(PartitionSession)] = ranges.back().second;
445450
}
446451

447452
ReadyBuffer.pop();
@@ -470,9 +475,9 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute
470475
ReadyBuffer.back().Watermark = watermark;
471476
}
472477

473-
struct TPQEventProcessor {
474-
void operator()(NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent& event) {
475-
const auto partitionKey = MakePartitionKey(event.GetPartitionStream());
478+
struct TTopicEventProcessor {
479+
void operator()(NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent& event) {
480+
const auto partitionKey = MakePartitionKey(event.GetPartitionSession());
476481
for (const auto& message : event.GetMessages()) {
477482
const TString& data = message.GetData();
478483
Self.IngressStats.Bytes += data.size();
@@ -490,7 +495,7 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute
490495
curBatch.Data.emplace_back(std::move(item));
491496
curBatch.UsedSpace += size;
492497

493-
auto& offsets = curBatch.OffsetRanges[message.GetPartitionStream()];
498+
auto& offsets = curBatch.OffsetRanges[message.GetPartitionSession()];
494499
if (!offsets.empty() && offsets.back().second == message.GetOffset()) {
495500
offsets.back().second = message.GetOffset() + 1;
496501
} else {
@@ -499,29 +504,29 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute
499504
}
500505
}
501506

502-
void operator()(NYdb::NPersQueue::TSessionClosedEvent& ev) {
507+
void operator()(NYdb::NTopic::TSessionClosedEvent& ev) {
503508
ythrow yexception() << "Read session to topic \"" << Self.SourceParams.GetTopicPath()
504509
<< "\" was closed: " << ev.DebugString();
505510
}
506511

507-
void operator()(NYdb::NPersQueue::TReadSessionEvent::TCommitAcknowledgementEvent&) { }
512+
void operator()(NYdb::NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent&) { }
508513

509-
void operator()(NYdb::NPersQueue::TReadSessionEvent::TCreatePartitionStreamEvent& event) {
514+
void operator()(NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent& event) {
510515
TMaybe<ui64> readOffset;
511-
const auto offsetIt = Self.PartitionToOffset.find(MakePartitionKey(event.GetPartitionStream()));
516+
const auto offsetIt = Self.PartitionToOffset.find(MakePartitionKey(event.GetPartitionSession()));
512517
if (offsetIt != Self.PartitionToOffset.end()) {
513518
readOffset = offsetIt->second;
514519
}
515520
event.Confirm(readOffset);
516521
}
517522

518-
void operator()(NYdb::NPersQueue::TReadSessionEvent::TDestroyPartitionStreamEvent& event) {
523+
void operator()(NYdb::NTopic::TReadSessionEvent::TStopPartitionSessionEvent& event) {
519524
event.Confirm();
520525
}
521526

522-
void operator()(NYdb::NPersQueue::TReadSessionEvent::TPartitionStreamStatusEvent&) { }
527+
void operator()(NYdb::NTopic::TReadSessionEvent::TPartitionSessionStatusEvent&) { }
523528

524-
void operator()(NYdb::NPersQueue::TReadSessionEvent::TPartitionStreamClosedEvent&) { }
529+
void operator()(NYdb::NTopic::TReadSessionEvent::TPartitionSessionClosedEvent&) { }
525530

526531
TReadyBatch& GetActiveBatch(const TPartitionKey& partitionKey, TInstant time) {
527532
if (Y_UNLIKELY(Self.ReadyBuffer.empty() || Self.ReadyBuffer.back().Watermark.Defined())) {
@@ -548,7 +553,7 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute
548553
return Self.ReadyBuffer.emplace(Nothing(), BatchCapacity); // And open new batch
549554
}
550555

551-
std::pair<NUdf::TUnboxedValuePod, i64> CreateItem(const NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TMessage& message) {
556+
std::pair<NUdf::TUnboxedValuePod, i64> CreateItem(const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage& message) {
552557
const TString& data = message.GetData();
553558

554559
i64 usedSpace = 0;
@@ -589,14 +594,14 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute
589594
std::shared_ptr<NYdb::ICredentialsProviderFactory> CredentialsProviderFactory;
590595
const NPq::NProto::TDqPqTopicSource SourceParams;
591596
const NPq::NProto::TDqReadTaskParams ReadParams;
592-
std::unique_ptr<NYdb::NPersQueue::TPersQueueClient> PersQueueClient;
593-
std::shared_ptr<NYdb::NPersQueue::IReadSession> ReadSession;
597+
std::unique_ptr<NYdb::NTopic::TTopicClient> TopicClient;
598+
std::shared_ptr<NYdb::NTopic::IReadSession> ReadSession;
594599
NThreading::TFuture<void> EventFuture;
595600
THashMap<TPartitionKey, ui64> PartitionToOffset; // {cluster, partition} -> offset of next event.
596601
TInstant StartingMessageTimestamp;
597602
const NActors::TActorId ComputeActorId;
598-
std::queue<std::pair<ui64, std::pair<NYdb::NPersQueue::TDeferredCommit, TDebugOffsets>>> DeferredCommits;
599-
NYdb::NPersQueue::TDeferredCommit CurrentDeferredCommit;
603+
std::queue<std::pair<ui64, std::pair<NYdb::NTopic::TDeferredCommit, TDebugOffsets>>> DeferredCommits;
604+
NYdb::NTopic::TDeferredCommit CurrentDeferredCommit;
600605
TDebugOffsets CurrentDeferredCommitOffset;
601606
bool SubscribedOnEvent = false;
602607
std::vector<std::tuple<TString, TPqMetaExtractor::TPqMetaExtractorLambda>> MetadataFields;

0 commit comments

Comments
 (0)