Skip to content

Commit 9289583

Browse files
authored
Return virtual timestamp for scan query (#14001) (#18152)
Conflicts: ydb/core/kqp/executer_actor/kqp_executer_impl.h ydb/public/api/protos/ydb_table.proto ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/table/table.h ydb/public/sdk/cpp/src/client/table/impl/readers.cpp
1 parent d29067d commit 9289583

File tree

9 files changed

+98
-15
lines changed

9 files changed

+98
-15
lines changed

ydb/core/grpc_services/query/rpc_execute_query.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,13 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
327327
response.set_result_set_index(ev->Get()->Record.GetQueryResultIndex());
328328
response.mutable_result_set()->Swap(ev->Get()->Record.MutableResultSet());
329329

330+
if (ev->Get()->Record.HasVirtualTimestamp()) {
331+
auto snap = response.mutable_snapshot_timestamp();
332+
auto& ts = ev->Get()->Record.GetVirtualTimestamp();
333+
snap->set_plan_step(ts.GetStep());
334+
snap->set_tx_id(ts.GetTxId());
335+
}
336+
330337
TString out;
331338
Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out);
332339

ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -340,27 +340,40 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrapped<TStreamExecuteScanQ
340340
ExecuterActorId_ = ev->Sender;
341341
}
342342

343+
auto& evRecord = ev->Get()->Record;
344+
343345
Ydb::Table::ExecuteScanQueryPartialResponse response;
344-
response.set_status(StatusIds::SUCCESS);
345-
response.mutable_result()->mutable_result_set()->Swap(ev->Get()->Record.MutableResultSet());
346+
347+
{
348+
response.set_status(StatusIds::SUCCESS);
349+
auto result = response.mutable_result();
350+
result->mutable_result_set()->Swap(evRecord.MutableResultSet());
351+
352+
if (evRecord.HasVirtualTimestamp()) {
353+
auto snap = result->mutable_snapshot();
354+
auto ts = evRecord.GetVirtualTimestamp();
355+
snap->set_plan_step(ts.GetStep());
356+
snap->set_tx_id(ts.GetTxId());
357+
}
358+
}
346359

347360
TString out;
348361
Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out);
349362

350363
FlowControl_.PushResponse(out.size());
351364
const i64 freeSpaceBytes = FlowControl_.FreeSpaceBytes();
352-
LastSeqNo_ = ev->Get()->Record.GetSeqNo();
365+
LastSeqNo_ = evRecord.GetSeqNo();
353366
AckedFreeSpaceBytes_ = freeSpaceBytes;
354367

355368
Request_->SendSerializedResult(std::move(out), StatusIds::SUCCESS);
356369

357370
LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " Send stream data ack"
358-
<< ", seqNo: " << ev->Get()->Record.GetSeqNo()
371+
<< ", seqNo: " << evRecord.GetSeqNo()
359372
<< ", freeSpace: " << freeSpaceBytes
360373
<< ", to: " << ev->Sender
361374
<< ", queue: " << FlowControl_.QueueSize());
362375

363-
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId());
376+
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(evRecord.GetSeqNo(), evRecord.GetChannelId());
364377
resp->Record.SetFreeSpace(freeSpaceBytes);
365378

366379
ctx.Send(ev->Sender, resp.Release());

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,13 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
304304
streamEv->Record.SetChannelId(channel.Id);
305305
streamEv->Record.MutableResultSet()->Swap(&resultSet);
306306

307+
const auto& snap = GetSnapshot();
308+
if (snap.IsValid()) {
309+
auto vt = streamEv->Record.MutableVirtualTimestamp();
310+
vt->SetStep(snap.Step);
311+
vt->SetTxId(snap.TxId);
312+
}
313+
307314
LOG_D("Send TEvStreamData to " << Target << ", seqNo: " << streamEv->Record.GetSeqNo()
308315
<< ", nRows: " << streamEv->Record.GetResultSet().rows().size());
309316

ydb/core/kqp/ut/scan/kqp_scan_ut.cpp

Lines changed: 43 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -530,17 +530,52 @@ Y_UNIT_TEST_SUITE(KqpScan) {
530530
.Build()
531531
.Build();
532532

533-
auto it = db.StreamExecuteScanQuery(R"(
534-
DECLARE $key AS Uint64;
535533

536-
SELECT * FROM `/Root/EightShard` WHERE Key = $key;
537-
)", params).GetValueSync();
534+
{
535+
auto it = db.StreamExecuteScanQuery(R"(
536+
DECLARE $key AS Uint64;
538537
539-
UNIT_ASSERT(it.IsSuccess());
538+
SELECT * FROM `/Root/EightShard` WHERE Key = $key;
539+
)", params).GetValueSync();
540540

541-
CompareYson(R"([
542-
[[1];[202u];["Value2"]]
543-
])", StreamResultToYson(it));
541+
UNIT_ASSERT(it.IsSuccess());
542+
543+
CompareYson(R"([
544+
[[1];[202u];["Value2"]]
545+
])", StreamResultToYson(it));
546+
}
547+
548+
{
549+
auto it = db.StreamExecuteScanQuery(R"(
550+
DECLARE $key AS Uint64;
551+
552+
SELECT * FROM `/Root/EightShard` WHERE Key = $key;
553+
)", params).GetValueSync();
554+
555+
UNIT_ASSERT(it.IsSuccess());
556+
auto part = it.ReadNext().GetValueSync();
557+
UNIT_ASSERT(part.IsSuccess());
558+
559+
560+
UNIT_ASSERT(part.HasVirtualTimestamp());
561+
UNIT_ASSERT(part.GetVirtualTimestamp().GetStep() != 0);
562+
UNIT_ASSERT(part.GetVirtualTimestamp().GetTxId() != 0);
563+
}
564+
565+
{
566+
auto it = db.StreamExecuteScanQuery(R"(
567+
SELECT * FROM `/Root/EightShard` WHERE Key = 9876554123;
568+
)", params).GetValueSync();
569+
570+
UNIT_ASSERT(it.IsSuccess());
571+
auto part = it.ReadNext().GetValueSync();
572+
UNIT_ASSERT(part.IsSuccess());
573+
574+
575+
UNIT_ASSERT(part.HasVirtualTimestamp());
576+
UNIT_ASSERT(part.GetVirtualTimestamp().GetStep() != 0);
577+
UNIT_ASSERT(part.GetVirtualTimestamp().GetTxId() != 0);
578+
}
544579
}
545580

546581
Y_UNIT_TEST(AggregateByColumn) {

ydb/core/protos/kqp.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -498,6 +498,7 @@ message TEvExecuterStreamData {
498498
optional uint32 QueryResultIndex = 3;
499499
optional uint32 ChannelId = 4;
500500
optional NActorsProto.TActorId ChannelActorId = 5;
501+
optional TKqpSnapshot VirtualTimestamp = 6;
501502
};
502503

503504
message TEvExecuterStreamDataAck {

ydb/public/api/protos/ydb_query.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ option java_outer_classname = "QueryProtos";
88
import "google/protobuf/duration.proto";
99

1010
import "ydb/public/api/protos/annotations/validation.proto";
11+
import "ydb/public/api/protos/ydb_common.proto";
1112
import "ydb/public/api/protos/ydb_issue_message.proto";
1213
import "ydb/public/api/protos/ydb_operation.proto";
1314
import "ydb/public/api/protos/ydb_query_stats.proto";
@@ -191,6 +192,7 @@ message ExecuteQueryResponsePart {
191192
Ydb.TableStats.QueryStats exec_stats = 5;
192193

193194
TransactionMeta tx_meta = 6;
195+
VirtualTimestamp snapshot_timestamp = 7;
194196
}
195197

196198
message ExecuteScriptRequest {

ydb/public/api/protos/ydb_table.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1223,4 +1223,6 @@ message ExecuteScanQueryPartialResult {
12231223
// works only in mode: MODE_EXPLAIN,
12241224
// collects additional diagnostics about query compilation, including query plan and scheme
12251225
string query_full_diagnostics = 7;
1226+
// Optional snapshot that corresponds to the returned data
1227+
VirtualTimestamp snapshot = 8;
12261228
}

ydb/public/sdk/cpp/client/ydb_table/impl/readers.cpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,16 @@ TAsyncScanQueryPart TScanQueryPartIterator::TReaderImpl::ReadNext(std::shared_pt
9090

9191
diagnostics = self->Response_.result().query_full_diagnostics();
9292

93+
std::optional<TVirtualTimestamp> vt;
94+
95+
if (self->Response_.result().has_snapshot()) {
96+
const auto& snap = self->Response_.result().snapshot();
97+
vt = TVirtualTimestamp(snap.plan_step(), snap.tx_id());
98+
}
99+
93100
if (self->Response_.result().has_result_set()) {
94101
promise.SetValue({std::move(status),
95-
TResultSet(std::move(*self->Response_.mutable_result()->mutable_result_set())), queryStats, diagnostics});
102+
TResultSet(std::move(*self->Response_.mutable_result()->mutable_result_set())), queryStats, diagnostics, std::move(vt)});
96103
} else {
97104
promise.SetValue({std::move(status), queryStats, diagnostics});
98105
}

ydb/public/sdk/cpp/client/ydb_table/table.h

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1927,6 +1927,8 @@ class TReadTableSnapshot {
19271927
ui64 TxId_;
19281928
};
19291929

1930+
using TVirtualTimestamp = TReadTableSnapshot;
1931+
19301932
template<typename TPart>
19311933
class TSimpleStreamPart : public TStreamPartStatus {
19321934
public:
@@ -1980,6 +1982,10 @@ class TScanQueryPart : public TStreamPartStatus {
19801982
const TString& GetDiagnostics() const { return *Diagnostics_; }
19811983
TString&& ExtractDiagnostics() { return std::move(*Diagnostics_); }
19821984

1985+
bool HasVirtualTimestamp() const { return Vt_.has_value(); }
1986+
const TVirtualTimestamp& GetVirtualTimestamp() const { return *Vt_; }
1987+
TVirtualTimestamp&& ExtractVirtualTimestamp() { return std::move(*Vt_); }
1988+
19831989
TScanQueryPart(TStatus&& status)
19841990
: TStreamPartStatus(std::move(status))
19851991
{}
@@ -1990,17 +1996,20 @@ class TScanQueryPart : public TStreamPartStatus {
19901996
, Diagnostics_(diagnostics)
19911997
{}
19921998

1993-
TScanQueryPart(TStatus&& status, TResultSet&& resultSet, const TMaybe<TQueryStats>& queryStats, const TMaybe<TString>& diagnostics)
1999+
TScanQueryPart(TStatus&& status, TResultSet&& resultSet, const TMaybe<TQueryStats>& queryStats,
2000+
const TMaybe<TString>& diagnostics, std::optional<TVirtualTimestamp>&& vt)
19942001
: TStreamPartStatus(std::move(status))
19952002
, ResultSet_(std::move(resultSet))
19962003
, QueryStats_(queryStats)
19972004
, Diagnostics_(diagnostics)
2005+
, Vt_(std::move(vt))
19982006
{}
19992007

20002008
private:
20012009
TMaybe<TResultSet> ResultSet_;
20022010
TMaybe<TQueryStats> QueryStats_;
20032011
TMaybe<TString> Diagnostics_;
2012+
std::optional<TVirtualTimestamp> Vt_;
20042013
};
20052014

20062015
using TAsyncScanQueryPart = NThreading::TFuture<TScanQueryPart>;

0 commit comments

Comments
 (0)