Skip to content

Commit f3d014c

Browse files
authored
Prevent unhandled background error on SPM shutdown (#8111)
1 parent 56da68f commit f3d014c

File tree

2 files changed

+58
-1
lines changed

2 files changed

+58
-1
lines changed

pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -428,12 +428,23 @@ def close(self, reason=None):
428428
_LOGGER.debug("Stopping scheduler.")
429429
self._scheduler.shutdown()
430430
self._scheduler = None
431+
432+
# Leaser and dispatcher reference each other through the shared
433+
# StreamingPullManager instance, i.e. "self", thus do not set their
434+
# references to None until both have been shut down.
435+
#
436+
# NOTE: Even if the dispatcher operates on an inactive leaser using
437+
# the latter's add() and remove() methods, these have no impact on
438+
# the stopped leaser (the leaser is never again re-started). Ditto
439+
# for the manager's maybe_resume_consumer() / maybe_pause_consumer(),
440+
# because the consumer gets shut down first.
431441
_LOGGER.debug("Stopping leaser.")
432442
self._leaser.stop()
433-
self._leaser = None
434443
_LOGGER.debug("Stopping dispatcher.")
435444
self._dispatcher.stop()
436445
self._dispatcher = None
446+
# dispatcher terminated, OK to dispose the leaser reference now
447+
self._leaser = None
437448
_LOGGER.debug("Stopping heartbeater.")
438449
self._heartbeater.stop()
439450
self._heartbeater = None

pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
# limitations under the License.
1414

1515
import logging
16+
import threading
17+
import time
1618
import types as stdlib_types
1719

1820
import mock
@@ -511,6 +513,50 @@ def test_close_idempotent():
511513
assert scheduler.shutdown.call_count == 1
512514

513515

516+
class FakeDispatcher(object):
517+
def __init__(self, manager, error_callback):
518+
self._manager = manager
519+
self._error_callback = error_callback
520+
self._thread = None
521+
self._stop = False
522+
523+
def start(self):
524+
self._thread = threading.Thread(target=self._do_work)
525+
self._thread.daemon = True
526+
self._thread.start()
527+
528+
def stop(self):
529+
self._stop = True
530+
self._thread.join()
531+
self._thread = None
532+
533+
def _do_work(self):
534+
while not self._stop:
535+
try:
536+
self._manager.leaser.add([mock.Mock()])
537+
except Exception as exc:
538+
self._error_callback(exc)
539+
time.sleep(0.1)
540+
541+
# also try to interact with the leaser after the stop flag has been set
542+
try:
543+
self._manager.leaser.remove([mock.Mock()])
544+
except Exception as exc:
545+
self._error_callback(exc)
546+
547+
548+
def test_close_no_dispatcher_error():
549+
manager, _, _, _, _, _ = make_running_manager()
550+
error_callback = mock.Mock(name="error_callback")
551+
dispatcher = FakeDispatcher(manager=manager, error_callback=error_callback)
552+
manager._dispatcher = dispatcher
553+
dispatcher.start()
554+
555+
manager.close()
556+
557+
error_callback.assert_not_called()
558+
559+
514560
def test_close_callbacks():
515561
manager, _, _, _, _, _ = make_running_manager()
516562

0 commit comments

Comments
 (0)