Skip to content

allow parsing out-of-range doubles in bulk-upsert #15376

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions ydb/core/formats/arrow/converter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

namespace NKikimr::NArrow {

static bool ConvertData(TCell& cell, const NScheme::TTypeInfo& colType, TMemoryPool& memPool, TString& errorMessage) {
static bool ConvertData(TCell& cell, const NScheme::TTypeInfo& colType, TMemoryPool& memPool, TString& errorMessage, const bool allowInfDouble) {
if (!cell.AsBuf()) {
cell = TCell();
return true;
Expand All @@ -34,7 +34,7 @@ static bool ConvertData(TCell& cell, const NScheme::TTypeInfo& colType, TMemoryP
break;
}
case NScheme::NTypeIds::JsonDocument: {
const auto binaryJson = NBinaryJson::SerializeToBinaryJson(cell.AsBuf());
const auto binaryJson = NBinaryJson::SerializeToBinaryJson(cell.AsBuf(), allowInfDouble);
if (std::holds_alternative<TString>(binaryJson)) {
errorMessage = "Invalid JSON for JsonDocument provided: " + std::get<TString>(binaryJson);
return false;
Expand All @@ -50,7 +50,8 @@ static bool ConvertData(TCell& cell, const NScheme::TTypeInfo& colType, TMemoryP
return true;
}

static arrow::Status ConvertColumn(const NScheme::TTypeInfo colType, std::shared_ptr<arrow::Array>& column, std::shared_ptr<arrow::Field>& field) {
static arrow::Status ConvertColumn(
const NScheme::TTypeInfo colType, std::shared_ptr<arrow::Array>& column, std::shared_ptr<arrow::Field>& field, const bool allowInfDouble) {
switch (colType.GetTypeId()) {
case NScheme::NTypeIds::Decimal:
return arrow::Status::OK();
Expand Down Expand Up @@ -102,7 +103,7 @@ static arrow::Status ConvertColumn(const NScheme::TTypeInfo colType, std::shared
return appendResult;
}
} else {
const auto maybeBinaryJson = NBinaryJson::SerializeToBinaryJson(valueBuf);
const auto maybeBinaryJson = NBinaryJson::SerializeToBinaryJson(valueBuf, allowInfDouble);
if (std::holds_alternative<TString>(maybeBinaryJson)) {
return arrow::Status::SerializationError("Cannot serialize json (", std::get<TString>(maybeBinaryJson),
"): ", valueBuf.SubStr(0, Min(valueBuf.Size(), size_t{1024})));
Expand Down Expand Up @@ -134,17 +135,16 @@ static arrow::Status ConvertColumn(const NScheme::TTypeInfo colType, std::shared
return arrow::Status::OK();
}

arrow::Result<std::shared_ptr<arrow::RecordBatch>> ConvertColumns(const std::shared_ptr<arrow::RecordBatch>& batch,
const THashMap<TString, NScheme::TTypeInfo>& columnsToConvert)
{
arrow::Result<std::shared_ptr<arrow::RecordBatch>> ConvertColumns(
const std::shared_ptr<arrow::RecordBatch>& batch, const THashMap<TString, NScheme::TTypeInfo>& columnsToConvert, const bool allowInfDouble) {
std::vector<std::shared_ptr<arrow::Array>> columns = batch->columns();
std::vector<std::shared_ptr<arrow::Field>> fields = batch->schema()->fields();
Y_ABORT_UNLESS(columns.size() == fields.size());
for (i32 i = 0; i < batch->num_columns(); ++i) {
auto& colName = batch->column_name(i);
auto it = columnsToConvert.find(TString(colName.data(), colName.size()));
if (it != columnsToConvert.end()) {
auto convertResult = ConvertColumn(it->second, columns[i], fields[i]);
auto convertResult = ConvertColumn(it->second, columns[i], fields[i], allowInfDouble);
if (!convertResult.ok()) {
return arrow::Status::FromArgs(convertResult.code(), "column ", colName, ": ", convertResult.ToString());
}
Expand Down Expand Up @@ -326,7 +326,7 @@ bool TArrowToYdbConverter::Process(const arrow::RecordBatch& batch, TString& err

if (NeedDataConversion(colType)) {
for (i32 i = 0; i < unroll; ++i) {
if (!ConvertData(cells[i][col], colType, memPool, errorMessage)) {
if (!ConvertData(cells[i][col], colType, memPool, errorMessage, AllowInfDouble_)) {
return false;
}
}
Expand Down Expand Up @@ -371,7 +371,7 @@ bool TArrowToYdbConverter::Process(const arrow::RecordBatch& batch, TString& err
return false;
}

if (!ConvertData(curCell, colType, memPool, errorMessage)) {
if (!ConvertData(curCell, colType, memPool, errorMessage, AllowInfDouble_)) {
return false;
}
++col;
Expand Down
11 changes: 7 additions & 4 deletions ydb/core/formats/arrow/converter.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class TArrowToYdbConverter {
private:
std::vector<std::pair<TString, NScheme::TTypeInfo>> YdbSchema_; // Destination schema (allow shrink and reorder)
IRowWriter& RowWriter_;
bool AllowInfDouble_;

template <typename TArray>
TCell MakeCellFromValue(const std::shared_ptr<arrow::Array>& column, i64 row) {
Expand Down Expand Up @@ -63,17 +64,19 @@ class TArrowToYdbConverter {

static bool NeedConversion(const NScheme::TTypeInfo& typeInRequest, const NScheme::TTypeInfo& expectedType);

TArrowToYdbConverter(const std::vector<std::pair<TString, NScheme::TTypeInfo>>& ydbSchema, IRowWriter& rowWriter)
TArrowToYdbConverter(
const std::vector<std::pair<TString, NScheme::TTypeInfo>>& ydbSchema, IRowWriter& rowWriter, const bool allowInfDouble = false)
: YdbSchema_(ydbSchema)
, RowWriter_(rowWriter)
{}
, AllowInfDouble_(allowInfDouble) {
}

bool Process(const arrow::RecordBatch& batch, TString& errorMessage);
};

arrow::Result<std::shared_ptr<arrow::RecordBatch>> ConvertColumns(const std::shared_ptr<arrow::RecordBatch>& batch,
const THashMap<TString, NScheme::TTypeInfo>& columnsToConvert);
const THashMap<TString, NScheme::TTypeInfo>& columnsToConvert, const bool allowInfDouble = false);
arrow::Result<std::shared_ptr<arrow::RecordBatch>> InplaceConvertColumns(const std::shared_ptr<arrow::RecordBatch>& batch,
const THashMap<TString, NScheme::TTypeInfo>& columnsToConvert);
const THashMap<TString, NScheme::TTypeInfo>& columnsToConvert);

} // namespace NKikimr::NArrow
2 changes: 1 addition & 1 deletion ydb/core/grpc_services/rpc_load_rows.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ class TUploadRowsRPCPublic : public NTxProxy::TUploadRowsBase<NKikimrServices::T
}

// Fill rest of cells with non-key column members
if (!FillCellsFromProto(valueCells, ValueColumnPositions, r, errorMessage, valueDataPool)) {
if (!FillCellsFromProto(valueCells, ValueColumnPositions, r, errorMessage, valueDataPool, IsInfinityInJsonAllowed())) {
return false;
}

Expand Down
4 changes: 4 additions & 0 deletions ydb/core/kqp/ut/common/kqp_ut_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ struct TKikimrSettings: public TTestFeatureFlagsHolder<TKikimrSettings> {
AppConfig.MutableColumnShardConfig()->SetAlterObjectEnabled(enable);
return *this;
}
TKikimrSettings& SetColumnShardDoubleOutOfRangeHandling(const NKikimrConfig::TColumnShardConfig_EJsonDoubleOutOfRangeHandlingPolicy value) {
AppConfig.MutableColumnShardConfig()->SetDoubleOutOfRangeHandling(value);
return *this;
}
};

class TKikimrRunner {
Expand Down
50 changes: 49 additions & 1 deletion ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3206,6 +3206,54 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
}
}

}
Y_UNIT_TEST(DoubleOutOfRangeInJson) {
auto settings = TKikimrSettings().SetWithSampleTables(false).SetColumnShardDoubleOutOfRangeHandling(
NKikimrConfig::TColumnShardConfig_EJsonDoubleOutOfRangeHandlingPolicy_CAST_TO_INFINITY);
TKikimrRunner kikimr(settings);
TLocalHelper(kikimr).CreateTestOlapTable();
WriteTestData(kikimr, "/Root/olapStore/olapTable", 0, 1000000, 2);
auto client = kikimr.GetTableClient();
Tests::NCommon::TLoggerInit(kikimr).Initialize();

{
auto result = kikimr.GetQueryClient()
.ExecuteQuery(R"(
CREATE TABLE olapTable (
k Uint32 NOT NULL,
v JsonDocument NOT NULL,
PRIMARY KEY (k)
)
WITH (
STORE = COLUMN,
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 4
)
)",
NQuery::TTxControl::NoTx())
.ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToOneLineString());
}

{
TValueBuilder rowsBuilder;
rowsBuilder.BeginList();
for (ui32 i = 0; i < 10; ++i) {
rowsBuilder.AddListItem().BeginStruct().AddMember("k").Uint32(i * 4 + 0).AddMember("v").JsonDocument("-1.797693135e+308").EndStruct();
rowsBuilder.AddListItem().BeginStruct().AddMember("k").Uint32(i * 4 + 1).AddMember("v").JsonDocument("1.797693135e+308").EndStruct();
rowsBuilder.AddListItem().BeginStruct().AddMember("k").Uint32(i * 4 + 2).AddMember("v").JsonDocument("1e1000000000000").EndStruct();
rowsBuilder.AddListItem().BeginStruct().AddMember("k").Uint32(i * 4 + 3).AddMember("v").JsonDocument("-1e1000000000000").EndStruct();
}
rowsBuilder.EndList();
auto result = client.BulkUpsert("/Root/olapTable", rowsBuilder.Build()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToOneLineString());
}

{
auto it = client.StreamExecuteScanQuery("SELECT * FROM olapTable WHERE k < 4 ORDER BY k").ExtractValueSync();
UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
TString result = StreamResultToYson(it);
Cout << result << Endl;
CompareYson(result, R"([[0u;"\"-inf\""];[1u;"\"inf\""];[2u;"\"inf\""];[3u;"\"-inf\""]])");
}
}
}
}
6 changes: 6 additions & 0 deletions ydb/core/protos/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1822,6 +1822,11 @@ message TColumnShardConfig {
}
repeated TRepairInfo Repairs = 15;

enum EJsonDoubleOutOfRangeHandlingPolicy {
REJECT = 0;
CAST_TO_INFINITY = 1;
}

optional uint32 MaxInFlightIntervalsOnRequest = 16;
optional uint32 MaxReadStaleness_ms = 18 [default = 300000];
optional uint32 GCIntervalMs = 19 [default = 30000];
Expand All @@ -1845,6 +1850,7 @@ message TColumnShardConfig {
optional int32 DefaultCompressionLevel = 37;
optional uint64 MemoryLimitMergeOnCompactionRawData = 38 [default = 512000000];
optional bool AlterObjectEnabled = 39 [default = false];
optional EJsonDoubleOutOfRangeHandlingPolicy DoubleOutOfRangeHandling = 40 [default = REJECT];
}

message TSchemeShardConfig {
Expand Down
25 changes: 19 additions & 6 deletions ydb/core/tx/tx_proxy/upload_rows_common_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/core/base/path.h>
#include <ydb/core/base/feature_flags.h>
#include <ydb/core/protos/config.pb.h>
#include <ydb/core/scheme/scheme_tablecell.h>
#include <ydb/core/scheme/scheme_type_info.h>
#include <ydb/core/scheme/scheme_types_proto.h>
Expand Down Expand Up @@ -267,7 +268,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit

ui32 keySize = KeyColumnPositions.size(); // YdbSchema contains keys first
TRowWriter writer(out, keySize);
NArrow::TArrowToYdbConverter batchConverter(YdbSchema, writer);
NArrow::TArrowToYdbConverter batchConverter(YdbSchema, writer, IsInfinityInJsonAllowed());
if (!batchConverter.Process(*batch, errorMessage)) {
return {};
}
Expand All @@ -276,6 +277,18 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
return out;
}

bool IsInfinityInJsonAllowed() const {
if (TableKind != NSchemeCache::TSchemeCacheNavigate::KindColumnTable) {
return false;
}
switch (AppDataVerified().ColumnShardConfig.GetDoubleOutOfRangeHandling()) {
case NKikimrConfig::TColumnShardConfig_EJsonDoubleOutOfRangeHandlingPolicy_REJECT:
return false;
case NKikimrConfig::TColumnShardConfig_EJsonDoubleOutOfRangeHandlingPolicy_CAST_TO_INFINITY:
return true;
}
}

private:
virtual void OnBeforeStart(const TActorContext&) {
// nothing by default
Expand Down Expand Up @@ -658,7 +671,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
}
// Explicit types conversion
if (!ColumnsToConvert.empty()) {
auto convertResult = NArrow::ConvertColumns(Batch, ColumnsToConvert);
auto convertResult = NArrow::ConvertColumns(Batch, ColumnsToConvert, IsInfinityInJsonAllowed());
if (!convertResult.ok()) {
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST,
TStringBuilder() << "Cannot convert arrow batch:" << convertResult.status().ToString(), ctx);
Expand Down Expand Up @@ -1238,9 +1251,8 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
using TFieldDescription = NTxProxy::TUploadRowsBase<NKikimrServices::TActivity::GRPC_REQ>::TFieldDescription;

template <class TProto>
inline bool FillCellsFromProto(TVector<TCell>& cells, const TVector<TFieldDescription>& descr, const TProto& proto,
TString& err, TMemoryPool& valueDataPool)
{
inline bool FillCellsFromProto(TVector<TCell>& cells, const TVector<TFieldDescription>& descr, const TProto& proto, TString& err,
TMemoryPool& valueDataPool, const bool allowInfDouble = false) {
cells.clear();
cells.reserve(descr.size());

Expand All @@ -1250,7 +1262,8 @@ inline bool FillCellsFromProto(TVector<TCell>& cells, const TVector<TFieldDescri
return false;
}
cells.push_back({});
if (!CellFromProtoVal(fd.Type, fd.Typmod, &proto.Getitems(fd.PositionInStruct), false, cells.back(), err, valueDataPool)) {
if (!CellFromProtoVal(
fd.Type, fd.Typmod, &proto.Getitems(fd.PositionInStruct), false, cells.back(), err, valueDataPool, allowInfDouble)) {
return false;
}

Expand Down
4 changes: 2 additions & 2 deletions ydb/core/ydb_convert/ydb_convert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1192,7 +1192,7 @@ bool CheckValueData(NScheme::TTypeInfo type, const TCell& cell, TString& err) {
}

bool CellFromProtoVal(const NScheme::TTypeInfo& type, i32 typmod, const Ydb::Value* vp, bool allowCastFromString,
TCell& c, TString& err, TMemoryPool& valueDataPool)
TCell& c, TString& err, TMemoryPool& valueDataPool, bool allowInfDouble)
{
if (vp->Hasnull_flag_value()) {
c = TCell();
Expand Down Expand Up @@ -1256,7 +1256,7 @@ bool CellFromProtoVal(const NScheme::TTypeInfo& type, i32 typmod, const Ydb::Val
break;
}
case NScheme::NTypeIds::JsonDocument : {
const auto binaryJson = NBinaryJson::SerializeToBinaryJson(val.Gettext_value());
const auto binaryJson = NBinaryJson::SerializeToBinaryJson(val.Gettext_value(), allowInfDouble);
if (std::holds_alternative<TString>(binaryJson)) {
err = "Invalid JSON for JsonDocument provided: " + std::get<TString>(binaryJson);
return false;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/ydb_convert/ydb_convert.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ void ConvertDirectoryEntry(const NKikimrSchemeOp::TDirEntry& from, Ydb::Scheme::
void ConvertDirectoryEntry(const NKikimrSchemeOp::TPathDescription& from, Ydb::Scheme::Entry* to, bool processAcl);

bool CellFromProtoVal(const NScheme::TTypeInfo& type, i32 typmod, const Ydb::Value* vp, bool allowCastFromString,
TCell& c, TString& err, TMemoryPool& valueDataPool);
TCell& c, TString& err, TMemoryPool& valueDataPool, bool allowInfDouble = false);

void ProtoValueFromCell(NYdb::TValueBuilder& vb, const NScheme::TTypeInfo& typeInfo, const TCell& cell);

Expand Down
Loading