diff --git a/CHANGES.md b/CHANGES.md index 749b83afb..bafd50508 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,8 @@ +# 2.1.6 (May 2, 2025) + +Fixes +* Only create fetch requests for ready nodes (#2607) + # 2.1.5 (Apr 4, 2025) Fixes diff --git a/docs/changelog.rst b/docs/changelog.rst index e77885af7..b2665eb78 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -1,6 +1,14 @@ Changelog ========= +2.1.6 (May 2, 2025) +################### + +Fixes +----- +* Only create fetch requests for ready nodes (#2607) + + 2.1.5 (Apr 4, 2025) ################### diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 508e35a0b..505272757 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -134,6 +134,7 @@ def send_fetches(self): future = self._client.send(node_id, request, wakeup=False) future.add_callback(self._handle_fetch_response, node_id, fetch_offsets, time.time()) future.add_errback(self._handle_fetch_error, node_id) + future.add_both(self._clear_pending_fetch_request, node_id) futures.append(future) self._fetch_futures.extend(futures) self._clean_done_fetch_futures() @@ -610,36 +611,42 @@ def _create_fetch_requests(self): log.log(0, "Skipping fetch for partition %s because node %s is throttled", partition, node_id) + elif not self._client.ready(node_id): + # Until we support send request queues, any attempt to send to a not-ready node will be + # immediately failed with NodeNotReadyError. + log.debug("Skipping fetch for partition %s because connection to leader node is not ready yet") + elif node_id in self._nodes_with_pending_fetch_requests: log.log(0, "Skipping fetch for partition %s because there is a pending fetch request to node %s", partition, node_id) - continue - if version < 5: - partition_info = ( - partition.partition, - position.offset, - self.config['max_partition_fetch_bytes'] - ) - elif version <= 8: - partition_info = ( - partition.partition, - position.offset, - -1, # log_start_offset is used internally by brokers / replicas only - self.config['max_partition_fetch_bytes'], - ) else: - partition_info = ( - partition.partition, - position.leader_epoch, - position.offset, - -1, # log_start_offset is used internally by brokers / replicas only - self.config['max_partition_fetch_bytes'], - ) - - fetchable[node_id][partition] = partition_info - log.debug("Adding fetch request for partition %s at offset %d", - partition, position.offset) + # Leader is connected and does not have a pending fetch request + if version < 5: + partition_info = ( + partition.partition, + position.offset, + self.config['max_partition_fetch_bytes'] + ) + elif version <= 8: + partition_info = ( + partition.partition, + position.offset, + -1, # log_start_offset is used internally by brokers / replicas only + self.config['max_partition_fetch_bytes'], + ) + else: + partition_info = ( + partition.partition, + position.leader_epoch, + position.offset, + -1, # log_start_offset is used internally by brokers / replicas only + self.config['max_partition_fetch_bytes'], + ) + + fetchable[node_id][partition] = partition_info + log.debug("Adding fetch request for partition %s at offset %d", + partition, position.offset) requests = {} for node_id, next_partitions in six.iteritems(fetchable): @@ -728,14 +735,18 @@ def _handle_fetch_response(self, node_id, fetch_offsets, send_time, response): if self._sensors: self._sensors.fetch_latency.record((time.time() - send_time) * 1000) - self._nodes_with_pending_fetch_requests.remove(node_id) def _handle_fetch_error(self, node_id, exception): level = logging.INFO if isinstance(exception, Errors.Cancelled) else logging.ERROR log.log(level, 'Fetch to node %s failed: %s', node_id, exception) if node_id in self._session_handlers: self._session_handlers[node_id].handle_error(exception) - self._nodes_with_pending_fetch_requests.remove(node_id) + + def _clear_pending_fetch_request(self, node_id, _): + try: + self._nodes_with_pending_fetch_requests.remove(node_id) + except KeyError: + pass def _parse_fetched_data(self, completed_fetch): tp = completed_fetch.topic_partition diff --git a/kafka/version.py b/kafka/version.py index cfb007cbc..da04cc33c 100644 --- a/kafka/version.py +++ b/kafka/version.py @@ -1 +1 @@ -__version__ = '2.1.5' +__version__ = '2.1.6' diff --git a/test/test_fetcher.py b/test/test_fetcher.py index 184acc9e1..7c0873846 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -108,6 +108,7 @@ def test_create_fetch_requests(fetcher, mocker, api_version, fetch_version): fetcher._client._api_versions = BROKER_API_VERSIONS[api_version] mocker.patch.object(fetcher._client.cluster, "leader_for_partition", return_value=0) mocker.patch.object(fetcher._client.cluster, "leader_epoch_for_partition", return_value=0) + mocker.patch.object(fetcher._client, "ready", return_value=True) by_node = fetcher._create_fetch_requests() requests_and_offsets = by_node.values() assert set([r.API_VERSION for (r, _offsets) in requests_and_offsets]) == set([fetch_version])