Skip to content

Fixed ToBytes/FromBytes for decimal & big tz dates #10348

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 31 additions & 19 deletions ydb/library/yql/core/type_ann/type_ann_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3973,9 +3973,9 @@ namespace NTypeAnnImpl {
return IGraphTransformer::TStatus::Ok;
}

IGraphTransformer::TStatus FromBytesWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
IGraphTransformer::TStatus FromBytesWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx) {
Y_UNUSED(output);
if (!EnsureMinArgsCount(*input, 2, ctx.Expr)) {
if (!EnsureArgsCount(*input, 2, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}

Expand All @@ -3993,25 +3993,33 @@ namespace NTypeAnnImpl {
return IGraphTransformer::TStatus::Error;
}

auto dataTypeName = input->Child(1)->Content();
auto slot = NKikimr::NUdf::FindDataSlot(dataTypeName);
if (!slot) {
ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Child(1)->Pos()), TStringBuilder() << "Unknown datatype: " << dataTypeName));
return IGraphTransformer::TStatus::Error;
}
auto targetDataTypeName = input->Child(1)->Content();
const TDataExprType* targetDataType = nullptr;
auto targetSlot = NKikimr::NUdf::FindDataSlot(targetDataTypeName);
if (!targetSlot) {
auto typeExpr = ctx.Expr.Builder(input->Child(1)->Pos()).Callable("ParseType")
.Add(0, input->ChildPtr(1))
.Seal().Build();
auto parseTypeResult = ParseTypeWrapper(typeExpr, typeExpr, ctx);
if (parseTypeResult == IGraphTransformer::TStatus::Error) {
return parseTypeResult;
}

const bool isDecimal = IsDataTypeDecimal(*slot);
if (!EnsureArgsCount(*input, isDecimal ? 4 : 2, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
if (!EnsureType(*typeExpr, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}

auto type = typeExpr->GetTypeAnn()->Cast<TTypeExprType>()->GetType();
if (!EnsureDataType(input->Child(1)->Pos(), *type, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}

targetDataType = type->Cast<TDataExprType>();
targetSlot = targetDataType->GetSlot();
}

auto dataTypeAnn = isDecimal ?
ctx.Expr.MakeType<TDataExprParamsType>(*slot, input->Child(2)->Content(), input->Child(3)->Content()):
ctx.Expr.MakeType<TDataExprType>(*slot);
auto dataTypeAnn = targetDataType ? targetDataType : ctx.Expr.MakeType<TDataExprType>(*targetSlot);
input->SetTypeAnn(ctx.Expr.MakeType<TOptionalExprType>(dataTypeAnn));
if (isDecimal && !input->GetTypeAnn()->Cast<TDataExprParamsType>()->Validate(input->Pos(), ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
return IGraphTransformer::TStatus::Ok;
}

Expand Down Expand Up @@ -5441,11 +5449,15 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
}

IGraphTransformer::TStatus ToBytesWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
Y_UNUSED(output);
if (!EnsureArgsCount(*input, 1, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}

if (IsNull(input->Head())) {
output = input->HeadPtr();
return IGraphTransformer::TStatus::Repeat;
}

bool isOptional;
const TDataExprType* dataType;
if (!EnsureDataOrOptionalOfData(input->Head(), isOptional, dataType, ctx.Expr)) {
Expand Down Expand Up @@ -12209,7 +12221,6 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
Functions["SessionWindowTraits"] = &SessionWindowTraitsWrapper;
Functions["FromString"] = &FromStringWrapper;
Functions["StrictFromString"] = &StrictFromStringWrapper;
Functions["FromBytes"] = &FromBytesWrapper;
Functions["Convert"] = &ConvertWrapper;
Functions["AlterTo"] = &AlterToWrapper;
Functions["ToIntegral"] = &ToIntegralWrapper;
Expand Down Expand Up @@ -12725,6 +12736,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
ExtFunctions["SafeCast"] = &CastWrapper<false>;
ExtFunctions["StrictCast"] = &CastWrapper<true>;
ExtFunctions["Version"] = &VersionWrapper;
ExtFunctions["FromBytes"] = &FromBytesWrapper;

ExtFunctions["Aggregate"] = &AggregateWrapper;
ExtFunctions["AggregateCombine"] = &AggregateWrapper;
Expand Down
123 changes: 108 additions & 15 deletions ydb/library/yql/minikql/comp_nodes/mkql_frombytes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,22 @@ template <bool IsOptional>
class TFromBytesWrapper : public TMutableComputationNode<TFromBytesWrapper<IsOptional>> {
typedef TMutableComputationNode<TFromBytesWrapper<IsOptional>> TBaseComputation;
public:
TFromBytesWrapper(TComputationMutables& mutables, IComputationNode* data, NUdf::TDataTypeId schemeType)
TFromBytesWrapper(TComputationMutables& mutables, IComputationNode* data, NUdf::TDataTypeId schemeType, ui32 param1, ui32 param2)
: TBaseComputation(mutables)
, Data(data)
, SchemeType(NUdf::GetDataSlot(schemeType))
, Param1(param1)
, Param2(param2)
{
if (SchemeType == NUdf::EDataSlot::Decimal) {
DecimalBound = NYql::NDecimal::TInt128(1);
NYql::NDecimal::TInt128 ten(10U);
for (ui32 i = 0; i < Param1; ++i) {
DecimalBound *= ten;
}

NegDecimalBound = -DecimalBound;
}
}

NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
Expand All @@ -36,10 +47,6 @@ class TFromBytesWrapper : public TMutableComputationNode<TFromBytesWrapper<IsOpt

switch (SchemeType) {
case NUdf::EDataSlot::TzDate: {
if (!data) {
return NUdf::TUnboxedValuePod();
}

const auto& ref = data.AsStringRef();
if (ref.Size() != 4) {
return NUdf::TUnboxedValuePod();
Expand All @@ -57,10 +64,6 @@ class TFromBytesWrapper : public TMutableComputationNode<TFromBytesWrapper<IsOpt
}

case NUdf::EDataSlot::TzDatetime: {
if (!data) {
return NUdf::TUnboxedValuePod();
}

const auto& ref = data.AsStringRef();
if (ref.Size() != 6) {
return NUdf::TUnboxedValuePod();
Expand All @@ -78,18 +81,65 @@ class TFromBytesWrapper : public TMutableComputationNode<TFromBytesWrapper<IsOpt
}

case NUdf::EDataSlot::TzTimestamp: {
if (!data) {
const auto& ref = data.AsStringRef();
if (ref.Size() != 10) {
return NUdf::TUnboxedValuePod();
}

auto tzId = SwapBytes(ReadUnaligned<ui16>(ref.Data() + ref.Size() - sizeof(ui16)));
auto value = SwapBytes(data.Get<ui64>());
if (value < NUdf::MAX_TIMESTAMP && tzId < NUdf::GetTimezones().size()) {
auto ret = NUdf::TUnboxedValuePod(value);
ret.SetTimezoneId(tzId);
return ret;
}

return NUdf::TUnboxedValuePod();
}

case NUdf::EDataSlot::TzDate32: {
const auto& ref = data.AsStringRef();
if (ref.Size() != 6) {
return NUdf::TUnboxedValuePod();
}

auto tzId = SwapBytes(ReadUnaligned<ui16>(ref.Data() + ref.Size() - sizeof(ui16)));
auto value = SwapBytes(data.Get<i32>());
if (value >= NUdf::MIN_DATE32 && value <= NUdf::MAX_DATE32 && tzId < NUdf::GetTimezones().size()) {
auto ret = NUdf::TUnboxedValuePod(value);
ret.SetTimezoneId(tzId);
return ret;
}

return NUdf::TUnboxedValuePod();
}

case NUdf::EDataSlot::TzDatetime64: {
const auto& ref = data.AsStringRef();
if (ref.Size() != 10) {
return NUdf::TUnboxedValuePod();
}

auto tzId = SwapBytes(ReadUnaligned<ui16>(ref.Data() + ref.Size() - sizeof(ui16)));
auto value = SwapBytes(data.Get<ui64>());
if (value < NUdf::MAX_TIMESTAMP && tzId < NUdf::GetTimezones().size()) {
auto value = SwapBytes(data.Get<i64>());
if (value >= NUdf::MIN_DATETIME64 && value <= NUdf::MAX_DATETIME64 && tzId < NUdf::GetTimezones().size()) {
auto ret = NUdf::TUnboxedValuePod(value);
ret.SetTimezoneId(tzId);
return ret;
}

return NUdf::TUnboxedValuePod();
}

case NUdf::EDataSlot::TzTimestamp64: {
const auto& ref = data.AsStringRef();
if (ref.Size() != 10) {
return NUdf::TUnboxedValuePod();
}

auto tzId = SwapBytes(ReadUnaligned<ui16>(ref.Data() + ref.Size() - sizeof(ui16)));
auto value = SwapBytes(data.Get<i64>());
if (value >= NUdf::MIN_TIMESTAMP64 && value <= NUdf::MAX_TIMESTAMP64 && tzId < NUdf::GetTimezones().size()) {
auto ret = NUdf::TUnboxedValuePod(value);
ret.SetTimezoneId(tzId);
return ret;
Expand All @@ -105,6 +155,35 @@ class TFromBytesWrapper : public TMutableComputationNode<TFromBytesWrapper<IsOpt
return data.Release();
}

case NUdf::EDataSlot::Decimal: {
const auto& ref = data.AsStringRef();
if (ref.Size() != 15) {
return NUdf::TUnboxedValuePod();
}

NYql::NDecimal::TInt128 v = 0;
ui8* p = (ui8*)&v;
memcpy(p, ref.Data(), 15);
p[0xF] = (p[0xE] & 0x80) ? 0xFF : 0x00;
if (NYql::NDecimal::IsError(v)) {
return NUdf::TUnboxedValuePod();
}

if (!NYql::NDecimal::IsNormal(v)) {
return NUdf::TUnboxedValuePod(v);
}

if (v >= DecimalBound) {
return NUdf::TUnboxedValuePod(NYql::NDecimal::Inf());
}

if (v <= NegDecimalBound) {
return NUdf::TUnboxedValuePod(-NYql::NDecimal::Inf());
}

return NUdf::TUnboxedValuePod(v);
}

default:
if (IsValidValue(SchemeType, data)) {
return data.Release();
Expand All @@ -121,25 +200,39 @@ class TFromBytesWrapper : public TMutableComputationNode<TFromBytesWrapper<IsOpt

IComputationNode* const Data;
const NUdf::EDataSlot SchemeType;
const ui32 Param1;
const ui32 Param2;
NYql::NDecimal::TInt128 DecimalBound, NegDecimalBound;
};

}

IComputationNode* WrapFromBytes(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() == 2, "Expected 2 args");
MKQL_ENSURE(callable.GetInputsCount() == 2 || callable.GetInputsCount() == 4, "Expected 2 or 4 args");

bool isOptional;
const auto dataType = UnpackOptionalData(callable.GetInput(0), isOptional);
MKQL_ENSURE(dataType->GetSchemeType() == NUdf::TDataType<char*>::Id, "Expected String");

const auto schemeTypeData = AS_VALUE(TDataLiteral, callable.GetInput(1));
const auto schemeType = schemeTypeData->AsValue().Get<ui32>();
ui32 param1 = 0;
ui32 param2 = 0;
if (schemeType == NUdf::TDataType<NUdf::TDecimal>::Id) {
MKQL_ENSURE(callable.GetInputsCount() == 4, "Expected 4 args");
const auto param1Data = AS_VALUE(TDataLiteral, callable.GetInput(2));
param1 = param1Data->AsValue().Get<ui32>();
const auto param2Data = AS_VALUE(TDataLiteral, callable.GetInput(3));
param2 = param2Data->AsValue().Get<ui32>();
} else {
MKQL_ENSURE(callable.GetInputsCount() == 2, "Expected 2 args");
}

const auto data = LocateNode(ctx.NodeLocator, callable, 0);
if (isOptional) {
return new TFromBytesWrapper<true>(ctx.Mutables, data, static_cast<NUdf::TDataTypeId>(schemeType));
return new TFromBytesWrapper<true>(ctx.Mutables, data, static_cast<NUdf::TDataTypeId>(schemeType), param1, param2);
} else {
return new TFromBytesWrapper<false>(ctx.Mutables, data, static_cast<NUdf::TDataTypeId>(schemeType));
return new TFromBytesWrapper<false>(ctx.Mutables, data, static_cast<NUdf::TDataTypeId>(schemeType), param1, param2);
}
}

Expand Down
34 changes: 34 additions & 0 deletions ydb/library/yql/minikql/comp_nodes/mkql_tobytes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include <ydb/library/yql/minikql/computation/mkql_computation_node_codegen.h> // Y_IGNORE
#include <ydb/library/yql/minikql/mkql_node_cast.h>
#include <ydb/library/yql/minikql/mkql_node_builder.h>
#include <ydb/library/yql/minikql/mkql_string_util.h>
#include <ydb/library/yql/utils/swap_bytes.h>

namespace NKikimr {
Expand Down Expand Up @@ -38,6 +39,30 @@ using TBaseComputation = TDecoratorCodegeneratorNode<TToBytesPrimitiveTypeWrappe
#endif
};

template<bool IsOptional>
class TToBytesDecimalWrapper : public TMutableComputationNode<TToBytesDecimalWrapper<IsOptional>> {
using TBaseComputation = TMutableComputationNode<TToBytesDecimalWrapper<IsOptional>>;
public:
TToBytesDecimalWrapper(TComputationMutables& mutables, IComputationNode* data)
: TBaseComputation(mutables, EValueRepresentation::String)
, Data(data)
{}

NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
const auto& value = this->Data->GetValue(ctx);
if (IsOptional && !value)
return NUdf::TUnboxedValuePod();

return MakeString(NUdf::TStringRef(reinterpret_cast<const char*>(&value), 15));
}

void RegisterDependencies() const final {
this->DependsOn(Data);
}

IComputationNode* const Data;
};

template<bool IsOptional, typename Type>
class TToBytesTzTypeWrapper : public TDecoratorComputationNode<TToBytesTzTypeWrapper<IsOptional, Type>> {
using TBaseComputation = TDecoratorComputationNode<TToBytesTzTypeWrapper<IsOptional, Type>>;
Expand Down Expand Up @@ -100,7 +125,16 @@ IComputationNode* WrapToBytes(TCallable& callable, const TComputationNodeFactory
MAKE_TZ_TYPE_BYTES(NUdf::TTzDate, ui16);
MAKE_TZ_TYPE_BYTES(NUdf::TTzDatetime, ui32);
MAKE_TZ_TYPE_BYTES(NUdf::TTzTimestamp, ui64);
MAKE_TZ_TYPE_BYTES(NUdf::TTzDate32, i32);
MAKE_TZ_TYPE_BYTES(NUdf::TTzDatetime64, i64);
MAKE_TZ_TYPE_BYTES(NUdf::TTzTimestamp64, i64);
#undef MAKE_TZ_TYPE_BYTES
case NUdf::TDataType<NUdf::TDecimal>::Id: {
if (isOptional) \
return new TToBytesDecimalWrapper<true>(ctx.Mutables, data);
else
return new TToBytesDecimalWrapper<false>(ctx.Mutables, data);
}
default:
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4414,9 +4414,9 @@ Y_UNIT_TEST_SUITE(TMiniKQLComputationNodeTest) {

TVector<TRuntimeNode> tupleItems;
const auto data1 = pb.NewDataLiteral<NUdf::EDataSlot::String>(TString("\xEA\x00\x00\x00", 4));
tupleItems.push_back(pb.FromBytes(data1, NUdf::TDataType<ui32>::Id));
tupleItems.push_back(pb.FromBytes(data1, pb.NewDataType(NUdf::TDataType<ui32>::Id)));
const auto data2 = pb.NewEmptyOptionalDataLiteral(NUdf::TDataType<const char*>::Id);
tupleItems.push_back(pb.FromBytes(data2, NUdf::TDataType<ui32>::Id));
tupleItems.push_back(pb.FromBytes(data2, pb.NewDataType(NUdf::TDataType<ui32>::Id)));
const auto pgmReturn = pb.NewTuple(tupleItems);

const auto graph = setup.BuildGraph(pgmReturn);
Expand Down
Loading
Loading