@@ -3568,12 +3568,9 @@ subscription_exists(StreamSubscriptions, SubscriptionId) ->
35683568 lists :any (fun (Id ) -> Id =:= SubscriptionId end , SubscriptionIds ).
35693569
35703570send_file_callback (? VERSION_1 ,
3571- Transport ,
35723571 _Log ,
35733572 # consumer {configuration =
3574- # consumer_configuration {socket = S ,
3575- subscription_id =
3576- SubscriptionId ,
3573+ # consumer_configuration {subscription_id = SubId ,
35773574 counters = Counters }},
35783575 Counter ) ->
35793576 fun (#{chunk_id := FirstOffsetInChunk , num_entries := NumEntries },
@@ -3584,19 +3581,16 @@ send_file_callback(?VERSION_1,
35843581 ? REQUEST :1 ,
35853582 ? COMMAND_DELIVER :15 ,
35863583 ? VERSION_1 :16 ,
3587- SubscriptionId :8 /unsigned >>,
3588- Transport :send (S , FrameBeginning ),
3584+ SubId :8 /unsigned >>,
35893585 atomics :add (Counter , 1 , Size ),
35903586 increase_messages_consumed (Counters , NumEntries ),
3591- set_consumer_offset (Counters , FirstOffsetInChunk )
3587+ set_consumer_offset (Counters , FirstOffsetInChunk ),
3588+ FrameBeginning
35923589 end ;
35933590send_file_callback (? VERSION_2 ,
3594- Transport ,
35953591 Log ,
35963592 # consumer {configuration =
3597- # consumer_configuration {socket = S ,
3598- subscription_id =
3599- SubscriptionId ,
3593+ # consumer_configuration {subscription_id = SubId ,
36003594 counters = Counters }},
36013595 Counter ) ->
36023596 fun (#{chunk_id := FirstOffsetInChunk , num_entries := NumEntries },
@@ -3608,12 +3602,12 @@ send_file_callback(?VERSION_2,
36083602 ? REQUEST :1 ,
36093603 ? COMMAND_DELIVER :15 ,
36103604 ? VERSION_2 :16 ,
3611- SubscriptionId :8 /unsigned ,
3605+ SubId :8 /unsigned ,
36123606 CommittedChunkId :64 >>,
3613- Transport :send (S , FrameBeginning ),
36143607 atomics :add (Counter , 1 , Size ),
36153608 increase_messages_consumed (Counters , NumEntries ),
3616- set_consumer_offset (Counters , FirstOffsetInChunk )
3609+ set_consumer_offset (Counters , FirstOffsetInChunk ),
3610+ FrameBeginning
36173611 end .
36183612
36193613send_chunks (DeliverVersion ,
@@ -3683,9 +3677,7 @@ send_chunks(DeliverVersion,
36833677 Retry ,
36843678 Counter ) ->
36853679 case osiris_log :send_file (Socket , Log ,
3686- send_file_callback (DeliverVersion ,
3687- Transport ,
3688- Log ,
3680+ send_file_callback (DeliverVersion , Log ,
36893681 Consumer ,
36903682 Counter ))
36913683 of
0 commit comments