@@ -3571,12 +3571,9 @@ subscription_exists(StreamSubscriptions, SubscriptionId) ->
3571
3571
lists :any (fun (Id ) -> Id =:= SubscriptionId end , SubscriptionIds ).
3572
3572
3573
3573
send_file_callback (? VERSION_1 ,
3574
- Transport ,
3575
3574
_Log ,
3576
3575
# consumer {configuration =
3577
- # consumer_configuration {socket = S ,
3578
- subscription_id =
3579
- SubscriptionId ,
3576
+ # consumer_configuration {subscription_id = SubId ,
3580
3577
counters = Counters }},
3581
3578
Counter ) ->
3582
3579
fun (#{chunk_id := FirstOffsetInChunk , num_entries := NumEntries },
@@ -3587,19 +3584,16 @@ send_file_callback(?VERSION_1,
3587
3584
? REQUEST :1 ,
3588
3585
? COMMAND_DELIVER :15 ,
3589
3586
? VERSION_1 :16 ,
3590
- SubscriptionId :8 /unsigned >>,
3591
- Transport :send (S , FrameBeginning ),
3587
+ SubId :8 /unsigned >>,
3592
3588
atomics :add (Counter , 1 , Size ),
3593
3589
increase_messages_consumed (Counters , NumEntries ),
3594
- set_consumer_offset (Counters , FirstOffsetInChunk )
3590
+ set_consumer_offset (Counters , FirstOffsetInChunk ),
3591
+ FrameBeginning
3595
3592
end ;
3596
3593
send_file_callback (? VERSION_2 ,
3597
- Transport ,
3598
3594
Log ,
3599
3595
# consumer {configuration =
3600
- # consumer_configuration {socket = S ,
3601
- subscription_id =
3602
- SubscriptionId ,
3596
+ # consumer_configuration {subscription_id = SubId ,
3603
3597
counters = Counters }},
3604
3598
Counter ) ->
3605
3599
fun (#{chunk_id := FirstOffsetInChunk , num_entries := NumEntries },
@@ -3611,12 +3605,12 @@ send_file_callback(?VERSION_2,
3611
3605
? REQUEST :1 ,
3612
3606
? COMMAND_DELIVER :15 ,
3613
3607
? VERSION_2 :16 ,
3614
- SubscriptionId :8 /unsigned ,
3608
+ SubId :8 /unsigned ,
3615
3609
CommittedChunkId :64 >>,
3616
- Transport :send (S , FrameBeginning ),
3617
3610
atomics :add (Counter , 1 , Size ),
3618
3611
increase_messages_consumed (Counters , NumEntries ),
3619
- set_consumer_offset (Counters , FirstOffsetInChunk )
3612
+ set_consumer_offset (Counters , FirstOffsetInChunk ),
3613
+ FrameBeginning
3620
3614
end .
3621
3615
3622
3616
send_chunks (DeliverVersion ,
@@ -3686,9 +3680,7 @@ send_chunks(DeliverVersion,
3686
3680
Retry ,
3687
3681
Counter ) ->
3688
3682
case osiris_log :send_file (Socket , Log ,
3689
- send_file_callback (DeliverVersion ,
3690
- Transport ,
3691
- Log ,
3683
+ send_file_callback (DeliverVersion , Log ,
3692
3684
Consumer ,
3693
3685
Counter ))
3694
3686
of
0 commit comments