@@ -199,33 +199,28 @@ async def _run_notifier_loop(self) -> None:
199
199
# The token has advanced but there is no data to
200
200
# send, so we send a `POSITION` to inform other
201
201
# workers of the updated position.
202
- if stream . NAME == EventsStream . NAME :
203
- # XXX: We only do this for the EventStream as it
204
- # turns out that e.g. account data streams share
205
- # their "current token" with each other, meaning
206
- # that it is *not* safe to send a POSITION .
207
-
208
- # Note: `last_token` may not *actually* be the
209
- # last token we sent out in a RDATA or POSITION.
210
- # This can happen if we sent out an RDATA for
211
- # position X when our current token was say X+1.
212
- # Other workers will see RDATA for X and then a
213
- # POSITION with last token of X+1, which will
214
- # cause them to check if there were any missing
215
- # updates between X and X+1.
216
- logger . info (
217
- "Sending position: %s -> %s" ,
202
+
203
+ # Note: `last_token` may not *actually* be the
204
+ # last token we sent out in a RDATA or POSITION.
205
+ # This can happen if we sent out an RDATA for
206
+ # position X when our current token was say X+1 .
207
+ # Other workers will see RDATA for X and then a
208
+ # POSITION with last token of X+1, which will
209
+ # cause them to check if there were any missing
210
+ # updates between X and X+1.
211
+ logger . info (
212
+ "Sending position: %s -> %s" ,
213
+ stream . NAME ,
214
+ current_token ,
215
+ )
216
+ self . command_handler . send_command (
217
+ PositionCommand (
218
218
stream .NAME ,
219
+ self ._instance_name ,
220
+ last_token ,
219
221
current_token ,
220
222
)
221
- self .command_handler .send_command (
222
- PositionCommand (
223
- stream .NAME ,
224
- self ._instance_name ,
225
- last_token ,
226
- current_token ,
227
- )
228
- )
223
+ )
229
224
continue
230
225
231
226
# Some streams return multiple rows with the same stream IDs,
0 commit comments