Skip to content

Commit 8f3bc35

Browse files
authored
[core] Make Unsubscribe Idempotent (#57546)
cherrypick #57234 Signed-off-by: Lonnie Liu <lonnie@anyscale.com>
1 parent c96f330 commit 8f3bc35

File tree

8 files changed

+33
-36
lines changed

8 files changed

+33
-36
lines changed

src/ray/core_worker/reference_counter.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1177,8 +1177,8 @@ void ReferenceCounter::WaitForRefRemoved(const ReferenceTable::iterator &ref_it,
11771177

11781178
CleanupBorrowersOnRefRemoved(new_borrower_refs, object_id, addr);
11791179
// Unsubscribe the object once the message is published.
1180-
RAY_CHECK(object_info_subscriber_->Unsubscribe(
1181-
rpc::ChannelType::WORKER_REF_REMOVED_CHANNEL, addr, object_id.Binary()));
1180+
object_info_subscriber_->Unsubscribe(
1181+
rpc::ChannelType::WORKER_REF_REMOVED_CHANNEL, addr, object_id.Binary());
11821182
};
11831183

11841184
// If the borrower is failed, this callback will be called.

src/ray/core_worker/tests/reference_counter_test.cc

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -190,11 +190,9 @@ class MockDistributedSubscriber : public pubsub::SubscriberInterface {
190190
failure_callback_it->second.emplace(oid, subscription_failure_callback);
191191
}
192192

193-
bool Unsubscribe(rpc::ChannelType channel_type,
193+
void Unsubscribe(rpc::ChannelType channel_type,
194194
const rpc::Address &publisher_address,
195-
const std::optional<std::string> &key_id_binary) override {
196-
return true;
197-
}
195+
const std::optional<std::string> &key_id_binary) override {}
198196

199197
bool IsSubscribed(rpc::ChannelType channel_type,
200198
const rpc::Address &publisher_address,

src/ray/pubsub/fake_subscriber.h

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,9 @@ class FakeSubscriber : public SubscriberInterface {
4545
pubsub::SubscriptionItemCallback subscription_callback,
4646
pubsub::SubscriptionFailureCallback subscription_failure_callback) override {}
4747

48-
bool Unsubscribe(rpc::ChannelType channel_type,
48+
void Unsubscribe(rpc::ChannelType channel_type,
4949
const rpc::Address &publisher_address,
50-
const std::optional<std::string> &key_id) override {
51-
return true;
52-
}
50+
const std::optional<std::string> &key_id) override {}
5351

5452
bool IsSubscribed(rpc::ChannelType channel_type,
5553
const rpc::Address &publisher_address,

src/ray/pubsub/subscriber.cc

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -53,25 +53,24 @@ void SubscriberChannel::Subscribe(
5353
}
5454
}
5555

56-
bool SubscriberChannel::Unsubscribe(const rpc::Address &publisher_address,
56+
void SubscriberChannel::Unsubscribe(const rpc::Address &publisher_address,
5757
const std::optional<std::string> &key_id) {
5858
cum_unsubscribe_requests_++;
5959
const auto publisher_id = UniqueID::FromBinary(publisher_address.worker_id());
6060

6161
// Find subscription info.
6262
auto subscription_it = subscription_map_.find(publisher_id);
6363
if (subscription_it == subscription_map_.end()) {
64-
return false;
64+
return;
6565
}
6666
auto &subscription_index = subscription_it->second;
6767

6868
// Unsubscribing from the channel.
6969
if (!key_id) {
7070
RAY_CHECK(subscription_index.per_entity_subscription.empty());
71-
const bool unsubscribed = subscription_index.all_entities_subscription != nullptr;
7271
subscription_index.all_entities_subscription.reset();
7372
subscription_map_.erase(subscription_it);
74-
return unsubscribed;
73+
return;
7574
}
7675

7776
// Unsubscribing from a single key.
@@ -80,13 +79,12 @@ bool SubscriberChannel::Unsubscribe(const rpc::Address &publisher_address,
8079

8180
auto subscription_callback_it = per_entity_subscription.find(*key_id);
8281
if (subscription_callback_it == per_entity_subscription.end()) {
83-
return false;
82+
return;
8483
}
8584
per_entity_subscription.erase(subscription_callback_it);
8685
if (per_entity_subscription.empty()) {
8786
subscription_map_.erase(subscription_it);
8887
}
89-
return true;
9088
}
9189

9290
bool SubscriberChannel::IsSubscribed(const rpc::Address &publisher_address,
@@ -171,10 +169,8 @@ void SubscriberChannel::HandlePublisherFailure(const rpc::Address &publisher_add
171169

172170
for (const auto &key_id : key_ids_to_unsubscribe) {
173171
// If the publisher is failed, we automatically unsubscribe objects from this
174-
// publishers. If the failure callback called UnsubscribeObject, this will raise
175-
// check failures.
176-
RAY_CHECK(Unsubscribe(publisher_address, key_id))
177-
<< "Calling UnsubscribeObject inside a failure callback is not allowed.";
172+
// publishers.
173+
Unsubscribe(publisher_address, key_id);
178174
}
179175
}
180176

@@ -189,8 +185,7 @@ void SubscriberChannel::HandlePublisherFailure(const rpc::Address &publisher_add
189185
auto unsubscribe_needed =
190186
HandlePublisherFailureInternal(publisher_address, key_id, Status::OK());
191187
if (unsubscribe_needed) {
192-
RAY_CHECK(Unsubscribe(publisher_address, key_id))
193-
<< "Calling UnsubscribeObject inside a failure callback is not allowed.";
188+
Unsubscribe(publisher_address, key_id);
194189
}
195190
}
196191

@@ -229,7 +224,7 @@ std::string SubscriberChannel::DebugString() const {
229224
/// Subscriber
230225
///////////////////////////////////////////////////////////////////////////////
231226

232-
bool Subscriber::Unsubscribe(rpc::ChannelType channel_type,
227+
void Subscriber::Unsubscribe(rpc::ChannelType channel_type,
233228
const rpc::Address &publisher_address,
234229
const std::optional<std::string> &key_id) {
235230
// Batch the unsubscribe command.
@@ -245,7 +240,7 @@ bool Subscriber::Unsubscribe(rpc::ChannelType channel_type,
245240
commands_[publisher_id].emplace(std::move(command));
246241
SendCommandBatchIfPossible(publisher_address);
247242

248-
return Channel(channel_type)->Unsubscribe(publisher_address, key_id);
243+
Channel(channel_type)->Unsubscribe(publisher_address, key_id);
249244
}
250245

251246
bool Subscriber::IsSubscribed(rpc::ChannelType channel_type,

src/ray/pubsub/subscriber.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ class SubscriberChannel {
8585
/// \param publisher_address The publisher address that it will unsubscribe to.
8686
/// \param key_id The entity id to unsubscribe.
8787
/// \return True if the publisher is unsubscribed.
88-
bool Unsubscribe(const rpc::Address &publisher_address,
88+
void Unsubscribe(const rpc::Address &publisher_address,
8989
const std::optional<std::string> &key_id);
9090

9191
/// Test only.
@@ -238,7 +238,7 @@ class Subscriber : public SubscriberInterface {
238238
SubscriptionItemCallback subscription_callback,
239239
SubscriptionFailureCallback subscription_failure_callback) override;
240240

241-
bool Unsubscribe(rpc::ChannelType channel_type,
241+
void Unsubscribe(rpc::ChannelType channel_type,
242242
const rpc::Address &publisher_address,
243243
const std::optional<std::string> &key_id) override;
244244

src/ray/pubsub/subscriber_interface.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,7 @@ class SubscriberInterface {
7272
/// \param publisher_address The publisher address that it will unsubscribe from.
7373
/// \param key_id The entity id to unsubscribe. Unsubscribes from all entities if
7474
/// nullopt.
75-
/// \return Returns whether the entity key_id has been subscribed before.
76-
virtual bool Unsubscribe(rpc::ChannelType channel_type,
75+
virtual void Unsubscribe(rpc::ChannelType channel_type,
7776
const rpc::Address &publisher_address,
7877
const std::optional<std::string> &key_id) = 0;
7978

src/ray/pubsub/tests/subscriber_test.cc

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,8 @@ TEST_F(SubscriberTest, TestBasicSubscription) {
228228

229229
const auto owner_addr = GenerateOwnerAddress();
230230
const auto object_id = ObjectID::FromRandom();
231-
ASSERT_FALSE(subscriber_->Unsubscribe(channel, owner_addr, object_id.Binary()));
231+
subscriber_->Unsubscribe(channel, owner_addr, object_id.Binary());
232+
ASSERT_FALSE(subscriber_->IsSubscribed(channel, owner_addr, object_id.Binary()));
232233
ASSERT_TRUE(owner_client->ReplyCommandBatch());
233234
subscriber_->Subscribe(GenerateSubMessage(object_id),
234235
channel,
@@ -254,7 +255,8 @@ TEST_F(SubscriberTest, TestBasicSubscription) {
254255
ASSERT_EQ(object_subscribed_[oid], 2);
255256
}
256257

257-
ASSERT_TRUE(subscriber_->Unsubscribe(channel, owner_addr, object_id.Binary()));
258+
subscriber_->Unsubscribe(channel, owner_addr, object_id.Binary());
259+
ASSERT_FALSE(subscriber_->IsSubscribed(channel, owner_addr, object_id.Binary()));
258260
ASSERT_TRUE(owner_client->ReplyCommandBatch());
259261
ASSERT_FALSE(subscriber_->IsSubscribed(channel, owner_addr, object_id.Binary()));
260262

@@ -504,7 +506,8 @@ TEST_F(SubscriberTest, TestSubscribeAllEntities) {
504506
}
505507

506508
// Unsubscribe from the channel.
507-
ASSERT_TRUE(subscriber_->Unsubscribe(channel, owner_addr, /*key_id=*/std::nullopt));
509+
subscriber_->Unsubscribe(channel, owner_addr, /*key_id=*/std::nullopt);
510+
ASSERT_FALSE(subscriber_->IsSubscribed(channel, owner_addr, /*key_id=*/""));
508511
}
509512

510513
TEST_F(SubscriberTest, TestIgnoreBatchAfterUnsubscription) {
@@ -527,7 +530,8 @@ TEST_F(SubscriberTest, TestIgnoreBatchAfterUnsubscription) {
527530
subscription_callback,
528531
failure_callback);
529532
ASSERT_TRUE(owner_client->ReplyCommandBatch());
530-
ASSERT_TRUE(subscriber_->Unsubscribe(channel, owner_addr, object_id.Binary()));
533+
subscriber_->Unsubscribe(channel, owner_addr, object_id.Binary());
534+
ASSERT_FALSE(subscriber_->IsSubscribed(channel, owner_addr, object_id.Binary()));
531535
ASSERT_TRUE(owner_client->ReplyCommandBatch());
532536
std::vector<ObjectID> objects_batched;
533537
objects_batched.push_back(object_id);
@@ -560,7 +564,8 @@ TEST_F(SubscriberTest, TestIgnoreBatchAfterUnsubscribeFromAll) {
560564
subscription_callback,
561565
failure_callback);
562566
ASSERT_TRUE(owner_client->ReplyCommandBatch());
563-
ASSERT_TRUE(subscriber_->Unsubscribe(channel, owner_addr, /*key_id=*/std::nullopt));
567+
subscriber_->Unsubscribe(channel, owner_addr, /*key_id=*/std::nullopt);
568+
ASSERT_FALSE(subscriber_->IsSubscribed(channel, owner_addr, /*key_id=*/""));
564569
ASSERT_TRUE(owner_client->ReplyCommandBatch());
565570

566571
const auto object_id = ObjectID::FromRandom();
@@ -614,6 +619,7 @@ TEST_F(SubscriberTest, TestUnsubscribeInSubscriptionCallback) {
614619
auto subscription_callback = [this, owner_addr](const rpc::PubMessage &msg) {
615620
const auto object_id = ObjectID::FromBinary(msg.key_id());
616621
subscriber_->Unsubscribe(channel, owner_addr, object_id.Binary());
622+
ASSERT_FALSE(subscriber_->IsSubscribed(channel, owner_addr, object_id.Binary()));
617623
ASSERT_TRUE(owner_client->ReplyCommandBatch());
618624
object_subscribed_[object_id]++;
619625
};
@@ -705,6 +711,7 @@ TEST_F(SubscriberTest, TestSubUnsubCommandBatchMultiEntries) {
705711

706712
// Test multiple entries in the batch before new reply is coming.
707713
subscriber_->Unsubscribe(channel, owner_addr, object_id.Binary());
714+
ASSERT_FALSE(subscriber_->IsSubscribed(channel, owner_addr, object_id.Binary()));
708715
subscriber_->Subscribe(GenerateSubMessage(object_id),
709716
channel,
710717
owner_addr,
@@ -967,7 +974,7 @@ TEST_F(SubscriberTest, TestIsSubscribed) {
967974
const auto owner_addr = GenerateOwnerAddress();
968975
const auto object_id = ObjectID::FromRandom();
969976

970-
ASSERT_FALSE(subscriber_->Unsubscribe(channel, owner_addr, object_id.Binary()));
977+
subscriber_->Unsubscribe(channel, owner_addr, object_id.Binary());
971978
ASSERT_FALSE(subscriber_->IsSubscribed(channel, owner_addr, object_id.Binary()));
972979

973980
subscriber_->Subscribe(GenerateSubMessage(object_id),
@@ -979,7 +986,7 @@ TEST_F(SubscriberTest, TestIsSubscribed) {
979986
failure_callback);
980987
ASSERT_TRUE(subscriber_->IsSubscribed(channel, owner_addr, object_id.Binary()));
981988

982-
ASSERT_TRUE(subscriber_->Unsubscribe(channel, owner_addr, object_id.Binary()));
989+
subscriber_->Unsubscribe(channel, owner_addr, object_id.Binary());
983990
ASSERT_FALSE(subscriber_->IsSubscribed(channel, owner_addr, object_id.Binary()));
984991
}
985992

src/ray/raylet/tests/local_object_manager_test.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ class MockSubscriber : public pubsub::SubscriberInterface {
8787
}
8888

8989
MOCK_METHOD3(Unsubscribe,
90-
bool(rpc::ChannelType channel_type,
90+
void(rpc::ChannelType channel_type,
9191
const rpc::Address &publisher_address,
9292
const std::optional<std::string> &key_id_binary));
9393

0 commit comments

Comments
 (0)