Skip to content

Commit 263288a

Browse files
authored
Merge c289a2c into 979cba7
2 parents 979cba7 + c289a2c commit 263288a

File tree

7 files changed

+88
-15
lines changed

7 files changed

+88
-15
lines changed

ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -358,27 +358,40 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrapped<TStreamExecuteScanQ
358358
ExecuterActorId_ = ev->Sender;
359359
}
360360

361+
auto& evRecord = ev->Get()->Record;
362+
361363
Ydb::Table::ExecuteScanQueryPartialResponse response;
362-
response.set_status(StatusIds::SUCCESS);
363-
response.mutable_result()->mutable_result_set()->Swap(ev->Get()->Record.MutableResultSet());
364+
365+
{
366+
response.set_status(StatusIds::SUCCESS);
367+
auto result = response.mutable_result();
368+
result->mutable_result_set()->Swap(evRecord.MutableResultSet());
369+
370+
if (evRecord.HasVirtualTimestamp()) {
371+
auto snap = result->mutable_snapshot();
372+
auto ts = evRecord.GetVirtualTimestamp();
373+
snap->set_plan_step(ts.GetStep());
374+
snap->set_tx_id(ts.GetTxId());
375+
}
376+
}
364377

365378
TString out;
366379
Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out);
367380

368381
FlowControl_.PushResponse(out.size());
369382
const i64 freeSpaceBytes = FlowControl_.FreeSpaceBytes();
370-
LastSeqNo_ = ev->Get()->Record.GetSeqNo();
383+
LastSeqNo_ = evRecord.GetSeqNo();
371384
AckedFreeSpaceBytes_ = freeSpaceBytes;
372385

373386
Request_->SendSerializedResult(std::move(out), StatusIds::SUCCESS);
374387

375388
LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " Send stream data ack"
376-
<< ", seqNo: " << ev->Get()->Record.GetSeqNo()
389+
<< ", seqNo: " << evRecord.GetSeqNo()
377390
<< ", freeSpace: " << freeSpaceBytes
378391
<< ", to: " << ev->Sender
379392
<< ", queue: " << FlowControl_.QueueSize());
380393

381-
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId());
394+
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(evRecord.GetSeqNo(), evRecord.GetChannelId());
382395
resp->Record.SetFreeSpace(freeSpaceBytes);
383396

384397
ctx.Send(ev->Sender, resp.Release());

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,12 @@ class TKqpExecuterBase : public TActor<TDerived> {
333333
streamEv->Record.SetSeqNo(computeData.Proto.GetSeqNo());
334334
streamEv->Record.SetQueryResultIndex(*txResult.QueryResultIndex + StatementResultIndex);
335335
streamEv->Record.SetChannelId(channel.Id);
336+
const auto& snap = GetSnapshot();
337+
if (snap.IsValid()) {
338+
auto vt = streamEv->Record.MutableVirtualTimestamp();
339+
vt->SetStep(snap.Step);
340+
vt->SetTxId(snap.TxId);
341+
}
336342

337343
TKqpProtoBuilder protoBuilder{*AppData()->FunctionRegistry};
338344
protoBuilder.BuildYdbResultSet(*streamEv->Record.MutableResultSet(), std::move(batches),

ydb/core/kqp/ut/scan/kqp_scan_ut.cpp

Lines changed: 43 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -531,17 +531,52 @@ Y_UNIT_TEST_SUITE(KqpScan) {
531531
.Build()
532532
.Build();
533533

534-
auto it = db.StreamExecuteScanQuery(R"(
535-
DECLARE $key AS Uint64;
536534

537-
SELECT * FROM `/Root/EightShard` WHERE Key = $key;
538-
)", params).GetValueSync();
535+
{
536+
auto it = db.StreamExecuteScanQuery(R"(
537+
DECLARE $key AS Uint64;
539538
540-
UNIT_ASSERT(it.IsSuccess());
539+
SELECT * FROM `/Root/EightShard` WHERE Key = $key;
540+
)", params).GetValueSync();
541541

542-
CompareYson(R"([
543-
[[1];[202u];["Value2"]]
544-
])", StreamResultToYson(it));
542+
UNIT_ASSERT(it.IsSuccess());
543+
544+
CompareYson(R"([
545+
[[1];[202u];["Value2"]]
546+
])", StreamResultToYson(it));
547+
}
548+
549+
{
550+
auto it = db.StreamExecuteScanQuery(R"(
551+
DECLARE $key AS Uint64;
552+
553+
SELECT * FROM `/Root/EightShard` WHERE Key = $key;
554+
)", params).GetValueSync();
555+
556+
UNIT_ASSERT(it.IsSuccess());
557+
auto part = it.ReadNext().GetValueSync();
558+
UNIT_ASSERT(part.IsSuccess());
559+
560+
561+
UNIT_ASSERT(part.HasVirtualTimestamp());
562+
UNIT_ASSERT(part.GetVirtualTimestamp().GetStep() != 0);
563+
UNIT_ASSERT(part.GetVirtualTimestamp().GetTxId() != 0);
564+
}
565+
566+
{
567+
auto it = db.StreamExecuteScanQuery(R"(
568+
SELECT * FROM `/Root/EightShard` WHERE Key = 9876554123;
569+
)", params).GetValueSync();
570+
571+
UNIT_ASSERT(it.IsSuccess());
572+
auto part = it.ReadNext().GetValueSync();
573+
UNIT_ASSERT(part.IsSuccess());
574+
575+
576+
UNIT_ASSERT(part.HasVirtualTimestamp());
577+
UNIT_ASSERT(part.GetVirtualTimestamp().GetStep() != 0);
578+
UNIT_ASSERT(part.GetVirtualTimestamp().GetTxId() != 0);
579+
}
545580
}
546581

547582
Y_UNIT_TEST(AggregateByColumn) {

ydb/core/protos/kqp.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,7 @@ message TEvExecuterStreamData {
492492
optional uint32 QueryResultIndex = 3;
493493
optional uint32 ChannelId = 4;
494494
optional NActorsProto.TActorId ChannelActorId = 5;
495+
optional TKqpSnapshot VirtualTimestamp = 6;
495496
};
496497

497498
message TEvExecuterStreamDataAck {

ydb/public/api/protos/ydb_table.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1293,6 +1293,8 @@ message ExecuteScanQueryPartialResult {
12931293
// works only in mode: MODE_EXPLAIN,
12941294
// collects additional diagnostics about query compilation, including query plan and scheme
12951295
string query_full_diagnostics = 7 [deprecated = true];
1296+
// Optional snapshot that corresponds to the returned data
1297+
VirtualTimestamp snapshot = 8;
12961298
}
12971299

12981300
// Returns information about an external data source with a given path.

ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/table/table.h

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2062,6 +2062,8 @@ class TReadTableSnapshot {
20622062
uint64_t TxId_;
20632063
};
20642064

2065+
using TVirtualTimestamp = TReadTableSnapshot;
2066+
20652067
template<typename TPart>
20662068
class TSimpleStreamPart : public TStreamPartStatus {
20672069
public:
@@ -2116,6 +2118,10 @@ class TScanQueryPart : public TStreamPartStatus {
21162118
const std::string& GetDiagnostics() const { return *Diagnostics_; }
21172119
std::string&& ExtractDiagnostics() { return std::move(*Diagnostics_); }
21182120

2121+
bool HasVirtualTimestamp() const { return Vt_.has_value(); }
2122+
const TVirtualTimestamp& GetVirtualTimestamp() const { return *Vt_; }
2123+
TVirtualTimestamp&& ExtractVirtualTimestamp() { return std::move(*Vt_); }
2124+
21192125
TScanQueryPart(TStatus&& status)
21202126
: TStreamPartStatus(std::move(status))
21212127
{}
@@ -2126,17 +2132,20 @@ class TScanQueryPart : public TStreamPartStatus {
21262132
, Diagnostics_(diagnostics)
21272133
{}
21282134

2129-
TScanQueryPart(TStatus&& status, TResultSet&& resultSet, const std::optional<TQueryStats>& queryStats, const std::optional<std::string>& diagnostics)
2135+
TScanQueryPart(TStatus&& status, TResultSet&& resultSet, const std::optional<TQueryStats>& queryStats,
2136+
const std::optional<std::string>& diagnostics, std::optional<TVirtualTimestamp>&& vt)
21302137
: TStreamPartStatus(std::move(status))
21312138
, ResultSet_(std::move(resultSet))
21322139
, QueryStats_(queryStats)
21332140
, Diagnostics_(diagnostics)
2141+
, Vt_(std::move(vt))
21342142
{}
21352143

21362144
private:
21372145
std::optional<TResultSet> ResultSet_;
21382146
std::optional<TQueryStats> QueryStats_;
21392147
std::optional<std::string> Diagnostics_;
2148+
std::optional<TVirtualTimestamp> Vt_;
21402149
};
21412150

21422151
using TAsyncScanQueryPart = NThreading::TFuture<TScanQueryPart>;

ydb/public/sdk/cpp/src/client/table/impl/readers.cpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,9 +89,16 @@ TAsyncScanQueryPart TScanQueryPartIterator::TReaderImpl::ReadNext(std::shared_pt
8989

9090
diagnostics = self->Response_.result().query_full_diagnostics();
9191

92+
std::optional<TVirtualTimestamp> vt;
93+
94+
if (self->Response_.result().has_snapshot()) {
95+
const auto& snap = self->Response_.result().snapshot();
96+
vt = TVirtualTimestamp(snap.plan_step(), snap.tx_id());
97+
}
98+
9299
if (self->Response_.result().has_result_set()) {
93100
promise.SetValue({std::move(status),
94-
TResultSet(std::move(*self->Response_.mutable_result()->mutable_result_set())), queryStats, diagnostics});
101+
TResultSet(std::move(*self->Response_.mutable_result()->mutable_result_set())), queryStats, diagnostics, std::move(vt)});
95102
} else {
96103
promise.SetValue({std::move(status), queryStats, diagnostics});
97104
}

0 commit comments

Comments
 (0)