Skip to content

Dynamic listing mass fix #1913

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,16 @@ void TCheckpointCoordinator::Handle(NActors::TEvInterconnect::TEvNodeConnected::
}
}

void TCheckpointCoordinator::Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) {
CC_LOG_D("Handle undelivered");

if (const auto actorIt = AllActors.find(ev->Sender); actorIt != AllActors.end()) {
actorIt->second->EventsQueue.HandleUndelivered(ev);
}

NYql::TTaskControllerImpl<TCheckpointCoordinator>::OnUndelivered(ev);
}

void TCheckpointCoordinator::Handle(NActors::TEvents::TEvPoison::TPtr& ev) {
CC_LOG_D("Got TEvPoison");
Send(ev->Sender, new NActors::TEvents::TEvPoisonTaken(), 0, ev->Cookie);
Expand Down
7 changes: 4 additions & 3 deletions ydb/core/fq/libs/checkpointing/checkpoint_coordinator.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class TCheckpointCoordinator : public NYql::TTaskControllerImpl<TCheckpointCoord
void Handle(NActors::TEvents::TEvPoison::TPtr&);
void Handle(NActors::TEvInterconnect::TEvNodeDisconnected::TPtr& ev);
void Handle(NActors::TEvInterconnect::TEvNodeConnected::TPtr& ev);
void Handle(NActors::TEvents::TEvUndelivered::TPtr& ev);
void Handle(const TEvCheckpointCoordinator::TEvRunGraph::TPtr&);
void HandleException(const std::exception& err);

Expand Down Expand Up @@ -89,13 +90,13 @@ class TCheckpointCoordinator : public NYql::TTaskControllerImpl<TCheckpointCoord
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvRetry, Handle)

hFunc(NActors::TEvents::TEvPoison, Handle)
hFunc(NActors::TEvents::TEvUndelivered, NYql::TTaskControllerImpl<TCheckpointCoordinator>::OnUndelivered)
hFunc(NActors::TEvents::TEvWakeup, NYql::TTaskControllerImpl<TCheckpointCoordinator>::OnWakeup)

hFunc(NActors::TEvInterconnect::TEvNodeDisconnected, Handle)
hFunc(NActors::TEvInterconnect::TEvNodeConnected, Handle),
hFunc(NActors::TEvInterconnect::TEvNodeConnected, Handle)
hFunc(NActors::TEvents::TEvUndelivered, Handle)

ExceptionFunc(std::exception, HandleException)
, ExceptionFunc(std::exception, HandleException)
)

static constexpr char ActorName[] = "YQ_CHECKPOINT_COORDINATOR";
Expand Down
7 changes: 3 additions & 4 deletions ydb/core/kqp/host/kqp_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1509,12 +1509,11 @@ class TKqpHost : public IKqpHost {
state->CredentialsFactory = FederatedQuerySetup->CredentialsFactory;
state->Configuration->WriteThroughDqIntegration = true;
state->Configuration->AllowAtomicUploadCommit = queryType == EKikimrQueryType::Script;
state->MaxTasksPerStage = SessionCtx->ConfigPtr()->MaxTasksPerStage.Get();

state->Configuration->Init(FederatedQuerySetup->S3GatewayConfig, TypesCtx);
state->Gateway = FederatedQuerySetup->HttpGateway;

auto dataSource = NYql::CreateS3DataSource(state, FederatedQuerySetup->HttpGateway);
auto dataSink = NYql::CreateS3DataSink(state, FederatedQuerySetup->HttpGateway);
auto dataSource = NYql::CreateS3DataSource(state);
auto dataSink = NYql::CreateS3DataSink(state);

TypesCtx->AddDataSource(NYql::S3ProviderName, std::move(dataSource));
TypesCtx->AddDataSink(NYql::S3ProviderName, std::move(dataSink));
Expand Down
11 changes: 6 additions & 5 deletions ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -947,11 +947,6 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
NYql::IDqIntegration* dqIntegration = provider->second->GetDqIntegration();
YQL_ENSURE(dqIntegration, "Unsupported dq source for provider: \"" << dataSourceCategory << "\"");
auto& externalSource = *protoSource->MutableExternalSource();
google::protobuf::Any& settings = *externalSource.MutableSettings();
TString& sourceType = *externalSource.MutableType();
dqIntegration->FillSourceSettings(source.Ref(), settings, sourceType);
YQL_ENSURE(!settings.type_url().empty(), "Data source provider \"" << dataSourceCategory << "\" did't fill dq source settings for its dq source node");
YQL_ENSURE(sourceType, "Data source provider \"" << dataSourceCategory << "\" did't fill dq source settings type for its dq source node");

// Partitioning
TVector<TString> partitionParams;
Expand All @@ -976,6 +971,12 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
externalSource.SetAuthInfo(CreateStructuredTokenParser(token).ToBuilder().RemoveSecrets().ToJson());
CreateStructuredTokenParser(token).ListReferences(SecretNames);
}

google::protobuf::Any& settings = *externalSource.MutableSettings();
TString& sourceType = *externalSource.MutableType();
dqIntegration->FillSourceSettings(source.Ref(), settings, sourceType, maxTasksPerStage);
YQL_ENSURE(!settings.type_url().empty(), "Data source provider \"" << dataSourceCategory << "\" didn't fill dq source settings for its dq source node");
YQL_ENSURE(sourceType, "Data source provider \"" << dataSourceCategory << "\" didn't fill dq source settings type for its dq source node");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ STRICT_STFUNC_EXC(TDqComputeActorCheckpoints::StateFunc,
hFunc(TEvDqCompute::TEvRun, Handle);
hFunc(NActors::TEvInterconnect::TEvNodeDisconnected, Handle);
hFunc(NActors::TEvInterconnect::TEvNodeConnected, Handle);
hFunc(NActors::TEvents::TEvUndelivered, Handle);
hFunc(TEvRetryQueuePrivate::TEvRetry, Handle);
hFunc(TEvents::TEvWakeup, Handle);
cFunc(TEvents::TEvPoisonPill::EventType, PassAway);,
Expand Down Expand Up @@ -393,6 +394,13 @@ void TDqComputeActorCheckpoints::Handle(NActors::TEvInterconnect::TEvNodeConnect
EventsQueue.HandleNodeConnected(ev->Get()->NodeId);
}

void TDqComputeActorCheckpoints::Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) {
LOG_D("Handle undelivered");
if (!EventsQueue.HandleUndelivered(ev)) {
LOG_E("TEvUndelivered: " << ev->Get()->SourceType);
}
}

void TDqComputeActorCheckpoints::Handle(TEvRetryQueuePrivate::TEvRetry::TPtr& ev) {
Y_UNUSED(ev);
EventsQueue.Retry();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ class TDqComputeActorCheckpoints : public NActors::TActor<TDqComputeActorCheckpo
void Handle(NActors::TEvents::TEvPoison::TPtr&);
void Handle(NActors::TEvInterconnect::TEvNodeDisconnected::TPtr& ev);
void Handle(NActors::TEvInterconnect::TEvNodeConnected::TPtr& ev);
void Handle(NActors::TEvents::TEvUndelivered::TPtr& ev);
void Handle(TEvRetryQueuePrivate::TEvRetry::TPtr& ev);
void Handle(NActors::TEvents::TEvWakeup::TPtr& ev);
void HandleException(const std::exception& err);
Expand Down
10 changes: 10 additions & 0 deletions ydb/library/yql/dq/actors/compute/retry_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ void TRetryEventsQueue::HandleNodeConnected(ui32 nodeId) {
}
}

bool TRetryEventsQueue::HandleUndelivered(NActors::TEvents::TEvUndelivered::TPtr& ev) {
if (ev->Sender == RecipientId && ev->Get()->Reason == NActors::TEvents::TEvUndelivered::Disconnected) {
Connected = false;
ScheduleRetry();
return true;
}

return false;
}

void TRetryEventsQueue::Retry() {
RetryScheduled = false;
if (!Connected) {
Expand Down
5 changes: 3 additions & 2 deletions ydb/library/yql/dq/actors/compute/retry_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,15 @@ class TRetryEventsQueue {
return false;
}

bool HasPendingEvents() {
bool RemoveConfirmedEvents() {
RemoveConfirmedEvents(MyConfirmedSeqNo);
return !Events.empty();
}

void OnNewRecipientId(const NActors::TActorId& recipientId, bool unsubscribe = true);
void HandleNodeConnected(ui32 nodeId);
void HandleNodeDisconnected(ui32 nodeId);
bool HandleUndelivered(NActors::TEvents::TEvUndelivered::TPtr& ev);
void Retry();
void Unsubscribe();

Expand Down Expand Up @@ -165,7 +166,7 @@ class TRetryEventsQueue {
THolder<T> ev = MakeHolder<T>();
ev->Record = Event->Record;
ev->Record.MutableTransportMeta()->SetConfirmedSeqNo(confirmedSeqNo);
return MakeHolder<NActors::IEventHandle>(Recipient, Sender, ev.Release(), 0, Cookie);
return MakeHolder<NActors::IEventHandle>(Recipient, Sender, ev.Release(), NActors::IEventHandle::FlagTrackDelivery, Cookie);
}

private:
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/dq/integration/yql_dq_integration.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class IDqIntegration {
virtual bool CanBlockRead(const NNodes::TExprBase& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx) = 0;
virtual void RegisterMkqlCompiler(NCommon::TMkqlCallableCompilerBase& compiler) = 0;
virtual bool CanFallback() = 0;
virtual void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sourceType) = 0;
virtual void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sourceType, size_t maxPartitions) = 0;
virtual void FillSinkSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sinkType) = 0;
virtual void FillTransformSettings(const TExprNode& node, ::google::protobuf::Any& settings) = 0;
virtual void Annotate(const TExprNode& node, THashMap<TString, TString>& params) = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class TClickHouseDqIntegration: public TDqIntegrationBase {
return 0ULL;
}

void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& protoSettings, TString& sourceType) override {
void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& protoSettings, TString& sourceType, size_t) override {
const TDqSource source(&node);
if (const auto maySettings = source.Settings().Maybe<TClSourceSettings>()) {
const auto settings = maySettings.Cast();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ bool TDqIntegrationBase::CanFallback() {
return false;
}

void TDqIntegrationBase::FillSourceSettings(const TExprNode&, ::google::protobuf::Any&, TString&) {
void TDqIntegrationBase::FillSourceSettings(const TExprNode&, ::google::protobuf::Any&, TString&, size_t) {
}

void TDqIntegrationBase::FillSinkSettings(const TExprNode&, ::google::protobuf::Any&, TString&) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class TDqIntegrationBase: public IDqIntegration {
bool CanBlockRead(const NNodes::TExprBase& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx) override;
TExprNode::TPtr WrapWrite(const TExprNode::TPtr& write, TExprContext& ctx) override;
bool CanFallback() override;
void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sourceType) override;
void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sourceType, size_t) override;
void FillSinkSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sinkType) override;
void FillTransformSettings(const TExprNode& node, ::google::protobuf::Any& settings) override;
void Annotate(const TExprNode& node, THashMap<TString, TString>& params) override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ namespace NYql::NDqs {
TString sourceType;
if (dqSource) {
sourceSettings.ConstructInPlace();
dqIntegration->FillSourceSettings(*read, *sourceSettings, sourceType);
dqIntegration->FillSourceSettings(*read, *sourceSettings, sourceType, maxPartitions);
YQL_ENSURE(!sourceSettings->type_url().empty(), "Data source provider \"" << dataSourceName << "\" did't fill dq source settings for its dq source node");
YQL_ENSURE(sourceType, "Data source provider \"" << dataSourceName << "\" did't fill dq source settings type for its dq source node");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ class TBuildDqSourceSettingsTransformer: public TOptimizeTransformerBase {
.Ptr();
::google::protobuf::Any settings;
TString sourceType;
dqIntegration->FillSourceSettings(*dqSourceNode, settings, sourceType);
dqIntegration->FillSourceSettings(*dqSourceNode, settings, sourceType, 1);
UNIT_ASSERT_STRINGS_EQUAL(sourceType, "PostgreSqlGeneric");
UNIT_ASSERT(settings.Is<Generic::TSource>());
settings.UnpackTo(DqSourceSettings_);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ namespace NYql {
}

void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& protoSettings,
TString& sourceType) override {
TString& sourceType, size_t) override {
const TDqSource source(&node);
if (const auto maybeSettings = source.Settings().Maybe<TGenSourceSettings>()) {
const auto settings = maybeSettings.Cast();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ class TPqDqIntegration: public TDqIntegrationBase {
}
}

void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& protoSettings, TString& sourceType) override {
void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& protoSettings, TString& sourceType, size_t) override {
if (auto maybeDqSource = TMaybeNode<TDqSource>(&node)) {
auto settings = maybeDqSource.Cast().Settings();
if (auto maybeTopicSource = TMaybeNode<TDqPqTopicSource>(settings.Raw())) {
Expand Down
Loading