Skip to content

Commit e3d6c66

Browse files
committed
Return stream frame header binary in dispatch chunk callback
This saves a system call by sending the frame header and the chunk header at the same time. References rabbitmq/osiris#192
1 parent c7f6cad commit e3d6c66

File tree

2 files changed

+10
-18
lines changed

2 files changed

+10
-18
lines changed

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3571,12 +3571,9 @@ subscription_exists(StreamSubscriptions, SubscriptionId) ->
35713571
lists:any(fun(Id) -> Id =:= SubscriptionId end, SubscriptionIds).
35723572

35733573
send_file_callback(?VERSION_1,
3574-
Transport,
35753574
_Log,
35763575
#consumer{configuration =
3577-
#consumer_configuration{socket = S,
3578-
subscription_id =
3579-
SubscriptionId,
3576+
#consumer_configuration{subscription_id = SubId,
35803577
counters = Counters}},
35813578
Counter) ->
35823579
fun(#{chunk_id := FirstOffsetInChunk, num_entries := NumEntries},
@@ -3587,19 +3584,16 @@ send_file_callback(?VERSION_1,
35873584
?REQUEST:1,
35883585
?COMMAND_DELIVER:15,
35893586
?VERSION_1:16,
3590-
SubscriptionId:8/unsigned>>,
3591-
Transport:send(S, FrameBeginning),
3587+
SubId:8/unsigned>>,
35923588
atomics:add(Counter, 1, Size),
35933589
increase_messages_consumed(Counters, NumEntries),
3594-
set_consumer_offset(Counters, FirstOffsetInChunk)
3590+
set_consumer_offset(Counters, FirstOffsetInChunk),
3591+
FrameBeginning
35953592
end;
35963593
send_file_callback(?VERSION_2,
3597-
Transport,
35983594
Log,
35993595
#consumer{configuration =
3600-
#consumer_configuration{socket = S,
3601-
subscription_id =
3602-
SubscriptionId,
3596+
#consumer_configuration{subscription_id = SubId,
36033597
counters = Counters}},
36043598
Counter) ->
36053599
fun(#{chunk_id := FirstOffsetInChunk, num_entries := NumEntries},
@@ -3611,12 +3605,12 @@ send_file_callback(?VERSION_2,
36113605
?REQUEST:1,
36123606
?COMMAND_DELIVER:15,
36133607
?VERSION_2:16,
3614-
SubscriptionId:8/unsigned,
3608+
SubId:8/unsigned,
36153609
CommittedChunkId:64>>,
3616-
Transport:send(S, FrameBeginning),
36173610
atomics:add(Counter, 1, Size),
36183611
increase_messages_consumed(Counters, NumEntries),
3619-
set_consumer_offset(Counters, FirstOffsetInChunk)
3612+
set_consumer_offset(Counters, FirstOffsetInChunk),
3613+
FrameBeginning
36203614
end.
36213615

36223616
send_chunks(DeliverVersion,
@@ -3686,9 +3680,7 @@ send_chunks(DeliverVersion,
36863680
Retry,
36873681
Counter) ->
36883682
case osiris_log:send_file(Socket, Log,
3689-
send_file_callback(DeliverVersion,
3690-
Transport,
3691-
Log,
3683+
send_file_callback(DeliverVersion, Log,
36923684
Consumer,
36933685
Counter))
36943686
of

rabbitmq-components.mk

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ dep_jose = hex 1.11.10
4949
dep_khepri = hex 0.17.2
5050
dep_khepri_mnesia_migration = hex 0.8.0
5151
dep_meck = hex 1.0.0
52-
dep_osiris = git https://github.com/rabbitmq/osiris v1.9.0
52+
dep_osiris = git https://github.com/rabbitmq/osiris send-file-improvements
5353
dep_prometheus = hex 5.1.1
5454
dep_ra = hex 2.17.0
5555
dep_ranch = hex 2.2.0

0 commit comments

Comments
 (0)