Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 6 additions & 103 deletions ydb/library/yql/core/type_ann/type_ann_pg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -394,38 +394,14 @@ IGraphTransformer::TStatus PgCallWrapper(const TExprNode::TPtr& input, TExprNode
}

const TTypeAnnotationNode* FromPgImpl(TPositionHandle pos, const TTypeAnnotationNode* type, TExprContext& ctx) {
auto name = type->Cast<TPgExprType>()->GetName();
const TDataExprType* dataType;
if (name == "bool") {
dataType = ctx.MakeType<TDataExprType>(EDataSlot::Bool);
} else if (name == "int2") {
dataType = ctx.MakeType<TDataExprType>(EDataSlot::Int16);
} else if (name == "int4") {
dataType = ctx.MakeType<TDataExprType>(EDataSlot::Int32);
} else if (name == "int8") {
dataType = ctx.MakeType<TDataExprType>(EDataSlot::Int64);
} else if (name == "float4") {
dataType = ctx.MakeType<TDataExprType>(EDataSlot::Float);
} else if (name == "float8") {
dataType = ctx.MakeType<TDataExprType>(EDataSlot::Double);
} else if (name == "text" || name == "varchar" || name == "cstring") {
dataType = ctx.MakeType<TDataExprType>(EDataSlot::Utf8);
} else if (name == "bytea") {
dataType = ctx.MakeType<TDataExprType>(EDataSlot::String);
} else if (name == "unknown") {
return ctx.MakeType<TNullExprType>();
} else if (name == "date") {
dataType = ctx.MakeType<TDataExprType>(EDataSlot::Date32);
} else if (name == "timestamp") {
dataType = ctx.MakeType<TDataExprType>(EDataSlot::Timestamp64);
} else if (name == "uuid") {
dataType = ctx.MakeType<TDataExprType>(EDataSlot::Uuid);
} else {
auto res = ConvertFromPgType(type->Cast<TPgExprType>()->GetId());
if (!res) {
ctx.AddError(TIssue(ctx.GetPosition(pos),
TStringBuilder() << "Unsupported type: " << name));
TStringBuilder() << "Unsupported type: " << *type));
return nullptr;
}

auto dataType = ctx.MakeType<TDataExprType>(*res);
return ctx.MakeType<TOptionalExprType>(dataType);
}

Expand Down Expand Up @@ -464,81 +440,8 @@ const TTypeAnnotationNode* ToPgImpl(TPositionHandle pos, const TTypeAnnotationNo
return nullptr;
}

TString pgType;
switch (dataType->GetSlot()) {
case NUdf::EDataSlot::Bool:
pgType = "bool";
break;
case NUdf::EDataSlot::Int16:
case NUdf::EDataSlot::Int8:
case NUdf::EDataSlot::Uint8:
pgType = "int2";
break;
case NUdf::EDataSlot::Int32:
case NUdf::EDataSlot::Uint16:
pgType = "int4";
break;
case NUdf::EDataSlot::Int64:
case NUdf::EDataSlot::Uint32:
pgType = "int8";
break;
case NUdf::EDataSlot::Uint64:
case NUdf::EDataSlot::Decimal:
case NUdf::EDataSlot::DyNumber:
pgType = "numeric";
break;
case NUdf::EDataSlot::Float:
pgType = "float4";
break;
case NUdf::EDataSlot::Double:
pgType = "float8";
break;
case NUdf::EDataSlot::String:
case NUdf::EDataSlot::Yson:
pgType = "bytea";
break;
case NUdf::EDataSlot::Utf8:
case NUdf::EDataSlot::TzDate:
case NUdf::EDataSlot::TzDatetime:
case NUdf::EDataSlot::TzTimestamp:
pgType = "text";
break;
case NUdf::EDataSlot::Date:
case NUdf::EDataSlot::Date32:
pgType = "date";
break;
case NUdf::EDataSlot::Datetime:
case NUdf::EDataSlot::Datetime64:
case NUdf::EDataSlot::Timestamp:
case NUdf::EDataSlot::Timestamp64:
pgType = "timestamp";
break;
case NUdf::EDataSlot::Interval:
case NUdf::EDataSlot::Interval64:
pgType = "interval";
break;
case NUdf::EDataSlot::Json:
pgType = "json";
break;
case NUdf::EDataSlot::JsonDocument:
pgType = "jsonb";
break;
case NUdf::EDataSlot::Uuid:
pgType = "uuid";
break;
default:
ctx.AddError(TIssue(ctx.GetPosition(pos),
TStringBuilder() << "Unsupported type: " << dataType->GetName()));
return nullptr;
}

try {
auto result = ctx.MakeType<TPgExprType>(NPg::LookupType(pgType).TypeId);
return result;
} catch (const yexception& e) {
ctx.AddError(TIssue(ctx.GetPosition(pos), e.what()));
return nullptr;
}
auto pgTypeId = ConvertToPgType(dataType->GetSlot());
return ctx.MakeType<TPgExprType>(pgTypeId);
}

IGraphTransformer::TStatus ToPgWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
Expand Down
9 changes: 1 addition & 8 deletions ydb/library/yql/core/yql_expr_type_annotation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6013,14 +6013,7 @@ bool ExtractPgType(const TTypeAnnotationNode* type, ui32& pgType, bool& convertT
}

auto slot = unpacked->Cast<TDataExprType>()->GetSlot();
auto convertedTypeId = ConvertToPgType(slot);
if (!convertedTypeId) {
ctx.AddError(TIssue(ctx.GetPosition(pos),
TStringBuilder() << "Type is not compatible to PG: " << slot));
return false;
}

pgType = *convertedTypeId;
pgType = ConvertToPgType(slot);
convertToPg = true;
return true;
} else if (type->GetKind() != ETypeAnnotationKind::Pg) {
Expand Down
80 changes: 65 additions & 15 deletions ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -399,8 +399,8 @@ class TPgTableContent : public TMutableComputationNode<TPgTableContent> {
ApplyFillers(AllPgTablesFillers, Y_ARRAY_SIZE(AllPgTablesFillers), PgTablesFillers_);
} else if (Table_ == "pg_roles") {
static const std::pair<const char*, TPgRolesFiller> AllPgRolesFillers[] = {
{"rolname", [](ui32 index) {
return PointerDatumToPod((Datum)MakeFixedString(index == 1 ? "postgres" : *PGGetGUCSetting("ydb_user"), NAMEDATALEN));
{"rolname", [](ui32 index) {
return PointerDatumToPod((Datum)MakeFixedString(index == 1 ? "postgres" : *PGGetGUCSetting("ydb_user"), NAMEDATALEN));
}},
{"oid", [](ui32) { return ScalarDatumToPod(ObjectIdGetDatum(1)); }},
{"rolbypassrls", [](ui32) { return ScalarDatumToPod(BoolGetDatum(true)); }},
Expand Down Expand Up @@ -489,7 +489,7 @@ class TPgTableContent : public TMutableComputationNode<TPgTableContent> {
};

ApplyFillers(AllPgLanguageFillers, Y_ARRAY_SIZE(AllPgLanguageFillers), PgLanguageFillers_);
}
}
} else {
if (Table_ == "tables") {
static const std::pair<const char*, TTablesFiller> AllTablesFillers[] = {
Expand Down Expand Up @@ -799,7 +799,7 @@ class TPgTableContent : public TMutableComputationNode<TPgTableContent> {
});

for (const auto& t : tables) {
const ui32 amOid = (t.Kind == NPg::ERelKind::Relation) ? btreeAmOid : 0;
const ui32 amOid = (t.Kind == NPg::ERelKind::Relation) ? btreeAmOid : 0;
NUdf::TUnboxedValue* items;
auto row = compCtx.HolderFactory.CreateDirectArrayHolder(PgClassFillers_.size(), items);
for (ui32 i = 0; i < PgClassFillers_.size(); ++i) {
Expand Down Expand Up @@ -1168,7 +1168,7 @@ class TPgResolvedCallBase : public TMutableComputationNode<TDerived> {
const TVector<TType*> ArgTypes;
const TStructType* StructType;
TVector<NPg::TTypeDesc> ArgDesc;

TPgArgsExprBuilder ArgsExprBuilder;
};

Expand Down Expand Up @@ -1312,7 +1312,7 @@ class TPgResolvedMultiCall : public TPgResolvedCallBase<TPgResolvedMultiCall> {

rsInfo.expectedDesc = BlessTupleDesc(rsInfo.expectedDesc);
}

TupleSlot = MakeSingleTupleTableSlot(rsInfo.expectedDesc, &TTSOpsMinimalTuple);
for (ui32 i = 0; i < args.size(); ++i) {
const auto& value = args[i];
Expand Down Expand Up @@ -1383,7 +1383,7 @@ class TPgResolvedMultiCall : public TPgResolvedCallBase<TPgResolvedMultiCall> {
FinishAndFree();
return false;
}

slot_getallattrs(TupleSlot);
if (RetTypeDesc.TypeId == RECORDOID) {
if (StructType) {
Expand Down Expand Up @@ -2003,7 +2003,7 @@ NUdf::TUnboxedValuePod ConvertToPgValue(NUdf::TUnboxedValuePod value, TMaybe<NUd
auto res = Timestamp2Pg(value.Get<ui64>());
return ScalarDatumToPod(res);
}
case NUdf::EDataSlot::Interval:
case NUdf::EDataSlot::Interval:
case NUdf::EDataSlot::Interval64: {
auto res = Interval2Pg(value.Get<i64>());
return PointerDatumToPod(PointerGetDatum(res));
Expand Down Expand Up @@ -2244,7 +2244,7 @@ class TToPg : public TMutableComputationNode<TToPg<Slot>> {

if constexpr (Slot == NUdf::EDataSlot::Decimal) {
auto decimalType = static_cast<TDataDecimalType*>(ArgType);
return PointerDatumToPod(NumericGetDatum(DecimalToPgNumeric(value,
return PointerDatumToPod(NumericGetDatum(DecimalToPgNumeric(value,
decimalType->GetParams().first, decimalType->GetParams().second)));
} else {
return ConvertToPgValue<Slot>(value);
Expand Down Expand Up @@ -2941,7 +2941,7 @@ struct TToPgExec {
*res = builder.Build(true);
break;
}
case NUdf::EDataSlot::Json:
case NUdf::EDataSlot::Json:
{
NUdf::TStringBlockReader<arrow::BinaryType, true> reader;
NUdf::TStringArrayBuilder<arrow::BinaryType, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::binary(), *ctx->memory_pool(), length);
Expand All @@ -2964,7 +2964,7 @@ struct TToPgExec {
*res = builder.Build(true);
break;
}
case NUdf::EDataSlot::JsonDocument:
case NUdf::EDataSlot::JsonDocument:
{
NUdf::TStringBlockReader<arrow::BinaryType, true> reader;
NUdf::TStringArrayBuilder<arrow::BinaryType, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::binary(), *ctx->memory_pool(), length);
Expand All @@ -2985,7 +2985,7 @@ struct TToPgExec {

*res = builder.Build(true);
break;
}
}
default:
ythrow yexception() << "Unsupported type: " << NUdf::GetDataTypeInfo(SourceDataSlot).Name;
}
Expand Down Expand Up @@ -4013,16 +4013,26 @@ arrow::Datum MakePgScalar(NKikimr::NMiniKQL::TPgType* type, const NUdf::TBlockIt
);
}

TMaybe<ui32> ConvertToPgType(NUdf::EDataSlot slot) {
ui32 ConvertToPgType(NUdf::EDataSlot slot) {
switch (slot) {
case NUdf::EDataSlot::Bool:
return BOOLOID;
case NUdf::EDataSlot::Int8:
return INT2OID;
case NUdf::EDataSlot::Uint8:
return INT2OID;
case NUdf::EDataSlot::Int16:
return INT2OID;
case NUdf::EDataSlot::Uint16:
return INT4OID;
case NUdf::EDataSlot::Int32:
return INT4OID;
case NUdf::EDataSlot::Uint32:
return INT8OID;
case NUdf::EDataSlot::Int64:
return INT8OID;
case NUdf::EDataSlot::Uint64:
return NUMERICOID;
case NUdf::EDataSlot::Float:
return FLOAT4OID;
case NUdf::EDataSlot::Double:
Expand All @@ -4031,8 +4041,40 @@ TMaybe<ui32> ConvertToPgType(NUdf::EDataSlot slot) {
return BYTEAOID;
case NUdf::EDataSlot::Utf8:
return TEXTOID;
default:
return Nothing();
case NUdf::EDataSlot::Yson:
return BYTEAOID;
case NUdf::EDataSlot::Json:
return JSONOID;
case NUdf::EDataSlot::Uuid:
return UUIDOID;
case NUdf::EDataSlot::Date:
return DATEOID;
case NUdf::EDataSlot::Datetime:
return TIMESTAMPOID;
case NUdf::EDataSlot::Timestamp:
return TIMESTAMPOID;
case NUdf::EDataSlot::Interval:
return INTERVALOID;
case NUdf::EDataSlot::TzDate:
return TEXTOID;
case NUdf::EDataSlot::TzDatetime:
return TEXTOID;
case NUdf::EDataSlot::TzTimestamp:
return TEXTOID;
case NUdf::EDataSlot::Decimal:
return NUMERICOID;
case NUdf::EDataSlot::DyNumber:
return NUMERICOID;
case NUdf::EDataSlot::JsonDocument:
return JSONBOID;
case NUdf::EDataSlot::Date32:
return DATEOID;
case NUdf::EDataSlot::Datetime64:
return TIMESTAMPOID;
case NUdf::EDataSlot::Timestamp64:
return TIMESTAMPOID;
case NUdf::EDataSlot::Interval64:
return INTERVALOID;
}
}

Expand All @@ -4053,7 +4095,15 @@ TMaybe<NUdf::EDataSlot> ConvertFromPgType(ui32 typeId) {
case BYTEAOID:
return NUdf::EDataSlot::String;
case TEXTOID:
case VARCHAROID:
case CSTRINGOID:
return NUdf::EDataSlot::Utf8;
case DATEOID:
return NUdf::EDataSlot::Date32;
case TIMESTAMPOID:
return NUdf::EDataSlot::Timestamp64;
case UUIDOID:
return NUdf::EDataSlot::Uuid;
}

return Nothing();
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/parser/pg_wrapper/interface/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

namespace NYql {

TMaybe<ui32> ConvertToPgType(NKikimr::NUdf::EDataSlot slot);
ui32 ConvertToPgType(NKikimr::NUdf::EDataSlot slot);
TMaybe<NKikimr::NUdf::EDataSlot> ConvertFromPgType(ui32 typeId);

bool ParsePgIntervalModifier(const TString& str, i32& ret);
Expand Down
4 changes: 2 additions & 2 deletions ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,9 +289,9 @@ TColumnConverter BuildPgColumnConverter(const std::shared_ptr<arrow::DataType>&
return {};
}

TMaybe<ui32> ConvertToPgType(NKikimr::NUdf::EDataSlot slot) {
ui32 ConvertToPgType(NKikimr::NUdf::EDataSlot slot) {
Y_UNUSED(slot);
return Nothing();
throw yexception() << "PG types are not supported";
}

TMaybe<NKikimr::NUdf::EDataSlot> ConvertFromPgType(ui32 typeId) {
Expand Down
22 changes: 22 additions & 0 deletions ydb/library/yql/tests/sql/dq_file/part4/canondata/result.json
Original file line number Diff line number Diff line change
Expand Up @@ -1932,6 +1932,28 @@
}
],
"test.test[order_by-sort_with_take_limit--Results]": [],
"test.test[pg-all_data--Analyze]": [
{
"checksum": "c5803b9f3080a48130c30f8602733de3",
"size": 4575,
"uri": "https://{canondata_backend}/1599023/6add8cb499cc3b1dca20f22c9b17ae29fbfe727d/resource.tar.gz#test.test_pg-all_data--Analyze_/plan.txt"
}
],
"test.test[pg-all_data--Debug]": [
{
"checksum": "8fabd9ef07df1dc7f87bf4f852b9e0bf",
"size": 5441,
"uri": "https://{canondata_backend}/1599023/6add8cb499cc3b1dca20f22c9b17ae29fbfe727d/resource.tar.gz#test.test_pg-all_data--Debug_/opt.yql_patched"
}
],
"test.test[pg-all_data--Plan]": [
{
"checksum": "c5803b9f3080a48130c30f8602733de3",
"size": 4575,
"uri": "https://{canondata_backend}/1599023/6add8cb499cc3b1dca20f22c9b17ae29fbfe727d/resource.tar.gz#test.test_pg-all_data--Plan_/plan.txt"
}
],
"test.test[pg-all_data--Results]": [],
"test.test[pg-drop_table--Analyze]": [
{
"checksum": "5ba98022094d906a6acb7cf1b61f14ed",
Expand Down
Loading