Skip to content

Commit 92f9aaa

Browse files
authored
fix reading outdated schemas in simple reader (#16357)
1 parent 358728e commit 92f9aaa

File tree

3 files changed

+84
-51
lines changed

3 files changed

+84
-51
lines changed

ydb/core/formats/arrow/program/original.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include "abstract.h"
33
#include "functions.h"
44
#include "kernel_logic.h"
5+
#include "execution.h"
56

67
namespace NKikimr::NArrow::NSSA {
78

ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp

Lines changed: 73 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -10642,64 +10642,90 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
1064210642
}
1064310643
}
1064410644

10645-
Y_UNIT_TEST(AddColumn) {
10646-
TKikimrSettings runnerSettings;
10647-
runnerSettings.WithSampleTables = false;
10648-
TTestHelper testHelper(runnerSettings);
10645+
class TestAddColumn {
10646+
private:
10647+
TString ReaderPolicyName;
1064910648

10650-
TVector<TTestHelper::TColumnSchema> schema = {
10651-
TTestHelper::TColumnSchema().SetName("id").SetType(NScheme::NTypeIds::Int32).SetNullable(false),
10652-
TTestHelper::TColumnSchema().SetName("resource_id").SetType(NScheme::NTypeIds::Utf8),
10653-
TTestHelper::TColumnSchema().SetName("level").SetType(NScheme::NTypeIds::Int32)
10654-
};
10649+
public:
10650+
TestAddColumn(const TString& reader)
10651+
: ReaderPolicyName(reader) {
10652+
}
1065510653

10656-
Tests::NCommon::TLoggerInit(testHelper.GetKikimr()).Initialize();
10657-
TTestHelper::TColumnTable testTable;
10654+
void Run() {
10655+
TKikimrSettings runnerSettings;
10656+
runnerSettings.WithSampleTables = false;
10657+
runnerSettings.SetColumnShardAlterObjectEnabled(true);
10658+
TTestHelper testHelper(runnerSettings);
1065810659

10659-
testTable.SetName("/Root/ColumnTableTest").SetPrimaryKey({"id"}).SetSharding({"id"}).SetSchema(schema);
10660-
testHelper.CreateTable(testTable);
10660+
TVector<TTestHelper::TColumnSchema> schema = {
10661+
TTestHelper::TColumnSchema().SetName("id").SetType(NScheme::NTypeIds::Int32).SetNullable(false),
10662+
TTestHelper::TColumnSchema().SetName("resource_id").SetType(NScheme::NTypeIds::Utf8),
10663+
TTestHelper::TColumnSchema().SetName("level").SetType(NScheme::NTypeIds::Int32)
10664+
};
1066110665

10662-
{
10663-
TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema));
10664-
tableInserter.AddRow().Add(1).Add("test_res_1").AddNull();
10665-
tableInserter.AddRow().Add(2).Add("test_res_2").Add(123);
10666-
testHelper.BulkUpsert(testTable, tableInserter);
10667-
}
10666+
Tests::NCommon::TLoggerInit(testHelper.GetKikimr()).Initialize();
10667+
TTestHelper::TColumnTable testTable;
1066810668

10669-
testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=1", "[[1;#;[\"test_res_1\"]]]");
10669+
testTable.SetName("/Root/ColumnTableTest").SetPrimaryKey({ "id" }).SetSharding({ "id" }).SetSchema(schema);
10670+
testHelper.CreateTable(testTable);
10671+
{
10672+
auto alterQuery = TStringBuilder()
10673+
<< "ALTER OBJECT `" << testTable.GetName()
10674+
<< "` (TYPE TABLE) SET (ACTION=UPSERT_OPTIONS, `SCAN_READER_POLICY_NAME`=`" << ReaderPolicyName << "`)";
10675+
auto alterResult = testHelper.GetSession().ExecuteSchemeQuery(alterQuery).GetValueSync();
10676+
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString());
10677+
}
1067010678

10671-
{
10672-
schema.push_back(TTestHelper::TColumnSchema().SetName("new_column").SetType(NScheme::NTypeIds::Uint64));
10673-
auto alterQuery = TStringBuilder() << "ALTER TABLE `" << testTable.GetName() << "` ADD COLUMN new_column Uint64;";
10674-
auto alterResult = testHelper.GetSession().ExecuteSchemeQuery(alterQuery).GetValueSync();
10675-
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString());
10676-
}
10679+
{
10680+
TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema));
10681+
tableInserter.AddRow().Add(1).Add("test_res_1").AddNull();
10682+
tableInserter.AddRow().Add(2).Add("test_res_2").Add(123);
10683+
testHelper.BulkUpsert(testTable, tableInserter);
10684+
}
1067710685

10678-
{
10679-
auto settings = TDescribeTableSettings().WithTableStatistics(true);
10680-
auto describeResult = testHelper.GetSession().DescribeTable("/Root/ColumnTableTest", settings).GetValueSync();
10681-
UNIT_ASSERT_C(describeResult.IsSuccess(), describeResult.GetIssues().ToString());
10686+
testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=1", "[[1;#;[\"test_res_1\"]]]");
1068210687

10683-
const auto& description = describeResult.GetTableDescription();
10684-
auto columns = description.GetTableColumns();
10685-
UNIT_ASSERT_VALUES_EQUAL(columns.size(), 4);
10686-
}
10688+
{
10689+
schema.push_back(TTestHelper::TColumnSchema().SetName("new_column").SetType(NScheme::NTypeIds::Uint64));
10690+
auto alterQuery = TStringBuilder() << "ALTER TABLE `" << testTable.GetName() << "` ADD COLUMN new_column Uint64;";
10691+
auto alterResult = testHelper.GetSession().ExecuteSchemeQuery(alterQuery).GetValueSync();
10692+
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString());
10693+
}
1068710694

10688-
testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=1", "[[1;#;#;[\"test_res_1\"]]]");
10689-
testHelper.ReadData("SELECT new_column FROM `/Root/ColumnTableTest` WHERE id=1", "[[#]]");
10690-
testHelper.ReadData("SELECT resource_id FROM `/Root/ColumnTableTest` WHERE id=1", "[[[\"test_res_1\"]]]");
10691-
Tests::NCommon::TLoggerInit(testHelper.GetKikimr()).Initialize();
10692-
{
10693-
TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema));
10694-
tableInserter.AddRow().Add(3).Add("test_res_3").Add(123).Add<uint64_t>(200);
10695-
testHelper.BulkUpsert(testTable, tableInserter);
10695+
{
10696+
auto settings = TDescribeTableSettings().WithTableStatistics(true);
10697+
auto describeResult = testHelper.GetSession().DescribeTable("/Root/ColumnTableTest", settings).GetValueSync();
10698+
UNIT_ASSERT_C(describeResult.IsSuccess(), describeResult.GetIssues().ToString());
10699+
10700+
const auto& description = describeResult.GetTableDescription();
10701+
auto columns = description.GetTableColumns();
10702+
UNIT_ASSERT_VALUES_EQUAL(columns.size(), 4);
10703+
}
10704+
10705+
testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=1", "[[1;#;#;[\"test_res_1\"]]]");
10706+
testHelper.ReadData("SELECT new_column FROM `/Root/ColumnTableTest` WHERE id=1", "[[#]]");
10707+
testHelper.ReadData("SELECT resource_id FROM `/Root/ColumnTableTest` WHERE id=1", "[[[\"test_res_1\"]]]");
10708+
Tests::NCommon::TLoggerInit(testHelper.GetKikimr()).Initialize();
10709+
{
10710+
TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema));
10711+
tableInserter.AddRow().Add(3).Add("test_res_3").Add(123).Add<uint64_t>(200);
10712+
testHelper.BulkUpsert(testTable, tableInserter);
10713+
}
10714+
10715+
testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=3", "[[3;[123];[200u];[\"test_res_3\"]]]");
10716+
testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE new_column=200", "[[3;[123];[200u];[\"test_res_3\"]]]");
10717+
testHelper.ReadData("SELECT new_column FROM `/Root/ColumnTableTest` WHERE id=3", "[[[200u]]]");
10718+
testHelper.ReadData("SELECT resource_id FROM `/Root/ColumnTableTest` WHERE id=3", "[[[\"test_res_3\"]]]");
10719+
testHelper.ReadData("SELECT new_column FROM `/Root/ColumnTableTest`", "[[#];[#];[[200u]]]");
1069610720
}
10721+
};
10722+
10723+
Y_UNIT_TEST(AddColumn) {
10724+
TestAddColumn("PLAIN").Run();
10725+
}
1069710726

10698-
testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=3", "[[3;[123];[200u];[\"test_res_3\"]]]");
10699-
testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE new_column=200", "[[3;[123];[200u];[\"test_res_3\"]]]");
10700-
testHelper.ReadData("SELECT new_column FROM `/Root/ColumnTableTest` WHERE id=3", "[[[200u]]]");
10701-
testHelper.ReadData("SELECT resource_id FROM `/Root/ColumnTableTest` WHERE id=3", "[[[\"test_res_3\"]]]");
10702-
testHelper.ReadData("SELECT new_column FROM `/Root/ColumnTableTest`", "[[#];[#];[[200u]]]");
10727+
Y_UNIT_TEST(AddColumnSimpleReader) {
10728+
TestAddColumn("SIMPLE").Run();
1070310729
}
1070410730

1070510731
Y_UNIT_TEST(AddColumnOldSchemeBulkUpsert) {

ydb/core/tx/columnshard/engines/reader/common_reader/iterator/default_fetching.h

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,13 @@ class TDefaultFetchLogic: public IKernelFetchLogic {
77
private:
88
using TBase = IKernelFetchLogic;
99

10+
std::shared_ptr<NArrow::NAccessor::TColumnLoader> GetColumnLoader(const std::shared_ptr<NCommon::IDataSource>& source) const {
11+
if (auto loader = source->GetSourceSchema()->GetColumnLoaderOptional(GetEntityId())) {
12+
return loader;
13+
}
14+
return source->GetContext()->GetReadMetadata()->GetResultSchema()->GetColumnLoaderVerified(GetEntityId());
15+
}
16+
1017
class TChunkRestoreInfo {
1118
private:
1219
std::optional<TBlobRange> BlobRange;
@@ -50,8 +57,7 @@ class TDefaultFetchLogic: public IKernelFetchLogic {
5057
chunks.emplace_back(i.ExtractDataVerified());
5158
}
5259

53-
TPortionDataAccessor::TPreparedColumn column(
54-
std::move(chunks), context.GetSource()->GetSourceSchema()->GetColumnLoaderVerified(GetEntityId()));
60+
TPortionDataAccessor::TPreparedColumn column(std::move(chunks), GetColumnLoader(context.GetSource()));
5561
context.GetAccessors().AddVerified(GetEntityId(), column.AssembleAccessor().DetachResult(), true);
5662
}
5763

@@ -72,8 +78,8 @@ class TDefaultFetchLogic: public IKernelFetchLogic {
7278
auto source = context.GetSource();
7379
auto columnChunks = source->GetStageData().GetPortionAccessor().GetColumnChunksPointers(GetEntityId());
7480
if (columnChunks.empty()) {
75-
ColumnChunks.emplace_back(source->GetRecordsCount(), TPortionDataAccessor::TAssembleBlobInfo(source->GetRecordsCount(),
76-
source->GetSourceSchema()->GetExternalDefaultValueVerified(GetEntityId())));
81+
ColumnChunks.emplace_back(source->GetRecordsCount(),
82+
TPortionDataAccessor::TAssembleBlobInfo(source->GetRecordsCount(), GetColumnLoader(context.GetSource())->GetDefaultValue()));
7783
return;
7884
}
7985
StorageId = source->GetColumnStorageId(GetEntityId());

0 commit comments

Comments
 (0)