Skip to content

Commit d17e400

Browse files
authored
Merge 3c07741 into 5c3d5c3
2 parents 5c3d5c3 + 3c07741 commit d17e400

File tree

3 files changed

+77
-6
lines changed

3 files changed

+77
-6
lines changed

ydb/core/tx/datashard/cdc_stream_scan.cpp

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include "datashard_impl.h"
44

55
#include <ydb/core/protos/datashard_config.pb.h>
6+
#include <ydb/core/protos/tx_datashard.pb.h>
67

78
#include <util/generic/maybe.h>
89
#include <util/string/builder.h>
@@ -17,6 +18,11 @@ using namespace NActors;
1718
using namespace NTable;
1819
using namespace NTabletFlatExecutor;
1920

21+
void TCdcStreamScanManager::TStats::Serialize(NKikimrTxDataShard::TEvCdcStreamScanResponse_TStats& proto) const {
22+
proto.SetRowsProcessed(RowsProcessed);
23+
proto.SetBytesProcessed(BytesProcessed);
24+
}
25+
2026
void TCdcStreamScanManager::Reset() {
2127
Scans.clear();
2228
TxIdToPathId.clear();
@@ -95,6 +101,7 @@ void TCdcStreamScanManager::Complete(const TPathId& streamPathId) {
95101
return;
96102
}
97103

104+
CompletedScans[streamPathId] = it->second.Stats;
98105
TxIdToPathId.erase(it->second.TxId);
99106
Scans.erase(it);
100107
}
@@ -104,6 +111,15 @@ void TCdcStreamScanManager::Complete(ui64 txId) {
104111
Complete(TxIdToPathId.at(txId));
105112
}
106113

114+
bool TCdcStreamScanManager::IsCompleted(const TPathId& streamPathId) const {
115+
return CompletedScans.contains(streamPathId);
116+
}
117+
118+
const TCdcStreamScanManager::TStats& TCdcStreamScanManager::GetCompletedStats(const TPathId& streamPathId) const {
119+
Y_ABORT_UNLESS(CompletedScans.contains(streamPathId));
120+
return CompletedScans.at(streamPathId);
121+
}
122+
107123
TCdcStreamScanManager::TScanInfo* TCdcStreamScanManager::Get(const TPathId& streamPathId) {
108124
return Scans.FindPtr(streamPathId);
109125
}
@@ -456,8 +472,7 @@ class TCdcStreamScan: public IActorCallback, public IScan {
456472
PathIdFromPathId(StreamPathId, response->Record.MutableStreamPathId());
457473
response->Record.SetStatus(status);
458474
response->Record.SetErrorDescription(error);
459-
response->Record.MutableStats()->SetRowsProcessed(Stats.RowsProcessed);
460-
response->Record.MutableStats()->SetBytesProcessed(Stats.BytesProcessed);
475+
Stats.Serialize(*response->Record.MutableStats());
461476

462477
Send(ReplyTo, std::move(response));
463478
}
@@ -572,11 +587,10 @@ class TDataShard::TTxCdcStreamScanRun: public TTransactionBase<TDataShard> {
572587
TEvDataShard::TEvCdcStreamScanRequest::TPtr Request;
573588
THolder<IEventHandle> Response; // response to sender or forward to scanner
574589

575-
THolder<IEventHandle> MakeResponse(const TActorContext& ctx,
576-
NKikimrTxDataShard::TEvCdcStreamScanResponse::EStatus status, const TString& error = {}) const
577-
{
590+
template <typename... Args>
591+
THolder<IEventHandle> MakeResponse(const TActorContext& ctx, Args&&... args) const {
578592
return MakeHolder<IEventHandle>(Request->Sender, ctx.SelfID, new TEvDataShard::TEvCdcStreamScanResponse(
579-
Request->Get()->Record, Self->TabletID(), status, error
593+
Request->Get()->Record, Self->TabletID(), std::forward<Args>(args)...
580594
));
581595
}
582596

@@ -643,6 +657,11 @@ class TDataShard::TTxCdcStreamScanRun: public TTransactionBase<TDataShard> {
643657
} else if (info->ScanId) {
644658
return true; // nop, scan actor will report state when it starts
645659
}
660+
} else if (Self->CdcStreamScanManager.IsCompleted(streamPathId)) {
661+
Response = MakeResponse(ctx, NKikimrTxDataShard::TEvCdcStreamScanResponse::DONE);
662+
Self->CdcStreamScanManager.GetCompletedStats(streamPathId).Serialize(
663+
*Response->Get<TEvDataShard::TEvCdcStreamScanResponse>()->Record.MutableStats());
664+
return true;
646665
} else if (Self->CdcStreamScanManager.Size()) {
647666
Response = MakeResponse(ctx, NKikimrTxDataShard::TEvCdcStreamScanResponse::OVERLOADED);
648667
return true;

ydb/core/tx/datashard/cdc_stream_scan.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,19 @@
99
#include <util/generic/hash.h>
1010
#include <util/generic/maybe.h>
1111

12+
namespace NKikimrTxDataShard {
13+
class TEvCdcStreamScanResponse_TStats;
14+
}
15+
1216
namespace NKikimr::NDataShard {
1317

1418
class TCdcStreamScanManager {
1519
public:
1620
struct TStats {
1721
ui64 RowsProcessed = 0;
1822
ui64 BytesProcessed = 0;
23+
24+
void Serialize(NKikimrTxDataShard::TEvCdcStreamScanResponse_TStats& proto) const;
1925
};
2026

2127
private:
@@ -39,6 +45,8 @@ class TCdcStreamScanManager {
3945

4046
void Complete(const TPathId& streamPathId);
4147
void Complete(ui64 txId);
48+
bool IsCompleted(const TPathId& streamPathId) const;
49+
const TStats& GetCompletedStats(const TPathId& streamPathId) const;
4250

4351
TScanInfo* Get(const TPathId& streamPathId);
4452
const TScanInfo* Get(const TPathId& streamPathId) const;
@@ -57,6 +65,7 @@ class TCdcStreamScanManager {
5765

5866
private:
5967
THashMap<TPathId, TScanInfo> Scans;
68+
THashMap<TPathId, TStats> CompletedScans;
6069
THashMap<ui64, TPathId> TxIdToPathId;
6170
};
6271

ydb/core/tx/datashard/datashard_ut_change_exchange.cpp

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2761,6 +2761,49 @@ Y_UNIT_TEST_SUITE(Cdc) {
27612761
});
27622762
}
27632763

2764+
Y_UNIT_TEST(InitialScanRacyCompleteAndRequest) {
2765+
TPortManager portManager;
2766+
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
2767+
.SetUseRealThreads(false)
2768+
.SetDomainName("Root")
2769+
.SetEnableChangefeedInitialScan(true)
2770+
);
2771+
2772+
auto& runtime = *server->GetRuntime();
2773+
const auto edgeActor = runtime.AllocateEdgeActor();
2774+
2775+
SetupLogging(runtime);
2776+
InitRoot(server, edgeActor);
2777+
CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable());
2778+
2779+
std::unique_ptr<IEventHandle> doneResponse;
2780+
auto blockDone = runtime.AddObserver<TEvDataShard::TEvCdcStreamScanResponse>(
2781+
[&](TEvDataShard::TEvCdcStreamScanResponse::TPtr& ev) {
2782+
if (ev->Get()->Record.GetStatus() == NKikimrTxDataShard::TEvCdcStreamScanResponse::DONE) {
2783+
doneResponse.reset(ev.Release());
2784+
}
2785+
}
2786+
);
2787+
2788+
WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table",
2789+
WithInitialScan(Updates(NKikimrSchemeOp::ECdcStreamFormatJson))));
2790+
WaitFor(runtime, [&]{ return bool(doneResponse); }, "doneResponse");
2791+
blockDone.Remove();
2792+
2793+
bool done = false;
2794+
auto waitDone = runtime.AddObserver<TEvDataShard::TEvCdcStreamScanResponse>(
2795+
[&](TEvDataShard::TEvCdcStreamScanResponse::TPtr& ev) {
2796+
if (ev->Get()->Record.GetStatus() == NKikimrTxDataShard::TEvCdcStreamScanResponse::DONE) {
2797+
done = true;
2798+
}
2799+
}
2800+
);
2801+
2802+
const auto& record = doneResponse->Get<TEvDataShard::TEvCdcStreamScanResponse>()->Record;
2803+
RebootTablet(runtime, record.GetTablePathId().GetOwnerId(), edgeActor);
2804+
WaitFor(runtime, [&]{ return done; }, "done");
2805+
}
2806+
27642807
Y_UNIT_TEST(InitialScanUpdatedRows) {
27652808
TPortManager portManager;
27662809
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())

0 commit comments

Comments
 (0)