Skip to content

Commit 2131521

Browse files
Merge dabc172 into f9059f6
2 parents f9059f6 + dabc172 commit 2131521

File tree

4 files changed

+118
-12
lines changed

4 files changed

+118
-12
lines changed

ydb/core/kqp/session_actor/kqp_session_actor.cpp

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -525,6 +525,21 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
525525
CompileQuery();
526526
}
527527

528+
bool AreAllTheTopicsAndPartitionsKnown() const {
529+
const NKikimrKqp::TTopicOperationsRequest& operations = QueryState->GetTopicOperations();
530+
for (const auto& topic : operations.GetTopics()) {
531+
auto path = CanonizePath(NPersQueue::GetFullTopicPath(TlsActivationContext->AsActorContext(),
532+
QueryState->GetDatabase(), topic.path()));
533+
534+
for (const auto& partition : topic.partitions()) {
535+
if (!QueryState->TxCtx->TopicOperations.HasThisPartitionAlreadyBeenAdded(path, partition.partition_id())) {
536+
return false;
537+
}
538+
}
539+
}
540+
return true;
541+
}
542+
528543
void AddOffsetsToTransaction() {
529544
YQL_ENSURE(QueryState);
530545
if (!PrepareQueryTransaction()) {
@@ -533,10 +548,25 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
533548

534549
QueryState->AddOffsetsToTransaction();
535550

536-
auto navigate = QueryState->BuildSchemeCacheNavigate();
551+
if (!AreAllTheTopicsAndPartitionsKnown()) {
552+
auto navigate = QueryState->BuildSchemeCacheNavigate();
553+
Become(&TKqpSessionActor::ExecuteState);
554+
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(navigate.release()));
555+
return;
556+
}
557+
558+
TString message;
559+
if (!QueryState->TryMergeTopicOffsets(QueryState->TopicOperations, message)) {
560+
ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << message;
561+
}
537562

538-
Become(&TKqpSessionActor::ExecuteState);
539-
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(navigate.release()));
563+
if (HasTopicWriteOperations() && !HasTopicWriteId()) {
564+
Become(&TKqpSessionActor::ExecuteState);
565+
Send(MakeTxProxyID(), new TEvTxUserProxy::TEvAllocateTxId, 0, QueryState->QueryId);
566+
return;
567+
}
568+
569+
ReplySuccess();
540570
}
541571

542572
void CompileQuery() {
@@ -2768,11 +2798,14 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
27682798
ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << message;
27692799
}
27702800

2801+
QueryState->TxCtx->TopicOperations.CacheSchemeCacheNavigate(response->ResultSet);
2802+
27712803
if (HasTopicWriteOperations() && !HasTopicWriteId()) {
27722804
Send(MakeTxProxyID(), new TEvTxUserProxy::TEvAllocateTxId, 0, QueryState->QueryId);
2773-
} else {
2774-
ReplySuccess();
2805+
return;
27752806
}
2807+
2808+
ReplySuccess();
27762809
}
27772810

27782811
void Handle(TEvTxUserProxy::TEvAllocateTxIdResult::TPtr& ev) {
@@ -2782,7 +2815,9 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
27822815

27832816
YQL_ENSURE(QueryState);
27842817
YQL_ENSURE(QueryState->GetAction() == NKikimrKqp::QUERY_ACTION_TOPIC);
2818+
27852819
SetTopicWriteId(NLongTxService::TLockHandle(ev->Get()->TxId, TActivationContext::ActorSystem()));
2820+
27862821
ReplySuccess();
27872822
}
27882823

ydb/core/kqp/topics/kqp_topics.cpp

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,11 +139,12 @@ void TTopicPartitionOperations::Merge(const TTopicPartitionOperations& rhs)
139139
{
140140
Y_ABORT_UNLESS(Topic_.Empty() || Topic_ == rhs.Topic_);
141141
Y_ABORT_UNLESS(Partition_.Empty() || Partition_ == rhs.Partition_);
142-
Y_ABORT_UNLESS(TabletId_.Empty() || TabletId_ == rhs.TabletId_);
143142

144143
if (Topic_.Empty()) {
145144
Topic_ = rhs.Topic_;
146145
Partition_ = rhs.Partition_;
146+
}
147+
if (TabletId_.Empty()) {
147148
TabletId_ = rhs.TabletId_;
148149
}
149150

@@ -363,6 +364,50 @@ bool TTopicOperations::ProcessSchemeCacheNavigate(const NSchemeCache::TSchemeCac
363364
return true;
364365
}
365366

367+
bool TTopicOperations::HasThisPartitionAlreadyBeenAdded(const TString& topicPath, ui32 partitionId)
368+
{
369+
if (Operations_.contains({topicPath, partitionId})) {
370+
return true;
371+
}
372+
if (!CachedNavigateResult_.contains(topicPath)) {
373+
return false;
374+
}
375+
376+
const NSchemeCache::TSchemeCacheNavigate::TEntry& entry =
377+
CachedNavigateResult_.at(topicPath);
378+
const NKikimrSchemeOp::TPersQueueGroupDescription& description =
379+
entry.PQGroupInfo->Description;
380+
381+
TString path = CanonizePath(entry.Path);
382+
Y_ABORT_UNLESS(path == topicPath,
383+
"path=%s, topicPath=%s",
384+
path.data(), topicPath.data());
385+
386+
for (const auto& partition : description.GetPartitions()) {
387+
if (partition.GetPartitionId() == partitionId) {
388+
TTopicPartition key{topicPath, partitionId};
389+
Operations_[key].SetTabletId(partition.GetTabletId());
390+
return true;
391+
}
392+
}
393+
394+
return false;
395+
}
396+
397+
void TTopicOperations::CacheSchemeCacheNavigate(const NSchemeCache::TSchemeCacheNavigate::TResultSet& results)
398+
{
399+
for (const auto& result : results) {
400+
if (result.Kind != NSchemeCache::TSchemeCacheNavigate::KindTopic) {
401+
continue;
402+
}
403+
if (!result.PQGroupInfo) {
404+
continue;
405+
}
406+
TString path = CanonizePath(result.Path);
407+
CachedNavigateResult_[path] = result;
408+
}
409+
}
410+
366411
void TTopicOperations::BuildTopicTxs(TTopicOperationTransactions& txs)
367412
{
368413
for (auto& [_, operations] : Operations_) {

ydb/core/kqp/topics/kqp_topics.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ class TTopicOperations {
117117
bool ProcessSchemeCacheNavigate(const NSchemeCache::TSchemeCacheNavigate::TResultSet& results,
118118
Ydb::StatusIds_StatusCode& status,
119119
TString& message);
120+
void CacheSchemeCacheNavigate(const NSchemeCache::TSchemeCacheNavigate::TResultSet& results);
120121

121122
void BuildTopicTxs(TTopicOperationTransactions &txs);
122123

@@ -129,13 +130,17 @@ class TTopicOperations {
129130

130131
size_t GetSize() const;
131132

133+
bool HasThisPartitionAlreadyBeenAdded(const TString& topic, ui32 partitionId);
134+
132135
private:
133136
THashMap<TTopicPartition, TTopicPartitionOperations, TTopicPartition::THash> Operations_;
134137
bool HasReadOperations_ = false;
135138
bool HasWriteOperations_ = false;
136139

137140
TMaybe<TString> Consumer_;
138141
NLongTxService::TLockHandle WriteId_;
142+
143+
THashMap<TString, NSchemeCache::TSchemeCacheNavigate::TEntry> CachedNavigateResult_;
139144
};
140145

141146
}

ydb/core/persqueue/writer/writer.cpp

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,7 @@ class TPartitionWriter : public TActorBootstrapped<TPartitionWriter>, private TR
223223
}
224224

225225
if (auto delay = RetryState->GetNextRetryDelay(code); delay.Defined()) {
226+
DEBUG("Repeat the request to KQP in " << *delay);
226227
Schedule(*delay, new TEvents::TEvWakeup());
227228
}
228229
}
@@ -254,6 +255,10 @@ class TPartitionWriter : public TActorBootstrapped<TPartitionWriter>, private TR
254255
/// GetWriteId
255256

256257
void GetWriteId(const TActorContext& ctx) {
258+
DEBUG("Start of a request to KQP for a WriteId. " <<
259+
"SessionId: " << Opts.SessionId <<
260+
" TxId: " << Opts.TxId);
261+
257262
auto ev = MakeWriteIdRequest();
258263
ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release());
259264
Become(&TThis::StateGetWriteId);
@@ -269,7 +274,12 @@ class TPartitionWriter : public TActorBootstrapped<TPartitionWriter>, private TR
269274
}
270275
}
271276

272-
void HandleWriteId(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
277+
void HandleWriteId(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& /*ctx*/) {
278+
DEBUG("End of the request to KQP for the WriteId. " <<
279+
"SessionId: " << Opts.SessionId <<
280+
" TxId: " << Opts.TxId <<
281+
" Status: " << ev->Get()->Record.GetYdbStatus());
282+
273283
auto& record = ev->Get()->Record;
274284
switch (record.GetYdbStatus()) {
275285
case Ydb::StatusIds::SUCCESS:
@@ -283,10 +293,9 @@ class TPartitionWriter : public TActorBootstrapped<TPartitionWriter>, private TR
283293

284294
WriteId = NPQ::GetWriteId(record.GetResponse().GetTopicOperations());
285295

286-
LOG_DEBUG_S(ctx, NKikimrServices::PQ_WRITE_PROXY,
287-
"SessionId: " << Opts.SessionId <<
288-
" TxId: " << Opts.TxId <<
289-
" WriteId: " << WriteId);
296+
DEBUG("SessionId: " << Opts.SessionId <<
297+
" TxId: " << Opts.TxId <<
298+
" WriteId: " << WriteId);
290299

291300
GetOwnership();
292301
}
@@ -404,11 +413,20 @@ class TPartitionWriter : public TActorBootstrapped<TPartitionWriter>, private TR
404413
Y_ABORT_UNLESS(HasWriteId());
405414
Y_ABORT_UNLESS(HasSupportivePartitionId());
406415

416+
DEBUG("Start of a request to KQP to save PartitionId. " <<
417+
"SessionId: " << Opts.SessionId <<
418+
" TxId: " << Opts.TxId);
419+
407420
auto ev = MakeWriteIdRequest();
408421
ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release());
409422
}
410423

411424
void HandlePartitionIdSaved(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext&) {
425+
DEBUG("End of a request to KQP to save PartitionId. " <<
426+
"SessionId: " << Opts.SessionId <<
427+
" TxId: " << Opts.TxId <<
428+
" Status: " << ev->Get()->Record.GetYdbStatus());
429+
412430
auto& record = ev->Get()->Record;
413431
switch (record.GetYdbStatus()) {
414432
case Ydb::StatusIds::SUCCESS:
@@ -968,7 +986,10 @@ class TPartitionWriter : public TActorBootstrapped<TPartitionWriter>, private TR
968986
using IRetryState = IRetryPolicy::IRetryState;
969987

970988
static IRetryPolicy::TPtr GetRetryPolicy() {
971-
return IRetryPolicy::GetExponentialBackoffPolicy(Retryable);
989+
return IRetryPolicy::GetExponentialBackoffPolicy(Retryable,
990+
TDuration::MilliSeconds(10),
991+
TDuration::MilliSeconds(10),
992+
TDuration::MilliSeconds(100));
972993
};
973994

974995
static ERetryErrorClass Retryable(Ydb::StatusIds::StatusCode code) {

0 commit comments

Comments
 (0)