Skip to content

Commit 7258f69

Browse files
authored
Fix incorrect actor->GetActivityType() usages in tests (#4620)
1 parent fd86a40 commit 7258f69

File tree

10 files changed

+34
-47
lines changed

10 files changed

+34
-47
lines changed

.github/config/muted_ya.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ ydb/core/persqueue/ut TPQTest.*DirectRead*
3131
ydb/core/persqueue/ut TopicAutoscaling.PartitionSplit_ManySession_NewSDK
3232
ydb/core/tx/coordinator/ut Coordinator.RestoreTenantConfiguration
3333
ydb/core/tx/datashard/ut_change_exchange Cdc.InitialScanDebezium
34-
ydb/core/tx/datashard/ut_build_index TTxDataShardBuildIndexScan.ShadowBorrowCompaction
3534
ydb/core/tx/schemeshard/ut_restore TImportTests.ShouldSucceedOnManyTables
3635
ydb/core/tx/schemeshard/ut_split_merge TSchemeShardSplitBySizeTest.Merge1KShards
3736
ydb/core/tx/tx_proxy/ut_ext_tenant TExtSubDomainTest.CreateTableInsideAndAlterDomainAndTable-AlterDatabaseCreateHiveFirst*

ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -185,9 +185,8 @@ Y_UNIT_TEST_SUITE(KqpProxy) {
185185

186186
auto scheduledEvs = [&](TTestActorRuntimeBase& run, TAutoPtr<IEventHandle> &event, TDuration delay, TInstant &deadline) {
187187
if (event->GetTypeRewrite() == TEvents::TSystem::Wakeup) {
188-
TActorId actorId = event->GetRecipientRewrite();
189-
IActor *actor = runtime->FindActor(actorId);
190-
if (actor && actor->GetActivityType() == NKikimrServices::TActivity::KQP_COMPILE_ACTOR) {
188+
Cerr << "Captured TEvents::TSystem::Wakeup to " << runtime->FindActorName(event->GetRecipientRewrite()) << Endl;
189+
if (runtime->FindActorName(event->GetRecipientRewrite()) == "KQP_COMPILE_ACTOR") {
191190
Cerr << "Captured scheduled event for compile actor " << event->Recipient << Endl;
192191
scheduled.push_back(event.Release());
193192
return true;

ydb/core/kqp/ut/scan/kqp_scan_ut.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2364,9 +2364,9 @@ Y_UNIT_TEST_SUITE(KqpScan) {
23642364
bool firstAttemptToGetData = false;
23652365

23662366
auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
2367-
if (ev->GetTypeRewrite() == TEvTxProxySchemeCache::TEvNavigateKeySetResult::EventType) {
2368-
IActor* actor = runtime->FindActor(ev->GetRecipientRewrite());
2369-
if (actor && actor->GetActivityType() == NKikimrServices::TActivity::KQP_STREAM_LOOKUP_ACTOR) {
2367+
if (ev->GetTypeRewrite() == TEvTxProxySchemeCache::TEvResolveKeySetResult::EventType) {
2368+
Cerr << "Captured TEvTxProxySchemeCache::TEvResolveKeySetResult from " << runtime->FindActorName(ev->Sender) << " to " << runtime->FindActorName(ev->GetRecipientRewrite()) << Endl;
2369+
if (runtime->FindActorName(ev->GetRecipientRewrite()) == "KQP_STREAM_LOOKUP_ACTOR") {
23702370
if (!firstAttemptToGetData) {
23712371
// capture response from scheme cache until CA calls GetAsyncInputData()
23722372
captured.push_back(ev.Release());
@@ -2428,13 +2428,13 @@ Y_UNIT_TEST_SUITE(KqpScan) {
24282428

24292429
createTable(createSession(), R"(
24302430
--!syntax_v1
2431-
CREATE TABLE `/Root/Table` (Key int32, Value int32, PRIMARY KEY(Key));
2431+
CREATE TABLE `/Root/Table` (Key int32, Fk int32, Value int32, PRIMARY KEY(Key), INDEX Index GLOBAL ON (Fk));
24322432
)");
24332433

24342434
server->GetRuntime()->SetEventFilter(captureEvents);
24352435

24362436
sendQuery(R"(
2437-
SELECT Value FROM `/Root/Table` WHERE Key IN AsList(1, 2, 3);
2437+
SELECT Value FROM `/Root/Table` VIEW Index WHERE Fk IN AsList(1, 2, 3);
24382438
)");
24392439
}
24402440

ydb/core/persqueue/ut/common/pq_ut_common.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,9 +127,8 @@ struct TTestContext {
127127

128128
static bool RequestTimeoutFilter(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration duration, TInstant& deadline) {
129129
if (event->GetTypeRewrite() == TEvents::TSystem::Wakeup) {
130-
TActorId actorId = event->GetRecipientRewrite();
131-
IActor *actor = runtime.FindActor(actorId);
132-
if (actor && actor->GetActivityType() == NKikimrServices::TActivity::PERSQUEUE_ANS_ACTOR) {
130+
Cerr << "Captured TEvents::TSystem::Wakeup to " << runtime.FindActorName(event->GetRecipientRewrite()) << Endl;
131+
if (runtime.FindActorName(event->GetRecipientRewrite()) == "PERSQUEUE_ANS_ACTOR") {
133132
return true;
134133
}
135134
}

ydb/core/tx/datashard/datashard_ut_build_index.cpp

Lines changed: 8 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -168,9 +168,8 @@ Y_UNIT_TEST_SUITE(TTxDataShardBuildIndexScan) {
168168
CreateShardedTableForIndex(server, sender, "/Root", "table-2", 1, false);
169169

170170
auto observer = runtime.AddObserver<TEvDataShard::TEvCompactBorrowed>([&](TEvDataShard::TEvCompactBorrowed::TPtr& event) {
171-
IActor *actor = runtime.FindActor(event->Sender);
172-
if (actor && actor->GetActivityType() == 186) {
173-
Cerr << "Ignore SchemeShard TEvCompactBorrowed from " << event->Sender << "(" << actor->GetActivityType() << ")" << " to " << event->Recipient << Endl;
171+
Cerr << "Captured TEvDataShard::TEvCompactBorrowed from " << runtime.FindActorName(event->Sender) << " to " << runtime.FindActorName(event->GetRecipientRewrite()) << Endl;
172+
if (runtime.FindActorName(event->Sender) == "FLAT_SCHEMESHARD_ACTOR") {
174173
event.Reset();
175174
}
176175
});
@@ -203,12 +202,9 @@ Y_UNIT_TEST_SUITE(TTxDataShardBuildIndexScan) {
203202

204203
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), shardIndex == 0 ? 2 : 3);
205204

206-
const auto& ownersProto = stats.GetUserTablePartOwners();
207-
THashSet<ui64> owners(ownersProto.begin(), ownersProto.end());
205+
THashSet<ui64> owners(stats.GetUserTablePartOwners().begin(), stats.GetUserTablePartOwners().end());
208206
// Note: datashard always adds current shard to part owners, even if there are no parts
209-
UNIT_ASSERT_VALUES_EQUAL(owners.size(), 2u);
210-
UNIT_ASSERT(owners.contains(shards1.at(0)));
211-
UNIT_ASSERT(owners.contains(shards2.at(shardIndex)));
207+
UNIT_ASSERT_VALUES_EQUAL(owners, (THashSet<ui64>{shards1.at(0), shards2.at(shardIndex)}));
212208

213209
auto tableId = ResolveTableId(server, sender, "/Root/table-2");
214210
auto result = CompactBorrowed(runtime, shards2.at(shardIndex), tableId);
@@ -217,23 +213,12 @@ Y_UNIT_TEST_SUITE(TTxDataShardBuildIndexScan) {
217213
UNIT_ASSERT_VALUES_EQUAL(result.GetPathId().GetOwnerId(), tableId.PathId.OwnerId);
218214
UNIT_ASSERT_VALUES_EQUAL(result.GetPathId().GetLocalId(), tableId.PathId.LocalPathId);
219215

220-
for (int i = 0; i < 5; ++i) {
216+
for (int i = 0; i < 5 && (owners.size() > 1 || owners.contains(shards1.at(0))); ++i) {
221217
auto stats = WaitTableStats(runtime, shards2.at(shardIndex));
222-
// Cerr << "Received shard stats:" << Endl << stats.DebugString() << Endl;
223-
const auto& ownersProto = stats.GetUserTablePartOwners();
224-
THashSet<ui64> owners(ownersProto.begin(), ownersProto.end());
225-
if (i < 4) {
226-
if (owners.size() > 1) {
227-
continue;
228-
}
229-
if (owners.contains(shards1.at(0))) {
230-
continue;
231-
}
232-
}
233-
UNIT_ASSERT_VALUES_EQUAL(owners.size(), 1u);
234-
UNIT_ASSERT(owners.contains(shards2.at(shardIndex)));
235-
Cerr << "OK " << shards2.at(shardIndex) << Endl;
218+
owners = THashSet<ui64>(stats.GetUserTablePartOwners().begin(), stats.GetUserTablePartOwners().end());
236219
}
220+
221+
UNIT_ASSERT_VALUES_EQUAL(owners, (THashSet<ui64>{shards2.at(shardIndex)}));
237222
}
238223

239224
// Alter table: disable shadow data and change compaction policy

ydb/core/tx/datashard/datashard_ut_kqp_stream_lookup.cpp

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,8 @@ Y_UNIT_TEST_SUITE(KqpStreamLookup) {
4141
bool readReceived = false;
4242
auto captureEvents = [&](TTestActorRuntimeBase &, TAutoPtr <IEventHandle> &ev) {
4343
if (ev->GetTypeRewrite() == TEvDataShard::TEvRead::EventType) {
44-
IActor* actor = runtime->FindActor(ev->Sender);
45-
if (actor && actor->GetActivityType() == NKikimrServices::TActivity::KQP_STREAM_LOOKUP_ACTOR) {
46-
44+
Cerr << "Captured TEvDataShard::TEvRead from " << runtime->FindActorName(ev->Sender) << " to " << runtime->FindActorName(ev->GetRecipientRewrite()) << Endl;
45+
if (runtime->FindActorName(ev->Sender) == "KQP_STREAM_LOOKUP_ACTOR") {
4746
if (!readReceived) {
4847
auto senderSplit = runtime->AllocateEdgeActor();
4948
ui64 txId = AsyncSplitTable(server, senderSplit, "/Root/TestTable", shards[0], 500);
@@ -109,9 +108,8 @@ Y_UNIT_TEST_SUITE(KqpStreamLookup) {
109108
bool readReceived = false;
110109
auto captureEvents = [&](TTestActorRuntimeBase &, TAutoPtr <IEventHandle> &ev) {
111110
if (ev->GetTypeRewrite() == TEvDataShard::TEvRead::EventType) {
112-
IActor* actor = runtime->FindActor(ev->Sender);
113-
if (actor && actor->GetActivityType() == NKikimrServices::TActivity::KQP_STREAM_LOOKUP_ACTOR) {
114-
111+
Cerr << "Captured TEvDataShard::TEvRead from " << runtime->FindActorName(ev->Sender) << " to " << runtime->FindActorName(ev->GetRecipientRewrite()) << Endl;
112+
if (runtime->FindActorName(ev->Sender) == "KQP_STREAM_LOOKUP_ACTOR") {
115113
if (!readReceived) {
116114
auto senderSplit = runtime->AllocateEdgeActor();
117115
ui64 txId = AsyncSplitTable(server, senderSplit, "/Root/TestTable", shards[0], 500);

ydb/core/tx/datashard/datashard_ut_stats.cpp

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -435,11 +435,8 @@ Y_UNIT_TEST_SUITE(DataShardStats) {
435435

436436
bool captured = false;
437437
auto observer = runtime.AddObserver<NSharedCache::TEvResult>([&](NSharedCache::TEvResult::TPtr& event) {
438-
IActor *actor = runtime.FindActor(event->Recipient);
439-
440-
Cerr << "Got SchemeShard NSharedCache::TEvResult from " << event->Sender << " to " << event->Recipient << "(" << actor->GetActivityType() << ")"<< Endl;
441-
442-
if (actor && actor->GetActivityType() == 288) {
438+
Cerr << "Captured NSharedCache::TEvResult from " << runtime.FindActorName(event->Sender) << " to " << runtime.FindActorName(event->GetRecipientRewrite()) << Endl;
439+
if (runtime.FindActorName(event->GetRecipientRewrite()) == "DATASHARD_STATS_BUILDER") {
443440
auto& message = *event->Get();
444441
event.Reset(static_cast<TEventHandle<NSharedCache::TEvResult> *>(
445442
new IEventHandle(event->Recipient, event->Sender,
@@ -455,6 +452,7 @@ Y_UNIT_TEST_SUITE(DataShardStats) {
455452
options.CustomFinalCondition = [&]() { return captured; };
456453
runtime.DispatchEvents(options, TDuration::Seconds(5));
457454
}
455+
UNIT_ASSERT(captured);
458456
observer.Remove();
459457

460458
{

ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1754,7 +1754,7 @@ NKikimrTxDataShard::TEvPeriodicTableStats WaitTableStats(TTestActorRuntime& runt
17541754
auto observer = runtime.AddObserver<TEvDataShard::TEvPeriodicTableStats>([&](auto& ev) {
17551755
const auto& record = ev->Get()->Record;
17561756
if (record.GetDatashardId() == tabletId) {
1757-
Cout << "Got TEvPeriodicTableStats record: PartCount=" << record.GetTableStats().GetPartCount() << ", RowCount=" << record.GetTableStats().GetRowCount() << Endl;
1757+
Cerr << "Captured TEvDataShard::TEvPeriodicTableStats " << record.ShortDebugString() << Endl;
17581758
if (record.GetTableStats().GetPartCount() >= minPartCount && record.GetTableStats().GetRowCount() >= minRows) {
17591759
stats = record;
17601760
captured = true;

ydb/library/actors/testlib/test_runtime.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1558,6 +1558,14 @@ namespace NActors {
15581558
return FindActor(actorId, node);
15591559
}
15601560

1561+
TStringBuf TTestActorRuntimeBase::FindActorName(const TActorId& actorId, ui32 nodeIndex) const {
1562+
auto actor = FindActor(actorId, nodeIndex);
1563+
if (!actor) {
1564+
return {};
1565+
}
1566+
return TLocalProcessKeyState<TActorActivityTag>::GetInstance().GetNameByIndex(actor->GetActivityType());
1567+
}
1568+
15611569
void TTestActorRuntimeBase::EnableScheduleForActor(const TActorId& actorId, bool allow) {
15621570
TGuard<TMutex> guard(Mutex);
15631571
if (allow) {

ydb/library/actors/testlib/test_runtime.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,7 @@ namespace NActors {
291291
TActorId GetInterconnectProxy(ui32 nodeIndexFrom, ui32 nodeIndexTo);
292292
void BlockOutputForActor(const TActorId& actorId);
293293
IActor* FindActor(const TActorId& actorId, ui32 nodeIndex = Max<ui32>()) const;
294+
TStringBuf FindActorName(const TActorId& actorId, ui32 nodeIndex = Max<ui32>()) const;
294295
void EnableScheduleForActor(const TActorId& actorId, bool allow = true);
295296
bool IsScheduleForActorEnabled(const TActorId& actorId) const;
296297
TIntrusivePtr<NMonitoring::TDynamicCounters> GetDynamicCounters(ui32 nodeIndex = 0);

0 commit comments

Comments
 (0)