Skip to content

Commit 8703fb9

Browse files
authored
allow reading from followers for vector index tables (#18175)
1 parent adfe657 commit 8703fb9

File tree

11 files changed

+100
-13
lines changed

11 files changed

+100
-13
lines changed

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -967,7 +967,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
967967
} else {
968968
settings = *stageInfo.Meta.ResolvedSinkSettings;
969969
}
970-
970+
971971
auto& lockTxId = TasksGraph.GetMeta().LockTxId;
972972
if (lockTxId) {
973973
settings.SetLockTxId(*lockTxId);
@@ -1178,6 +1178,8 @@ class TKqpExecuterBase : public TActor<TDerived> {
11781178
NKikimrTxDataShard::TKqpReadRangesSourceSettings* settings = input.Meta.SourceSettings;
11791179
FillTableMeta(stageInfo, settings->MutableTable());
11801180

1181+
settings->SetIsTableImmutable(source.GetIsTableImmutable());
1182+
11811183
for (auto& keyColumn : keyTypes) {
11821184
auto columnType = NScheme::ProtoColumnTypeFromTypeInfoMod(keyColumn, "");
11831185
*settings->AddKeyColumnTypeInfos() = columnType.TypeInfo ?

ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,11 @@ void BuildStreamLookupChannels(TKqpTasksGraph& graph, const TStageInfo& stageInf
367367
streamLookupTransform.InputType = streamLookup.GetLookupKeysType();
368368
streamLookupTransform.OutputType = streamLookup.GetResultType();
369369

370+
if (streamLookup.GetIsTableImmutable()) {
371+
settings->SetAllowUseFollowers(true);
372+
settings->SetIsTableImmutable(true);
373+
}
374+
370375
for (ui32 taskId = 0; taskId < inputStageInfo.Tasks.size(); ++taskId) {
371376
auto& originTask = graph.GetTask(inputStageInfo.Tasks[taskId]);
372377
auto& targetTask = graph.GetTask(stageInfo.Tasks[taskId]);
@@ -1159,18 +1164,26 @@ void FillInputDesc(const TKqpTasksGraph& tasksGraph, NYql::NDqProto::TTaskInput&
11591164
inputDesc.MutableSource()->SetWatermarksMode(input.WatermarksMode);
11601165
if (Y_LIKELY(input.Meta.SourceSettings)) {
11611166
enableMetering = true;
1162-
if (snapshot.IsValid()) {
1167+
YQL_ENSURE(input.Meta.SourceSettings->HasTable());
1168+
bool isTableImmutable = input.Meta.SourceSettings->GetIsTableImmutable();
1169+
1170+
if (snapshot.IsValid() && !isTableImmutable) {
11631171
input.Meta.SourceSettings->MutableSnapshot()->SetStep(snapshot.Step);
11641172
input.Meta.SourceSettings->MutableSnapshot()->SetTxId(snapshot.TxId);
11651173
}
11661174

1167-
if (tasksGraph.GetMeta().UseFollowers) {
1168-
input.Meta.SourceSettings->SetUseFollowers(tasksGraph.GetMeta().UseFollowers);
1175+
if (tasksGraph.GetMeta().UseFollowers || isTableImmutable) {
1176+
input.Meta.SourceSettings->SetUseFollowers(tasksGraph.GetMeta().UseFollowers || isTableImmutable);
11691177
}
11701178

11711179
if (serializeAsyncIoSettings) {
11721180
inputDesc.MutableSource()->MutableSettings()->PackFrom(*input.Meta.SourceSettings);
11731181
}
1182+
1183+
if (isTableImmutable) {
1184+
input.Meta.SourceSettings->SetAllowInconsistentReads(true);
1185+
}
1186+
11741187
} else {
11751188
YQL_ENSURE(input.SourceSettings);
11761189
inputDesc.MutableSource()->MutableSettings()->CopyFrom(*input.SourceSettings);
@@ -1209,21 +1222,25 @@ void FillInputDesc(const TKqpTasksGraph& tasksGraph, NYql::NDqProto::TTaskInput&
12091222
if (input.Meta.StreamLookupSettings) {
12101223
enableMetering = true;
12111224
YQL_ENSURE(input.Meta.StreamLookupSettings);
1212-
if (snapshot.IsValid()) {
1225+
bool isTableImmutable = input.Meta.StreamLookupSettings->GetIsTableImmutable();
1226+
1227+
if (snapshot.IsValid() && !isTableImmutable) {
12131228
input.Meta.StreamLookupSettings->MutableSnapshot()->SetStep(snapshot.Step);
12141229
input.Meta.StreamLookupSettings->MutableSnapshot()->SetTxId(snapshot.TxId);
12151230
} else {
1216-
YQL_ENSURE(tasksGraph.GetMeta().AllowInconsistentReads, "Expected valid snapshot or enabled inconsistent read mode");
1231+
YQL_ENSURE(tasksGraph.GetMeta().AllowInconsistentReads || isTableImmutable, "Expected valid snapshot or enabled inconsistent read mode");
12171232
input.Meta.StreamLookupSettings->SetAllowInconsistentReads(true);
12181233
}
12191234

1220-
if (lockTxId) {
1235+
if (lockTxId && !isTableImmutable) {
12211236
input.Meta.StreamLookupSettings->SetLockTxId(*lockTxId);
12221237
input.Meta.StreamLookupSettings->SetLockNodeId(tasksGraph.GetMeta().LockNodeId);
12231238
}
1224-
if (tasksGraph.GetMeta().LockMode) {
1239+
1240+
if (tasksGraph.GetMeta().LockMode && !isTableImmutable) {
12251241
input.Meta.StreamLookupSettings->SetLockMode(*tasksGraph.GetMeta().LockMode);
12261242
}
1243+
12271244
transformProto->MutableSettings()->PackFrom(*input.Meta.StreamLookupSettings);
12281245
} else if (input.Meta.SequencerSettings) {
12291246
transformProto->MutableSettings()->PackFrom(*input.Meta.SequencerSettings);

ydb/core/kqp/provider/yql_kikimr_datasource.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,7 @@ class TKiSourceLoadTableMetadataTransformer : public TGraphTransformerBase {
383383
do {
384384
auto nextImplTable = implTable->Next;
385385
auto& desc = SessionCtx->Tables().GetOrAddTable(implTable->Cluster, SessionCtx->GetDatabase(), implTable->Name);
386+
SessionCtx->Tables().AddIndexImplTableToMainTableMapping(tablePath, implTable->Name);
386387
desc.Metadata = std::move(implTable);
387388
desc.Load(ctx, sysColumnsEnabled);
388389
implTable = std::move(nextImplTable);

ydb/core/kqp/provider/yql_kikimr_provider.cpp

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,8 @@ TKikimrTableDescription& TKikimrTablesData::GetOrAddTable(const TString& cluster
204204
return Tables[std::make_pair(cluster, tablePath)];
205205
}
206206

207+
208+
207209
TKikimrTableDescription& TKikimrTablesData::GetTable(const TString& cluster, const TString& table) {
208210
auto tablePath = table;
209211
if (TempTablesState) {
@@ -220,6 +222,26 @@ TKikimrTableDescription& TKikimrTablesData::GetTable(const TString& cluster, con
220222
return *desc;
221223
}
222224

225+
bool TKikimrTablesData::IsTableImmutable(const TStringBuf& cluster, const TStringBuf& path) {
226+
auto mainTableImpl = GetMainTableIfTableIsImplTableOfIndex(cluster, path);
227+
if (mainTableImpl) {
228+
229+
for(const auto& index: mainTableImpl->Metadata->Indexes) {
230+
if (index.Type != TIndexDescription::EType::GlobalSyncVectorKMeansTree) {
231+
continue;
232+
}
233+
234+
for(const auto& implTable: index.GetImplTables()) {
235+
TString implTablePath = TStringBuilder() << mainTableImpl->Metadata->Name << "/" << index.Name << "/" << implTable;
236+
if (path == implTablePath)
237+
return true;
238+
}
239+
}
240+
}
241+
242+
return false;
243+
}
244+
223245
const TKikimrTableDescription& TKikimrTablesData::ExistingTable(const TStringBuf& cluster,
224246
const TStringBuf& table) const
225247
{

ydb/core/kqp/provider/yql_kikimr_provider.h

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,23 @@ class TKikimrTablesData : public TThrRefBase {
207207
return Tables;
208208
}
209209

210+
void AddIndexImplTableToMainTableMapping(const TString& mainTable, const TString& indexTable) {
211+
auto [it, success] = IndexTableToMainTable.emplace(indexTable, mainTable);
212+
if (!success) {
213+
YQL_ENSURE(it->second == mainTable);
214+
}
215+
}
216+
217+
const TKikimrTableDescription* GetMainTableIfTableIsImplTableOfIndex(const TStringBuf& cluster, const TStringBuf& id) {
218+
auto it = IndexTableToMainTable.find(id);
219+
if (it == IndexTableToMainTable.end()) {
220+
return nullptr;
221+
}
222+
return &ExistingTable(cluster, it->second);
223+
}
224+
225+
bool IsTableImmutable(const TStringBuf& cluster, const TStringBuf& path);
226+
210227
std::optional<TString> GetTempTablePath(const TStringBuf& table) const;
211228

212229
void Reset() {
@@ -220,6 +237,8 @@ class TKikimrTablesData : public TThrRefBase {
220237
private:
221238
THashMap<std::pair<TString, TString>, TKikimrTableDescription> Tables;
222239
NKikimr::NKqp::TKqpTempTablesState::TConstPtr TempTablesState;
240+
241+
THashMap<TString, TString> IndexTableToMainTable;
223242
};
224243

225244
enum class TYdbOperation : ui64 {

ydb/core/kqp/query_compiler/kqp_query_compiler.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1029,6 +1029,7 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
10291029
auto tableMeta = TablesData->ExistingTable(Cluster, settings.Table().Cast().Path()).Metadata;
10301030
YQL_ENSURE(tableMeta);
10311031

1032+
readProto.SetIsTableImmutable(TablesData->IsTableImmutable(Cluster, settings.Table().Cast().Path()));
10321033
{
10331034

10341035
THashMap<TString, const TExprNode*> columnsMap;
@@ -1473,6 +1474,8 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
14731474
auto tableMeta = TablesData->ExistingTable(Cluster, streamLookup.Table().Path()).Metadata;
14741475
YQL_ENSURE(tableMeta);
14751476

1477+
streamLookupProto.SetIsTableImmutable(TablesData->IsTableImmutable(Cluster, streamLookup.Table().Path()));
1478+
14761479
FillTablesMap(streamLookup.Table(), streamLookup.Columns(), tablesMap);
14771480
FillTableId(streamLookup.Table(), *streamLookupProto.MutableTable());
14781481

ydb/core/kqp/runtime/kqp_read_actor.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1044,6 +1044,10 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
10441044
BrokenLocks.push_back(lock);
10451045
}
10461046

1047+
if (UseFollowers) {
1048+
YQL_ENSURE(Locks.empty());
1049+
}
1050+
10471051
CA_LOG_D("Taken " << Locks.size() << " locks");
10481052
Reads[id].SerializedContinuationToken = record.GetContinuationToken();
10491053

ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ namespace {
2424

2525
static constexpr TDuration SCHEME_CACHE_REQUEST_TIMEOUT = TDuration::Seconds(10);
2626
NActors::TActorId MainPipeCacheId = NKikimr::MakePipePerNodeCacheID(false);
27+
NActors::TActorId FollowersPipeCacheId = NKikimr::MakePipePerNodeCacheID(true);
2728

2829
class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLookupActor>, public NYql::NDq::IDqComputeActorAsyncInput {
2930
public:
@@ -37,6 +38,8 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
3738
, Alloc(args.Alloc)
3839
, Snapshot(settings.GetSnapshot().GetStep(), settings.GetSnapshot().GetTxId())
3940
, AllowInconsistentReads(settings.GetAllowInconsistentReads())
41+
, UseFollowers(settings.GetAllowUseFollowers())
42+
, PipeCacheId(UseFollowers ? FollowersPipeCacheId : MainPipeCacheId)
4043
, LockTxId(settings.HasLockTxId() ? settings.GetLockTxId() : TMaybe<ui64>())
4144
, NodeLockId(settings.HasLockNodeId() ? settings.GetLockNodeId() : TMaybe<ui32>())
4245
, LockMode(settings.HasLockMode() ? settings.GetLockMode() : TMaybe<NKikimrDataEvents::ELockMode>())
@@ -207,11 +210,11 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
207210
Counters->SentIteratorCancels->Inc();
208211
auto cancel = MakeHolder<TEvDataShard::TEvReadCancel>();
209212
cancel->Record.SetReadId(id);
210-
Send(MainPipeCacheId, new TEvPipeCache::TEvForward(cancel.Release(), state.ShardId, false));
213+
Send(PipeCacheId, new TEvPipeCache::TEvForward(cancel.Release(), state.ShardId, false));
211214
}
212215
}
213216

214-
Send(MainPipeCacheId, new TEvPipeCache::TEvUnlink(0));
217+
Send(PipeCacheId, new TEvPipeCache::TEvUnlink(0));
215218
TActorBootstrapped<TKqpStreamLookupActor>::PassAway();
216219

217220
LookupActorSpan.End();
@@ -344,6 +347,14 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
344347
Locks.push_back(lock);
345348
}
346349

350+
if (UseFollowers) {
351+
YQL_ENSURE(Locks.empty());
352+
if (!record.GetFinished()) {
353+
RuntimeError("read from follower returned partial data.", NYql::NDqProto::StatusIds::INTERNAL_ERROR);
354+
return;
355+
}
356+
}
357+
347358
if (!Snapshot.IsValid()) {
348359
Snapshot = IKqpGateway::TKqpSnapshot(record.GetSnapshot().GetStep(), record.GetSnapshot().GetTxId());
349360
}
@@ -429,7 +440,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
429440
request->Record.SetMaxRows(defaultSettings.GetMaxRows());
430441
request->Record.SetMaxBytes(defaultSettings.GetMaxBytes());
431442

432-
Send(MainPipeCacheId, new TEvPipeCache::TEvForward(request.Release(), read.ShardId, true),
443+
Send(PipeCacheId, new TEvPipeCache::TEvForward(request.Release(), read.ShardId, true),
433444
IEventHandle::FlagTrackDelivery);
434445

435446
CA_LOG_D("TEvReadAck was sent to shard: " << read.ShardId);
@@ -562,7 +573,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
562573
<< ", lockTxId=" << record.GetLockTxId()
563574
<< ", lockNodeId=" << record.GetLockNodeId());
564575

565-
Send(MainPipeCacheId, new TEvPipeCache::TEvForward(request.Release(), shardId, true),
576+
Send(PipeCacheId, new TEvPipeCache::TEvForward(request.Release(), shardId, true),
566577
IEventHandle::FlagTrackDelivery, 0, LookupActorSpan.GetTraceId());
567578

568579
read.State = EReadState::Running;
@@ -699,6 +710,8 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
699710
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
700711
IKqpGateway::TKqpSnapshot Snapshot;
701712
const bool AllowInconsistentReads;
713+
const bool UseFollowers;
714+
const TActorId PipeCacheId;
702715
const TMaybe<ui64> LockTxId;
703716
const TMaybe<ui32> NodeLockId;
704717
const TMaybe<NKikimrDataEvents::ELockMode> LockMode;

ydb/core/protos/kqp.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -823,6 +823,8 @@ message TKqpStreamLookupSettings {
823823
optional bool AllowNullKeys = 12;
824824
optional uint32 AllowNullKeysPrefixSize = 13;
825825
optional NKikimrDataEvents.ELockMode LockMode = 14;
826+
optional bool AllowUseFollowers = 15;
827+
optional bool IsTableImmutable = 16;
826828
}
827829

828830
message TKqpSequencerSettings {

ydb/core/protos/kqp_physical.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,7 @@ message TKqpPhyCnStreamLookup {
304304
bool KeepRowsOrder = 7;
305305
bool AllowNullKeys = 8;
306306
uint32 AllowNullKeysPrefixSize = 9;
307+
bool IsTableImmutable = 10;
307308
}
308309

309310
message TKqpPhyCnSequencer {
@@ -348,6 +349,7 @@ message TKqpReadRangesSource {
348349

349350
repeated string SkipNullKeys = 8;
350351
uint64 SequentialInFlightShards = 9;
352+
bool IsTableImmutable = 10;
351353
}
352354

353355
message TKqpExternalSource {

0 commit comments

Comments
 (0)