Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 11 additions & 26 deletions ydb/library/yql/providers/dq/task_runner/tasks_runner_local.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,13 @@ class TLocalTaskRunner: public ITaskRunner {

/*______________________________________________________________________________________________*/

class TAbstractFactory: public IProxyFactory {
class TLocalFactory: public IProxyFactory {
public:
TAbstractFactory(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
TLocalFactory(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
NKikimr::NMiniKQL::TComputationNodeFactory compFactory,
TTaskTransformFactory taskTransformFactory,
std::shared_ptr<NKikimr::NMiniKQL::TComputationPatternLRUCache> patternCache)
std::shared_ptr<NKikimr::NMiniKQL::TComputationPatternLRUCache> patternCache,
bool terminateOnError)
: DeterministicMode(!!GetEnv("YQL_DETERMINISTIC_MODE"))
, RandomProvider(
DeterministicMode
Expand All @@ -210,6 +211,7 @@ class TAbstractFactory: public IProxyFactory {
: CreateDefaultTimeProvider())
, FunctionRegistry(functionRegistry)
, TaskTransformFactory(std::move(taskTransformFactory))
, TerminateOnError(terminateOnError)
{
ExecutionContext.FuncRegistry = FunctionRegistry;
ExecutionContext.ComputationFactory = compFactory;
Expand All @@ -218,29 +220,6 @@ class TAbstractFactory: public IProxyFactory {
ExecutionContext.PatternCache = patternCache;
}

protected:
bool DeterministicMode;
TIntrusivePtr<IRandomProvider> RandomProvider;
TIntrusivePtr<ITimeProvider> TimeProvider;
const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry;
TTaskTransformFactory TaskTransformFactory;

NDq::TDqTaskRunnerContext ExecutionContext;
};

/*______________________________________________________________________________________________*/

class TLocalFactory: public TAbstractFactory {
public:
TLocalFactory(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
NKikimr::NMiniKQL::TComputationNodeFactory compFactory,
TTaskTransformFactory taskTransformFactory,
std::shared_ptr<NKikimr::NMiniKQL::TComputationPatternLRUCache> patternCache,
bool terminateOnError)
: TAbstractFactory(functionRegistry, compFactory, taskTransformFactory, patternCache)
, TerminateOnError(terminateOnError)
{ }

ITaskRunner::TPtr GetOld(const TDqTaskSettings& task, const TString& traceId) override {
return new TLocalTaskRunner(task, Get(task, NDqProto::DQ_STATS_MODE_BASIC, traceId));
}
Expand Down Expand Up @@ -287,6 +266,12 @@ class TLocalFactory: public TAbstractFactory {
}

private:
bool DeterministicMode;
TIntrusivePtr<IRandomProvider> RandomProvider;
TIntrusivePtr<ITimeProvider> TimeProvider;
const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry;
TTaskTransformFactory TaskTransformFactory;
NDq::TDqTaskRunnerContext ExecutionContext;
const bool TerminateOnError;
};

Expand Down