Skip to content

Commit ffff7d5

Browse files
authored
Merge 5e90a37 into 1dd2b99
2 parents 1dd2b99 + 5e90a37 commit ffff7d5

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+4032
-2149
lines changed

ydb/core/fq/libs/row_dispatcher/actors_factory.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#pragma once
22

3-
#include "common.h"
3+
#include "format_handler/common/common.h"
44

55
#include <ydb/core/fq/libs/config/protos/row_dispatcher.pb.h>
66
#include <util/generic/ptr.h>

ydb/core/fq/libs/row_dispatcher/common.cpp

Lines changed: 0 additions & 47 deletions
This file was deleted.

ydb/core/fq/libs/row_dispatcher/common.h

Lines changed: 0 additions & 29 deletions
This file was deleted.
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
#include "common.h"
2+
3+
#include <util/string/builder.h>
4+
5+
#include <ydb/library/yql/public/purecalc/common/interface.h>
6+
7+
namespace NFq::NRowDispatcher {
8+
9+
namespace {
10+
11+
class TPureCalcProgramFactory : public IPureCalcProgramFactory {
12+
public:
13+
TPureCalcProgramFactory() {
14+
CreateFactory({.EnabledLLVM = false});
15+
CreateFactory({.EnabledLLVM = true});
16+
}
17+
18+
NYql::NPureCalc::IProgramFactoryPtr GetFactory(const TSettings& settings) const override {
19+
const auto it = ProgramFactories.find(settings);
20+
Y_ENSURE(it != ProgramFactories.end());
21+
return it->second;
22+
}
23+
24+
TGuard<TMutex> LockFactory() const override {
25+
return Guard(FactoryMutex);
26+
}
27+
28+
private:
29+
void CreateFactory(const TSettings& settings) {
30+
ProgramFactories.insert({settings, NYql::NPureCalc::MakeProgramFactory(
31+
NYql::NPureCalc::TProgramFactoryOptions()
32+
.SetLLVMSettings(settings.EnabledLLVM ? "ON" : "OFF")
33+
)});
34+
}
35+
36+
private:
37+
std::map<TSettings, NYql::NPureCalc::IProgramFactoryPtr> ProgramFactories;
38+
TMutex FactoryMutex;
39+
};
40+
41+
} // anonymous namespace
42+
43+
//// TStatus
44+
45+
TStatus::TStatus()
46+
: Status(EId::SUCCESS)
47+
{}
48+
49+
TStatus::TStatus(TStatucCode status, NYql::TIssues issues)
50+
: Status(status)
51+
, Issues(std::move(issues))
52+
{}
53+
54+
TStatus::TStatus(TStatucCode status, TString message)
55+
: Status(status)
56+
, Issues({NYql::TIssue(std::move(message))})
57+
{}
58+
59+
bool TStatus::IsSuccess() const {
60+
return Status == EId::SUCCESS;
61+
}
62+
63+
TStatus::TStatucCode TStatus::GetStatus() const {
64+
return Status;
65+
}
66+
67+
const NYql::TIssues& TStatus::GetIssues() const {
68+
return Issues;
69+
}
70+
71+
TString TStatus::ToString() const {
72+
return TStringBuilder() << "Status: " << NYql::NDqProto::StatusIds_StatusCode_Name(Status) << ", Issues: " << Issues.ToOneLineString();
73+
}
74+
75+
TStatus& TStatus::AddParentIssue(NYql::TIssue issue) {
76+
for (const auto& childIssue : Issues) {
77+
issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(childIssue));
78+
}
79+
Issues = {std::move(issue)};
80+
return *this;
81+
}
82+
83+
TStatus& TStatus::AddParentIssue(TString message) {
84+
return AddParentIssue(NYql::TIssue(std::move(message)));
85+
}
86+
87+
TStatus& TStatus::AddIssue(NYql::TIssue issue) {
88+
Issues.AddIssue(std::move(issue));
89+
return *this;
90+
}
91+
92+
TStatus& TStatus::AddIssue(TString message) {
93+
return AddIssue(NYql::TIssue{std::move(message)});
94+
}
95+
96+
//// IPureCalcProgramFactory
97+
98+
IPureCalcProgramFactory::TPtr CreatePureCalcProgramFactory() {
99+
return MakeIntrusive<TPureCalcProgramFactory>();
100+
}
101+
102+
} // namespace NFq::NRowDispatcher
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
#pragma once
2+
3+
#include <util/system/mutex.h>
4+
5+
#include <ydb/library/yql/dq/actors/protos/dq_status_codes.pb.h>
6+
#include <ydb/library/yql/public/purecalc/common/fwd.h>
7+
8+
#include <yql/essentials/public/issue/yql_issue.h>
9+
10+
namespace NFq::NRowDispatcher {
11+
12+
struct TFormatHandlerException : public yexception {
13+
using yexception::yexception;
14+
};
15+
16+
class TStatus {
17+
public:
18+
using EId = NYql::NDqProto::StatusIds;
19+
using TStatucCode = EId::StatusCode;
20+
21+
public:
22+
TStatus();
23+
explicit TStatus(TStatucCode status, NYql::TIssues issues = {});
24+
TStatus(TStatucCode status, TString message);
25+
26+
virtual bool IsSuccess() const;
27+
28+
TStatucCode GetStatus() const;
29+
const NYql::TIssues& GetIssues() const;
30+
31+
TString ToString() const;
32+
33+
TStatus& AddParentIssue(NYql::TIssue issue);
34+
TStatus& AddParentIssue(TString message);
35+
36+
TStatus& AddIssue(NYql::TIssue issue);
37+
TStatus& AddIssue(TString message);
38+
39+
protected:
40+
TStatucCode Status;
41+
NYql::TIssues Issues;
42+
};
43+
44+
template <typename TValue>
45+
class TValueStatus : public TStatus {
46+
using TBase = TStatus;
47+
48+
public:
49+
TValueStatus(TValue value)
50+
: TBase(EId::SUCCESS)
51+
, Value(std::move(value))
52+
{}
53+
54+
TValueStatus(TStatus status)
55+
: TBase(std::move(status))
56+
{}
57+
58+
bool IsSuccess() const override {
59+
return TBase::IsSuccess() && Value;
60+
}
61+
62+
TValue& GetValue() {
63+
if (Y_UNLIKELY(!Value)) {
64+
throw TFormatHandlerException() << "Internal error, failed to get value, " << ToString();
65+
}
66+
return *Value;
67+
}
68+
69+
private:
70+
std::optional<TValue> Value;
71+
};
72+
73+
struct TSchemaColumn {
74+
TString Name;
75+
TString TypeYson;
76+
77+
bool operator==(const TSchemaColumn& other) const = default;
78+
};
79+
80+
class IPureCalcProgramFactory : public TThrRefBase {
81+
public:
82+
using TPtr = TIntrusivePtr<IPureCalcProgramFactory>;
83+
84+
struct TSettings {
85+
bool EnabledLLVM = false;
86+
87+
std::strong_ordering operator<=>(const TSettings& other) const = default;
88+
};
89+
90+
public:
91+
virtual NYql::NPureCalc::IProgramFactoryPtr GetFactory(const TSettings& settings) const = 0;
92+
93+
// Before creating purecalc program factory should be locked
94+
virtual TGuard<TMutex> LockFactory() const = 0;
95+
};
96+
97+
IPureCalcProgramFactory::TPtr CreatePureCalcProgramFactory();
98+
99+
} // namespace NFq::NRowDispatcher
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
LIBRARY()
2+
3+
SRCS(
4+
common.cpp
5+
)
6+
7+
PEERDIR(
8+
ydb/core/fq/libs/row_dispatcher/purecalc_no_pg_wrapper
9+
10+
ydb/library/yql/dq/actors
11+
ydb/library/yql/dq/actors/protos
12+
13+
yql/essentials/public/issue
14+
)
15+
16+
YQL_LAST_ABI_VERSION()
17+
18+
END()

0 commit comments

Comments
 (0)