Skip to content

indexes construction on first compaction stage (from insert table to … #1343

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/formats/arrow/arrow_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ class TColumnFilter {

static ui32 CrossSize(const ui32 s1, const ui32 f1, const ui32 s2, const ui32 f2);
class TMergerImpl;
void Add(const bool value, const ui32 count = 1);
void Reset(const ui32 count);
void ResetCaches() const {
FilterPlain.reset();
FilteredCount.reset();
}
public:
void Add(const bool value, const ui32 count = 1);
std::optional<ui32> GetFilteredCount() const;
const std::vector<bool>& BuildSimpleFilter() const;
std::shared_ptr<arrow::BooleanArray> BuildArrowFilter(const ui32 expectedSize, const std::optional<ui32> startPos = {}, const std::optional<ui32> count = {}) const;
Expand Down
11 changes: 9 additions & 2 deletions ydb/core/formats/arrow/hash/calcer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ void TXX64::AppendField(const std::shared_ptr<arrow::Scalar>& scalar, NXX64::TSt

auto& typedScalar = static_cast<const TScalar&>(*scalar);
if constexpr (arrow::has_string_view<T>()) {
hashCalcer.Update((const ui8*)typedScalar.value->data(), typedScalar.value->size());
hashCalcer.Update(reinterpret_cast<const ui8*>(typedScalar.value->data()), typedScalar.value->size());
} else if constexpr (arrow::has_c_type<T>()) {
hashCalcer.Update((const ui8*)(typedScalar.data()), sizeof(T));
hashCalcer.Update(reinterpret_cast<const ui8*>(typedScalar.data()), sizeof(typedScalar.value));
} else {
static_assert(arrow::is_decimal_type<T>());
}
Expand Down Expand Up @@ -130,4 +130,11 @@ std::vector<std::shared_ptr<arrow::Array>> TXX64::GetColumns(const std::shared_p
return columns;
}

ui64 TXX64::CalcHash(const std::shared_ptr<arrow::Scalar>& scalar) {
NXX64::TStreamStringHashCalcer calcer(0);
calcer.Start();
AppendField(scalar, calcer);
return calcer.Finish();
}

}
1 change: 1 addition & 0 deletions ydb/core/formats/arrow/hash/calcer.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class TXX64 {

static void AppendField(const std::shared_ptr<arrow::Array>& array, const int row, NXX64::TStreamStringHashCalcer& hashCalcer);
static void AppendField(const std::shared_ptr<arrow::Scalar>& scalar, NXX64::TStreamStringHashCalcer& hashCalcer);
static ui64 CalcHash(const std::shared_ptr<arrow::Scalar>& scalar);
std::optional<std::vector<ui64>> Execute(const std::shared_ptr<arrow::RecordBatch>& batch) const;
std::shared_ptr<arrow::Array> ExecuteToArray(const std::shared_ptr<arrow::RecordBatch>& batch, const std::string& hashFieldName) const;
};
Expand Down
60 changes: 60 additions & 0 deletions ydb/core/formats/arrow/ut/ut_hash.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#include <library/cpp/testing/unittest/registar.h>
#include <ydb/core/formats/arrow/arrow_helpers.h>
#include <ydb/core/formats/arrow/hash/xx_hash.h>
#include <ydb/core/formats/arrow/hash/calcer.h>

Y_UNIT_TEST_SUITE(Hash) {

using namespace NKikimr::NArrow;

Y_UNIT_TEST(ScalarBinaryHash) {
std::shared_ptr<arrow::Scalar> s1 = std::make_shared<arrow::StringScalar>("abcde");
std::shared_ptr<arrow::Scalar> s2 = std::make_shared<arrow::StringScalar>("abcde");
NHash::NXX64::TStreamStringHashCalcer calcer1(0);
calcer1.Start();
NHash::TXX64::AppendField(s1, calcer1);

NHash::NXX64::TStreamStringHashCalcer calcer2(0);
calcer2.Start();
NHash::TXX64::AppendField(s2, calcer2);
const ui64 hash = calcer1.Finish();
Cerr << hash << Endl;
Y_ABORT_UNLESS(hash == calcer2.Finish());
}

Y_UNIT_TEST(ScalarCTypeHash) {
std::shared_ptr<arrow::Scalar> s1 = std::make_shared<arrow::UInt32Scalar>(52);
std::shared_ptr<arrow::Scalar> s2 = std::make_shared<arrow::UInt32Scalar>(52);
NHash::NXX64::TStreamStringHashCalcer calcer1(0);
calcer1.Start();
NHash::TXX64::AppendField(s1, calcer1);

NHash::NXX64::TStreamStringHashCalcer calcer2(0);
calcer2.Start();
NHash::TXX64::AppendField(s2, calcer2);
const ui64 hash = calcer1.Finish();
Cerr << hash << Endl;
Y_ABORT_UNLESS(hash == calcer2.Finish());
}

Y_UNIT_TEST(ScalarCompositeHash) {
std::shared_ptr<arrow::Scalar> s11 = std::make_shared<arrow::StringScalar>("abcde");
std::shared_ptr<arrow::Scalar> s12 = std::make_shared<arrow::UInt32Scalar>(52);
std::shared_ptr<arrow::Scalar> s21 = std::make_shared<arrow::StringScalar>("abcde");
std::shared_ptr<arrow::Scalar> s22 = std::make_shared<arrow::UInt32Scalar>(52);
NHash::NXX64::TStreamStringHashCalcer calcer1(0);
calcer1.Start();
NHash::TXX64::AppendField(s11, calcer1);
NHash::TXX64::AppendField(s12, calcer1);

NHash::NXX64::TStreamStringHashCalcer calcer2(0);
calcer2.Start();
NHash::TXX64::AppendField(s21, calcer2);
NHash::TXX64::AppendField(s22, calcer2);
const ui64 hash = calcer1.Finish();
Cerr << hash << Endl;
Y_ABORT_UNLESS(hash == calcer2.Finish());
}


};
1 change: 1 addition & 0 deletions ydb/core/formats/arrow/ut/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ SRCS(
ut_dictionary.cpp
ut_size_calcer.cpp
ut_column_filter.cpp
ut_hash.cpp
)

END()
125 changes: 105 additions & 20 deletions ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1229,14 +1229,14 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
TLocalHelper(kikimr).CreateTestOlapTable();
auto tableClient = kikimr.GetTableClient();

Tests::NCommon::TLoggerInit(kikimr).Initialize();
// Tests::NCommon::TLoggerInit(kikimr).Initialize();

auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();

{
auto alterQuery = TStringBuilder() <<
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER,
FEATURES=`{"column_names" : ["uid"], "false_positive_probability" : 0.1}`);
FEATURES=`{"column_names" : ["uid"], "false_positive_probability" : 0.05}`);
)";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
Expand All @@ -1245,13 +1245,17 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
{
auto alterQuery = TStringBuilder() <<
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_resource_id, TYPE=BLOOM_FILTER,
FEATURES=`{"column_names" : ["resource_id", "uid"], "false_positive_probability" : 0.2}`);
FEATURES=`{"column_names" : ["resource_id", "level"], "false_positive_probability" : 0.05}`);
)";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString());
}

std::vector<TString> uids;
std::vector<TString> resourceIds;
std::vector<ui32> levels;

{
WriteTestData(kikimr, "/Root/olapStore/olapTable", 1000000, 300000000, 10000);
WriteTestData(kikimr, "/Root/olapStore/olapTable", 1100000, 300100000, 10000);
Expand All @@ -1260,6 +1264,23 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
WriteTestData(kikimr, "/Root/olapStore/olapTable", 1400000, 300400000, 10000);
WriteTestData(kikimr, "/Root/olapStore/olapTable", 2000000, 200000000, 70000);
WriteTestData(kikimr, "/Root/olapStore/olapTable", 3000000, 100000000, 110000);

const auto filler = [&](const ui32 startRes, const ui32 startUid, const ui32 count) {
for (ui32 i = 0; i < count; ++i) {
uids.emplace_back("uid_" + ::ToString(startUid + i));
resourceIds.emplace_back(::ToString(startRes + i));
levels.emplace_back(i % 5);
}
};

filler(1000000, 300000000, 10000);
filler(1100000, 300100000, 10000);
filler(1200000, 300200000, 10000);
filler(1300000, 300300000, 10000);
filler(1400000, 300400000, 10000);
filler(2000000, 200000000, 70000);
filler(3000000, 100000000, 110000);

}

{
Expand All @@ -1276,7 +1297,9 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
Cout << result << Endl;
CompareYson(result, R"([[230000u;]])");
}

AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val() == 0);
AFL_VERIFY(csController->GetIndexesApprovedOnSelect().Val() == 0);
TInstant start = Now();
ui32 compactionsStart = csController->GetCompactions().Val();
while (Now() - start < TDuration::Seconds(10)) {
Expand All @@ -1295,35 +1318,97 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
SELECT
COUNT(*)
FROM `/Root/olapStore/olapTable`
WHERE resource_id = '3000008' AND level = 3 AND uid = 'uid_100000008'
WHERE ((resource_id = '2' AND level = 222222) OR (resource_id = '1' AND level = 111111) OR (resource_id LIKE '%11dd%')) AND uid = '222'
)").GetValueSync();

UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
TString result = StreamResultToYson(it);
Cout << result << Endl;
Cout << csController->GetIndexesSkippingOnSelect().Val() << " / " << csController->GetIndexesApprovedOnSelect().Val() << Endl;
CompareYson(result, R"([[0u;]])");
AFL_VERIFY(csController->GetIndexesSkippedNoData().Val() == 0);
AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val() == 17);
AFL_VERIFY(csController->GetIndexesApprovedOnSelect().Val() == 4);
}
ui32 requestsCount = 100;
for (ui32 i = 0; i < requestsCount; ++i) {
const ui32 idx = RandomNumber<ui32>(uids.size());
const auto query = [](const TString& res, const TString& uid, const ui32 level) {
TStringBuilder sb;
sb << "SELECT" << Endl;
sb << "COUNT(*)" << Endl;
sb << "FROM `/Root/olapStore/olapTable`" << Endl;
sb << "WHERE(" << Endl;
sb << "resource_id = '" << res << "' AND" << Endl;
sb << "uid= '" << uid << "' AND" << Endl;
sb << "level= " << level << Endl;
sb << ")";
return sb;
};
auto it = tableClient.StreamExecuteScanQuery(query(resourceIds[idx], uids[idx], levels[idx])).GetValueSync();

UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
TString result = StreamResultToYson(it);
Cout << csController->GetIndexesSkippingOnSelect().Val() << " / " << csController->GetIndexesApprovedOnSelect().Val() << " / " << csController->GetIndexesSkippedNoData().Val() << Endl;
CompareYson(result, R"([[1u;]])");
AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val());
AFL_VERIFY(!csController->GetIndexesApprovedOnSelect().Val());
}

AFL_VERIFY(csController->GetIndexesApprovedOnSelect().Val() / csController->GetIndexesSkippingOnSelect().Val() < 0.15);

}

Y_UNIT_TEST(IndexesModificationError) {
auto settings = TKikimrSettings()
.SetWithSampleTables(false);
TKikimrRunner kikimr(settings);

TLocalHelper(kikimr).CreateTestOlapTable();
auto tableClient = kikimr.GetTableClient();

// Tests::NCommon::TLoggerInit(kikimr).Initialize();

auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();

{
const i64 before = csController->GetIndexesSkippingOnSelect().Val();
auto it = tableClient.StreamExecuteScanQuery(R"(
--!syntax_v1
auto alterQuery = TStringBuilder() <<
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER,
FEATURES=`{"column_names" : ["uid"], "false_positive_probability" : 0.05}`);
)";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString());
}

SELECT
COUNT(*)
FROM `/Root/olapStore/olapTable`
WHERE ((resource_id = '2' AND level = 222222) OR (resource_id = '1' AND level = 111111) OR (resource_id LIKE '%11dd%')) AND uid = '222'
)").GetValueSync();
{
auto alterQuery = TStringBuilder() <<
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER,
FEATURES=`{"column_names" : ["uid", "resource_id"], "false_positive_probability" : 0.05}`);
)";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_UNEQUAL(alterResult.GetStatus(), EStatus::SUCCESS);
}

UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
TString result = StreamResultToYson(it);
AFL_VERIFY(before != csController->GetIndexesSkippingOnSelect().Val());
Cout << result << Endl;
CompareYson(result, R"([[0u;]])");
AFL_VERIFY(!csController->GetIndexesApprovedOnSelect().Val());
{
auto alterQuery = TStringBuilder() <<
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER,
FEATURES=`{"column_names" : ["uid"], "false_positive_probability" : 0.005}`);
)";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_UNEQUAL(alterResult.GetStatus(), EStatus::SUCCESS);
}

{
auto alterQuery = TStringBuilder() <<
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER,
FEATURES=`{"column_names" : ["uid"], "false_positive_probability" : 0.01}`);
)";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString());
}

}

Y_UNIT_TEST(PushdownFilter) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/protos/ssa.proto
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ message TProgram {
}

message TBloomFilterChecker {
repeated uint32 HashValues = 1;
repeated uint64 HashValues = 1;
}

message TOlapIndexChecker {
Expand Down
10 changes: 7 additions & 3 deletions ydb/core/tx/columnshard/columnshard_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -298,9 +298,10 @@ struct Schema : NIceDb::Schema {
struct Blob: Column<5, NScheme::NTypeIds::String> {};
struct Offset: Column<6, NScheme::NTypeIds::Uint32> {};
struct Size: Column<7, NScheme::NTypeIds::Uint32> {};
struct RecordsCount: Column<8, NScheme::NTypeIds::Uint32> {};

using TKey = TableKey<PathId, PortionId, IndexId, ChunkIdx>;
using TColumns = TableColumns<PathId, PortionId, IndexId, ChunkIdx, Blob, Offset, Size>;
using TColumns = TableColumns<PathId, PortionId, IndexId, ChunkIdx, Blob, Offset, Size, RecordsCount>;
};

using TTables = SchemaTables<
Expand Down Expand Up @@ -618,14 +619,17 @@ class TIndexChunkLoadContext {
private:
YDB_READONLY_DEF(TBlobRange, BlobRange);
TChunkAddress Address;
const ui32 RecordsCount;
public:
TIndexChunk BuildIndexChunk() const {
return TIndexChunk(Address.GetColumnId(), Address.GetChunkIdx(), BlobRange);
return TIndexChunk(Address.GetColumnId(), Address.GetChunkIdx(), RecordsCount, BlobRange);
}

template <class TSource>
TIndexChunkLoadContext(const TSource& rowset, const IBlobGroupSelector* dsGroupSelector)
: Address(rowset.template GetValue<NColumnShard::Schema::IndexIndexes::IndexId>(), rowset.template GetValue<NColumnShard::Schema::IndexIndexes::ChunkIdx>()) {
: Address(rowset.template GetValue<NColumnShard::Schema::IndexIndexes::IndexId>(), rowset.template GetValue<NColumnShard::Schema::IndexIndexes::ChunkIdx>())
, RecordsCount(rowset.template GetValue<NColumnShard::Schema::IndexIndexes::RecordsCount>())
{
AFL_VERIFY(Address.GetColumnId())("event", "incorrect address")("address", Address.DebugString());
TString strBlobId = rowset.template GetValue<NColumnShard::Schema::IndexIndexes::Blob>();
Y_ABORT_UNLESS(strBlobId.size() == sizeof(TLogoBlobID), "Size %" PRISZT " doesn't match TLogoBlobID", strBlobId.size());
Expand Down
36 changes: 25 additions & 11 deletions ydb/core/tx/columnshard/engines/changes/with_appended.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,26 +98,40 @@ void TChangesWithAppend::DoCompile(TFinalizationContext& context) {
}

std::vector<TPortionInfoWithBlobs> TChangesWithAppend::MakeAppendedPortions(const std::shared_ptr<arrow::RecordBatch> batch,
const ui64 granule, const TSnapshot& snapshot, const TGranuleMeta* granuleMeta, TConstructionContext& context) const {
const ui64 pathId, const TSnapshot& snapshot, const TGranuleMeta* granuleMeta, TConstructionContext& context) const {
Y_ABORT_UNLESS(batch->num_rows());

auto resultSchema = context.SchemaVersions.GetSchema(snapshot);
std::vector<TPortionInfoWithBlobs> out;

std::shared_ptr<NOlap::TSerializationStats> stats = std::make_shared<NOlap::TSerializationStats>();
if (granuleMeta) {
stats = granuleMeta->BuildSerializationStats(resultSchema);
}
auto schema = std::make_shared<TDefaultSchemaDetails>(resultSchema, SaverContext, stats);
TRBSplitLimiter limiter(context.Counters.SplitterCounters, schema, batch, SplitSettings);

std::vector<std::vector<std::shared_ptr<IPortionDataChunk>>> chunkByBlobs;
std::shared_ptr<arrow::RecordBatch> portionBatch;
while (limiter.Next(chunkByBlobs, portionBatch)) {
TPortionInfoWithBlobs infoWithBlob = TPortionInfoWithBlobs::BuildByBlobs(chunkByBlobs, nullptr, granule, snapshot, SaverContext.GetStorageOperator());
infoWithBlob.GetPortionInfo().AddMetadata(*resultSchema, portionBatch, SaverContext.GetTierName());
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("portion_appended", infoWithBlob.GetPortionInfo().DebugString());
out.emplace_back(std::move(infoWithBlob));
std::vector<TPortionInfoWithBlobs> out;
{
std::vector<TBatchSerializedSlice> pages = TRBSplitLimiter::BuildSimpleSlices(batch, SplitSettings, context.Counters.SplitterCounters, schema);
std::vector<TGeneralSerializedSlice> generalPages;
for (auto&& i : pages) {
std::map<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>> portionColumns = i.GetPortionChunks();
resultSchema->GetIndexInfo().AppendIndexes(portionColumns);
generalPages.emplace_back(portionColumns, schema, context.Counters.SplitterCounters, SplitSettings);
}

TSimilarSlicer slicer(SplitSettings.GetExpectedPortionSize());
auto packs = slicer.Split(generalPages);

ui32 recordIdx = 0;
for (auto&& i : packs) {
TGeneralSerializedSlice slice(std::move(i));
auto b = batch->Slice(recordIdx, slice.GetRecordsCount());
std::vector<std::vector<std::shared_ptr<IPortionDataChunk>>> chunksByBlobs = slice.GroupChunksByBlobs();
out.emplace_back(TPortionInfoWithBlobs::BuildByBlobs(chunksByBlobs, nullptr, pathId, snapshot, SaverContext.GetStorageOperator()));
NArrow::TFirstLastSpecialKeys primaryKeys(slice.GetFirstLastPKBatch(resultSchema->GetIndexInfo().GetReplaceKey()));
NArrow::TMinMaxSpecialKeys snapshotKeys(b, TIndexInfo::ArrowSchemaSnapshot());
out.back().GetPortionInfo().AddMetadata(*resultSchema, primaryKeys, snapshotKeys, SaverContext.GetTierName());
recordIdx += slice.GetRecordsCount();
}
}

return out;
Expand Down
Loading