Skip to content

Commit 84023c4

Browse files
committed
Return virtual timestamp for scan query
1 parent 5706f2d commit 84023c4

File tree

9 files changed

+97
-15
lines changed

9 files changed

+97
-15
lines changed

ydb/core/grpc_services/query/rpc_execute_query.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,13 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
354354
response->set_result_set_index(ev->Get()->Record.GetQueryResultIndex());
355355
response->mutable_result_set()->Swap(ev->Get()->Record.MutableResultSet());
356356

357+
if (ev->Get()->Record.HasVirtualTimestamp()) {
358+
auto snap = response->mutable_snapshot_timestamp();
359+
auto& ts = ev->Get()->Record.GetVirtualTimestamp();
360+
snap->set_plan_step(ts.GetStep());
361+
snap->set_tx_id(ts.GetTxId());
362+
}
363+
357364
TString out;
358365
Y_PROTOBUF_SUPPRESS_NODISCARD response->SerializeToString(&out);
359366

ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -358,27 +358,40 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrapped<TStreamExecuteScanQ
358358
ExecuterActorId_ = ev->Sender;
359359
}
360360

361+
auto& evRecord = ev->Get()->Record;
362+
361363
Ydb::Table::ExecuteScanQueryPartialResponse response;
362-
response.set_status(StatusIds::SUCCESS);
363-
response.mutable_result()->mutable_result_set()->Swap(ev->Get()->Record.MutableResultSet());
364+
365+
{
366+
response.set_status(StatusIds::SUCCESS);
367+
auto result = response.mutable_result();
368+
result->mutable_result_set()->Swap(evRecord.MutableResultSet());
369+
370+
if (evRecord.HasVirtualTimestamp()) {
371+
auto snap = result->mutable_snapshot();
372+
auto ts = evRecord.GetVirtualTimestamp();
373+
snap->set_plan_step(ts.GetStep());
374+
snap->set_tx_id(ts.GetTxId());
375+
}
376+
}
364377

365378
TString out;
366379
Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out);
367380

368381
FlowControl_.PushResponse(out.size());
369382
const i64 freeSpaceBytes = FlowControl_.FreeSpaceBytes();
370-
LastSeqNo_ = ev->Get()->Record.GetSeqNo();
383+
LastSeqNo_ = evRecord.GetSeqNo();
371384
AckedFreeSpaceBytes_ = freeSpaceBytes;
372385

373386
Request_->SendSerializedResult(std::move(out), StatusIds::SUCCESS);
374387

375388
LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " Send stream data ack"
376-
<< ", seqNo: " << ev->Get()->Record.GetSeqNo()
389+
<< ", seqNo: " << evRecord.GetSeqNo()
377390
<< ", freeSpace: " << freeSpaceBytes
378391
<< ", to: " << ev->Sender
379392
<< ", queue: " << FlowControl_.QueueSize());
380393

381-
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId());
394+
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(evRecord.GetSeqNo(), evRecord.GetChannelId());
382395
resp->Record.SetFreeSpace(freeSpaceBytes);
383396

384397
ctx.Send(ev->Sender, resp.Release());

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,12 @@ class TKqpExecuterBase : public TActor<TDerived> {
333333
streamEv->Record.SetSeqNo(computeData.Proto.GetSeqNo());
334334
streamEv->Record.SetQueryResultIndex(*txResult.QueryResultIndex + StatementResultIndex);
335335
streamEv->Record.SetChannelId(channel.Id);
336+
const auto& snap = GetSnapshot();
337+
if (snap.IsValid()) {
338+
auto vt = streamEv->Record.MutableVirtualTimestamp();
339+
vt->SetStep(snap.Step);
340+
vt->SetTxId(snap.TxId);
341+
}
336342

337343
TKqpProtoBuilder protoBuilder{*AppData()->FunctionRegistry};
338344
protoBuilder.BuildYdbResultSet(*streamEv->Record.MutableResultSet(), std::move(batches),

ydb/core/kqp/ut/scan/kqp_scan_ut.cpp

Lines changed: 43 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -531,17 +531,52 @@ Y_UNIT_TEST_SUITE(KqpScan) {
531531
.Build()
532532
.Build();
533533

534-
auto it = db.StreamExecuteScanQuery(R"(
535-
DECLARE $key AS Uint64;
536534

537-
SELECT * FROM `/Root/EightShard` WHERE Key = $key;
538-
)", params).GetValueSync();
535+
{
536+
auto it = db.StreamExecuteScanQuery(R"(
537+
DECLARE $key AS Uint64;
539538
540-
UNIT_ASSERT(it.IsSuccess());
539+
SELECT * FROM `/Root/EightShard` WHERE Key = $key;
540+
)", params).GetValueSync();
541541

542-
CompareYson(R"([
543-
[[1];[202u];["Value2"]]
544-
])", StreamResultToYson(it));
542+
UNIT_ASSERT(it.IsSuccess());
543+
544+
CompareYson(R"([
545+
[[1];[202u];["Value2"]]
546+
])", StreamResultToYson(it));
547+
}
548+
549+
{
550+
auto it = db.StreamExecuteScanQuery(R"(
551+
DECLARE $key AS Uint64;
552+
553+
SELECT * FROM `/Root/EightShard` WHERE Key = $key;
554+
)", params).GetValueSync();
555+
556+
UNIT_ASSERT(it.IsSuccess());
557+
auto part = it.ReadNext().GetValueSync();
558+
UNIT_ASSERT(part.IsSuccess());
559+
560+
561+
UNIT_ASSERT(part.HasVirtualTimestamp());
562+
UNIT_ASSERT(part.GetVirtualTimestamp().GetStep() != 0);
563+
UNIT_ASSERT(part.GetVirtualTimestamp().GetTxId() != 0);
564+
}
565+
566+
{
567+
auto it = db.StreamExecuteScanQuery(R"(
568+
SELECT * FROM `/Root/EightShard` WHERE Key = 9876554123;
569+
)", params).GetValueSync();
570+
571+
UNIT_ASSERT(it.IsSuccess());
572+
auto part = it.ReadNext().GetValueSync();
573+
UNIT_ASSERT(part.IsSuccess());
574+
575+
576+
UNIT_ASSERT(part.HasVirtualTimestamp());
577+
UNIT_ASSERT(part.GetVirtualTimestamp().GetStep() != 0);
578+
UNIT_ASSERT(part.GetVirtualTimestamp().GetTxId() != 0);
579+
}
545580
}
546581

547582
Y_UNIT_TEST(AggregateByColumn) {

ydb/core/protos/kqp.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -512,6 +512,7 @@ message TEvExecuterStreamData {
512512
optional uint32 QueryResultIndex = 3;
513513
optional uint32 ChannelId = 4;
514514
optional NActorsProto.TActorId ChannelActorId = 5;
515+
optional TKqpSnapshot VirtualTimestamp = 6;
515516
};
516517

517518
message TEvExecuterStreamDataAck {

ydb/public/api/protos/ydb_query.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ option java_outer_classname = "QueryProtos";
88
import "google/protobuf/duration.proto";
99

1010
import "ydb/public/api/protos/annotations/validation.proto";
11+
import "ydb/public/api/protos/ydb_common.proto";
1112
import "ydb/public/api/protos/ydb_issue_message.proto";
1213
import "ydb/public/api/protos/ydb_operation.proto";
1314
import "ydb/public/api/protos/ydb_query_stats.proto";
@@ -200,6 +201,7 @@ message ExecuteQueryResponsePart {
200201
Ydb.TableStats.QueryStats exec_stats = 5;
201202

202203
TransactionMeta tx_meta = 6;
204+
VirtualTimestamp snapshot_timestamp = 7;
203205
}
204206

205207
message ExecuteScriptRequest {

ydb/public/api/protos/ydb_table.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1293,6 +1293,8 @@ message ExecuteScanQueryPartialResult {
12931293
// works only in mode: MODE_EXPLAIN,
12941294
// collects additional diagnostics about query compilation, including query plan and scheme
12951295
string query_full_diagnostics = 7 [deprecated = true];
1296+
// Optional snapshot that corresponds to the returned data
1297+
VirtualTimestamp snapshot = 8;
12961298
}
12971299

12981300
// Returns information about an external data source with a given path.

ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/table/table.h

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2063,6 +2063,8 @@ class TReadTableSnapshot {
20632063
uint64_t TxId_;
20642064
};
20652065

2066+
using TVirtualTimestamp = TReadTableSnapshot;
2067+
20662068
template<typename TPart>
20672069
class TSimpleStreamPart : public TStreamPartStatus {
20682070
public:
@@ -2117,6 +2119,10 @@ class TScanQueryPart : public TStreamPartStatus {
21172119
const std::string& GetDiagnostics() const { return *Diagnostics_; }
21182120
std::string&& ExtractDiagnostics() { return std::move(*Diagnostics_); }
21192121

2122+
bool HasVirtualTimestamp() const { return Vt_.has_value(); }
2123+
const TVirtualTimestamp& GetVirtualTimestamp() const { return *Vt_; }
2124+
TVirtualTimestamp&& ExtractVirtualTimestamp() { return std::move(*Vt_); }
2125+
21202126
TScanQueryPart(TStatus&& status)
21212127
: TStreamPartStatus(std::move(status))
21222128
{}
@@ -2127,17 +2133,20 @@ class TScanQueryPart : public TStreamPartStatus {
21272133
, Diagnostics_(diagnostics)
21282134
{}
21292135

2130-
TScanQueryPart(TStatus&& status, TResultSet&& resultSet, const std::optional<TQueryStats>& queryStats, const std::optional<std::string>& diagnostics)
2136+
TScanQueryPart(TStatus&& status, TResultSet&& resultSet, const std::optional<TQueryStats>& queryStats,
2137+
const std::optional<std::string>& diagnostics, std::optional<TVirtualTimestamp>&& vt)
21312138
: TStreamPartStatus(std::move(status))
21322139
, ResultSet_(std::move(resultSet))
21332140
, QueryStats_(queryStats)
21342141
, Diagnostics_(diagnostics)
2142+
, Vt_(std::move(vt))
21352143
{}
21362144

21372145
private:
21382146
std::optional<TResultSet> ResultSet_;
21392147
std::optional<TQueryStats> QueryStats_;
21402148
std::optional<std::string> Diagnostics_;
2149+
std::optional<TVirtualTimestamp> Vt_;
21412150
};
21422151

21432152
using TAsyncScanQueryPart = NThreading::TFuture<TScanQueryPart>;

ydb/public/sdk/cpp/src/client/table/impl/readers.cpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,9 +89,16 @@ TAsyncScanQueryPart TScanQueryPartIterator::TReaderImpl::ReadNext(std::shared_pt
8989

9090
diagnostics = self->Response_.result().query_full_diagnostics();
9191

92+
std::optional<TVirtualTimestamp> vt;
93+
94+
if (self->Response_.result().has_snapshot()) {
95+
const auto& snap = self->Response_.result().snapshot();
96+
vt = TVirtualTimestamp(snap.plan_step(), snap.tx_id());
97+
}
98+
9299
if (self->Response_.result().has_result_set()) {
93100
promise.SetValue({std::move(status),
94-
TResultSet(std::move(*self->Response_.mutable_result()->mutable_result_set())), queryStats, diagnostics});
101+
TResultSet(std::move(*self->Response_.mutable_result()->mutable_result_set())), queryStats, diagnostics, std::move(vt)});
95102
} else {
96103
promise.SetValue({std::move(status), queryStats, diagnostics});
97104
}

0 commit comments

Comments
 (0)