Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/kqp/executer_actor/kqp_partition_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@ TSerializedTableRange MakeKeyRange(const TVector<NScheme::TTypeInfo>& keyColumnT
{
YQL_ENSURE(range.HasFrom());
YQL_ENSURE(range.HasTo());
auto guard = typeEnv.BindAllocator();

auto fromValues = FillKeyValues(keyColumnTypes, range.GetFrom(), stageInfo, holderFactory, typeEnv);
if (range.GetFrom().GetIsInclusive()) {
Expand Down
47 changes: 47 additions & 0 deletions ydb/core/kqp/provider/yql_kikimr_opt_build.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -854,6 +854,53 @@ TExprNode::TPtr KiBuildQuery(TExprBase node, TExprContext& ctx, TIntrusivePtr<TK
auto settings = NCommon::ParseCommitSettings(commit, ctx);
auto kiDataSink = commit.DataSink().Cast<TKiDataSink>();

TNodeOnNodeOwnedMap replaces;
VisitExpr(node.Ptr(), [&replaces](const TExprNode::TPtr& input) -> bool {
if (input->IsCallable("PgTableContent")) {
TPgTableContent content(input);
if (content.Table() == "pg_tables") {
replaces[input.Get()] = nullptr;
}
}
return true;
});
if (!replaces.empty()) {
TExprNode::TPtr path = ctx.NewCallable(node.Pos(), "String", { ctx.NewAtom(node.Pos(), "/Root/.sys/pg_tables") });
auto table = ctx.NewList(node.Pos(), {ctx.NewAtom(node.Pos(), "table"), path});
auto newKey = ctx.NewCallable(node.Pos(), "Key", {table});

for (auto& [key, _] : replaces) {
auto ydbSysTableRead = Build<TCoRead>(ctx, node.Pos())
.World<TCoWorld>().Build()
.DataSource<TCoDataSource>()
.Category(ctx.NewAtom(node.Pos(), KikimrProviderName))
.FreeArgs()
.Add(ctx.NewAtom(node.Pos(), "db"))
.Build()
.Build()
.FreeArgs()
.Add(newKey)
.Add(ctx.NewCallable(node.Pos(), "Void", {}))
.Add(ctx.NewList(node.Pos(), {}))
.Build()
.Done().Ptr();

auto readData = Build<TCoRight>(ctx, node.Pos())
.Input(ydbSysTableRead)
.Done().Ptr();
replaces[key] = readData;
}
ctx.Step
.Repeat(TExprStep::ExprEval)
.Repeat(TExprStep::DiscoveryIO)
.Repeat(TExprStep::Epochs)
.Repeat(TExprStep::Intents)
.Repeat(TExprStep::LoadTablesMetadata)
.Repeat(TExprStep::RewriteIO);
auto res = ctx.ReplaceNodes(std::move(node.Ptr()), replaces);
return res;
}

TKiExploreTxResults txExplore;
txExplore.ConcurrentResults = concurrentResults;
if (!ExploreTx(commit.World(), ctx, kiDataSink, txExplore, tablesData, types) || txExplore.HasErrors) {
Expand Down
59 changes: 59 additions & 0 deletions ydb/core/kqp/ut/pg/pg_catalog_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,65 @@ Y_UNIT_TEST_SUITE(PgCatalog) {
])", FormatResultSetYson(result.GetResultSet(0)));
}
}

Y_UNIT_TEST(PgTables) {
bool experimentalPg = false;
if (auto* p = std::getenv("YDB_EXPERIMENTAL_PG")) {
experimentalPg = true;
}
TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false));
auto db = kikimr.GetQueryClient();
auto settings = NYdb::NQuery::TExecuteQuerySettings().Syntax(NYdb::NQuery::ESyntax::Pg);
{
auto result = db.ExecuteQuery(R"(
CREATE TABLE table1 (
id int4 primary key
);
CREATE TABLE table2 (
id varchar primary key
);
)", NYdb::NQuery::TTxControl::NoTx(), settings).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}
{
auto result = db.ExecuteQuery(R"(
SELECT * FROM pg_tables WHERE schemaname = 'public' AND hasindexes = true ORDER BY tablename;
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
UNIT_ASSERT_C(!result.GetResultSets().empty(), "no result sets");
CompareYson(R"([
["t";"f";"f";"f";"public";"table1";"root@builtin";#];
["t";"f";"f";"f";"public";"table2";"root@builtin";#]
])", FormatResultSetYson(result.GetResultSet(0)));
}
{
auto result = db.ExecuteQuery(R"(
--!syntax_pg
select
min(schemaname) min_s,
min(tablename) min_t,
max(schemaname) max_s,
max(tablename) max_t
from pg_catalog.pg_tables;
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
UNIT_ASSERT_C(!result.GetResultSets().empty(), "no result sets");
CompareYson(R"([
["information_schema";"_pg_foreign_data_wrappers";"public";"views"]
])", FormatResultSetYson(result.GetResultSet(0)));
}
{
auto result = db.ExecuteQuery(R"(
select count(*)
from pg_catalog.pg_tables;
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
UNIT_ASSERT_C(!result.GetResultSets().empty(), "no result sets");
CompareYson(
Sprintf("[[\"%u\"]]", experimentalPg ? 208 : 205),
FormatResultSetYson(result.GetResultSet(0)));
}
}
}

} // namespace NKqp
Expand Down
32 changes: 11 additions & 21 deletions ydb/core/sys_view/common/schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@
namespace NKikimr {
namespace NSysView {

TVector<Schema::PgColumn> Schema::PgTables::Columns = { //lexicographical order
Schema::PgColumn(5, "pgbool", "hasindexes"),
Schema::PgColumn(6, "pgbool", "hasrules"),
Schema::PgColumn(7, "pgbool", "hastriggers"),
Schema::PgColumn(8, "pgbool", "rowsecurity"),
const TVector<Schema::PgColumn> Schema::PgTables::Columns = {
Schema::PgColumn(1, "pgname", "schemaname"),
Schema::PgColumn(2, "pgname", "tablename"),
Schema::PgColumn(3, "pgname", "tableowner"),
Schema::PgColumn(4, "pgname", "tablespace")
Schema::PgColumn(4, "pgname", "tablespace"),
Schema::PgColumn(5, "pgbool", "hasindexes"),
Schema::PgColumn(6, "pgbool", "hasrules"),
Schema::PgColumn(7, "pgbool", "hastriggers"),
Schema::PgColumn(8, "pgbool", "rowsecurity")
};

bool MaybeSystemViewPath(const TVector<TString>& path) {
Expand Down Expand Up @@ -182,24 +182,14 @@ class TSystemViewResolver : public ISystemViewResolver {
void RegisterPgTablesSystemView() {
auto& dsv = DomainSystemViews[PgTablesName];
auto& sdsv = SubDomainSystemViews[PgTablesName];
auto PgTablesSchema = Schema::PgTables();
for (const auto& column : PgTablesSchema.Columns) {
dsv.Columns[column._ColumnId - 1] = TSysTables::TTableColumnInfo(
column.GetColumnName(), column._ColumnId, column._ColumnTypeInfo, "", -1
for (const auto& column : Schema::PgTables::Columns) {
dsv.Columns[column._ColumnId] = TSysTables::TTableColumnInfo(
column._ColumnName, column._ColumnId, column._ColumnTypeInfo, "", -1
);
sdsv.Columns[column._ColumnId - 1] = TSysTables::TTableColumnInfo(
column.GetColumnName(), column._ColumnId, column._ColumnTypeInfo, "", -1
sdsv.Columns[column._ColumnId] = TSysTables::TTableColumnInfo(
column._ColumnName, column._ColumnId, column._ColumnTypeInfo, "", -1
);
}
auto fillKey = [&](TSchema& schema, i32 index) -> void {
auto& column = schema.Columns[index];
column.KeyOrder = index;
schema.KeyColumnTypes.push_back(column.PType);
};
for (size_t i = 0; i < 2; i++) {
fillKey(dsv, i);
fillKey(sdsv, i);
}
}

template <typename Table>
Expand Down
9 changes: 3 additions & 6 deletions ydb/core/sys_view/common/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -596,15 +596,12 @@ struct Schema : NIceDb::Schema {
NScheme::TTypeInfo _ColumnTypeInfo;
TString _ColumnName;
PgColumn(NIceDb::TColumnId columnId, TStringBuf columnTypeName, TStringBuf columnName)
: _ColumnId(columnId), _ColumnTypeInfo(NScheme::TTypeInfo(NScheme::NTypeIds::Pg, NPg::TypeDescFromPgTypeName(columnTypeName))), _ColumnName(columnName) {}

TString GetColumnName() const {
return _ColumnName;
}
: _ColumnId(columnId), _ColumnTypeInfo(NScheme::NTypeIds::Pg, NPg::TypeDescFromPgTypeName(columnTypeName)), _ColumnName(columnName)
{}
};

struct PgTables {
static TVector<PgColumn> Columns;
const static TVector<PgColumn> Columns;
};
};

Expand Down
97 changes: 83 additions & 14 deletions ydb/core/sys_view/pg_tables/pg_tables.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,85 @@ class TPgTablesScan : public NKikimr::NSysView::TScanActorBase<TPgTablesScan> {
private:
TCell MakePgCell(const Schema::PgColumn& column, const TString& value, TVector<TString>& cellData) {
NYql::NUdf::TStringRef ref;
auto typeDesc = column._ColumnTypeInfo.GetTypeDesc();
auto convert = NPg::PgNativeBinaryFromNativeText(value, NPg::PgTypeIdFromTypeDesc(typeDesc));
auto convert = NPg::PgNativeBinaryFromNativeText(value, NPg::PgTypeIdFromTypeDesc(column._ColumnTypeInfo.GetTypeDesc()));
if (convert.Error) {
ConvertError_ = *convert.Error;
return TCell();
}
cellData.emplace_back(convert.Str);
ref = NYql::NUdf::TStringRef(cellData.back());

Y_ENSURE(ref.Size() > 0);
return TCell(ref.Data(), ref.Size());
}

TVector<TCell> MakePgTablesRow(const TString& tableName, const TString& tableOwner, TVector<TString>& cellData) {
const auto &Columns = Schema::PgTables::Columns;
return {
MakePgCell(Columns[0], "true", cellData), //hasindexes
MakePgCell(Columns[1], "false", cellData), //hasrules
MakePgCell(Columns[2], "false", cellData), //hastriggers
MakePgCell(Columns[3], "false", cellData), //rowsecurity
MakePgCell(Columns[4], "public", cellData), //schemaname
MakePgCell(Columns[5], tableName, cellData), //tablename
MakePgCell(Columns[6], tableOwner, cellData), //tableowner
TCell() //tablespace
};
TVector<TCell> res;
res.reserve(Columns.size());
for (const auto& column : Columns) {
TCell cell;
switch (column.Tag) {
case 1: {
cell = MakePgCell(Schema::PgTables::Columns[0], "public", cellData);
break;
}
case 2: {
cell = MakePgCell(Schema::PgTables::Columns[1], tableName, cellData);
break;
}
case 3: {
cell = MakePgCell(Schema::PgTables::Columns[2], tableOwner, cellData);
break;
}
case 4: {
cell = TCell();
break;
}
case 5: {
cell = MakePgCell(Schema::PgTables::Columns[4], "true", cellData);
break;
}
case 6: {
cell = MakePgCell(Schema::PgTables::Columns[5], "false", cellData);
break;
}
case 7: {
cell = MakePgCell(Schema::PgTables::Columns[6], "false", cellData);
break;
}
case 8: {
cell = MakePgCell(Schema::PgTables::Columns[7], "false", cellData);
break;
}

}
res.emplace_back(std::move(cell));
}
return res;
}

TVector<TCell> MakePgTablesStaticRow(const NYql::NPg::TTableInfo& tableInfo, TVector<TString>& cellData) {
TVector<TCell> res;
res.reserve(Columns.size());
for (const auto& column : Columns) {
TCell cell;
switch (column.Tag) {
case 1: {
cell = MakePgCell(Schema::PgTables::Columns[0], tableInfo.Schema, cellData);
break;
}
case 2: {
cell = MakePgCell(Schema::PgTables::Columns[1], tableInfo.Name, cellData);
break;
}
default: {
cell = TCell();
break;
}

}
res.emplace_back(std::move(cell));
}
return res;
}
public:
using TBase = NKikimr::NSysView::TScanActorBase<TPgTablesScan>;
Expand All @@ -71,6 +126,19 @@ class TPgTablesScan : public NKikimr::NSysView::TScanActorBase<TPgTablesScan> {
Become(&TPgTablesScan::StateWork);
}

void ExpandBatchWithStaticTables(const THolder<NKqp::TEvKqpCompute::TEvScanData>& batch) {
for (const auto& tableDesc : NYql::NPg::GetStaticTables()) {
TVector<TString> cellData;
TVector<TCell> cells = MakePgTablesStaticRow(tableDesc, cellData);
if (!ConvertError_.Empty()) {
ReplyErrorAndDie(Ydb::StatusIds::INTERNAL_ERROR, ConvertError_);
return;
}
TArrayRef<const TCell> ref(cells);
batch->Rows.emplace_back(TOwnedCellVec::Make(ref));
}
}

void Handle(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult::TPtr& ev, const TActorContext& ctx) {
Y_UNUSED(ctx);
const auto& record = ev->Get()->GetRecord();
Expand Down Expand Up @@ -102,6 +170,7 @@ class TPgTablesScan : public NKikimr::NSysView::TScanActorBase<TPgTablesScan> {

auto batch = MakeHolder<NKqp::TEvKqpCompute::TEvScanData>(ScanId);

ExpandBatchWithStaticTables(batch);

for (size_t i = 0; i < record.GetPathDescription().ChildrenSize(); ++i) {
TVector<TString> cellData;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/sys_view/ut_kqp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ Y_UNIT_TEST_SUITE(SystemView) {
auto session = client.CreateSession().GetValueSync().GetSession();
{
auto result = session.ExecuteDataQuery(R"(
SELECT schemaname, tablename, tableowner, tablespace, hasindexes, hasrules, hastriggers, rowsecurity FROM `Root/.sys/pg_tables`;
SELECT schemaname, tablename, tableowner, tablespace, hasindexes, hasrules, hastriggers, rowsecurity FROM `Root/.sys/pg_tables` WHERE tablename = PgName("Table0") OR tablename = PgName("Table1") ORDER BY tablename;
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();

UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,4 @@ TDataProviderInitializer GetPgDataProviderInitializer() {
};
}

}
}