Skip to content

Commit 7d36dcf

Browse files
committed
A basic set of EventEngine listener tests
1 parent e1b6464 commit 7d36dcf

File tree

2 files changed

+255
-3
lines changed

2 files changed

+255
-3
lines changed

test/core/event_engine/test_suite/BUILD

+1
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ grpc_cc_test(
153153
deps = [
154154
":client",
155155
":oracle_event_engine_posix",
156+
":server",
156157
"//:grpc",
157158
"//test/core/util:grpc_test_util",
158159
],

test/core/event_engine/test_suite/server_test.cc

+254-3
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,263 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
#include <gtest/gtest.h>
15+
#include <algorithm>
16+
#include <chrono>
17+
#include <memory>
18+
#include <string>
19+
#include <thread>
20+
#include <tuple>
21+
#include <utility>
22+
#include <vector>
1623

24+
#include "absl/status/status.h"
25+
#include "absl/status/statusor.h"
26+
#include "absl/strings/str_cat.h"
27+
#include "absl/time/clock.h"
28+
#include "absl/time/time.h"
29+
#include "gtest/gtest.h"
30+
31+
#include <grpc/event_engine/event_engine.h>
32+
#include <grpc/event_engine/memory_allocator.h>
33+
#include <grpc/grpc.h>
34+
#include <grpc/support/log.h>
35+
36+
#include "src/core/lib/channel/channel_args.h"
37+
#include "src/core/lib/event_engine/channel_args_endpoint_config.h"
38+
#include "src/core/lib/gprpp/notification.h"
1739
#include "src/core/lib/iomgr/exec_ctx.h"
40+
#include "src/core/lib/resource_quota/memory_quota.h"
41+
#include "src/core/lib/resource_quota/resource_quota.h"
1842
#include "test/core/event_engine/test_suite/event_engine_test.h"
43+
#include "test/core/event_engine/test_suite/event_engine_test_utils.h"
44+
#include "test/core/util/port.h"
1945

2046
class EventEngineServerTest : public EventEngineTest {};
2147

22-
// TODO(hork): establish meaningful tests
23-
TEST_F(EventEngineServerTest, TODO) { grpc_core::ExecCtx exec_ctx; }
48+
using namespace std::chrono_literals;
49+
50+
namespace {
51+
52+
using ::grpc_event_engine::experimental::ChannelArgsEndpointConfig;
53+
using ::grpc_event_engine::experimental::EventEngine;
54+
using ::grpc_event_engine::experimental::URIToResolvedAddress;
55+
using Endpoint = ::grpc_event_engine::experimental::EventEngine::Endpoint;
56+
using Listener = ::grpc_event_engine::experimental::EventEngine::Listener;
57+
using ::grpc_event_engine::experimental::GetNextSendMessage;
58+
using ::grpc_event_engine::experimental::WaitForSingleOwner;
59+
60+
constexpr int kNumExchangedMessages = 100;
61+
62+
} // namespace
63+
64+
// Create a connection using the oracle EventEngine to a listener created
65+
// by the Test EventEngine and exchange bi-di data over the connection.
66+
// For each data transfer, verify that data written at one end of the stream
67+
// equals data read at the other end of the stream.
68+
TEST_F(EventEngineServerTest, ServerConnectExchangeBidiDataTransferTest) {
69+
grpc_core::ExecCtx ctx;
70+
auto oracle_ee = this->NewOracleEventEngine();
71+
std::shared_ptr<EventEngine> test_ee(this->NewEventEngine());
72+
auto memory_quota = std::make_unique<grpc_core::MemoryQuota>("bar");
73+
std::string target_addr = absl::StrCat(
74+
"ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die()));
75+
std::unique_ptr<EventEngine::Endpoint> client_endpoint;
76+
std::unique_ptr<EventEngine::Endpoint> server_endpoint;
77+
grpc_core::Notification client_signal;
78+
grpc_core::Notification server_signal;
79+
80+
Listener::AcceptCallback accept_cb =
81+
[&server_endpoint, &server_signal](
82+
std::unique_ptr<Endpoint> ep,
83+
grpc_core::MemoryAllocator /*memory_allocator*/) {
84+
server_endpoint = std::move(ep);
85+
server_signal.Notify();
86+
};
87+
88+
grpc_core::ChannelArgs args;
89+
auto quota = grpc_core::ResourceQuota::Default();
90+
args = args.Set(GRPC_ARG_RESOURCE_QUOTA, quota);
91+
ChannelArgsEndpointConfig config(args);
92+
auto status = test_ee->CreateListener(
93+
std::move(accept_cb), [](absl::Status /*status*/) {}, config,
94+
std::make_unique<grpc_core::MemoryQuota>("foo"));
95+
EXPECT_TRUE(status.ok());
96+
97+
std::unique_ptr<Listener> listener = std::move(*status);
98+
EXPECT_TRUE(listener->Bind(URIToResolvedAddress(target_addr)).ok());
99+
EXPECT_TRUE(listener->Start().ok());
100+
101+
oracle_ee->Connect(
102+
[&client_endpoint,
103+
&client_signal](absl::StatusOr<std::unique_ptr<Endpoint>> status) {
104+
if (!status.ok()) {
105+
gpr_log(GPR_ERROR, "Connect failed: %s",
106+
status.status().ToString().c_str());
107+
client_endpoint = nullptr;
108+
} else {
109+
client_endpoint = std::move(*status);
110+
}
111+
client_signal.Notify();
112+
},
113+
URIToResolvedAddress(target_addr), config,
114+
memory_quota->CreateMemoryAllocator("conn-1"), 24h);
115+
116+
client_signal.WaitForNotification();
117+
server_signal.WaitForNotification();
118+
EXPECT_NE(client_endpoint.get(), nullptr);
119+
EXPECT_NE(server_endpoint.get(), nullptr);
120+
121+
// Alternate message exchanges between client -- server and server --
122+
// client.
123+
for (int i = 0; i < kNumExchangedMessages; i++) {
124+
// Send from client to server and verify data read at the server.
125+
EXPECT_TRUE(SendValidatePayload(GetNextSendMessage(), client_endpoint.get(),
126+
server_endpoint.get())
127+
.ok());
128+
129+
// Send from server to client and verify data read at the client.
130+
EXPECT_TRUE(SendValidatePayload(GetNextSendMessage(), server_endpoint.get(),
131+
client_endpoint.get())
132+
.ok());
133+
}
134+
client_endpoint.reset();
135+
server_endpoint.reset();
136+
listener.reset();
137+
WaitForSingleOwner(std::move(test_ee));
138+
}
139+
140+
// Create 1 listener bound to N IPv6 addresses and M connections where M > N and
141+
// exchange and verify random number of messages over each connection.
142+
TEST_F(EventEngineServerTest,
143+
ServerMultipleIPv6ConnectionsToOneOracleListenerTest) {
144+
grpc_core::ExecCtx ctx;
145+
static constexpr int kNumListenerAddresses = 10; // N
146+
static constexpr int kNumConnections = 10; // M
147+
auto oracle_ee = this->NewOracleEventEngine();
148+
std::shared_ptr<EventEngine> test_ee(this->NewEventEngine());
149+
auto memory_quota = std::make_unique<grpc_core::MemoryQuota>("bar");
150+
std::unique_ptr<EventEngine::Endpoint> server_endpoint;
151+
// Notifications can only be fired once, so they are newed every loop
152+
grpc_core::Notification* server_signal = new grpc_core::Notification();
153+
std::vector<std::string> target_addrs;
154+
std::vector<std::tuple<std::unique_ptr<Endpoint>, std::unique_ptr<Endpoint>>>
155+
connections;
156+
157+
Listener::AcceptCallback accept_cb =
158+
[&server_endpoint, &server_signal](
159+
std::unique_ptr<Endpoint> ep,
160+
grpc_core::MemoryAllocator /*memory_allocator*/) {
161+
server_endpoint = std::move(ep);
162+
server_signal->Notify();
163+
};
164+
grpc_core::ChannelArgs args;
165+
auto quota = grpc_core::ResourceQuota::Default();
166+
args = args.Set(GRPC_ARG_RESOURCE_QUOTA, quota);
167+
ChannelArgsEndpointConfig config(args);
168+
auto status = test_ee->CreateListener(
169+
std::move(accept_cb), [](absl::Status /*status*/) {}, config,
170+
std::make_unique<grpc_core::MemoryQuota>("foo"));
171+
EXPECT_TRUE(status.ok());
172+
std::unique_ptr<Listener> listener = std::move(*status);
173+
174+
target_addrs.reserve(kNumListenerAddresses);
175+
for (int i = 0; i < kNumListenerAddresses; i++) {
176+
std::string target_addr = absl::StrCat(
177+
"ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die()));
178+
EXPECT_TRUE(listener->Bind(URIToResolvedAddress(target_addr)).ok());
179+
target_addrs.push_back(target_addr);
180+
}
181+
EXPECT_TRUE(listener->Start().ok());
182+
absl::SleepFor(absl::Milliseconds(500));
183+
for (int i = 0; i < kNumConnections; i++) {
184+
std::unique_ptr<EventEngine::Endpoint> client_endpoint;
185+
grpc_core::Notification client_signal;
186+
// Create an oracle EventEngine client and connect to a one of the
187+
// addresses bound to the test EventEngine listener. Verify that the
188+
// connection succeeds.
189+
grpc_core::ChannelArgs client_args;
190+
auto client_quota = grpc_core::ResourceQuota::Default();
191+
client_args = client_args.Set(GRPC_ARG_RESOURCE_QUOTA, client_quota);
192+
ChannelArgsEndpointConfig client_config(client_args);
193+
oracle_ee->Connect(
194+
[&client_endpoint,
195+
&client_signal](absl::StatusOr<std::unique_ptr<Endpoint>> status) {
196+
if (!status.ok()) {
197+
gpr_log(GPR_ERROR, "Connect failed: %s",
198+
status.status().ToString().c_str());
199+
client_endpoint = nullptr;
200+
} else {
201+
client_endpoint = std::move(*status);
202+
}
203+
client_signal.Notify();
204+
},
205+
URIToResolvedAddress(target_addrs[i % kNumListenerAddresses]),
206+
client_config,
207+
memory_quota->CreateMemoryAllocator(
208+
absl::StrCat("conn-", std::to_string(i))),
209+
24h);
210+
211+
client_signal.WaitForNotification();
212+
server_signal->WaitForNotification();
213+
EXPECT_NE(client_endpoint.get(), nullptr);
214+
EXPECT_NE(server_endpoint.get(), nullptr);
215+
connections.push_back(std::make_tuple(std::move(client_endpoint),
216+
std::move(server_endpoint)));
217+
delete server_signal;
218+
server_signal = new grpc_core::Notification();
219+
}
220+
delete server_signal;
221+
222+
std::vector<std::thread> threads;
223+
// Create one thread for each connection. For each connection, create
224+
// 2 more worker threads: to exchange and verify bi-directional data
225+
// transfer.
226+
threads.reserve(kNumConnections);
227+
for (int i = 0; i < kNumConnections; i++) {
228+
// For each connection, simulate a parallel bi-directional data transfer.
229+
// All bi-directional transfers are run in parallel across all
230+
// connections. Each bi-directional data transfer uses a random number of
231+
// messages.
232+
threads.emplace_back([client_endpoint =
233+
std::move(std::get<0>(connections[i])),
234+
server_endpoint =
235+
std::move(std::get<1>(connections[i]))]() {
236+
std::vector<std::thread> workers;
237+
workers.reserve(2);
238+
auto worker = [client_endpoint = client_endpoint.get(),
239+
server_endpoint =
240+
server_endpoint.get()](bool client_to_server) {
241+
grpc_core::ExecCtx ctx;
242+
for (int i = 0; i < kNumExchangedMessages; i++) {
243+
// If client_to_server is true, send from client to server and
244+
// verify data read at the server. Otherwise send data from server
245+
// to client and verify data read at client.
246+
if (client_to_server) {
247+
EXPECT_TRUE(SendValidatePayload(GetNextSendMessage(),
248+
client_endpoint, server_endpoint)
249+
.ok());
250+
} else {
251+
EXPECT_TRUE(SendValidatePayload(GetNextSendMessage(),
252+
server_endpoint, client_endpoint)
253+
.ok());
254+
}
255+
}
256+
};
257+
// worker[0] simulates a flow from client to server endpoint
258+
workers.emplace_back([&worker]() { worker(true); });
259+
// worker[1] simulates a flow from server to client endpoint
260+
workers.emplace_back([&worker]() { worker(false); });
261+
workers[0].join();
262+
workers[1].join();
263+
});
264+
}
265+
for (auto& t : threads) {
266+
t.join();
267+
}
268+
server_endpoint.reset();
269+
listener.reset();
270+
WaitForSingleOwner(std::move(test_ee));
271+
}
272+
273+
// TODO(vigneshbabu): Add more tests which create listeners bound to a mix
274+
// Ipv6 and other type of addresses (UDS) in the same test.

0 commit comments

Comments
 (0)