Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion ydb/core/kqp/ut/olap/indexes_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
csController->SetOverrideLagForCompactionBeforeTierings(TDuration::Seconds(1));
csController->SetOverrideReduceMemoryIntervalLimit(1LLU << 30);
csController->SetOverrideMemoryLimitForPortionReading(1e+10);
csController->SetOverrideBlobSplitSettings(NOlap::NSplitter::TSplitSettings());

TLocalHelper(kikimr).CreateTestOlapTable();
auto tableClient = kikimr.GetTableClient();
Expand Down Expand Up @@ -103,6 +104,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
csController->SetOverridePeriodicWakeupActivationPeriod(TDuration::Seconds(1));
csController->SetOverrideLagForCompactionBeforeTierings(TDuration::Seconds(1));
csController->SetOverrideReduceMemoryIntervalLimit(1LLU << 30);
csController->SetOverrideBlobSplitSettings(NOlap::NSplitter::TSplitSettings());

TLocalHelper(kikimr).CreateTestOlapTableWithoutStore();
auto tableClient = kikimr.GetTableClient();
Expand Down Expand Up @@ -347,6 +349,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>();
csController->SetOverrideReduceMemoryIntervalLimit(1LLU << 30);
csController->SetOverrideMemoryLimitForPortionReading(1e+10);
csController->SetOverrideBlobSplitSettings(NOlap::NSplitter::TSplitSettings());
TLocalHelper(*Kikimr).CreateTestOlapTable();
auto tableClient = Kikimr->GetTableClient();

Expand All @@ -367,7 +370,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
auto alterQuery =
TStringBuilder() << Sprintf(
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_ngramm_uid, TYPE=BLOOM_NGRAMM_FILTER,
FEATURES=`{"column_name" : "resource_id", "ngramm_size" : 3, "hashes_count" : 2, "filter_size_bytes" : 64024}`);
FEATURES=`{"column_name" : "resource_id", "ngramm_size" : 3, "hashes_count" : 2, "filter_size_bytes" : 512, "records_count" : 1024}`);
)",
StorageId.data());
auto session = tableClient.CreateSession().GetValueSync().GetSession();
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/ut/olap/sys_view_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
ui64 bytesPK1;
{
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>();
csController->SetOverrideBlobSplitSettings(NOlap::NSplitter::TSplitSettings());
auto settings = TKikimrSettings()
.SetWithSampleTables(false);
TKikimrRunner kikimr(settings);
Expand All @@ -127,6 +128,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
}

auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>();
csController->SetOverrideBlobSplitSettings(NOlap::NSplitter::TSplitSettings());
ui64 rawBytesUnpack1PK = 0;
ui64 bytesUnpack1PK = 0;
ui64 rawBytesPackAndUnpack2PK;
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ message TRequestedBloomNGrammFilter {
optional uint32 FilterSizeBytes = 2;
optional uint32 HashesCount = 3;
optional string ColumnName = 4;
optional uint32 RecordsCount = 5;
}

message TRequestedMaxIndex {
Expand Down Expand Up @@ -432,6 +433,7 @@ message TBloomNGrammFilter {
optional uint32 FilterSizeBytes = 2;
optional uint32 HashesCount = 3;
optional uint32 ColumnId = 4;
optional uint32 RecordsCount = 5;
}

message TMaxIndex {
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/tx/columnshard/blobs_action/abstract/storage.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include "storage.h"

#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>

namespace NKikimr::NOlap {

bool TCommonBlobsTracker::IsBlobInUsage(const NOlap::TUnifiedBlobId& blobId) const {
Expand Down Expand Up @@ -42,4 +44,8 @@ void IBlobsStorageOperator::Stop() {
Stopped = true;
}

const NSplitter::TSplitSettings& IBlobsStorageOperator::GetBlobSplitSettings() const {
return NYDBTest::TControllers::GetColumnShardController()->GetBlobSplitSettings(DoGetBlobSplitSettings());
}

} // namespace NKikimr::NOlap
26 changes: 14 additions & 12 deletions ydb/core/tx/columnshard/blobs_action/abstract/storage.h
Original file line number Diff line number Diff line change
@@ -1,26 +1,28 @@
#pragma once
#include "gc.h"
#include "read.h"
#include "remove.h"
#include "write.h"
#include "read.h"
#include "gc.h"

#include <ydb/core/tx/columnshard/blobs_action/blob_manager_db.h>
#include <ydb/core/tx/columnshard/blobs_action/counters/storage.h>
#include <ydb/core/tx/columnshard/blobs_action/counters/remove_gc.h>
#include <ydb/core/tx/columnshard/blobs_action/counters/storage.h>
#include <ydb/core/tx/columnshard/data_sharing/manager/shared_blobs.h>
#include <ydb/core/tx/tiering/abstract/manager.h>

#include <ydb/library/accessor/accessor.h>
#include <ydb/core/tx/tiering/abstract/manager.h>

namespace NKikimr::NOlap {

class TCommonBlobsTracker: public IBlobInUseTracker {
private:
// List of blobs that are used by in-flight requests
THashMap<TUnifiedBlobId, i64> BlobsUseCount;

protected:
virtual bool DoUseBlob(const TUnifiedBlobId& blobId) override;
virtual bool DoFreeBlob(const TUnifiedBlobId& blobId) override;

public:
virtual bool IsBlobInUsage(const NOlap::TUnifiedBlobId& blobId) const override;
virtual void OnBlobFree(const TUnifiedBlobId& blobId) = 0;
Expand All @@ -34,8 +36,10 @@ class IBlobsStorageOperator {
YDB_READONLY(bool, Stopped, false);
std::shared_ptr<NBlobOperations::TStorageCounters> Counters;
YDB_ACCESSOR_DEF(std::shared_ptr<NDataSharing::TStorageSharedBlobsManager>, SharedBlobs);

protected:
virtual std::shared_ptr<IBlobsDeclareRemovingAction> DoStartDeclareRemovingAction(const std::shared_ptr<NBlobOperations::TRemoveDeclareCounters>& counters) = 0;
virtual std::shared_ptr<IBlobsDeclareRemovingAction> DoStartDeclareRemovingAction(
const std::shared_ptr<NBlobOperations::TRemoveDeclareCounters>& counters) = 0;
virtual std::shared_ptr<IBlobsWritingAction> DoStartWritingAction() = 0;
virtual std::shared_ptr<IBlobsReadingAction> DoStartReadingAction() = 0;
virtual bool DoLoad(IBlobManagerDb& dbBlobs) = 0;
Expand Down Expand Up @@ -67,16 +71,13 @@ class IBlobsStorageOperator {
IBlobsStorageOperator(const TString& storageId, const std::shared_ptr<NDataSharing::TStorageSharedBlobsManager>& sharedBlobs)
: SelfTabletId(sharedBlobs->GetSelfTabletId())
, StorageId(storageId)
, SharedBlobs(sharedBlobs)
{
, SharedBlobs(sharedBlobs) {
Counters = std::make_shared<NBlobOperations::TStorageCounters>(storageId);
}

void Stop();

const NSplitter::TSplitSettings& GetBlobSplitSettings() const {
return DoGetBlobSplitSettings();
}
const NSplitter::TSplitSettings& GetBlobSplitSettings() const;

virtual TTabletsByBlob GetBlobsToDelete() const = 0;
virtual bool HasToDelete(const TUnifiedBlobId& blobId, const TTabletId initiatorTabletId) const = 0;
Expand Down Expand Up @@ -120,7 +121,8 @@ class IBlobsStorageOperator {
}

[[nodiscard]] std::shared_ptr<IBlobsGCAction> CreateGC() {
NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_BLOBS)("storage_id", GetStorageId())("tablet_id", GetSelfTabletId());
NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_BLOBS)(
"storage_id", GetStorageId())("tablet_id", GetSelfTabletId());
if (CurrentGCAction && CurrentGCAction->IsInProgress()) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_BLOBS)("event", "gc_in_progress");
return nullptr;
Expand All @@ -137,4 +139,4 @@ class IBlobsStorageOperator {
virtual bool IsReady() const = 0;
};

}
} // namespace NKikimr::NOlap
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/blobs_action/local/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,4 @@ class TOperator: public IBlobsStorageOperator {
}
};

}
} // namespace NKikimr::NOlap::NBlobOperations::NLocal
3 changes: 2 additions & 1 deletion ydb/core/tx/columnshard/engines/scheme/index_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,8 @@ NKikimr::TConclusionStatus TIndexInfo::AppendIndex(const THashMap<ui32, std::vec
std::shared_ptr<IPortionDataChunk> chunk = index->BuildIndex(originalData, recordsCount, *this);
auto opStorage = operators->GetOperatorVerified(index->GetStorageId());
if ((i64)chunk->GetPackedSize() > opStorage->GetBlobSplitSettings().GetMaxBlobSize()) {
return TConclusionStatus::Fail("blob size for secondary data (" + ::ToString(indexId) + ") bigger than limit (" +
return TConclusionStatus::Fail("blob size for secondary data (" + ::ToString(indexId) + ":" + ::ToString(chunk->GetPackedSize()) + ":" +
::ToString(recordsCount) + ") bigger than limit (" +
::ToString(opStorage->GetBlobSplitSettings().GetMaxBlobSize()) + ")");
}
if (index->GetStorageId() == IStoragesManager::LocalMetadataStorageId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
#include <ydb/core/formats/arrow/accessor/plain/accessor.h>
#include <ydb/core/formats/arrow/arrow_helpers.h>
#include <ydb/core/formats/arrow/serializer/native.h>
#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>
#include <ydb/core/tx/columnshard/engines/portions/write_with_blobs.h>
#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>
#include <ydb/core/tx/columnshard/engines/storage/chunks/column.h>
#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
#include <ydb/core/tx/columnshard/splitter/batch_slice.h>

#include <ydb/library/formats/arrow/simple_arrays_cache.h>
Expand Down Expand Up @@ -60,7 +61,7 @@ TConclusion<std::shared_ptr<NArrow::TGeneralContainer>> ISnapshotSchema::Normali
}
if (restoreColumnIds.contains(columnId)) {
AFL_VERIFY(!!GetExternalDefaultValueVerified(columnId) || GetIndexInfo().IsNullableVerified(columnId))("column_name",
GetIndexInfo().GetColumnName(columnId, false))("id", columnId);
GetIndexInfo().GetColumnName(columnId, false))("id", columnId);
result->AddField(resultField, GetColumnLoaderVerified(columnId)->BuildDefaultAccessor(batch->num_rows())).Validate();
}
}
Expand Down Expand Up @@ -324,7 +325,8 @@ TConclusion<TWritePortionInfoWithBlobsResult> ISnapshotSchema::PrepareForWrite(c

TGeneralSerializedSlice slice(chunks, schemaDetails, splitterCounters);
std::vector<TSplittedBlob> blobs;
if (!slice.GroupBlobs(blobs, NSplitter::TEntityGroups(NSplitter::TSplitSettings(), NBlobOperations::TGlobal::DefaultStorageId))) {
if (!slice.GroupBlobs(blobs, NSplitter::TEntityGroups(NYDBTest::TControllers::GetColumnShardController()->GetBlobSplitSettings(),
NBlobOperations::TGlobal::DefaultStorageId))) {
return TConclusionStatus::Fail("cannot split data for appropriate blobs size");
}
auto constructor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,32 @@ class TFixStringBitsStorage {
return (bytesCount + ((bitsCount % 8) ? 1 : 0)) * 8;
}

TString DebugString() const {
TStringBuilder sb;
ui32 count1 = 0;
ui32 count0 = 0;
for (ui32 i = 0; i < GetSizeBits(); ++i) {
if (Get(i)) {
// sb << 1 << " ";
++count1;
} else {
// sb << 0 << " ";
++count0;
}
// if (i % 20 == 0) {
// sb << i << " ";
// }
}
sb << GetSizeBits() << "=" << count0 << "[0]+" << count1 << "[1]";
return sb;
}

template <class TBitsVector>
TFixStringBitsStorage(const TBitsVector& bitsVector)
: TFixStringBitsStorage(TSizeDetector<TBitsVector>::GetSize(bitsVector)) {
ui32 byteIdx = 0;
ui8 byteCurrent = 0;
ui8 shiftCurrent = 0;
ui8 shiftCurrent = 1;
for (ui32 i = 0; i < TSizeDetector<TBitsVector>::GetSize(bitsVector); ++i) {
if (i && i % 8 == 0) {
Data[byteIdx] = (char)byteCurrent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@ bool TFilterChecker::DoCheckImpl(const std::vector<TString>& blobs) const {
for (auto&& blob : blobs) {
TFixStringBitsStorage bits(blob);
bool found = true;
TStringBuilder sb;
for (auto&& i : HashValues) {
sb << i % bits.GetSizeBits() << ",";
if (!bits.Get(i % bits.GetSizeBits())) {
found = false;
break;
}
}
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("size", bits.GetSizeBits())("found", found)("hashes", sb)("details", bits.DebugString());
if (found) {
// AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("size", bArray.length())("data", bArray.ToString())("index_id", GetIndexId());
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

namespace NKikimr::NOlap::NIndexes::NBloomNGramm {

TString TConstants::GetRecordsCountIntervalString() {
return TStringBuilder() << "[" << MinRecordsCount << ", " << MaxRecordsCount << "]";
}

TString TConstants::GetHashesCountIntervalString() {
return TStringBuilder() << "[" << MinHashesCount << ", " << MaxHashesCount << "]";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ class TConstants {
static constexpr ui32 MaxHashesCount = 8;
static constexpr ui32 MinFilterSizeBytes = 128;
static constexpr ui32 MaxFilterSizeBytes = 1 << 20;
static constexpr ui32 MinRecordsCount = 128;
static constexpr ui32 MaxRecordsCount = 1000000;

static bool CheckRecordsCount(const ui32 value) {
return MinRecordsCount <= value && value <= MaxRecordsCount;
}

static bool CheckNGrammSize(const ui32 value) {
return MinNGrammSize <= value && value <= MaxNGrammSize;
Expand All @@ -26,6 +32,7 @@ class TConstants {
static TString GetHashesCountIntervalString();
static TString GetFilterSizeBytesIntervalString();
static TString GetNGrammSizeIntervalString();
static TString GetRecordsCountIntervalString();
};

} // namespace NKikimr::NOlap::NIndexes::NBloomNGramm
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ std::shared_ptr<IIndexMeta> TIndexConstructor::DoCreateIndexMeta(
}
const ui32 columnId = columnInfo->GetId();
return std::make_shared<TIndexMeta>(indexId, indexName, GetStorageId().value_or(NBlobOperations::TGlobal::DefaultStorageId), columnId,
HashesCount, FilterSizeBytes, NGrammSize);
HashesCount, FilterSizeBytes, NGrammSize, RecordsCount);
}

TConclusionStatus TIndexConstructor::DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) {
Expand All @@ -29,6 +29,14 @@ TConclusionStatus TIndexConstructor::DoDeserializeFromJson(const NJson::TJsonVal
return TConclusionStatus::Fail("empty column_name in bloom ngramm filter features");
}

if (!jsonInfo["records_count"].IsUInteger()) {
return TConclusionStatus::Fail("records_count have to be in bloom filter features as uint field");
}
RecordsCount = jsonInfo["records_count"].GetUInteger();
if (!TConstants::CheckRecordsCount(RecordsCount)) {
return TConclusionStatus::Fail("records_count have to be in bloom ngramm filter in interval " + TConstants::GetRecordsCountIntervalString());
}

if (!jsonInfo["ngramm_size"].IsUInteger()) {
return TConclusionStatus::Fail("ngramm_size have to be in bloom filter features as uint field");
}
Expand Down Expand Up @@ -64,6 +72,10 @@ NKikimr::TConclusionStatus TIndexConstructor::DoDeserializeFromProto(const NKiki
return TConclusionStatus::Fail(errorMessage);
}
auto& bFilter = proto.GetBloomNGrammFilter();
RecordsCount = bFilter.GetRecordsCount();
if (!TConstants::CheckRecordsCount(RecordsCount)) {
return TConclusionStatus::Fail("RecordsCount have to be in " + TConstants::GetRecordsCountIntervalString());
}
NGrammSize = bFilter.GetNGrammSize();
if (!TConstants::CheckNGrammSize(NGrammSize)) {
return TConclusionStatus::Fail("NGrammSize have to be in " + TConstants::GetNGrammSizeIntervalString());
Expand All @@ -86,6 +98,7 @@ NKikimr::TConclusionStatus TIndexConstructor::DoDeserializeFromProto(const NKiki
void TIndexConstructor::DoSerializeToProto(NKikimrSchemeOp::TOlapIndexRequested& proto) const {
auto* filterProto = proto.MutableBloomNGrammFilter();
filterProto->SetColumnName(ColumnName);
filterProto->SetRecordsCount(RecordsCount);
filterProto->SetNGrammSize(NGrammSize);
filterProto->SetFilterSizeBytes(FilterSizeBytes);
filterProto->SetHashesCount(HashesCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class TIndexConstructor: public IIndexMetaConstructor {
ui32 NGrammSize = 3;
ui32 FilterSizeBytes = 512;
ui32 HashesCount = 2;
ui32 RecordsCount = 10000;
static inline auto Registrator = TFactory::TRegistrator<TIndexConstructor>(GetClassNameStatic());

protected:
Expand Down
Loading
Loading