Skip to content

Commit c0dd9da

Browse files
authored
Make use of poller actor straight in http subsystem (#13316)
1 parent 315b99e commit c0dd9da

File tree

6 files changed

+18
-26
lines changed

6 files changed

+18
-26
lines changed

ydb/core/blobstorage/ut_vdisk/lib/astest.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <ydb/library/actors/core/executor_pool_basic.h>
77
#include <ydb/library/actors/core/executor_pool_io.h>
88
#include <ydb/library/actors/core/scheduler_basic.h>
9+
#include <ydb/library/actors/interconnect/poller_actor.h>
910
#include <ydb/core/mon/mon.h>
1011
#include <ydb/library/actors/interconnect/interconnect.h>
1112
#include <ydb/library/actors/protos/services_common.pb.h>
@@ -64,7 +65,7 @@ inline void TTestWithActorSystem::Run(NActors::IActor *testActor) {
6465
const TActorId nameserviceId = GetNameserviceActorId();
6566
TActorSetupCmd nameserviceSetup(CreateNameserverTable(nameserverTable), TMailboxType::Simple, 0);
6667
setup1->LocalServices.push_back(std::pair<TActorId, TActorSetupCmd>(nameserviceId, std::move(nameserviceSetup)));
67-
68+
setup1->LocalServices.emplace_back(MakePollerActorId(), TActorSetupCmd(CreatePollerActor(), TMailboxType::Simple, 0));
6869

6970
///////////////////////// LOGGER ///////////////////////////////////////////////
7071
NActors::TActorId loggerActorId = NActors::TActorId(1, "logger");

ydb/library/actors/http/http_proxy.cpp

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,23 +9,22 @@ class THttpProxy : public NActors::TActorBootstrapped<THttpProxy>, public THttpC
99
using TBase = NActors::TActorBootstrapped<THttpProxy>;
1010

1111
IActor* AddListeningPort(TEvHttpProxy::TEvAddListeningPort::TPtr& event) {
12-
IActor* listeningSocket = CreateHttpAcceptorActor(SelfId(), Poller);
12+
IActor* listeningSocket = CreateHttpAcceptorActor(SelfId());
1313
TActorId acceptorId = Register(listeningSocket);
1414
Send(event->Forward(acceptorId));
1515
Acceptors.emplace_back(acceptorId);
1616
return listeningSocket;
1717
}
1818

1919
IActor* AddOutgoingConnection(bool secure) {
20-
IActor* connectionSocket = CreateOutgoingConnectionActor(SelfId(), secure, Poller);
20+
IActor* connectionSocket = CreateOutgoingConnectionActor(SelfId(), secure);
2121
TActorId connectionId = Register(connectionSocket);
2222
ALOG_DEBUG(HttpLog, "Connection created " << connectionId);
2323
Connections.emplace(connectionId);
2424
return connectionSocket;
2525
}
2626

2727
void Bootstrap() {
28-
Poller = Register(NActors::CreatePollerActor());
2928
Become(&THttpProxy::StateWork);
3029
}
3130

@@ -54,7 +53,6 @@ class THttpProxy : public NActors::TActorBootstrapped<THttpProxy>, public THttpC
5453
}
5554

5655
void PassAway() override {
57-
Send(Poller, new NActors::TEvents::TEvPoisonPill());
5856
for (const NActors::TActorId& connection : Connections) {
5957
Send(connection, new NActors::TEvents::TEvPoisonPill());
6058
}
@@ -273,7 +271,6 @@ class THttpProxy : public NActors::TActorBootstrapped<THttpProxy>, public THttpC
273271
PassAway();
274272
}
275273

276-
NActors::TActorId Poller;
277274
TVector<TActorId> Acceptors;
278275

279276
struct THostEntry {

ydb/library/actors/http/http_proxy.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -293,8 +293,8 @@ struct TPrivateEndpointInfo : THttpEndpointInfo {
293293
};
294294

295295
NActors::IActor* CreateHttpProxy(std::weak_ptr<NMonitoring::IMetricFactory> registry = NMonitoring::TMetricRegistry::SharedInstance());
296-
NActors::IActor* CreateHttpAcceptorActor(const TActorId& owner, const TActorId& poller);
297-
NActors::IActor* CreateOutgoingConnectionActor(const TActorId& owner, bool secure, const TActorId& poller);
296+
NActors::IActor* CreateHttpAcceptorActor(const TActorId& owner);
297+
NActors::IActor* CreateOutgoingConnectionActor(const TActorId& owner, bool secure);
298298
NActors::IActor* CreateIncomingConnectionActor(
299299
std::shared_ptr<TPrivateEndpointInfo> endpoint,
300300
TIntrusivePtr<TSocketDescriptor> socket,

ydb/library/actors/http/http_proxy_acceptor.cpp

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,16 @@ class TAcceptorActor : public NActors::TActor<TAcceptorActor>, public THttpConfi
1010
using TBase::Schedule;
1111

1212
const TActorId Owner;
13-
const TActorId Poller;
1413
TIntrusivePtr<TSocketDescriptor> Socket;
1514
NActors::TPollerToken::TPtr PollerToken;
1615
THashSet<TActorId> Connections;
1716
TDeque<THttpIncomingRequestPtr> RecycledRequests;
1817
ui32 MaxRecycledRequestsCount = 0;
1918
std::shared_ptr<TPrivateEndpointInfo> Endpoint;
2019

21-
TAcceptorActor(const TActorId& owner, const TActorId& poller)
20+
TAcceptorActor(const TActorId& owner)
2221
: NActors::TActor<TAcceptorActor>(&TAcceptorActor::StateInit)
2322
, Owner(owner)
24-
, Poller(poller)
2523
{
2624
}
2725

@@ -89,7 +87,7 @@ class TAcceptorActor : public NActors::TActor<TAcceptorActor>, public THttpConfi
8987
if (err == 0) {
9088
ALOG_INFO(HttpLog, "Listening on " << schema << bindAddress->ToString());
9189
SetNonBlock(Socket->Socket);
92-
Send(Poller, new NActors::TEvPollerRegister(Socket, SelfId(), SelfId()));
90+
Send(NActors::MakePollerActorId(), new NActors::TEvPollerRegister(Socket, SelfId(), SelfId()));
9391
TBase::Become(&TAcceptorActor::StateListening);
9492
Send(event->Sender, new TEvHttpProxy::TEvConfirmListen(bindAddress, Endpoint), 0, event->Cookie);
9593
return;
@@ -138,7 +136,7 @@ class TAcceptorActor : public NActors::TActor<TAcceptorActor>, public THttpConfi
138136
RecycledRequests.pop_front();
139137
}
140138
NActors::TActorId connectionId = Register(connectionSocket);
141-
Send(Poller, new NActors::TEvPollerRegister(socket, connectionId, connectionId));
139+
Send(NActors::MakePollerActorId(), new NActors::TEvPollerRegister(socket, connectionId, connectionId));
142140
Connections.emplace(connectionId);
143141
}
144142
}
@@ -159,8 +157,8 @@ class TAcceptorActor : public NActors::TActor<TAcceptorActor>, public THttpConfi
159157
}
160158
};
161159

162-
NActors::IActor* CreateHttpAcceptorActor(const TActorId& owner, const TActorId& poller) {
163-
return new TAcceptorActor(owner, poller);
160+
NActors::IActor* CreateHttpAcceptorActor(const TActorId& owner) {
161+
return new TAcceptorActor(owner);
164162
}
165163

166164
}

ydb/library/actors/http/http_proxy_outgoing.cpp

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ class TOutgoingConnectionActor : public NActors::TActor<TOutgoingConnectionActor
1313
using TBase::SelfId;
1414

1515
const TActorId Owner;
16-
const TActorId Poller;
1716
SocketAddressType Address;
1817
TString Destination;
1918
TActorId RequestOwner;
@@ -24,10 +23,9 @@ class TOutgoingConnectionActor : public NActors::TActor<TOutgoingConnectionActor
2423
bool AllowConnectionReuse = false;
2524
NActors::TPollerToken::TPtr PollerToken;
2625

27-
TOutgoingConnectionActor(const TActorId& owner, const TActorId& poller)
26+
TOutgoingConnectionActor(const TActorId& owner)
2827
: TBase(&TSelf::StateWaiting)
2928
, Owner(owner)
30-
, Poller(poller)
3129
{
3230
}
3331

@@ -173,7 +171,7 @@ class TOutgoingConnectionActor : public NActors::TActor<TOutgoingConnectionActor
173171
}
174172

175173
void RegisterPoller() {
176-
Send(Poller, new NActors::TEvPollerRegister(TSocketImpl::Socket, SelfId(), SelfId()));
174+
Send(NActors::MakePollerActorId(), new NActors::TEvPollerRegister(TSocketImpl::Socket, SelfId(), SelfId()));
177175
}
178176

179177
void OnConnect() {
@@ -351,11 +349,11 @@ class TOutgoingConnectionActor : public NActors::TActor<TOutgoingConnectionActor
351349
}
352350
};
353351

354-
NActors::IActor* CreateOutgoingConnectionActor(const TActorId& owner, bool secure, const TActorId& poller) {
352+
NActors::IActor* CreateOutgoingConnectionActor(const TActorId& owner, bool secure) {
355353
if (secure) {
356-
return new TOutgoingConnectionActor<TSecureSocketImpl>(owner, poller);
354+
return new TOutgoingConnectionActor<TSecureSocketImpl>(owner);
357355
} else {
358-
return new TOutgoingConnectionActor<TPlainSocketImpl>(owner, poller);
356+
return new TOutgoingConnectionActor<TPlainSocketImpl>(owner);
359357
}
360358
}
361359

ydb/library/actors/testlib/test_runtime.cpp

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1799,10 +1799,8 @@ namespace NActors {
17991799

18001800
setup->Interconnect.ProxyWrapperFactory = CreateProxyWrapperFactory(common, InterconnectPoolId(), &InterconnectMock);
18011801

1802-
if (UseRealInterconnect) {
1803-
setup->LocalServices.emplace_back(MakePollerActorId(), NActors::TActorSetupCmd(CreatePollerActor(),
1804-
NActors::TMailboxType::Simple, InterconnectPoolId()));
1805-
}
1802+
setup->LocalServices.emplace_back(MakePollerActorId(), NActors::TActorSetupCmd(CreatePollerActor(),
1803+
NActors::TMailboxType::Simple, InterconnectPoolId()));
18061804

18071805
if (!SingleSysEnv) { // Single system env should do this self
18081806
if (LogBackendFactory) {

0 commit comments

Comments
 (0)