Skip to content

YQ-3561 add features for FQ-run #14826

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ struct TDummyTopic {
TString TopicName;
TMaybe<TString> Path;
size_t PartitionsCount;
bool CancelOnFileFinish = false;
};

// Dummy Pq gateway for tests.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ class TFileTopicReadSession : public NYdb::NTopic::IReadSession {
constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(5);

public:
TFileTopicReadSession(TFile file, NYdb::NTopic::TPartitionSession::TPtr session, const TString& producerId = "")
TFileTopicReadSession(TFile file, NYdb::NTopic::TPartitionSession::TPtr session, const TString& producerId = "", bool cancelOnFileFinish = false)
: File_(std::move(file))
, Session_(std::move(session))
, ProducerId_(producerId)
, FilePoller_([this] () {
PollFileForChanges();
})
, Counters_()
, CancelOnFileFinish_(cancelOnFileFinish)
{
Pool_.Start(1);
}
Expand Down Expand Up @@ -123,7 +124,7 @@ constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(5);
size_t size = 0;
ui64 maxBatchRowSize = 100;

while (size_t read = fi.ReadLine(rawMsg)) {
while (fi.ReadLine(rawMsg)) {
msgs.emplace_back(MakeNextMessage(rawMsg));
MsgOffset_++;
if (!maxBatchRowSize--) {
Expand All @@ -133,6 +134,8 @@ constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(5);
}
if (!msgs.empty()) {
EventsQ_.Push(NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent(msgs, {}, Session_), size);
} else if (CancelOnFileFinish_) {
EventsQ_.Push(NYdb::NTopic::TSessionClosedEvent(NYdb::EStatus::CANCELLED, {NYdb::NIssue::TIssue("PQ file topic was finished")}), size);
}

Sleep(FILE_POLL_PERIOD);
Expand All @@ -145,6 +148,7 @@ constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(5);
TString ProducerId_;
std::thread FilePoller_;
NYdb::NTopic::TReaderCounters::TPtr Counters_;
bool CancelOnFileFinish_ = false;

TThreadPool Pool_;
size_t MsgOffset_ = 0;
Expand Down Expand Up @@ -336,6 +340,10 @@ struct TDummyPartitionSession: public NYdb::NTopic::TPartitionSession {
}
};

TFileTopicClient::TFileTopicClient(THashMap<TDummyPqGateway::TClusterNPath, TDummyTopic> topics)
: Topics_(std::move(topics))
{}

std::shared_ptr<NYdb::NTopic::IReadSession> TFileTopicClient::CreateReadSession(const NYdb::NTopic::TReadSessionSettings& settings) {
Y_ENSURE(!settings.Topics_.empty());
const auto& topic = settings.Topics_.front();
Expand All @@ -358,7 +366,8 @@ std::shared_ptr<NYdb::NTopic::IReadSession> TFileTopicClient::CreateReadSession(
ui64 sessionId = 0;
return std::make_shared<TFileTopicReadSession>(
TFile(*filePath, EOpenMode::TEnum::RdOnly),
MakeIntrusive<TDummyPartitionSession>(sessionId, TString{topicPath}, partitionId)
MakeIntrusive<TDummyPartitionSession>(sessionId, TString{topicPath}, partitionId),
"", topicsIt->second.CancelOnFileFinish
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace NYql {
struct TFileTopicClient : public ITopicClient {
TFileTopicClient(THashMap<TDummyPqGateway::TClusterNPath, TDummyTopic> topics): Topics_(topics) {}
explicit TFileTopicClient(THashMap<TDummyPqGateway::TClusterNPath, TDummyTopic> topics);

NYdb::TAsyncStatus CreateTopic(const TString& path, const NYdb::NTopic::TCreateTopicSettings& settings = {}) override;

Expand Down Expand Up @@ -32,5 +32,7 @@ struct TFileTopicClient : public ITopicClient {

private:
THashMap<TDummyPqGateway::TClusterNPath, TDummyTopic> Topics_;
bool CancelOnFileFinish_ = false;
};
}

}
2 changes: 2 additions & 0 deletions ydb/tests/tools/fqrun/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ sync_dir
*.conf
*.parquet
*.json
*.svg
*.txt
11 changes: 10 additions & 1 deletion ydb/tests/tools/fqrun/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@

Tool can be used to execute streaming queries by using FQ proxy infrastructure.

For profiling memory allocations build fqrun with ya make flags `-D PROFILE_MEMORY_ALLOCATIONS -D CXXFLAGS=-DPROFILE_MEMORY_ALLOCATIONS`.

## Scripts

* `flame_graph.sh` - script for collecting flame graphs in svg format, usage:
```(bash)
./flame_graph.sh [graph collection time in seconds]
```

## Examples

### Queries
Expand All @@ -25,7 +34,7 @@ Tool can be used to execute streaming queries by using FQ proxy infrastructure.
./fqrun -M 32000
```

Monitoring endpoint: https://localhost:32000
Monitoring endpoint: http://localhost:32000

* gRPC endpoint:
```(bash)
Expand Down
46 changes: 46 additions & 0 deletions ydb/tests/tools/fqrun/configuration/as_config.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
Executor {
Type: BASIC
Threads: 1
SpinThreshold: 10
Name: "System"
}
Executor {
Type: BASIC
Threads: 6
SpinThreshold: 1
Name: "User"
}
Executor {
Type: BASIC
Threads: 1
SpinThreshold: 1
Name: "Batch"
}
Executor {
Type: IO
Threads: 1
Name: "IO"
}
Executor {
Type: BASIC
Threads: 2
SpinThreshold: 10
Name: "IC"
TimePerMailboxMicroSecs: 100
}

Scheduler {
Resolution: 64
SpinThreshold: 0
ProgressThreshold: 10000
}

SysExecutor: 0
UserExecutor: 1
IoExecutor: 3
BatchExecutor: 2

ServiceExecutor {
ServiceName: "Interconnect"
ExecutorId: 4
}
25 changes: 25 additions & 0 deletions ydb/tests/tools/fqrun/flame_graph.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#!/usr/bin/env bash

set -eux

function cleanup {
sudo rm ./profdata
rm ./profdata.txt
}
trap cleanup EXIT

if [ $# -gt 1 ]; then
echo "Too many arguments"
exit -1
fi

fqrun_pid=$(pgrep -u $USER fqrun)

sudo perf record -F 50 --call-graph dwarf -g --proc-map-timeout=10000 --pid $fqrun_pid -v -o profdata -- sleep ${1:-'30'}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А нужно ли тут sudo? (возможно, в некоторых случаях -- да, но в некоторых случаях -- тот же пользователь, отсутствие параноидальных настроек с ptrace -- работает и без него). (Насколько я понимаю, это просто скопировано из kqprun, так что те же вопросы можно адресовать и к тому скрипту)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Иногда без sudo у меня perf ругался, оставил чтобы работало скорее всегда (если прав на sudo нет, что редко, можно в ручную убрать).

Это ведь не засоряет лишними файлами fs для супер пользователя?

Copy link
Collaborator

@yumkam yumkam Feb 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perf record вроде ничего трогать не должен, и не сильно страшен; только вот он сохраняет perf.data с 0600 root:root, что побуждает и perf script запускать с sudo, а он делает всякое-страшное (и насчёт загрязнения не уверен)
В общем, чем с меньшими правами всякое запускается -- тем меньше шансов случайно отстрелить.
Я бы скорее вставил в скрипт что-то вроде $SUDO perf [...] и оставил выставление SUDO=sudo ./flame-graph.sh на пользователя (если у него без sudo не работает)
Но можно оставить и как есть

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Хорошее замечание, поправлю в следующем PR

sudo perf script -i profdata > profdata.txt

SCRIPT_DIR=$(cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd)

flame_graph_tool="$SCRIPT_DIR/../../../../contrib/tools/flame-graph/"

${flame_graph_tool}/stackcollapse-perf.pl profdata.txt | ${flame_graph_tool}/flamegraph.pl > profdata.svg
65 changes: 63 additions & 2 deletions ydb/tests/tools/fqrun/fqrun.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,16 @@

#include <util/datetime/base.h>

#include <ydb/core/blob_depot/mon_main.h>
#include <ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.h>
#include <ydb/tests/tools/fqrun/src/fq_runner.h>
#include <ydb/tests/tools/kqprun/runlib/application.h>
#include <ydb/tests/tools/kqprun/runlib/utils.h>

#ifdef PROFILE_MEMORY_ALLOCATIONS
#include <library/cpp/lfalloc/alloc_profiler/profiler.h>
#endif

using namespace NKikimrRun;

namespace NFqRun {
Expand Down Expand Up @@ -156,6 +161,21 @@ class TMain : public TMainBase {
}
});

options.AddLongOption("as-cfg", "File with actor system config (TActorSystemConfig), use '-' for default")
.RequiredArgument("file")
.DefaultValue("./configuration/as_config.conf")
.Handler1([this](const NLastGetopt::TOptsParser* option) {
const TString file(option->CurValOrDef());
if (file == "-") {
return;
}

RunnerOptions.FqSettings.ActorSystemConfig = NKikimrConfig::TActorSystemConfig();
if (!google::protobuf::TextFormat::ParseFromString(LoadFile(file), &(*RunnerOptions.FqSettings.ActorSystemConfig))) {
ythrow yexception() << "Bad format of actor system configuration";
}
});

options.AddLongOption("emulate-s3", "Enable readings by s3 provider from files, `bucket` value in connection - path to folder with files")
.NoArgument()
.SetFlag(&RunnerOptions.FqSettings.EmulateS3);
Expand All @@ -165,17 +185,27 @@ class TMain : public TMainBase {
.Handler1([this](const NLastGetopt::TOptsParser* option) {
TStringBuf topicName, others;
TStringBuf(option->CurVal()).Split('@', topicName, others);

TStringBuf path, partitionCountStr;
TStringBuf(others).Split(':', path, partitionCountStr);
size_t partitionCount = !partitionCountStr.empty() ? FromString<size_t>(partitionCountStr) : 1;
if (!partitionCount) {
ythrow yexception() << "Topic partition count should be at least one";
}
if (topicName.empty() || path.empty()) {
ythrow yexception() << "Incorrect PQ file mapping, expected form topic@path[:partitions_count]" << Endl;
ythrow yexception() << "Incorrect PQ file mapping, expected form topic@path[:partitions_count]";
}
if (!PqFilesMapping.emplace(topicName, NYql::TDummyTopic("pq", TString(topicName), TString(path), partitionCount)).second) {
ythrow yexception() << "Got duplicated topic name: " << topicName;
}
});

options.AddLongOption("cancel-on-file-finish", "Cancel emulate YDS topics when topic file finished")
.RequiredArgument("topic")
.Handler1([this](const NLastGetopt::TOptsParser* option) {
TopicsSettings[option->CurVal()].CancelOnFileFinish = true;
});

// Outputs

options.AddLongOption("result-file", "File with query results (use '-' to write in stdout)")
Expand Down Expand Up @@ -210,6 +240,7 @@ class TMain : public TMainBase {
ExecutionOptions.Validate(RunnerOptions);

RunnerOptions.FqSettings.YqlToken = GetEnv(YQL_TOKEN_VARIABLE);
RunnerOptions.FqSettings.FunctionRegistry = CreateFunctionRegistry().Get();

auto& gatewayConfig = *RunnerOptions.FqSettings.FqConfig.mutable_gateways();
FillTokens(gatewayConfig.mutable_pq());
Expand All @@ -224,14 +255,39 @@ class TMain : public TMainBase {

if (!PqFilesMapping.empty()) {
auto fileGateway = MakeIntrusive<NYql::TDummyPqGateway>();
for (const auto& [_, topic] : PqFilesMapping) {
for (auto [_, topic] : PqFilesMapping) {
if (const auto it = TopicsSettings.find(topic.TopicName); it != TopicsSettings.end()) {
topic.CancelOnFileFinish = it->second.CancelOnFileFinish;
TopicsSettings.erase(it);
}
fileGateway->AddDummyTopic(topic);
}
RunnerOptions.FqSettings.PqGateway = std::move(fileGateway);
}
if (!TopicsSettings.empty()) {
ythrow yexception() << "Found topic settings for not existing topic: '" << TopicsSettings.begin()->first << "'";
}

#ifdef PROFILE_MEMORY_ALLOCATIONS
if (RunnerOptions.FqSettings.VerboseLevel >= EVerbose::Info) {
Cout << CoutColors.Cyan() << "Starting profile memory allocations" << CoutColors.Default() << Endl;
}
NAllocProfiler::StartAllocationSampling(true);
#else
if (ProfileAllocationsOutput) {
ythrow yexception() << "Profile memory allocations disabled, please rebuild fqrun with flag `-D PROFILE_MEMORY_ALLOCATIONS`";
}
#endif

RunScript(ExecutionOptions, RunnerOptions);

#ifdef PROFILE_MEMORY_ALLOCATIONS
if (RunnerOptions.FqSettings.VerboseLevel >= EVerbose::Info) {
Cout << CoutColors.Cyan() << "Finishing profile memory allocations" << CoutColors.Default() << Endl;
}
FinishProfileMemoryAllocations();
#endif

return 0;
}

Expand All @@ -249,6 +305,11 @@ class TMain : public TMainBase {
TExecutionOptions ExecutionOptions;
TRunnerOptions RunnerOptions;
std::unordered_map<TString, NYql::TDummyTopic> PqFilesMapping;

struct TTopicSettings {
bool CancelOnFileFinish = false;
};
std::unordered_map<TString, TTopicSettings> TopicsSettings;
};

} // anonymous namespace
Expand Down
4 changes: 4 additions & 0 deletions ydb/tests/tools/fqrun/src/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
#include <ydb/library/yql/providers/pq/provider/yql_pq_gateway.h>
#include <ydb/tests/tools/kqprun/runlib/settings.h>

#include <yql/essentials/minikql/mkql_function_registry.h>

namespace NFqRun {

constexpr char YQL_TOKEN_VARIABLE[] = "YQL_TOKEN";
Expand All @@ -27,8 +29,10 @@ struct TFqSetupSettings : public NKikimrRun::TServerSettings {

TString YqlToken;
NYql::IPqGateway::TPtr PqGateway;
TIntrusivePtr<NKikimr::NMiniKQL::IMutableFunctionRegistry> FunctionRegistry;
NFq::NConfig::TConfig FqConfig;
NKikimrConfig::TLogConfig LogConfig;
std::optional<NKikimrConfig::TActorSystemConfig> ActorSystemConfig;
};

struct TRunnerOptions {
Expand Down
12 changes: 12 additions & 0 deletions ydb/tests/tools/fqrun/src/fq_setup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ class TFqSetup::TImpl {
serverSettings.SetLoggerInitializer(loggerInitializer);
}

void SetFunctionRegistry(NKikimr::Tests::TServerSettings& serverSettings) const {
if (Settings.FunctionRegistry) {
serverSettings.SetFrFactory([this](const NKikimr::NScheme::TTypeRegistry&) {
return Settings.FunctionRegistry.Get();
});
}
}

NKikimr::Tests::TServerSettings GetServerSettings(ui32 grpcPort) {
NKikimr::Tests::TServerSettings serverSettings(PortManager.GetPort());

Expand All @@ -52,9 +60,13 @@ class TFqSetup::TImpl {

NKikimrConfig::TAppConfig config;
*config.MutableLogConfig() = Settings.LogConfig;
if (Settings.ActorSystemConfig) {
*config.MutableActorSystemConfig() = *Settings.ActorSystemConfig;
}
serverSettings.SetAppConfig(config);

SetLoggerSettings(serverSettings);
SetFunctionRegistry(serverSettings);

if (Settings.MonitoringEnabled) {
serverSettings.InitKikimrRunConfig();
Expand Down
2 changes: 1 addition & 1 deletion ydb/tests/tools/fqrun/src/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ SRCS(
PEERDIR(
library/cpp/colorizer
library/cpp/testing/unittest
util
ydb/core/fq/libs/config/protos
ydb/core/fq/libs/control_plane_proxy/events
ydb/core/fq/libs/init
Expand All @@ -20,6 +19,7 @@ PEERDIR(
ydb/library/security
ydb/library/yql/providers/pq/provider
ydb/tests/tools/kqprun/runlib
yql/essentials/minikql
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

рекомендую сортировать списки. util лучше убери, он всегда неявно подключается

Copy link
Collaborator Author

@GrigoriyPA GrigoriyPA Feb 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Я стараюсь везде сортировать список PEERDIR по возрастанию, где тут нарушился порядок? С ходу не видно

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ну вот ты добавил строчку не по алфавиту

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

util тоже не по алфавиту был


YQL_LAST_ABI_VERSION()
Expand Down
Loading
Loading