Skip to content

[CDC] Do not lose presition during float/double to json serialization… #7737

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions ydb/core/tx/datashard/change_record_cdc_serializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,16 @@ class TJsonSerializer: public TBaseSerializer {
friend class TChangeRecord; // used in GetPartitionKey()

static NJson::TJsonWriterConfig DefaultJsonConfig() {
NJson::TJsonWriterConfig jsonConfig;
jsonConfig.ValidateUtf8 = false;
jsonConfig.WriteNanAsString = true;
return jsonConfig;
constexpr ui32 doubleNDigits = std::numeric_limits<double>::max_digits10;
constexpr ui32 floatNDigits = std::numeric_limits<float>::max_digits10;
constexpr EFloatToStringMode floatMode = EFloatToStringMode::PREC_NDIGITS;
return NJson::TJsonWriterConfig {
.DoubleNDigits = doubleNDigits,
.FloatNDigits = floatNDigits,
.FloatToStringMode = floatMode,
.ValidateUtf8 = false,
.WriteNanAsString = true,
};
}

protected:
Expand Down
113 changes: 113 additions & 0 deletions ydb/tests/functional/replication/main.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
#include <util/system/env.h>
#include <library/cpp/testing/unittest/registar.h>

#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
#include <ydb/public/sdk/cpp/client/draft/ydb_scripting.h>

#include <library/cpp/threading/local_executor/local_executor.h>

using namespace NYdb;
using namespace NYdb::NTable;

namespace {

std::pair<ui64, Ydb::ResultSet> DoRead(TSession& s, const TString& table) {
auto res = s.ExecuteDataQuery(
Sprintf("SELECT * FROM `/local/%s`; SELECT COUNT(*) AS __count FROM `/local/%s`;",
table.data(), table.data()), TTxControl::BeginTx().CommitTx()).GetValueSync();
UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
auto rs = NYdb::TResultSetParser(res.GetResultSet(1));
UNIT_ASSERT(rs.TryNextRow());
auto count = rs.ColumnParser("__count").GetUint64();

const auto proto = NYdb::TProtoAccessor::GetProto(res.GetResultSet(0));
return {count, proto};
}

} // namespace

Y_UNIT_TEST_SUITE(Replication)
{
Y_UNIT_TEST(Types)
{
TString connectionString = GetEnv("YDB_ENDPOINT") + "/?database=" + GetEnv("YDB_DATABASE");
auto config = TDriverConfig(connectionString);
auto driver = TDriver(config);
auto tableClient = TTableClient(driver);
auto session = tableClient.GetSession().GetValueSync().GetSession();

{
auto res = session.ExecuteSchemeQuery(R"(
CREATE TABLE `/local/ProducerUuidValue` (
Key Uint32,
v01 Uuid,
v02 Uuid NOT NULL,
v03 Double,
PRIMARY KEY (Key)
);
)").GetValueSync();
UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
}

{
auto sessionResult = tableClient.GetSession().GetValueSync();
UNIT_ASSERT_C(sessionResult.IsSuccess(), sessionResult.GetIssues().ToString());
auto s = sessionResult.GetSession();

{
const TString query = "UPSERT INTO ProducerUuidValue (Key,v01,v02,v03) VALUES"
"(1, "
"CAST(\"5b99a330-04ef-4f1a-9b64-ba6d5f44ea01\" as Uuid), "
"UNWRAP(CAST(\"5b99a330-04ef-4f1a-9b64-ba6d5f44ea02\" as Uuid)), "
"CAST(\"311111111113.222222223\" as Double) "
");";
auto res = s.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).GetValueSync();
UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
}

{
const TString query = Sprintf("CREATE ASYNC REPLICATION `replication` FOR"
"`ProducerUuidValue` AS `ConsumerUuidValue`"
"WITH ("
"CONNECTION_STRING = 'grpc://%s',"
"TOKEN = 'root@builtin'"
");", connectionString.data());
auto res = s.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
}
// TODO: Make CREATE ASYNC REPLICATION to be a sync call
Sleep(TDuration::Seconds(10));
}

NYdb::NTable::TExecDataQuerySettings execSettings;
execSettings.KeepInQueryCache(true);

auto sessionResult = tableClient.GetSession().GetValueSync();
UNIT_ASSERT_C(sessionResult.IsSuccess(), sessionResult.GetIssues().ToString());

auto s = sessionResult.GetSession();
TUuidValue expectedV1("5b99a330-04ef-4f1a-9b64-ba6d5f44ea01");
TUuidValue expectedV2("5b99a330-04ef-4f1a-9b64-ba6d5f44ea02");
double expectedV3 = 311111111113.222222223;
ui32 attempt = 10;
while (--attempt) {
auto res = DoRead(s, "ConsumerUuidValue");
if (res.first == 1) {
const Ydb::ResultSet& proto = res.second;
UNIT_ASSERT_VALUES_EQUAL(proto.rows(0).items(0).uint32_value(), 1);
UNIT_ASSERT_VALUES_EQUAL(proto.rows(0).items(1).low_128(), expectedV1.Buf_.Halfs[0]);
UNIT_ASSERT_VALUES_EQUAL(proto.rows(0).items(1).high_128(), expectedV1.Buf_.Halfs[1]);
UNIT_ASSERT_VALUES_EQUAL(proto.rows(0).items(2).low_128(), expectedV2.Buf_.Halfs[0]);
UNIT_ASSERT_VALUES_EQUAL(proto.rows(0).items(2).high_128(), expectedV2.Buf_.Halfs[1]);
UNIT_ASSERT_DOUBLES_EQUAL(proto.rows(0).items(3).double_value(), expectedV3, 0.0001);
break;
}
Sleep(TDuration::Seconds(1));
}

UNIT_ASSERT_C(attempt, "Unable to wait replication result");
}
}

26 changes: 26 additions & 0 deletions ydb/tests/functional/replication/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
UNITTEST()

ENV(YDB_USE_IN_MEMORY_PDISKS=true)

ENV(YDB_ERASURE=block_4-2)

PEERDIR(
library/cpp/threading/local_executor
ydb/public/sdk/cpp/client/ydb_table
ydb/public/sdk/cpp/client/ydb_proto
ydb/public/sdk/cpp/client/draft
)

SRCS(
main.cpp
)

INCLUDE(${ARCADIA_ROOT}/ydb/public/tools/ydb_recipe/recipe.inc)

SIZE(MEDIUM)

IF (SANITIZER_TYPE)
REQUIREMENTS(ram:16)
ENDIF()

END()
1 change: 1 addition & 0 deletions ydb/tests/functional/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ RECURSE(
query_cache
rename
restarts
replication
scheme_shard
scheme_tests
script_execution
Expand Down
Loading