Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/testlib/actors/block_events.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
#include "block_events.h"
82 changes: 82 additions & 0 deletions ydb/core/testlib/actors/block_events.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#include "test_runtime.h"

#include <deque>
#include <functional>

namespace NActors {

/**
* Easy blocking for events under the test actor runtime
*
* Matching events are blocked just before they are processed and stashed
* into a deque.
*/
template<class TEvType>
class TBlockEvents : public std::deque<typename TEvType::TPtr> {
public:
TBlockEvents(TTestActorRuntime& runtime, std::function<bool(typename TEvType::TPtr&)> condition = {})
: Runtime(runtime)
, Condition(std::move(condition))
, Holder(Runtime.AddObserver<TEvType>(
[this](typename TEvType::TPtr& ev) {
this->Process(ev);
}))
{}

/**
* Unblocks up to count events at the front of the deque, allowing them
* to be handled by the destination actor.
*/
TBlockEvents& Unblock(size_t count = -1) {
while (!this->empty() && count > 0) {
auto& ev = this->front();
if (!Stopped) {
IEventHandle* ptr = ev.Get();
UnblockedOnce.insert(ptr);
}
ui32 nodeId = ev->GetRecipientRewrite().NodeId();
ui32 nodeIdx = nodeId - Runtime.GetFirstNodeId();
Runtime.Send(ev.Release(), nodeIdx, /* viaActorSystem */ true);
this->pop_front();
--count;
}
return *this;
}

/**
* Stops blocking any new events. Events currently in the deque are
* not unblocked, but may be unblocked at a later time if needed.
*/
TBlockEvents& Stop() {
UnblockedOnce.clear();
Holder.Remove();
Stopped = true;
return *this;
}

private:
void Process(typename TEvType::TPtr& ev) {
IEventHandle* ptr = ev.Get();
auto it = UnblockedOnce.find(ptr);
if (it != UnblockedOnce.end()) {
UnblockedOnce.erase(it);
return;
}

if (Condition && !Condition(ev)) {
return;
}

this->emplace_back(std::move(ev));
}

private:
TTestActorRuntime& Runtime;
std::function<bool(typename TEvType::TPtr&)> Condition;
TTestActorRuntime::TEventObserverHolder Holder;
THashSet<IEventHandle*> UnblockedOnce;
bool Stopped = false;
};


} // namespace NActors
111 changes: 110 additions & 1 deletion ydb/core/testlib/actors/test_runtime_ut.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <ydb/core/testlib/actors/test_runtime.h>
#include <ydb/core/testlib/actors/block_events.h>
#include <ydb/core/base/appdata.h>
#include <ydb/library/actors/core/event_local.h>
#include <ydb/library/actors/core/events.h>
Expand Down Expand Up @@ -717,6 +718,114 @@ Y_UNIT_TEST_SUITE(TActorTest) {
runtime.WaitFor("value = 42", [&]{ return value == 42; });
UNIT_ASSERT_VALUES_EQUAL(value, 42);
}
};

Y_UNIT_TEST(TestBlockEvents) {
enum EEv {
EvTrigger = EventSpaceBegin(TEvents::ES_PRIVATE)
};

struct TEvTrigger : public TEventLocal<TEvTrigger, EvTrigger> {
int Value;

TEvTrigger(int value)
: Value(value)
{}
};

class TTargetActor : public TActorBootstrapped<TTargetActor> {
public:
TTargetActor(std::vector<int>* ptr)
: Ptr(ptr)
{}

void Bootstrap() {
Become(&TThis::StateWork);
}

private:
STFUNC(StateWork) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvTrigger, Handle);
}
}

void Handle(TEvTrigger::TPtr& ev) {
Ptr->push_back(ev->Get()->Value);
}

private:
std::vector<int>* Ptr;
};

class TSourceActor : public TActorBootstrapped<TSourceActor> {
public:
TSourceActor(const TActorId& target)
: Target(target)
{}

void Bootstrap() {
Become(&TThis::StateWork);
Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup);
}

private:
STFUNC(StateWork) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvents::TEvWakeup, Handle);
}
}

void Handle(TEvents::TEvWakeup::TPtr&) {
Send(Target, new TEvTrigger(++Counter));
Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup);
}

private:
TActorId Target;
int Counter = 0;
};

TTestActorRuntime runtime(2);
runtime.Initialize(MakeEgg());

std::vector<int> values;
auto target = runtime.Register(new TTargetActor(&values), /* nodeIdx */ 1);
auto source = runtime.Register(new TSourceActor(target), /* nodeIdx */ 1);
runtime.EnableScheduleForActor(source);

TBlockEvents<TEvTrigger> block(runtime, [&](TEvTrigger::TPtr& ev){ return ev->GetRecipientRewrite() == target; });
runtime.WaitFor("blocked 3 events", [&]{ return block.size() >= 3; });
UNIT_ASSERT_VALUES_EQUAL(block.size(), 3u);
UNIT_ASSERT_VALUES_EQUAL(values.size(), 0u);

block.Unblock(2);
UNIT_ASSERT_VALUES_EQUAL(block.size(), 1u);
UNIT_ASSERT_VALUES_EQUAL(values.size(), 0u);

runtime.WaitFor("blocked 1 more event", [&]{ return block.size() >= 2; });
UNIT_ASSERT_VALUES_EQUAL(block.size(), 2u);
UNIT_ASSERT_VALUES_EQUAL(values.size(), 2u);
UNIT_ASSERT_VALUES_EQUAL(values.at(0), 1);
UNIT_ASSERT_VALUES_EQUAL(values.at(1), 2);
values.clear();

block.Stop();
runtime.WaitFor("processed 2 more events", [&]{ return values.size() >= 2; });
UNIT_ASSERT_VALUES_EQUAL(block.size(), 2u);
UNIT_ASSERT_VALUES_EQUAL(values.size(), 2u);
UNIT_ASSERT_VALUES_EQUAL(values.at(0), 5);
UNIT_ASSERT_VALUES_EQUAL(values.at(1), 6);
values.clear();

block.Unblock();
UNIT_ASSERT_VALUES_EQUAL(block.size(), 0u);
UNIT_ASSERT_VALUES_EQUAL(values.size(), 0u);
runtime.WaitFor("processed 3 more events", [&]{ return values.size() >= 3; });
UNIT_ASSERT_VALUES_EQUAL(values.size(), 3u);
UNIT_ASSERT_VALUES_EQUAL(values.at(0), 3);
UNIT_ASSERT_VALUES_EQUAL(values.at(1), 4);
UNIT_ASSERT_VALUES_EQUAL(values.at(2), 7);
}
}

}
3 changes: 3 additions & 0 deletions ydb/core/testlib/actors/ya.make
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
LIBRARY()

SRCS(
block_events.cpp
block_events.h
test_runtime.cpp
test_runtime.h
)

PEERDIR(
Expand Down
53 changes: 1 addition & 52 deletions ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "read_iterator.h"

#include <ydb/core/testlib/tablet_helpers.h>
#include <ydb/core/testlib/actors/block_events.h>
#include <ydb/core/formats/arrow/arrow_helpers.h>
#include <ydb/core/formats/arrow/converter.h>
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
Expand Down Expand Up @@ -4627,58 +4628,6 @@ Y_UNIT_TEST_SUITE(DataShardReadIteratorConsistency) {
"result2: " << result2);
}

template<class TEvType>
class TBlockEvents : public std::deque<typename TEvType::TPtr> {
public:
TBlockEvents(TTestActorRuntime& runtime, std::function<bool(typename TEvType::TPtr&)> condition = {})
: Runtime(runtime)
, Condition(std::move(condition))
, Holder(Runtime.AddObserver<TEvType>(
[this](typename TEvType::TPtr& ev) {
this->Process(ev);
}))
{}

TBlockEvents& Unblock(size_t count = -1) {
while (!this->empty() && count > 0) {
auto& ev = this->front();
IEventHandle* ptr = ev.Get();
UnblockedOnce.insert(ptr);
Runtime.Send(ev.Release(), 0, /* viaActorSystem */ true);
this->pop_front();
--count;
}
return *this;
}

void Stop() {
UnblockedOnce.clear();
Holder.Remove();
}

private:
void Process(typename TEvType::TPtr& ev) {
IEventHandle* ptr = ev.Get();
auto it = UnblockedOnce.find(ptr);
if (it != UnblockedOnce.end()) {
UnblockedOnce.erase(it);
return;
}

if (Condition && !Condition(ev)) {
return;
}

this->emplace_back(std::move(ev));
}

private:
TTestActorRuntime& Runtime;
std::function<bool(typename TEvType::TPtr&)> Condition;
TTestActorRuntime::TEventObserverHolder Holder;
THashSet<IEventHandle*> UnblockedOnce;
};

Y_UNIT_TEST(Bug_7674_IteratorDuplicateRows) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
Expand Down