File tree Expand file tree Collapse file tree 3 files changed +37
-9
lines changed
lib/action_cable/subscription_adapter
test/lib/action_cable/subscription_adapter Expand file tree Collapse file tree 3 files changed +37
-9
lines changed Original file line number Diff line number Diff line change @@ -7,7 +7,7 @@ class Message < SolidCable::Record
7
7
}
8
8
scope :broadcastable , lambda { |channels , last_id |
9
9
where ( channel_hash : channel_hashes_for ( channels ) ) .
10
- where ( id : ( last_id + 1 ) ..) . order ( :id )
10
+ where ( id : ( last_id . to_i + 1 ) ..) . order ( :id )
11
11
}
12
12
13
13
class << self
Original file line number Diff line number Diff line change @@ -89,7 +89,7 @@ def shutdown
89
89
end
90
90
91
91
def add_channel ( channel , on_success )
92
- channels . add ( channel )
92
+ channels [ channel ] = last_message_id
93
93
event_loop . post ( &on_success ) if on_success
94
94
end
95
95
@@ -103,21 +103,22 @@ def invoke_callback(*)
103
103
104
104
private
105
105
attr_reader :event_loop , :thread
106
- attr_writer :last_id
107
106
108
- def last_id
109
- @last_id ||= ::SolidCable ::Message . maximum ( :id ) || 0
107
+ def last_message_id
108
+ ::SolidCable ::Message . maximum ( :id ) || 0
110
109
end
111
110
112
111
def channels
113
- @channels ||= Set . new
112
+ @channels ||= Concurrent :: Hash . new
114
113
end
115
114
116
115
def broadcast_messages
117
- ::SolidCable ::Message . broadcastable ( channels , last_id ) .
116
+ ::SolidCable ::Message . broadcastable ( channels . keys , channels . values . min ) .
118
117
each do |message |
119
- broadcast ( message . channel , message . payload )
120
- self . last_id = message . id
118
+ if channels [ message . channel ] . present? && channels [ message . channel ] < message . id
119
+ broadcast ( message . channel , message . payload )
120
+ channels [ message . channel ] = message . id
121
+ end
121
122
end
122
123
end
123
124
Original file line number Diff line number Diff line change @@ -148,6 +148,33 @@ class ActionCable::SubscriptionAdapter::SolidCableTest < ActionCable::TestCase
148
148
end
149
149
end
150
150
151
+ test "does not send old messages" do
152
+ @tx_adapter . broadcast ( "channel" , "channel1" )
153
+ @tx_adapter . broadcast ( "channel" , "channel2" )
154
+
155
+ subscribe_as_queue ( "channel" ) do |queue |
156
+ assert_empty queue
157
+
158
+ @tx_adapter . broadcast ( "channel" , "channel3" )
159
+ @tx_adapter . broadcast ( "channel" , "channel4" )
160
+ @tx_adapter . broadcast ( "other" , "other1" )
161
+ @tx_adapter . broadcast ( "other" , "other2" )
162
+
163
+ subscribe_as_queue ( "other" ) do |other_queue |
164
+ assert_empty other_queue
165
+ end
166
+ assert_equal "channel3" , queue . pop
167
+ assert_equal "channel4" , queue . pop
168
+ end
169
+
170
+ @tx_adapter . broadcast ( "channel" , "channel5" )
171
+ @tx_adapter . broadcast ( "channel" , "channel6" )
172
+
173
+ subscribe_as_queue ( "channel" ) do |queue |
174
+ assert_empty queue
175
+ end
176
+ end
177
+
151
178
private
152
179
def cable_config
153
180
{ adapter : "solid_cable" , message_retention : "1.second" ,
You can’t perform that action at this time.
0 commit comments