Skip to content

YQ RD optimized purecalc memory usage #11394

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ydb/core/fq/libs/row_dispatcher/actors_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

#include <ydb/core/fq/libs/row_dispatcher/topic_session.h>

#include <ydb/library/yql/public/purecalc/common/interface.h>

namespace NFq::NRowDispatcher {


Expand All @@ -17,6 +19,7 @@ struct TActorFactory : public IActorFactory {
ui32 partitionId,
NYdb::TDriver driver,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway) const override {

Expand All @@ -29,6 +32,7 @@ struct TActorFactory : public IActorFactory {
partitionId,
std::move(driver),
credentialsProviderFactory,
pureCalcProgramFactory,
counters,
pqGateway
);
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/fq/libs/row_dispatcher/actors_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <ydb/library/actors/core/actor.h>
#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
#include <ydb/library/yql/providers/pq/provider/yql_pq_gateway.h>
#include <ydb/library/yql/public/purecalc/common/fwd.h>

namespace NFq::NRowDispatcher {

Expand All @@ -20,6 +21,7 @@ struct IActorFactory : public TThrRefBase {
ui32 partitionId,
NYdb::TDriver driver,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway) const = 0;
};
Expand Down
16 changes: 9 additions & 7 deletions ydb/core/fq/libs/row_dispatcher/json_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,15 +263,15 @@ class TJsonFilter::TImpl {
TImpl(const TVector<TString>& columns,
const TVector<TString>& types,
const TString& whereFilter,
TCallback callback)
TCallback callback,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory)
: Sql(GenerateSql(whereFilter)) {
Y_ENSURE(columns.size() == types.size(), "Number of columns and types should by equal");
auto factory = NYql::NPureCalc::MakeProgramFactory(NYql::NPureCalc::TProgramFactoryOptions());

// Program should be stateless because input values
// allocated on another allocator and should be released
LOG_ROW_DISPATCHER_DEBUG("Creating program...");
Program = factory->MakePushStreamProgram(
Program = pureCalcProgramFactory->MakePushStreamProgram(
TFilterInputSpec(MakeInputSchema(columns, types)),
TFilterOutputSpec(MakeOutputSchema()),
Sql,
Expand Down Expand Up @@ -311,8 +311,9 @@ TJsonFilter::TJsonFilter(
const TVector<TString>& columns,
const TVector<TString>& types,
const TString& whereFilter,
TCallback callback)
: Impl(std::make_unique<TJsonFilter::TImpl>(columns, types, whereFilter, callback)) {
TCallback callback,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory)
: Impl(std::make_unique<TJsonFilter::TImpl>(columns, types, whereFilter, callback, pureCalcProgramFactory)) {
}

TJsonFilter::~TJsonFilter() {
Expand All @@ -330,8 +331,9 @@ std::unique_ptr<TJsonFilter> NewJsonFilter(
const TVector<TString>& columns,
const TVector<TString>& types,
const TString& whereFilter,
TCallback callback) {
return std::unique_ptr<TJsonFilter>(new TJsonFilter(columns, types, whereFilter, callback));
TCallback callback,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory) {
return std::unique_ptr<TJsonFilter>(new TJsonFilter(columns, types, whereFilter, callback, pureCalcProgramFactory));
}

} // namespace NFq
7 changes: 5 additions & 2 deletions ydb/core/fq/libs/row_dispatcher/json_filter.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
#include <ydb/library/yql/public/purecalc/common/fwd.h>

namespace NFq {

Expand All @@ -13,7 +14,8 @@ class TJsonFilter {
const TVector<TString>& columns,
const TVector<TString>& types,
const TString& whereFilter,
TCallback callback);
TCallback callback,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory);

~TJsonFilter();

Expand All @@ -29,6 +31,7 @@ std::unique_ptr<TJsonFilter> NewJsonFilter(
const TVector<TString>& columns,
const TVector<TString>& types,
const TString& whereFilter,
TJsonFilter::TCallback callback);
TJsonFilter::TCallback callback,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory);

} // namespace NFq
4 changes: 4 additions & 0 deletions ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <ydb/library/actors/core/interconnect.h>
#include <ydb/library/yql/dq/actors/common/retry_queue.h>
#include <ydb/library/yql/providers/dq/counters/counters.h>
#include <ydb/library/yql/public/purecalc/common/interface.h>

#include <ydb/core/fq/libs/actors/logging/log.h>
#include <ydb/core/fq/libs/events/events.h>
Expand Down Expand Up @@ -119,6 +120,7 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {

NConfig::TRowDispatcherConfig Config;
NKikimr::TYdbCredentialsProviderFactory CredentialsProviderFactory;
NYql::NPureCalc::IProgramFactoryPtr PureCalcProgramFactory;
TYqSharedResources::TPtr YqSharedResources;
TMaybe<TActorId> CoordinatorActorId;
TSet<TActorId> CoordinatorChangedSubscribers;
Expand Down Expand Up @@ -264,6 +266,7 @@ TRowDispatcher::TRowDispatcher(
const NYql::IPqGateway::TPtr& pqGateway)
: Config(config)
, CredentialsProviderFactory(credentialsProviderFactory)
, PureCalcProgramFactory(NYql::NPureCalc::MakeProgramFactory(NYql::NPureCalc::TProgramFactoryOptions()))
, YqSharedResources(yqSharedResources)
, CredentialsFactory(credentialsFactory)
, LogPrefix("RowDispatcher: ")
Expand Down Expand Up @@ -436,6 +439,7 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
CredentialsFactory,
ev->Get()->Record.GetToken(),
source.GetAddBearerToToken()),
PureCalcProgramFactory,
Counters,
PqGateway
);
Expand Down
10 changes: 8 additions & 2 deletions ydb/core/fq/libs/row_dispatcher/topic_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
ui32 PartitionId;
NYdb::TDriver Driver;
std::shared_ptr<NYdb::ICredentialsProviderFactory> CredentialsProviderFactory;
NYql::NPureCalc::IProgramFactoryPtr PureCalcProgramFactory;
NYql::ITopicClient::TPtr TopicClient;
std::shared_ptr<NYdb::NTopic::IReadSession> ReadSession;
const i64 BufferSize;
Expand Down Expand Up @@ -179,6 +180,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
ui32 partitionId,
NYdb::TDriver driver,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway);

Expand Down Expand Up @@ -268,6 +270,7 @@ TTopicSession::TTopicSession(
ui32 partitionId,
NYdb::TDriver driver,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway)
: TopicPath(topicPath)
Expand All @@ -277,6 +280,7 @@ TTopicSession::TTopicSession(
, PartitionId(partitionId)
, Driver(std::move(driver))
, CredentialsProviderFactory(credentialsProviderFactory)
, PureCalcProgramFactory(pureCalcProgramFactory)
, BufferSize(16_MB)
, LogPrefix("TopicSession")
, Config(config)
Expand Down Expand Up @@ -734,7 +738,8 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
predicate,
[&, actorId = clientInfo.ReadActorId](ui64 offset, const TString& json){
Send(SelfId(), new NFq::TEvPrivate::TEvDataAfterFilteration(offset, json, actorId));
});
},
PureCalcProgramFactory);
} else {
ClientsWithoutPredicate.insert(ev->Sender);
}
Expand Down Expand Up @@ -959,9 +964,10 @@ std::unique_ptr<NActors::IActor> NewTopicSession(
ui32 partitionId,
NYdb::TDriver driver,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway) {
return std::unique_ptr<NActors::IActor>(new TTopicSession(topicPath, endpoint, database, config, rowDispatcherActorId, partitionId, std::move(driver), credentialsProviderFactory, counters, pqGateway));
return std::unique_ptr<NActors::IActor>(new TTopicSession(topicPath, endpoint, database, config, rowDispatcherActorId, partitionId, std::move(driver), credentialsProviderFactory, pureCalcProgramFactory, counters, pqGateway));
}

} // namespace NFq
2 changes: 2 additions & 0 deletions ydb/core/fq/libs/row_dispatcher/topic_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include <ydb/library/yql/providers/pq/proto/dq_io.pb.h>
#include <ydb/library/yql/providers/pq/provider/yql_pq_gateway.h>
#include <ydb/library/yql/public/purecalc/common/fwd.h>

#include <ydb/library/actors/core/actor.h>

Expand All @@ -24,6 +25,7 @@ std::unique_ptr<NActors::IActor> NewTopicSession(
ui32 partitionId,
NYdb::TDriver driver,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway);

Expand Down
21 changes: 16 additions & 5 deletions ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <ydb/core/testlib/actor_helpers.h>

#include <ydb/library/yql/minikql/mkql_string_util.h>
#include <ydb/library/yql/public/purecalc/common/interface.h>

#include <library/cpp/testing/unittest/registar.h>

Expand All @@ -22,16 +23,24 @@ class TFixture : public NUnitTest::TBaseFixture {

public:
TFixture()
: Runtime(true)
: PureCalcProgramFactory(NYql::NPureCalc::MakeProgramFactory(NYql::NPureCalc::TProgramFactoryOptions()))
, Runtime(true)
, Alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), true, false)
{}

static void SegmentationFaultHandler(int) {
Cerr << "segmentation fault call stack:" << Endl;
FormatBackTrace(&Cerr);
abort();
}

void SetUp(NUnitTest::TTestContext&) override {
NKikimr::EnableYDBBacktraceFormat();
signal(SIGSEGV, &SegmentationFaultHandler);

TAutoPtr<TAppPrepare> app = new TAppPrepare();
Runtime.Initialize(app->Unwrap());
Runtime.SetLogPriority(NKikimrServices::FQ_ROW_DISPATCHER, NLog::PRI_DEBUG);

NKikimr::EnableYDBBacktraceFormat();
}

void TearDown(NUnitTest::TTestContext& /* context */) override {
Expand All @@ -55,7 +64,8 @@ class TFixture : public NUnitTest::TBaseFixture {
columns,
types,
whereFilter,
callback);
callback,
PureCalcProgramFactory);
}

const NKikimr::NMiniKQL::TUnboxedValueVector* MakeVector(size_t size, std::function<NYql::NUdf::TUnboxedValuePod(size_t)> valueCreator) {
Expand Down Expand Up @@ -90,8 +100,9 @@ class TFixture : public NUnitTest::TBaseFixture {
});
}

TActorSystemStub actorSystemStub;
NYql::NPureCalc::IProgramFactoryPtr PureCalcProgramFactory;
NActors::TTestActorRuntime Runtime;
TActorSystemStub ActorSystemStub;
std::unique_ptr<NFq::TJsonFilter> Filter;

NKikimr::NMiniKQL::TScopedAlloc Alloc;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ struct TTestActorFactory : public NFq::NRowDispatcher::IActorFactory {
ui32 /*partitionId*/,
NYdb::TDriver /*driver*/,
std::shared_ptr<NYdb::ICredentialsProviderFactory> /*credentialsProviderFactory*/,
NYql::NPureCalc::IProgramFactoryPtr /*pureCalcProgramFactory*/,
const ::NMonitoring::TDynamicCounterPtr& /*counters*/,
const NYql::IPqGateway::TPtr& /*pqGateway*/) const override {
auto actorId = Runtime.AllocateEdgeActor();
Expand Down
10 changes: 7 additions & 3 deletions ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <ydb/tests/fq/pq_async_io/ut_helpers.h>

#include <ydb/library/yql/providers/pq/gateway/native/yql_pq_gateway.h>
#include <ydb/library/yql/public/purecalc/common/interface.h>

namespace {

Expand All @@ -24,10 +25,11 @@ const ui64 TimeoutBeforeStartSessionSec = 3;
const ui64 GrabTimeoutSec = 4 * TimeoutBeforeStartSessionSec;

class TFixture : public NUnitTest::TBaseFixture {

public:
TFixture()
: Runtime(true) {}
: PureCalcProgramFactory(NYql::NPureCalc::MakeProgramFactory(NYql::NPureCalc::TProgramFactoryOptions()))
, Runtime(true)
{}

void SetUp(NUnitTest::TTestContext&) override {
TAutoPtr<TAppPrepare> app = new TAppPrepare();
Expand Down Expand Up @@ -68,6 +70,7 @@ class TFixture : public NUnitTest::TBaseFixture {
0,
Driver,
CredentialsProviderFactory,
PureCalcProgramFactory,
MakeIntrusive<NMonitoring::TDynamicCounters>(),
CreatePqNativeGateway(pqServices)
).release());
Expand Down Expand Up @@ -155,8 +158,9 @@ class TFixture : public NUnitTest::TBaseFixture {
return eventHolder->Get()->Record.MessagesSize();
}

TActorSystemStub actorSystemStub;
NYql::NPureCalc::IProgramFactoryPtr PureCalcProgramFactory;
NActors::TTestActorRuntime Runtime;
TActorSystemStub ActorSystemStub;
NActors::TActorId TopicSession;
NActors::TActorId RowDispatcherActorId;
NYdb::TDriver Driver = NYdb::TDriver(NYdb::TDriverConfig().SetLog(CreateLogBackend("cerr")));
Expand Down
Loading