From 0e1a5bc8243db0b131ee58a9257c9363e9c62c5d Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 23 Jun 2025 16:12:45 -0700 Subject: [PATCH 1/6] Fix coordinator lock contention during close() (#2652) --- kafka/coordinator/base.py | 35 ++++++++++++++++++----------------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 0eb7f0eec..5e1f72621 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -857,14 +857,12 @@ def _disable_heartbeat_thread(self): self._heartbeat_thread.disable() def _close_heartbeat_thread(self, timeout_ms=None): - with self._lock: - if self._heartbeat_thread is not None: - heartbeat_log.info('Stopping heartbeat thread') - try: - self._heartbeat_thread.close(timeout_ms=timeout_ms) - except ReferenceError: - pass - self._heartbeat_thread = None + if self._heartbeat_thread is not None: + try: + self._heartbeat_thread.close(timeout_ms=timeout_ms) + except ReferenceError: + pass + self._heartbeat_thread = None def __del__(self): try: @@ -1047,17 +1045,20 @@ def disable(self): self.enabled = False def close(self, timeout_ms=None): - if self.closed: - return - self.closed = True + with self.coordinator._lock: + if self.closed: + return - # Generally this should not happen - close() is triggered - # by the coordinator. But in some cases GC may close the coordinator - # from within the heartbeat thread. - if threading.current_thread() == self: - return + heartbeat_log.info('Stopping heartbeat thread') + self.closed = True - with self.coordinator._lock: + # Generally this should not happen - close() is triggered + # by the coordinator. But in some cases GC may close the coordinator + # from within the heartbeat thread. + if threading.current_thread() == self: + return + + # Notify coordinator lock to wake thread from sleep/lock.wait self.coordinator._lock.notify() if self.is_alive(): From 41966ddf53f4daab945d07a036122f0a170a29fb Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 26 Jun 2025 16:40:11 -0700 Subject: [PATCH 2/6] python2 fixups (#2655) --- kafka/consumer/fetcher.py | 3 ++- kafka/sasl/gssapi.py | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 1888d38bf..1689b23f1 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -613,7 +613,8 @@ def _fetchable_partitions(self): fetchable = self._subscriptions.fetchable_partitions() # do not fetch a partition if we have a pending fetch response to process # use copy.copy to avoid runtimeerror on mutation from different thread - discard = {fetch.topic_partition for fetch in self._completed_fetches.copy()} + # TODO: switch to deque.copy() with py3 + discard = {fetch.topic_partition for fetch in copy.copy(self._completed_fetches)} current = self._next_partition_records if current: discard.add(current.topic_partition) diff --git a/kafka/sasl/gssapi.py b/kafka/sasl/gssapi.py index c8e4f7cac..4785b1b75 100644 --- a/kafka/sasl/gssapi.py +++ b/kafka/sasl/gssapi.py @@ -68,10 +68,10 @@ def receive(self, auth_bytes): # simply set QoP to 'auth' only (first octet). We reuse the max message size proposed # by the server client_flags = self.SASL_QOP_AUTH - server_flags = msg[0] + server_flags = struct.Struct('>b').unpack(msg[0:1])[0] message_parts = [ struct.Struct('>b').pack(client_flags & server_flags), - msg[1:], + msg[1:], # always agree to max message size from server self.auth_id.encode('utf-8'), ] # add authorization identity to the response, and GSS-wrap From 7c750cda0b62b0f43968e67119ae4b72e91de6d4 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 27 Jun 2025 09:45:47 -0700 Subject: [PATCH 3/6] Patch Release 2.2.14 --- CHANGES.md | 6 ++++++ docs/changelog.rst | 8 ++++++++ kafka/version.py | 2 +- 3 files changed, 15 insertions(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 372aebfc6..bc3f6e837 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,9 @@ +# 2.2.14 (June 27, 2025) + +Fixes +* python2 fixups (#2655) +* Fix coordinator lock contention during close() (#2652) + # 2.2.13 (June 20, 2025) Fixes diff --git a/docs/changelog.rst b/docs/changelog.rst index 430f8a512..51a5f448c 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -1,6 +1,14 @@ Changelog ========= +2.2.14 (June 27, 2025) +###################### + +Fixes +* python2 fixups (#2655) +* Fix coordinator lock contention during close() (#2652) + + 2.2.13 (June 20, 2025) ###################### diff --git a/kafka/version.py b/kafka/version.py index 298979870..efe92403b 100644 --- a/kafka/version.py +++ b/kafka/version.py @@ -1 +1 @@ -__version__ = '2.2.13' +__version__ = '2.2.14' From 37ec9437f6fc6894be7120ed0a3d1183f9f72be8 Mon Sep 17 00:00:00 2001 From: Xeus-CC Date: Wed, 2 Jul 2025 05:30:08 +1200 Subject: [PATCH 4/6] Fix spelling mistake in KafkaConsumer docs (#2659) --- kafka/consumer/group.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 4eb9e2ab4..bc974ee14 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -123,7 +123,7 @@ class KafkaConsumer(six.Iterator): be disabled in cases seeking extreme performance. Default: True isolation_level (str): Configure KIP-98 transactional consumer by setting to 'read_committed'. This will cause the consumer to - skip records from aborted tranactions. Default: 'read_uncommitted' + skip records from aborted transactions. Default: 'read_uncommitted' allow_auto_create_topics (bool): Enable/disable auto topic creation on metadata request. Only available with api_version >= (0, 11). Default: True From 81c8ec36058ce8f89593649f95a18107e5dc935e Mon Sep 17 00:00:00 2001 From: llk89 Date: Wed, 2 Jul 2025 01:32:09 +0800 Subject: [PATCH 5/6] Fix KafkaProducer broken method names (#2660) --- kafka/producer/record_accumulator.py | 4 ++-- kafka/producer/transaction_manager.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index 77d48d84f..3a4e60146 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -430,7 +430,7 @@ def ready(self, cluster, now=None): expired = bool(waited_time >= time_to_wait) sendable = (full or expired or self._closed or - self._flush_in_progress()) + self.flush_in_progress()) if sendable and not backing_off: ready_nodes.add(leader) @@ -563,7 +563,7 @@ def deallocate(self, batch): """Deallocate the record batch.""" self._incomplete.remove(batch) - def _flush_in_progress(self): + def flush_in_progress(self): """Are there any threads currently waiting on a flush?""" return self._flushes_in_progress.get() > 0 diff --git a/kafka/producer/transaction_manager.py b/kafka/producer/transaction_manager.py index 7302eb00e..5d69ddc97 100644 --- a/kafka/producer/transaction_manager.py +++ b/kafka/producer/transaction_manager.py @@ -553,11 +553,11 @@ def producer_epoch(self): return self.transaction_manager.producer_id_and_epoch.epoch def fatal_error(self, exc): - self.transaction_manager._transition_to_fatal_error(exc) + self.transaction_manager.transition_to_fatal_error(exc) self._result.done(error=exc) def abortable_error(self, exc): - self.transaction_manager._transition_to_abortable_error(exc) + self.transaction_manager.transition_to_abortable_error(exc) self._result.done(error=exc) def fail(self, exc): From 94d620e23b83442fdd58c022478687d399c01b9a Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 1 Jul 2025 10:35:31 -0700 Subject: [PATCH 6/6] Patch Release 2.2.15 --- CHANGES.md | 6 ++++++ docs/changelog.rst | 9 +++++++++ kafka/version.py | 2 +- 3 files changed, 16 insertions(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index bc3f6e837..9cab2c837 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,9 @@ +# 2.2.15 (July 1, 2025) + +Fixes +* Fix KafkaProducer broken method names (llk89 / #2660) +* Fix spelling mistake in KafkaConsumer docs (Xeus-CC / #2659) + # 2.2.14 (June 27, 2025) Fixes diff --git a/docs/changelog.rst b/docs/changelog.rst index 51a5f448c..844bdd9b6 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -1,6 +1,15 @@ Changelog ========= +2.2.15 (July 1, 2025) +##################### + +Fixes +----- +* Fix KafkaProducer broken method names (llk89 / #2660) +* Fix spelling mistake in KafkaConsumer docs (Xeus-CC / #2659) + + 2.2.14 (June 27, 2025) ###################### diff --git a/kafka/version.py b/kafka/version.py index efe92403b..9f4696f50 100644 --- a/kafka/version.py +++ b/kafka/version.py @@ -1 +1 @@ -__version__ = '2.2.14' +__version__ = '2.2.15'