Skip to content

Commit 3680543

Browse files
committed
Redirect pg_tables reads to system table. Resolves #1918
1 parent f486ded commit 3680543

File tree

10 files changed

+212
-53
lines changed

10 files changed

+212
-53
lines changed

ydb/core/kqp/executer_actor/kqp_partition_helper.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -441,6 +441,7 @@ TSerializedTableRange MakeKeyRange(const TVector<NScheme::TTypeInfo>& keyColumnT
441441
{
442442
YQL_ENSURE(range.HasFrom());
443443
YQL_ENSURE(range.HasTo());
444+
auto guard = typeEnv.BindAllocator();
444445

445446
auto fromValues = FillKeyValues(keyColumnTypes, range.GetFrom(), stageInfo, holderFactory, typeEnv);
446447
if (range.GetFrom().GetIsInclusive()) {

ydb/core/kqp/host/kqp_host.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1749,7 +1749,7 @@ class TKqpHost : public IKqpHost {
17491749
void InitPgProvider() {
17501750
auto state = MakeIntrusive<NYql::TPgState>();
17511751
state->Types = TypesCtx.Get();
1752-
1752+
state->InitializedInKqpHost = true;
17531753
TypesCtx->AddDataSource(NYql::PgProviderName, NYql::CreatePgDataSource(state));
17541754
TypesCtx->AddDataSink(NYql::PgProviderName, NYql::CreatePgDataSink(state));
17551755
}

ydb/core/kqp/ut/pg/pg_catalog_ut.cpp

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,60 @@ Y_UNIT_TEST_SUITE(PgCatalog) {
404404
])", FormatResultSetYson(result.GetResultSet(0)));
405405
}
406406
}
407+
408+
Y_UNIT_TEST(PgTables) {
409+
TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false));
410+
auto db = kikimr.GetQueryClient();
411+
auto settings = NYdb::NQuery::TExecuteQuerySettings().Syntax(NYdb::NQuery::ESyntax::Pg);
412+
{
413+
auto result = db.ExecuteQuery(R"(
414+
CREATE TABLE table1 (
415+
id int4 primary key
416+
);
417+
CREATE TABLE table2 (
418+
id varchar primary key
419+
);
420+
)", NYdb::NQuery::TTxControl::NoTx(), settings).ExtractValueSync();
421+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
422+
}
423+
{
424+
auto result = db.ExecuteQuery(R"(
425+
DROP TABLE table1;
426+
)", NYdb::NQuery::TTxControl::NoTx(), settings).ExtractValueSync();
427+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
428+
}
429+
{
430+
auto result = db.ExecuteQuery(R"(
431+
SELECT * FROM pg_tables WHERE schemaname = 'public' ORDER BY tablename;
432+
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
433+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
434+
UNIT_ASSERT_C(!result.GetResultSets().empty(), "no result sets");
435+
CompareYson(R"([
436+
["public";"config";#;#;#;#;#;#];
437+
["public";"dbversion";#;#;#;#;#;#];
438+
["public";"migration_log";#;#;#;#;#;#];
439+
["public";"table2";"root@builtin";#;"t";"f";"f";"f"]
440+
])", FormatResultSetYson(result.GetResultSet(0)));
441+
}
442+
{
443+
auto result = db.ExecuteQuery(R"(
444+
--!syntax_pg
445+
select
446+
count(*) n,
447+
min(schemaname) min_s,
448+
min(tablename) min_t,
449+
max(schemaname) max_s,
450+
max(tablename) max_t
451+
from pg_catalog.pg_tables;
452+
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
453+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
454+
UNIT_ASSERT_C(!result.GetResultSets().empty(), "no result sets");
455+
CompareYson(R"([
456+
["207";"information_schema";"_pg_foreign_data_wrappers";"public";"views"]
457+
])", FormatResultSetYson(result.GetResultSet(0)));
458+
459+
}
460+
}
407461
}
408462

409463
} // namespace NKqp

ydb/core/sys_view/common/schema.cpp

Lines changed: 11 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,15 @@
55
namespace NKikimr {
66
namespace NSysView {
77

8-
TVector<Schema::PgColumn> Schema::PgTables::Columns = { //lexicographical order
9-
Schema::PgColumn(5, "pgbool", "hasindexes"),
10-
Schema::PgColumn(6, "pgbool", "hasrules"),
11-
Schema::PgColumn(7, "pgbool", "hastriggers"),
12-
Schema::PgColumn(8, "pgbool", "rowsecurity"),
8+
const TVector<Schema::PgColumn> Schema::PgTables::Columns = {
139
Schema::PgColumn(1, "pgname", "schemaname"),
1410
Schema::PgColumn(2, "pgname", "tablename"),
1511
Schema::PgColumn(3, "pgname", "tableowner"),
16-
Schema::PgColumn(4, "pgname", "tablespace")
12+
Schema::PgColumn(4, "pgname", "tablespace"),
13+
Schema::PgColumn(5, "pgbool", "hasindexes"),
14+
Schema::PgColumn(6, "pgbool", "hasrules"),
15+
Schema::PgColumn(7, "pgbool", "hastriggers"),
16+
Schema::PgColumn(8, "pgbool", "rowsecurity")
1717
};
1818

1919
bool MaybeSystemViewPath(const TVector<TString>& path) {
@@ -182,24 +182,14 @@ class TSystemViewResolver : public ISystemViewResolver {
182182
void RegisterPgTablesSystemView() {
183183
auto& dsv = DomainSystemViews[PgTablesName];
184184
auto& sdsv = SubDomainSystemViews[PgTablesName];
185-
auto PgTablesSchema = Schema::PgTables();
186-
for (const auto& column : PgTablesSchema.Columns) {
187-
dsv.Columns[column._ColumnId - 1] = TSysTables::TTableColumnInfo(
188-
column.GetColumnName(), column._ColumnId, column._ColumnTypeInfo, "", -1
185+
for (const auto& column : Schema::PgTables::Columns) {
186+
dsv.Columns[column._ColumnId] = TSysTables::TTableColumnInfo(
187+
column._ColumnName, column._ColumnId, column._ColumnTypeInfo, "", -1
189188
);
190-
sdsv.Columns[column._ColumnId - 1] = TSysTables::TTableColumnInfo(
191-
column.GetColumnName(), column._ColumnId, column._ColumnTypeInfo, "", -1
189+
sdsv.Columns[column._ColumnId] = TSysTables::TTableColumnInfo(
190+
column._ColumnName, column._ColumnId, column._ColumnTypeInfo, "", -1
192191
);
193192
}
194-
auto fillKey = [&](TSchema& schema, i32 index) -> void {
195-
auto& column = schema.Columns[index];
196-
column.KeyOrder = index;
197-
schema.KeyColumnTypes.push_back(column.PType);
198-
};
199-
for (size_t i = 0; i < 2; i++) {
200-
fillKey(dsv, i);
201-
fillKey(sdsv, i);
202-
}
203193
}
204194

205195
template <typename Table>

ydb/core/sys_view/common/schema.h

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -596,15 +596,12 @@ struct Schema : NIceDb::Schema {
596596
NScheme::TTypeInfo _ColumnTypeInfo;
597597
TString _ColumnName;
598598
PgColumn(NIceDb::TColumnId columnId, TStringBuf columnTypeName, TStringBuf columnName)
599-
: _ColumnId(columnId), _ColumnTypeInfo(NScheme::TTypeInfo(NScheme::NTypeIds::Pg, NPg::TypeDescFromPgTypeName(columnTypeName))), _ColumnName(columnName) {}
600-
601-
TString GetColumnName() const {
602-
return _ColumnName;
603-
}
599+
: _ColumnId(columnId), _ColumnTypeInfo(NScheme::NTypeIds::Pg, NPg::TypeDescFromPgTypeName(columnTypeName)), _ColumnName(columnName)
600+
{}
604601
};
605602

606603
struct PgTables {
607-
static TVector<PgColumn> Columns;
604+
const static TVector<PgColumn> Columns;
608605
};
609606
};
610607

ydb/core/sys_view/pg_tables/pg_tables.cpp

Lines changed: 83 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,30 +22,85 @@ class TPgTablesScan : public NKikimr::NSysView::TScanActorBase<TPgTablesScan> {
2222
private:
2323
TCell MakePgCell(const Schema::PgColumn& column, const TString& value, TVector<TString>& cellData) {
2424
NYql::NUdf::TStringRef ref;
25-
auto typeDesc = column._ColumnTypeInfo.GetTypeDesc();
26-
auto convert = NPg::PgNativeBinaryFromNativeText(value, NPg::PgTypeIdFromTypeDesc(typeDesc));
25+
auto convert = NPg::PgNativeBinaryFromNativeText(value, NPg::PgTypeIdFromTypeDesc(column._ColumnTypeInfo.GetTypeDesc()));
2726
if (convert.Error) {
2827
ConvertError_ = *convert.Error;
2928
return TCell();
3029
}
3130
cellData.emplace_back(convert.Str);
3231
ref = NYql::NUdf::TStringRef(cellData.back());
33-
32+
Y_ENSURE(ref.Size() > 0);
3433
return TCell(ref.Data(), ref.Size());
3534
}
3635

3736
TVector<TCell> MakePgTablesRow(const TString& tableName, const TString& tableOwner, TVector<TString>& cellData) {
38-
const auto &Columns = Schema::PgTables::Columns;
39-
return {
40-
MakePgCell(Columns[0], "true", cellData), //hasindexes
41-
MakePgCell(Columns[1], "false", cellData), //hasrules
42-
MakePgCell(Columns[2], "false", cellData), //hastriggers
43-
MakePgCell(Columns[3], "false", cellData), //rowsecurity
44-
MakePgCell(Columns[4], "public", cellData), //schemaname
45-
MakePgCell(Columns[5], tableName, cellData), //tablename
46-
MakePgCell(Columns[6], tableOwner, cellData), //tableowner
47-
TCell() //tablespace
48-
};
37+
TVector<TCell> res;
38+
res.reserve(Columns.size());
39+
for (const auto& column : Columns) {
40+
TCell cell;
41+
switch (column.Tag) {
42+
case 1: {
43+
cell = MakePgCell(Schema::PgTables::Columns[0], "public", cellData);
44+
break;
45+
}
46+
case 2: {
47+
cell = MakePgCell(Schema::PgTables::Columns[1], tableName, cellData);
48+
break;
49+
}
50+
case 3: {
51+
cell = MakePgCell(Schema::PgTables::Columns[2], tableOwner, cellData);
52+
break;
53+
}
54+
case 4: {
55+
cell = TCell();
56+
break;
57+
}
58+
case 5: {
59+
cell = MakePgCell(Schema::PgTables::Columns[4], "true", cellData);
60+
break;
61+
}
62+
case 6: {
63+
cell = MakePgCell(Schema::PgTables::Columns[5], "false", cellData);
64+
break;
65+
}
66+
case 7: {
67+
cell = MakePgCell(Schema::PgTables::Columns[6], "false", cellData);
68+
break;
69+
}
70+
case 8: {
71+
cell = MakePgCell(Schema::PgTables::Columns[7], "false", cellData);
72+
break;
73+
}
74+
75+
}
76+
res.emplace_back(std::move(cell));
77+
}
78+
return res;
79+
}
80+
81+
TVector<TCell> MakePgTablesStaticRow(const NYql::NPg::TTableInfo& tableInfo, TVector<TString>& cellData) {
82+
TVector<TCell> res;
83+
res.reserve(Columns.size());
84+
for (const auto& column : Columns) {
85+
TCell cell;
86+
switch (column.Tag) {
87+
case 1: {
88+
cell = MakePgCell(Schema::PgTables::Columns[0], tableInfo.Schema, cellData);
89+
break;
90+
}
91+
case 2: {
92+
cell = MakePgCell(Schema::PgTables::Columns[1], tableInfo.Name, cellData);
93+
break;
94+
}
95+
default: {
96+
cell = TCell();
97+
break;
98+
}
99+
100+
}
101+
res.emplace_back(std::move(cell));
102+
}
103+
return res;
49104
}
50105
public:
51106
using TBase = NKikimr::NSysView::TScanActorBase<TPgTablesScan>;
@@ -71,6 +126,19 @@ class TPgTablesScan : public NKikimr::NSysView::TScanActorBase<TPgTablesScan> {
71126
Become(&TPgTablesScan::StateWork);
72127
}
73128

129+
void ExpandBatchWithStaticTables(const THolder<NKqp::TEvKqpCompute::TEvScanData>& batch) {
130+
for (const auto& tableDesc : NYql::NPg::GetStaticTables()) {
131+
TVector<TString> cellData;
132+
TVector<TCell> cells = MakePgTablesStaticRow(tableDesc, cellData);
133+
if (!ConvertError_.Empty()) {
134+
ReplyErrorAndDie(Ydb::StatusIds::INTERNAL_ERROR, ConvertError_);
135+
return;
136+
}
137+
TArrayRef<const TCell> ref(cells);
138+
batch->Rows.emplace_back(TOwnedCellVec::Make(ref));
139+
}
140+
}
141+
74142
void Handle(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult::TPtr& ev, const TActorContext& ctx) {
75143
Y_UNUSED(ctx);
76144
const auto& record = ev->Get()->GetRecord();
@@ -102,6 +170,7 @@ class TPgTablesScan : public NKikimr::NSysView::TScanActorBase<TPgTablesScan> {
102170

103171
auto batch = MakeHolder<NKqp::TEvKqpCompute::TEvScanData>(ScanId);
104172

173+
ExpandBatchWithStaticTables(batch);
105174

106175
for (size_t i = 0; i < record.GetPathDescription().ChildrenSize(); ++i) {
107176
TVector<TString> cellData;

ydb/core/sys_view/ut_kqp.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ Y_UNIT_TEST_SUITE(SystemView) {
326326
auto session = client.CreateSession().GetValueSync().GetSession();
327327
{
328328
auto result = session.ExecuteDataQuery(R"(
329-
SELECT schemaname, tablename, tableowner, tablespace, hasindexes, hasrules, hastriggers, rowsecurity FROM `Root/.sys/pg_tables`;
329+
SELECT schemaname, tablename, tableowner, tablespace, hasindexes, hasrules, hastriggers, rowsecurity FROM `Root/.sys/pg_tables` WHERE tablename = PgName("Table0") OR tablename = PgName("Table1") ORDER BY tablename;
330330
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
331331

332332
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());

ydb/library/yql/providers/pg/provider/yql_pg_datasource.cpp

Lines changed: 56 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
#include "ydb/library/yql/providers/common/provider/yql_provider.h"
12
#include "yql_pg_provider_impl.h"
23

34
#include <ydb/library/yql/providers/common/provider/yql_data_provider_impl.h>
@@ -48,11 +49,16 @@ class TPgDataSourceImpl : public TDataProviderBase {
4849

4950
TExprNode::TPtr RewriteIO(const TExprNode::TPtr& node, TExprContext& ctx) override {
5051
YQL_CLOG(INFO, ProviderPg) << "RewriteIO";
51-
if (auto left = TMaybeNode<TCoLeft>(node)) {
52-
return left.Input().Maybe<TPgRead>().World().Cast().Ptr();
52+
53+
TMaybeNode<TPgRead> maybeRead;
54+
auto left = TMaybeNode<TCoLeft>(node);
55+
if (left) {
56+
maybeRead = left.Input().Cast<TPgRead>();
57+
} else {
58+
maybeRead = TCoRight(node).Input().Cast<TPgRead>();
5359
}
5460

55-
auto read = TCoRight(node).Input().Cast<TPgRead>();
61+
auto read = maybeRead.Cast();
5662
auto keyNode = read.FreeArgs().Get(2).Ptr();
5763
if (keyNode->IsCallable("MrTableConcat")) {
5864
if (keyNode->ChildrenSize() != 1) {
@@ -77,13 +83,54 @@ class TPgDataSourceImpl : public TDataProviderBase {
7783
}
7884

7985
const auto tableName = TString(keyArg.Tail().Head().Content());
80-
auto childrenList = read.Ref().ChildrenList();
81-
childrenList[2] = ctx.NewAtom(childrenList[2]->Pos(), tableName);
82-
auto newRead = ctx.NewCallable(read.Ref().Pos(), TPgReadTable::CallableName(), std::move(childrenList));
83-
84-
return Build<TCoRight>(ctx, read.Pos())
85-
.Input(newRead)
86+
if (tableName == "pg_tables" && State_->InitializedInKqpHost) {
87+
TNodeOnNodeOwnedMap replaces;
88+
replaces[keyArg.TailPtr()->TailPtr().Get()] = ctx.NewAtom(read.Pos(), "/Root/.sys/pg_tables");
89+
90+
auto ydbSysTableRead = Build<TCoRead>(ctx, read.Pos())
91+
.InitFrom(read.Cast<TCoRead>())
92+
.DataSource<TCoDataSource>()
93+
.Category(ctx.NewAtom(read.Pos(), KikimrProviderName))
94+
.FreeArgs()
95+
.Add(ctx.NewAtom(read.Pos(), "db"))
96+
.Build()
97+
.Build()
98+
.FreeArgs()
99+
.Add(ctx.ReplaceNodes(maybeKey.Cast().Ptr(), replaces))
100+
.Add(ctx.NewCallable(read.Pos(), "Void", {}))
101+
.Add(ctx.NewList(read.Pos(), {}))
102+
.Build()
86103
.Done().Ptr();
104+
105+
ctx.Step
106+
.Repeat(TExprStep::ExprEval)
107+
.Repeat(TExprStep::DiscoveryIO)
108+
.Repeat(TExprStep::Epochs)
109+
.Repeat(TExprStep::Intents)
110+
.Repeat(TExprStep::LoadTablesMetadata)
111+
.Repeat(TExprStep::RewriteIO);
112+
113+
if (left) {
114+
return Build<TCoLeft>(ctx, ydbSysTableRead->Pos())
115+
.Input(ydbSysTableRead)
116+
.Done().Ptr();
117+
} else {
118+
return Build<TCoRight>(ctx, ydbSysTableRead->Pos())
119+
.Input(ydbSysTableRead)
120+
.Done().Ptr();
121+
}
122+
} else {
123+
if (left) {
124+
return left.Input().Maybe<TPgRead>().World().Cast().Ptr();
125+
} else {
126+
auto childrenList = read.Ref().ChildrenList();
127+
childrenList[2] = ctx.NewAtom(childrenList[2]->Pos(), tableName);
128+
auto newRead = ctx.NewCallable(read.Ref().Pos(), TPgReadTable::CallableName(), std::move(childrenList));
129+
return Build<TCoRight>(ctx, read.Pos())
130+
.Input(newRead)
131+
.Done().Ptr();
132+
}
133+
}
87134
}
88135

89136
bool ValidateParameters(TExprNode& node, TExprContext& ctx, TMaybe<TString>& cluster) override {

ydb/library/yql/providers/pg/provider/yql_pg_provider.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,4 +62,4 @@ TDataProviderInitializer GetPgDataProviderInitializer() {
6262
};
6363
}
6464

65-
}
65+
}

ydb/library/yql/providers/pg/provider/yql_pg_provider.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ struct TPgState : public TThrRefBase
99
using TPtr = TIntrusivePtr<TPgState>;
1010

1111
TTypeAnnotationContext* Types = nullptr;
12+
bool InitializedInKqpHost = false;
1213
};
1314

1415
TDataProviderInitializer GetPgDataProviderInitializer();

0 commit comments

Comments
 (0)