Skip to content

KIKIMR-20714 Удаляем LongTxService в тестах #970

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 0 additions & 21 deletions ydb/core/kqp/ut/common/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ namespace NKqp {
TTestHelper::TTestHelper(const TKikimrSettings& settings)
: Kikimr(settings)
, TableClient(Kikimr.GetTableClient())
, LongTxClient(Kikimr.GetDriver())
, Session(TableClient.CreateSession().GetValueSync().GetSession())
{}

Expand All @@ -31,26 +30,6 @@ namespace NKqp {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

void TTestHelper::InsertData(const TColumnTable& table, TTestHelper::TUpdatesBuilder& updates, const std::function<void()> onBeforeCommit /*= {}*/, const EStatus opStatus /*= EStatus::SUCCESS*/) {
NLongTx::TLongTxBeginResult resBeginTx = LongTxClient.BeginWriteTx().GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(resBeginTx.Status().GetStatus(), EStatus::SUCCESS, resBeginTx.Status().GetIssues().ToString());

auto txId = resBeginTx.GetResult().tx_id();
auto batch = updates.BuildArrow();
TString data = NArrow::NSerialization::TFullDataSerializer(arrow::ipc::IpcWriteOptions::Defaults()).Serialize(batch);

NLongTx::TLongTxWriteResult resWrite =
LongTxClient.Write(txId, table.GetName(), txId, data, Ydb::LongTx::Data::APACHE_ARROW).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(resWrite.Status().GetStatus(), opStatus, resWrite.Status().GetIssues().ToString());

if (onBeforeCommit) {
onBeforeCommit();
}

NLongTx::TLongTxCommitResult resCommitTx = LongTxClient.CommitTx(txId).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(resCommitTx.Status().GetStatus(), EStatus::SUCCESS, resCommitTx.Status().GetIssues().ToString());
}

void TTestHelper::BulkUpsert(const TColumnTable& table, TTestHelper::TUpdatesBuilder& updates, const Ydb::StatusIds_StatusCode& opStatus /*= Ydb::StatusIds::SUCCESS*/) {
Y_UNUSED(opStatus);
NKikimr::Tests::NCS::THelper helper(Kikimr.GetTestServer());
Expand Down
3 changes: 0 additions & 3 deletions ydb/core/kqp/ut/common/columnshard.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#include "kqp_ut_common.h"
#include <ydb/library/accessor/accessor.h>
#include <ydb/public/lib/scheme_types/scheme_type_id.h>
#include <ydb/public/sdk/cpp/client/draft/ydb_long_tx.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
#include <ydb/public/sdk/cpp/client/ydb_types/status_codes.h>
#include <ydb/core/tx/columnshard/columnshard_ut_common.h>
Expand Down Expand Up @@ -64,7 +63,6 @@ namespace NKqp {
private:
TKikimrRunner Kikimr;
NYdb::NTable::TTableClient TableClient;
NYdb::NLongTx::TClient LongTxClient;
NYdb::NTable::TSession Session;

public:
Expand All @@ -73,7 +71,6 @@ namespace NKqp {
TTestActorRuntime& GetRuntime();
NYdb::NTable::TSession& GetSession();
void CreateTable(const TColumnTableBase& table);
void InsertData(const TColumnTable& table, TTestHelper::TUpdatesBuilder& updates, const std::function<void()> onBeforeCommit = {}, const NYdb::EStatus opStatus = NYdb::EStatus::SUCCESS);
void BulkUpsert(const TColumnTable& table, TTestHelper::TUpdatesBuilder& updates, const Ydb::StatusIds_StatusCode& opStatus = Ydb::StatusIds::SUCCESS);
void BulkUpsert(const TColumnTable& table, std::shared_ptr<arrow::RecordBatch> batch, const Ydb::StatusIds_StatusCode& opStatus = Ydb::StatusIds::SUCCESS);
void ReadData(const TString& query, const TString& expected, const NYdb::EStatus opStatus = NYdb::EStatus::SUCCESS);
Expand Down
8 changes: 4 additions & 4 deletions ydb/core/kqp/ut/olap/kqp_olap_stats_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ Y_UNIT_TEST_SUITE(KqpOlapStats) {
tableInserter.AddRow().Add(i).Add("test_res_" + std::to_string(i)).AddNull();
}

testHelper.InsertData(testTable, tableInserter);
testHelper.BulkUpsert(testTable, tableInserter);
}

Sleep(TDuration::Seconds(1));
Expand Down Expand Up @@ -88,7 +88,7 @@ Y_UNIT_TEST_SUITE(KqpOlapStats) {
for (size_t i = 0; i < inserted_rows; i++) {
tableInserter.AddRow().Add(i).Add("test_res_" + std::to_string(i)).AddNull();
}
testHelper.InsertData(testTable, tableInserter);
testHelper.BulkUpsert(testTable, tableInserter);
}

Sleep(TDuration::Seconds(1));
Expand Down Expand Up @@ -135,7 +135,7 @@ Y_UNIT_TEST_SUITE(KqpOlapStats) {
.Add("test_res_" + std::to_string(i + t * tables_in_store))
.AddNull();
}
testHelper.InsertData(testTable, tableInserter);
testHelper.BulkUpsert(testTable, tableInserter);
}

Sleep(TDuration::Seconds(20));
Expand All @@ -155,4 +155,4 @@ Y_UNIT_TEST_SUITE(KqpOlapStats) {
}

} // namespace NKqp
} // namespace NKikimr
} // namespace NKikimr
51 changes: 4 additions & 47 deletions ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
#include <ydb/core/kqp/ut/common/columnshard.h>
#include <ydb/public/sdk/cpp/client/draft/ydb_long_tx.h>

#include <ydb/core/sys_view/service/query_history.h>
#include <ydb/core/tx/columnshard/columnshard_ut_common.h>
Expand Down Expand Up @@ -534,67 +533,25 @@ Y_UNIT_TEST_SUITE(KqpOlap) {

void WriteTestData(TKikimrRunner& kikimr, TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount, bool withSomeNulls = false) {
UNIT_ASSERT(testTable != "/Root/benchTable"); // TODO: check schema instead

TLocalHelper lHelper(kikimr);
if (withSomeNulls)
lHelper.WithSomeNulls();
NYdb::NLongTx::TClient client(kikimr.GetDriver());

NLongTx::TLongTxBeginResult resBeginTx = client.BeginWriteTx().GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(resBeginTx.Status().GetStatus(), EStatus::SUCCESS, resBeginTx.Status().GetIssues().ToString());

auto txId = resBeginTx.GetResult().tx_id();
auto batch = lHelper.TestArrowBatch(pathIdBegin, tsBegin, rowCount);

TString data = NArrow::NSerialization::TFullDataSerializer(arrow::ipc::IpcWriteOptions::Defaults()).Serialize(batch);

NLongTx::TLongTxWriteResult resWrite =
client.Write(txId, testTable, txId, data, Ydb::LongTx::Data::APACHE_ARROW).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(resWrite.Status().GetStatus(), EStatus::SUCCESS, resWrite.Status().GetIssues().ToString());

NLongTx::TLongTxCommitResult resCommitTx = client.CommitTx(txId).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(resCommitTx.Status().GetStatus(), EStatus::SUCCESS, resCommitTx.Status().GetIssues().ToString());
lHelper.SendDataViaActorSystem(testTable, batch);
}

void WriteTestDataForClickBench(TKikimrRunner& kikimr, TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) {
UNIT_ASSERT(testTable == "/Root/benchTable"); // TODO: check schema instead

TClickHelper lHelper(kikimr.GetTestServer());
NYdb::NLongTx::TClient client(kikimr.GetDriver());

NLongTx::TLongTxBeginResult resBeginTx = client.BeginWriteTx().GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(resBeginTx.Status().GetStatus(), EStatus::SUCCESS, resBeginTx.Status().GetIssues().ToString());

auto txId = resBeginTx.GetResult().tx_id();
auto batch = lHelper.TestArrowBatch(pathIdBegin, tsBegin, rowCount);
TString data = NArrow::NSerialization::TFullDataSerializer(arrow::ipc::IpcWriteOptions::Defaults()).Serialize(batch);

NLongTx::TLongTxWriteResult resWrite =
client.Write(txId, testTable, txId, data, Ydb::LongTx::Data::APACHE_ARROW).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(resWrite.Status().GetStatus(), EStatus::SUCCESS, resWrite.Status().GetIssues().ToString());

NLongTx::TLongTxCommitResult resCommitTx = client.CommitTx(txId).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(resCommitTx.Status().GetStatus(), EStatus::SUCCESS, resCommitTx.Status().GetIssues().ToString());
lHelper.SendDataViaActorSystem(testTable, batch);
}

void WriteTestDataForTableWithNulls(TKikimrRunner& kikimr, TString testTable) {
UNIT_ASSERT(testTable == "/Root/tableWithNulls"); // TODO: check schema instead
TTableWithNullsHelper lHelper(kikimr.GetTestServer());
NYdb::NLongTx::TClient client(kikimr.GetDriver());

NLongTx::TLongTxBeginResult resBeginTx = client.BeginWriteTx().GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(resBeginTx.Status().GetStatus(), EStatus::SUCCESS, resBeginTx.Status().GetIssues().ToString());

auto txId = resBeginTx.GetResult().tx_id();
auto batch = lHelper.TestArrowBatch();
TString data = NArrow::NSerialization::TFullDataSerializer(arrow::ipc::IpcWriteOptions::Defaults()).Serialize(batch);

NLongTx::TLongTxWriteResult resWrite =
client.Write(txId, testTable, txId, data, Ydb::LongTx::Data::APACHE_ARROW).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(resWrite.Status().GetStatus(), EStatus::SUCCESS, resWrite.Status().GetIssues().ToString());

NLongTx::TLongTxCommitResult resCommitTx = client.CommitTx(txId).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(resCommitTx.Status().GetStatus(), EStatus::SUCCESS, resCommitTx.Status().GetIssues().ToString());
lHelper.SendDataViaActorSystem(testTable, batch);
}

void CreateTableOfAllTypes(TKikimrRunner& kikimr) {
Expand Down Expand Up @@ -5307,7 +5264,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
tableInserter.AddRow().Add(2).Add("test_res_2").Add("val1").AddNull();
tableInserter.AddRow().Add(3).Add("test_res_3").Add("val3").AddNull();
tableInserter.AddRow().Add(2).Add("test_res_2").Add("val2").AddNull();
testHelper.InsertData(testTable, tableInserter);
testHelper.BulkUpsert(testTable, tableInserter);
}
while (csController->GetIndexations().Val() == 0) {
Cout << "Wait indexation..." << Endl;
Expand Down
1 change: 0 additions & 1 deletion ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
#include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h>
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
#include <ydb/public/sdk/cpp/client/draft/ydb_long_tx.h>
#include <ydb/core/testlib/cs_helper.h>
#include <ydb/core/testlib/common_helper.h>
#include <ydb/core/formats/arrow/serializer/full.h>
Expand Down
50 changes: 11 additions & 39 deletions ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
#include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h>
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
#include <ydb/public/sdk/cpp/client/draft/ydb_long_tx.h>
#include <ydb/core/testlib/cs_helper.h>
#include <ydb/core/testlib/common_helper.h>
#include <ydb/core/formats/arrow/serializer/full.h>
Expand Down Expand Up @@ -5378,7 +5377,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema));
tableInserter.AddRow().Add(1).Add("test_res_1").AddNull();
tableInserter.AddRow().Add(2).Add("test_res_2").Add(123);
testHelper.InsertData(testTable, tableInserter);
testHelper.BulkUpsert(testTable, tableInserter);
}

testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=1", "[[1;#;[\"test_res_1\"]]]");
Expand Down Expand Up @@ -5407,7 +5406,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
{
TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema));
tableInserter.AddRow().Add(3).Add("test_res_3").Add(123).Add<uint64_t>(200);
testHelper.InsertData(testTable, tableInserter);
testHelper.BulkUpsert(testTable, tableInserter);
}

testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=3", "[[3;[123];[200u];[\"test_res_3\"]]]");
Expand Down Expand Up @@ -5444,7 +5443,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
}
testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=1", "[[1;#;#;[\"test_res_1\"]]]");
}

/*
Y_UNIT_TEST(AddColumnOnSchemeChange) {
TKikimrSettings runnerSettings;
runnerSettings.WithSampleTables = false;
Expand All @@ -5471,7 +5470,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
}
testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=1", "[[1;#;#;[\"test_res_1\"]]]");
}

*/
Y_UNIT_TEST(AddColumnWithStore) {
TKikimrSettings runnerSettings;
runnerSettings.WithSampleTables = false;
Expand All @@ -5494,7 +5493,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema));
tableInserter.AddRow().Add(1).Add("test_res_1").AddNull();
tableInserter.AddRow().Add(2).Add("test_res_2").Add(123);
testHelper.InsertData(testTable, tableInserter);
testHelper.BulkUpsert(testTable, tableInserter);
}

testHelper.ReadData("SELECT * FROM `/Root/TableStoreTest/ColumnTableTest` WHERE id=1", "[[1;#;[\"test_res_1\"]]]");
Expand Down Expand Up @@ -5524,7 +5523,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
{
TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema));
tableInserter.AddRow().Add(3).Add("test_res_3").Add(123).Add<uint64_t>(200);
testHelper.InsertData(testTable, tableInserter);
testHelper.BulkUpsert(testTable, tableInserter);
}

testHelper.ReadData("SELECT * FROM `/Root/TableStoreTest/ColumnTableTest` WHERE id=3", "[[3;[123];[200u];[\"test_res_3\"]]]");
Expand Down Expand Up @@ -5580,7 +5579,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schemaWithNull));
tableInserter.AddRow().Add(1).Add("test_res_1").AddNull();
tableInserter.AddRow().Add(2).Add("test_res_2").Add(123);
testHelper.InsertData(testTable, tableInserter, {}, EStatus::GENERIC_ERROR);
testHelper.BulkUpsert(testTable, tableInserter, Ydb::StatusIds::GENERIC_ERROR);
}
{
TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schemaWithNull));
Expand Down Expand Up @@ -5611,7 +5610,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema));
tableInserter.AddRow().Add(1).Add("test_res_1").AddNull();
tableInserter.AddRow().Add(2).Add("test_res_2").Add(123);
testHelper.InsertData(testTable, tableInserter);
testHelper.BulkUpsert(testTable, tableInserter);
}
testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=1", "[[1;#;[\"test_res_1\"]]]");
{
Expand All @@ -5628,7 +5627,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
}
testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` ", "[[1];[2]]");
}

/*
Y_UNIT_TEST(DropColumnOnSchemeChange) {
TKikimrSettings runnerSettings;
runnerSettings.WithSampleTables = false;
Expand All @@ -5654,34 +5653,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
}
testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=1", "[[1;#]]");
}

Y_UNIT_TEST(DropColumnOldScheme) {
TKikimrSettings runnerSettings;
runnerSettings.WithSampleTables = false;
TTestHelper testHelper(runnerSettings);

TVector<TTestHelper::TColumnSchema> schema = {
TTestHelper::TColumnSchema().SetName("id").SetType(NScheme::NTypeIds::Int32).SetNullable(false),
TTestHelper::TColumnSchema().SetName("resource_id").SetType(NScheme::NTypeIds::Utf8),
TTestHelper::TColumnSchema().SetName("level").SetType(NScheme::NTypeIds::Int32)
};

TTestHelper::TColumnTable testTable;

testTable.SetName("/Root/ColumnTableTest").SetPrimaryKey({"id"}).SetSharding({"id"}).SetSchema(schema);
testHelper.CreateTable(testTable);
{
auto alterQuery = TStringBuilder() << "ALTER TABLE `" << testTable.GetName() << "`DROP COLUMN resource_id;";
auto alterResult = testHelper.GetSession().ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString());
}
{
TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema));
tableInserter.AddRow().Add(1).Add("test_res_1").AddNull();
testHelper.InsertData(testTable, tableInserter, {}, EStatus::SUCCESS);
}
// testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=1", "[[1;#]]");
}
*/

Y_UNIT_TEST(DropColumnOldSchemeBulkUpsert) {
TKikimrSettings runnerSettings;
Expand Down Expand Up @@ -5729,7 +5701,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema));
tableInserter.AddRow().Add(1).Add("test_res_1").AddNull();
tableInserter.AddRow().Add(2).Add("test_res_2").Add(123);
testHelper.InsertData(testTable, tableInserter);
testHelper.BulkUpsert(testTable, tableInserter);
}
testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=1", "[[1;#;[\"test_res_1\"]]]");
{
Expand Down
5 changes: 2 additions & 3 deletions ydb/core/testlib/cs_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -429,15 +429,14 @@ std::shared_ptr<arrow::RecordBatch> TTableWithNullsHelper::TestArrowBatch(ui64,
Y_ABORT_UNLESS(bJsonDoc.AppendNull().ok());
}

auto maybeJsonDoc = NBinaryJson::SerializeToBinaryJson(R"({"col1": "val1", "obj": {"obj_col2_int": 16}})");
Y_ABORT_UNLESS(maybeJsonDoc.Defined());
const auto maybeJsonDoc = std::string(R"({"col1": "val1", "obj": {"obj_col2_int": 16}})");
for (size_t i = rowCount / 2 + 1; i <= rowCount; ++i) {
Y_ABORT_UNLESS(bId.Append(i).ok());
Y_ABORT_UNLESS(bResourceId.Append(std::to_string(i)).ok());
Y_ABORT_UNLESS(bLevel.AppendNull().ok());
Y_ABORT_UNLESS(bBinaryStr.Append(std::to_string(i)).ok());
Y_ABORT_UNLESS(bJsonVal.AppendNull().ok());
Y_ABORT_UNLESS(bJsonDoc.Append(maybeJsonDoc->Data(), maybeJsonDoc->Size()).ok());
Y_ABORT_UNLESS(bJsonDoc.Append(maybeJsonDoc.data(), maybeJsonDoc.length()).ok());
}

std::shared_ptr<arrow::Int32Array> aId;
Expand Down