Fix thread not waking up when there is still data to be sent #2670
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
When producing messages quickly without waiting for the future of previous requests, there could be some situations when the last batch was not sent.
That seemed to be more frequent with larger messages (~100KiB), but apparently it could happen to any message when
linger_ms
is 0. Not sure if it could happen when it is non-zero though.The reason is that
BrokerConnection.send_pending_requests_v2
would fill the internal buffer with the bytes from a request and try to send it.kafka-python/kafka/conn.py
Line 1071 in 512d0a0
If it couldn't send it completely for some reason, it would try to send again in the next call to
send_pending_requests_v2
.But if between those 2 calls,
BrokerConnection.send
was called, new data would be appended to self._protocol: KafkaProtocol:kafka-python/kafka/conn.py
Line 1015 in 512d0a0
but the second call to
send_pending_requests_v2
wouldn't check if any new data was available and would return False:kafka-python/kafka/conn.py
Line 1070 in 512d0a0
This would tell
KafkaClient._poll
that all pending data was sent, which would make the client not listen to socked write readiness anymore:kafka-python/kafka/client_async.py
Lines 744 to 748 in 512d0a0