Skip to content

Commit 7760964

Browse files
corrections
1 parent 526f125 commit 7760964

35 files changed

+500
-157
lines changed

.github/config/muted_ya.txt

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,6 @@ ydb/core/kqp/ut/olap KqpOlapBlobsSharing.BlobsSharingSplit1_1_clean_with_restart
2424
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.TableReshardingConsistency64
2525
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.TableReshardingModuloN
2626
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.UpsertWhileSplitTest
27-
ydb/core/kqp/ut/olap KqpOlapIndexes.IndexesActualization
28-
ydb/core/kqp/ut/olap KqpOlapIndexes.IndexesInBS
29-
ydb/core/kqp/ut/olap KqpOlapIndexes.IndexesInLocalMetadata
30-
ydb/core/kqp/ut/olap KqpOlapIndexes.IndexesModificationError
3127
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.*
3228
ydb/core/kqp/ut/olap KqpOlapStatistics.StatsUsageWithTTL
3329
ydb/core/kqp/ut/olap KqpOlapWrite.TierDraftsGCWithRestart

ydb/core/kqp/ut/olap/kqp_olap_ut.cpp

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2719,6 +2719,64 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
27192719

27202720
}
27212721

2722+
Y_UNIT_TEST(MetadataMemoryManager) {
2723+
auto settings = TKikimrSettings().SetWithSampleTables(false);
2724+
TKikimrRunner kikimr(settings);
2725+
2726+
TLocalHelper(kikimr).CreateTestOlapTable();
2727+
auto tableClient = kikimr.GetTableClient();
2728+
2729+
// Tests::NCommon::TLoggerInit(kikimr).Initialize();
2730+
2731+
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
2732+
csController->SetOverrideReduceMemoryIntervalLimit(1LLU << 30);
2733+
2734+
WriteTestData(kikimr, "/Root/olapStore/olapTable", 1000000, 300000000, 10000);
2735+
WriteTestData(kikimr, "/Root/olapStore/olapTable", 1100000, 300100000, 10000);
2736+
{
2737+
auto it = tableClient
2738+
.StreamExecuteScanQuery(R"(
2739+
--!syntax_v1
2740+
2741+
SELECT
2742+
COUNT(*)
2743+
FROM `/Root/olapStore/olapTable`
2744+
)")
2745+
.GetValueSync();
2746+
2747+
UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
2748+
TString result = StreamResultToYson(it);
2749+
Cout << result << Endl;
2750+
CompareYson(result, R"([[20000u;]])");
2751+
}
2752+
{
2753+
auto alterQuery =
2754+
TStringBuilder() <<
2755+
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_OPTIONS, `METADATA_MEMORY_MANAGER.CLASS_NAME`=`local_db`,
2756+
`METADATA_MEMORY_MANAGER.FEATURES`=`{"memory_cache_size" : 0}`);
2757+
)";
2758+
auto session = tableClient.CreateSession().GetValueSync().GetSession();
2759+
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
2760+
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
2761+
}
2762+
{
2763+
auto it = tableClient
2764+
.StreamExecuteScanQuery(R"(
2765+
--!syntax_v1
2766+
2767+
SELECT
2768+
COUNT(*)
2769+
FROM `/Root/olapStore/olapTable`
2770+
)")
2771+
.GetValueSync();
2772+
2773+
UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
2774+
TString result = StreamResultToYson(it);
2775+
Cout << result << Endl;
2776+
CompareYson(result, R"([[20000u;]])");
2777+
}
2778+
}
2779+
27222780
Y_UNIT_TEST(NormalizeAbsentColumn) {
27232781
auto settings = TKikimrSettings().SetWithSampleTables(false);
27242782
TKikimrRunner kikimr(settings);

ydb/core/protos/flat_scheme_op.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -472,6 +472,7 @@ message TMetadataManagerConstructorContainer {
472472
}
473473

474474
message TLocalDB {
475+
optional uint64 MemoryCacheSize = 1 [default = 128000000];
475476
}
476477

477478
oneof Implementation {

ydb/core/tx/columnshard/columnshard_impl.cpp

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet)
8181
, PeriodicWakeupActivationPeriod(NYDBTest::TControllers::GetColumnShardController()->GetPeriodicWakeupActivationPeriod())
8282
, StatsReportInterval(NYDBTest::TControllers::GetColumnShardController()->GetStatsReportInterval())
8383
, InFlightReadsTracker(StoragesManager, Counters.GetRequestsTracingCounters())
84-
, TablesManager(StoragesManager, std::make_shared<NOlap::NDataAccessorControl::TLocalManager>(), info->TabletID)
84+
, TablesManager(StoragesManager, std::make_shared<NOlap::NDataAccessorControl::TLocalManager>(nullptr), info->TabletID)
8585
, Subscribers(std::make_shared<NSubscriber::TManager>(*this))
8686
, PipeClientCache(NTabletPipe::CreateBoundedClientCache(new NTabletPipe::TBoundedClientCacheConfig(), GetPipeClientConfig()))
8787
, InsertTable(std::make_unique<NOlap::TInsertTable>())
@@ -1218,23 +1218,36 @@ void TColumnShard::Handle(NOlap::NDataSharing::NEvents::TEvFinishedFromSource::T
12181218
class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
12191219
private:
12201220
using TBase = TTransactionBase<TColumnShard>;
1221-
std::shared_ptr<NOlap::TDataAccessorsRequest> Request;
1221+
std::shared_ptr<NOlap::NDataAccessorControl::IAccessorCallback> FetchCallback;
12221222
THashMap<ui64, std::vector<NOlap::TPortionInfo::TConstPtr>> PortionsByPath;
1223-
THashMap<ui64, std::vector<NOlap::TPortionDataAccessor>> Accessors;
1224-
NOlap::TDataAccessorsResult Result;
1223+
std::vector<NOlap::TPortionDataAccessor> FetchedAccessors;
12251224

12261225
public:
1227-
TTxAskPortionChunks(TColumnShard* self, const std::shared_ptr<NOlap::TDataAccessorsRequest>& request)
1226+
TTxAskPortionChunks(TColumnShard* self, const std::shared_ptr<NOlap::NDataAccessorControl::IAccessorCallback>& fetchCallback,
1227+
THashMap<ui64, NOlap::TPortionInfo::TConstPtr>&& portions)
12281228
: TBase(self)
1229-
, Request(request) {
1230-
for (auto&& i : Request->GetPathIds()) {
1231-
PortionsByPath.emplace(i, Request->StartFetching(i));
1229+
, FetchCallback(fetchCallback) {
1230+
for (auto&& i : portions) {
1231+
PortionsByPath[i.second->GetPathId()].emplace_back(i.second);
12321232
}
12331233
}
12341234

12351235
bool Execute(TTransactionContext& txc, const TActorContext& /*ctx*/) override {
12361236
NIceDb::TNiceDb db(txc.DB);
12371237
TBlobGroupSelector selector(Self->Info());
1238+
bool reask = false;
1239+
for (auto&& i : PortionsByPath) {
1240+
for (auto&& p : i.second) {
1241+
auto rowset = db.Table<NColumnShard::Schema::IndexColumnsV1>().Prefix(p->GetPathId(), p->GetPortionId()).Select();
1242+
if (!rowset.IsReady()) {
1243+
reask = true;
1244+
}
1245+
}
1246+
}
1247+
if (reask) {
1248+
return false;
1249+
}
1250+
12381251
for (auto&& i : PortionsByPath) {
12391252
while (i.second.size()) {
12401253
auto p = i.second.back();
@@ -1264,26 +1277,23 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
12641277
}
12651278
}
12661279
}
1267-
NOlap::TPortionDataAccessor accessor =
1268-
NOlap::TPortionAccessorConstructor::BuildForLoading(p, std::move(records), std::move(indexes));
1269-
Accessors[i.first].emplace_back(accessor);
1280+
FetchedAccessors.emplace_back(NOlap::TPortionAccessorConstructor::BuildForLoading(p, std::move(records), std::move(indexes)));
12701281
i.second.pop_back();
12711282
}
12721283
}
1284+
1285+
FetchCallback->OnAccessorsFetched(std::move(FetchedAccessors));
12731286
return true;
12741287
}
12751288
void Complete(const TActorContext& /*ctx*/) override {
1276-
for (auto&& i : Accessors) {
1277-
Request->AddData(i.first, std::move(i.second));
1278-
}
12791289
}
12801290
TTxType GetTxType() const override {
12811291
return TXTYPE_WRITE_INDEX;
12821292
}
12831293
};
12841294

1285-
void TColumnShard::Handle(NOlap::NDataAccessorControl::TEvAskDataAccessors::TPtr& ev, const TActorContext& /*ctx*/) {
1286-
Execute(new TTxAskPortionChunks(this, ev->Get()->GetRequest()));
1295+
void TColumnShard::Handle(NOlap::NDataAccessorControl::TEvAskTabletDataAccessors::TPtr& ev, const TActorContext& /*ctx*/) {
1296+
Execute(new TTxAskPortionChunks(this, ev->Get()->GetCallback(), std::move(ev->Get()->MutablePortions())));
12871297
}
12881298

12891299
void TColumnShard::Handle(NOlap::NDataSharing::NEvents::TEvAckFinishFromInitiator::TPtr& ev, const TActorContext& ctx) {
@@ -1384,7 +1394,7 @@ void TColumnShard::ActivateTiering(const ui64 pathId, const TString& useTiering)
13841394

13851395
void TColumnShard::Enqueue(STFUNC_SIG) {
13861396
const TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", TabletID())(
1387-
"self_id", SelfId())("process", "Enqueue")("ev", ev->ToString());
1397+
"self_id", SelfId())("process", "Enqueue")("ev", ev->GetTypeName());
13881398
switch (ev->GetTypeRewrite()) {
13891399
HFunc(TEvPrivate::TEvTieringModified, Handle);
13901400
HFunc(TEvPrivate::TEvNormalizerResult, Handle);

ydb/core/tx/columnshard/columnshard_impl.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
281281
void Handle(NOlap::NDataSharing::NEvents::TEvAckFinishToSource::TPtr& ev, const TActorContext& ctx);
282282
void Handle(NOlap::NDataSharing::NEvents::TEvAckFinishFromInitiator::TPtr& ev, const TActorContext& ctx);
283283

284-
void Handle(NOlap::NDataAccessorControl::TEvAskDataAccessors::TPtr& ev, const TActorContext& ctx);
284+
void Handle(NOlap::NDataAccessorControl::TEvAskTabletDataAccessors::TPtr& ev, const TActorContext& ctx);
285285

286286
ITransaction* CreateTxInitSchema();
287287

@@ -385,7 +385,7 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
385385

386386
STFUNC(StateWork) {
387387
const TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", TabletID())(
388-
"self_id", SelfId())("ev", ev->ToString());
388+
"self_id", SelfId())("ev", ev->GetTypeName());
389389
TRACE_EVENT(NKikimrServices::TX_COLUMNSHARD);
390390
switch (ev->GetTypeRewrite()) {
391391
hFunc(NMetadata::NProvider::TEvRefreshSubscriberData, Handle);
@@ -441,7 +441,7 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
441441
HFunc(NOlap::NDataSharing::NEvents::TEvFinishedFromSource, Handle);
442442
HFunc(NOlap::NDataSharing::NEvents::TEvAckFinishToSource, Handle);
443443
HFunc(NOlap::NDataSharing::NEvents::TEvAckFinishFromInitiator, Handle);
444-
HFunc(NOlap::NDataAccessorControl::TEvAskDataAccessors, Handle);
444+
HFunc(NOlap::NDataAccessorControl::TEvAskTabletDataAccessors, Handle);
445445

446446
default:
447447
if (!HandleDefaultEvents(ev, SelfId())) {

ydb/core/tx/columnshard/columnshard_private_events.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ struct TEvPrivate {
6060

6161
EvRegisterGranuleDataAccessor,
6262
EvUnregisterGranuleDataAccessor,
63-
EvAskDataAccessors,
63+
EvAskTabletDataAccessors,
64+
EvAskServiceDataAccessors,
6465
EvAddPortionDataAccessor,
6566
EvRemovePortionDataAccessor,
6667

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,18 @@
11
#include "collector.h"
22

3+
#include <ydb/core/tx/columnshard/data_accessor/events.h>
34
#include <ydb/core/tx/columnshard/data_accessor/request.h>
45

56
namespace NKikimr::NOlap::NDataAccessorControl {
67

7-
void IGranuleDataAccessor::AskData(const std::shared_ptr<TDataAccessorsRequest>& request) {
8-
AFL_VERIFY(request);
9-
AFL_VERIFY(request->HasSubscriber());
10-
return DoAskData(request);
8+
THashMap<ui64, TPortionDataAccessor> IGranuleDataAccessor::AskData(
9+
const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<IAccessorCallback>& callback) {
10+
AFL_VERIFY(portions.size());
11+
return DoAskData(portions, callback);
12+
}
13+
14+
void TActorAccessorsCallback::OnAccessorsFetched(std::vector<TPortionDataAccessor>&& accessors) {
15+
NActors::TActivationContext::Send(ActorId, std::make_unique<TEvAddPortion>(std::move(accessors)));
1116
}
1217

1318
} // namespace NKikimr::NOlap::NDataAccessorControl

ydb/core/tx/columnshard/data_accessor/abstract/collector.h

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,29 @@
33
#include <ydb/core/tx/columnshard/engines/portions/data_accessor.h>
44

55
namespace NKikimr::NOlap::NDataAccessorControl {
6+
class IAccessorCallback {
7+
public:
8+
virtual void OnAccessorsFetched(std::vector<TPortionDataAccessor>&& accessors) = 0;
9+
virtual ~IAccessorCallback() = default;
10+
};
11+
12+
class TActorAccessorsCallback: public IAccessorCallback {
13+
private:
14+
const NActors::TActorId ActorId;
15+
16+
public:
17+
virtual void OnAccessorsFetched(std::vector<TPortionDataAccessor>&& accessors) override;
18+
TActorAccessorsCallback(const NActors::TActorId& actorId)
19+
: ActorId(actorId) {
20+
}
21+
};
22+
623
class IGranuleDataAccessor {
724
private:
825
const ui64 PathId;
926

10-
virtual void DoAskData(const std::shared_ptr<TDataAccessorsRequest>& request) = 0;
27+
virtual THashMap<ui64, TPortionDataAccessor> DoAskData(
28+
const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<IAccessorCallback>& callback) = 0;
1129
virtual void DoModifyPortions(const std::vector<TPortionDataAccessor>& add, const std::vector<ui64>& remove) = 0;
1230

1331
public:
@@ -21,7 +39,7 @@ class IGranuleDataAccessor {
2139
: PathId(pathId) {
2240
}
2341

24-
void AskData(const std::shared_ptr<TDataAccessorsRequest>& request);
42+
THashMap<ui64, TPortionDataAccessor> AskData(const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<IAccessorCallback>& callback);
2543
void ModifyPortions(const std::vector<TPortionDataAccessor>& add, const std::vector<ui64>& remove) {
2644
return DoModifyPortions(add, remove);
2745
}
Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
#include "constructor.h"
22

3+
#include <ydb/core/tx/columnshard/data_accessor/local_db/constructor.h>
4+
35
namespace NKikimr::NOlap::NDataAccessorControl {
46

57
std::shared_ptr<IManagerConstructor> IManagerConstructor::BuildDefault() {
6-
std::shared_ptr<IManagerConstructor> result(TFactory::Construct("in_mem"));
7-
AFL_VERIFY(!!result);
8-
return result;
8+
return NLocalDB::TManagerConstructor::BuildDefault();
99
}
1010

11-
}
11+
} // namespace NKikimr::NOlap::NDataAccessorControl

ydb/core/tx/columnshard/data_accessor/abstract/constructor.h

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,17 @@ namespace NKikimr::NOlap::NDataAccessorControl {
88
class TManagerConstructionContext {
99
private:
1010
YDB_READONLY_DEF(NActors::TActorId, TabletActorId);
11+
const bool IsUpdateFlag = false;
1112

1213
public:
13-
TManagerConstructionContext(const NActors::TActorId& tabletActorId)
14-
: TabletActorId(tabletActorId) {
14+
bool IsUpdate() const {
15+
return IsUpdateFlag;
16+
}
17+
18+
TManagerConstructionContext(const NActors::TActorId& tabletActorId, const bool isUpdate)
19+
: TabletActorId(tabletActorId)
20+
, IsUpdateFlag(isUpdate)
21+
{
1522
}
1623
};
1724

@@ -25,12 +32,21 @@ class IManagerConstructor {
2532
virtual bool DoDeserializeFromProto(const TProto& proto) = 0;
2633
virtual void DoSerializeToProto(TProto& proto) const = 0;
2734
virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) = 0;
35+
virtual bool IsEqualToWithSameClassName(const IManagerConstructor& /*item*/) const {
36+
return false;
37+
}
2838

2939
public:
3040
static std::shared_ptr<IManagerConstructor> BuildDefault();
3141

3242
virtual ~IManagerConstructor() = default;
3343

44+
bool IsEqualTo(const IManagerConstructor& item) const {
45+
if (GetClassName() != item.GetClassName()) {
46+
return false;
47+
}
48+
return IsEqualToWithSameClassName(item);
49+
}
3450
virtual TString GetClassName() const = 0;
3551

3652
TConclusionStatus DeserializeFromJson(const NJson::TJsonValue& jsonInfo) {
@@ -56,6 +72,16 @@ class TMetadataManagerConstructorContainer: public NBackgroundTasks::TInterfaceP
5672
public:
5773
using TBase::TBase;
5874

75+
bool IsEqualTo(const TMetadataManagerConstructorContainer& item) {
76+
if (TBase::HasObject() != item.HasObject()) {
77+
return false;
78+
}
79+
if (!TBase::HasObject()) {
80+
return true;
81+
}
82+
return TBase::GetObjectPtr()->IsEqualTo(*item.GetObjectPtr());
83+
}
84+
5985
static TConclusion<TMetadataManagerConstructorContainer> BuildFromProto(const NKikimrSchemeOp::TMetadataManagerConstructorContainer& proto) {
6086
TMetadataManagerConstructorContainer result;
6187
if (!result.DeserializeFromProto(proto)) {

0 commit comments

Comments
 (0)