Skip to content

[CDC] Do not lose presition during float/double to json serialization #7625

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions ydb/core/tx/datashard/change_record_cdc_serializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,16 @@ class TJsonSerializer: public TBaseSerializer {
friend class TChangeRecord; // used in GetPartitionKey()

static NJson::TJsonWriterConfig DefaultJsonConfig() {
NJson::TJsonWriterConfig jsonConfig;
jsonConfig.ValidateUtf8 = false;
jsonConfig.WriteNanAsString = true;
return jsonConfig;
constexpr ui32 doubleNDigits = std::numeric_limits<double>::max_digits10;
constexpr ui32 floatNDigits = std::numeric_limits<float>::max_digits10;
constexpr EFloatToStringMode floatMode = EFloatToStringMode::PREC_NDIGITS;
return NJson::TJsonWriterConfig {
.DoubleNDigits = doubleNDigits,
.FloatNDigits = floatNDigits,
.FloatToStringMode = floatMode,
.ValidateUtf8 = false,
.WriteNanAsString = true,
};
}

protected:
Expand Down
57 changes: 27 additions & 30 deletions ydb/tests/functional/replication/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@

#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
#include <ydb/public/sdk/cpp/client/draft/ydb_scripting.h>
#include <ydb/public/lib/yson_value/ydb_yson_value.h>
#include <library/cpp/yson/writer.h>

#include <library/cpp/threading/local_executor/local_executor.h>

Expand All @@ -14,18 +13,7 @@ using namespace NYdb::NTable;

namespace {

TString ReformatYson(const TString& yson) {
TStringStream ysonInput(yson);
TStringStream output;
NYson::ReformatYsonStream(&ysonInput, &output, NYson::EYsonFormat::Text);
return output.Str();
}

void CompareYson(const TString& expected, const TString& actual) {
UNIT_ASSERT_NO_DIFF(ReformatYson(expected), ReformatYson(actual));
}

ui64 DoRead(TSession& s, const TString& table, ui64 expectedRows, const TString& expectedContent) {
std::pair<ui64, Ydb::ResultSet> DoRead(TSession& s, const TString& table) {
auto res = s.ExecuteDataQuery(
Sprintf("SELECT * FROM `/local/%s`; SELECT COUNT(*) AS __count FROM `/local/%s`;",
table.data(), table.data()), TTxControl::BeginTx().CommitTx()).GetValueSync();
Expand All @@ -34,23 +22,17 @@ ui64 DoRead(TSession& s, const TString& table, ui64 expectedRows, const TString&
UNIT_ASSERT(rs.TryNextRow());
auto count = rs.ColumnParser("__count").GetUint64();

if (count == expectedRows) {
auto yson = NYdb::FormatResultSetYson(res.GetResultSet(0));

CompareYson(expectedContent, yson);
}

return count;
const auto proto = NYdb::TProtoAccessor::GetProto(res.GetResultSet(0));
return {count, proto};
}

} // namespace

Y_UNIT_TEST_SUITE(Replication)
{
Y_UNIT_TEST(UuidValue)
Y_UNIT_TEST(Types)
{
TString connectionString = GetEnv("YDB_ENDPOINT") + "/?database=" + GetEnv("YDB_DATABASE");
Cerr << connectionString << Endl;
auto config = TDriverConfig(connectionString);
auto driver = TDriver(config);
auto tableClient = TTableClient(driver);
Expand All @@ -60,8 +42,9 @@ Y_UNIT_TEST_SUITE(Replication)
auto res = session.ExecuteSchemeQuery(R"(
CREATE TABLE `/local/ProducerUuidValue` (
Key Uint32,
Value1 Uuid,
Value2 Uuid NOT NULL,
v01 Uuid,
v02 Uuid NOT NULL,
v03 Double,
PRIMARY KEY (Key)
);
)").GetValueSync();
Expand All @@ -74,11 +57,12 @@ Y_UNIT_TEST_SUITE(Replication)
auto s = sessionResult.GetSession();

{
const TString query = "UPSERT INTO ProducerUuidValue (Key, Value1, Value2) VALUES"
const TString query = "UPSERT INTO ProducerUuidValue (Key,v01,v02,v03) VALUES"
"(1, "
"CAST(\"5b99a330-04ef-4f1a-9b64-ba6d5f44ea01\" as Uuid), "
"UNWRAP(CAST(\"5b99a330-04ef-4f1a-9b64-ba6d5f44ea02\" as Uuid)"
"));";
"UNWRAP(CAST(\"5b99a330-04ef-4f1a-9b64-ba6d5f44ea02\" as Uuid)), "
"CAST(\"311111111113.222222223\" as Double) "
");";
auto res = s.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).GetValueSync();
UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
}
Expand All @@ -104,9 +88,22 @@ Y_UNIT_TEST_SUITE(Replication)
UNIT_ASSERT_C(sessionResult.IsSuccess(), sessionResult.GetIssues().ToString());

auto s = sessionResult.GetSession();
const TString expected = R"([[[1u];["5b99a330-04ef-4f1a-9b64-ba6d5f44ea01"];"5b99a330-04ef-4f1a-9b64-ba6d5f44ea02"]])";
TUuidValue expectedV1("5b99a330-04ef-4f1a-9b64-ba6d5f44ea01");
TUuidValue expectedV2("5b99a330-04ef-4f1a-9b64-ba6d5f44ea02");
double expectedV3 = 311111111113.222222223;
ui32 attempt = 10;
while (1 != DoRead(s, "ConsumerUuidValue", 1, expected) && --attempt) {
while (--attempt) {
auto res = DoRead(s, "ConsumerUuidValue");
if (res.first == 1) {
const Ydb::ResultSet& proto = res.second;
UNIT_ASSERT_VALUES_EQUAL(proto.rows(0).items(0).uint32_value(), 1);
UNIT_ASSERT_VALUES_EQUAL(proto.rows(0).items(1).low_128(), expectedV1.Buf_.Halfs[0]);
UNIT_ASSERT_VALUES_EQUAL(proto.rows(0).items(1).high_128(), expectedV1.Buf_.Halfs[1]);
UNIT_ASSERT_VALUES_EQUAL(proto.rows(0).items(2).low_128(), expectedV2.Buf_.Halfs[0]);
UNIT_ASSERT_VALUES_EQUAL(proto.rows(0).items(2).high_128(), expectedV2.Buf_.Halfs[1]);
UNIT_ASSERT_DOUBLES_EQUAL(proto.rows(0).items(3).double_value(), expectedV3, 0.0001);
break;
}
Sleep(TDuration::Seconds(1));
}

Expand Down
3 changes: 1 addition & 2 deletions ydb/tests/functional/replication/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ ENV(YDB_ERASURE=block_4-2)

PEERDIR(
library/cpp/threading/local_executor
library/cpp/yson
ydb/public/sdk/cpp/client/ydb_table
ydb/public/sdk/cpp/client/ydb_proto
ydb/public/sdk/cpp/client/draft
ydb/public/lib/yson_value
)

SRCS(
Expand Down
Loading