diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py b/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py index c6dbf067f..b921c4fe0 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py @@ -29,6 +29,7 @@ from google.cloud.pubsub_v1.subscriber._protocol import helper_threads from google.cloud.pubsub_v1.subscriber._protocol import requests from google.cloud.pubsub_v1.subscriber.exceptions import ( + AcknowledgeError, AcknowledgeStatus, ) @@ -86,6 +87,7 @@ def __init__(self, manager: "StreamingPullManager", queue: "queue.Queue"): self._queue = queue self._thread: Optional[threading.Thread] = None self._operational_lock = threading.Lock() + self._shutdown_mode = False def start(self) -> None: """Start a thread to dispatch requests queued up by callbacks. @@ -112,6 +114,7 @@ def start(self) -> None: def stop(self) -> None: with self._operational_lock: if self._thread is not None: + self.enter_shutdown_mode() # Signal the worker to stop by queueing a "poison pill" self._queue.put(helper_threads.STOP) self._thread.join() @@ -125,6 +128,25 @@ def dispatch_callback(self, items: Sequence[RequestItem]) -> None: items: Queued requests to dispatch. """ + exactly_once_delivery_enabled = self._manager._exactly_once_delivery_enabled() + if self._shutdown_mode: + for item in items: + if ( + isinstance(item, requests.ModAckRequest) + or isinstance(item, requests.AckRequest) + or isinstance(item, requests.NackRequest) + ) and item.future is not None: + if exactly_once_delivery_enabled: + item.future.set_exception( + AcknowledgeError( + AcknowledgeStatus.OTHER, + "Stream is being shutdown, request was not sent.", + ) + ) + else: + item.future.set_result(AcknowledgeStatus.SUCCESS) + return + lease_requests: List[requests.LeaseRequest] = [] modack_requests: List[requests.ModAckRequest] = [] ack_requests: List[requests.AckRequest] = [] @@ -136,7 +158,6 @@ def dispatch_callback(self, items: Sequence[RequestItem]) -> None: ack_ids = set() nack_ids = set() drop_ids = set() - exactly_once_delivery_enabled = self._manager._exactly_once_delivery_enabled() for item in items: if isinstance(item, requests.LeaseRequest): @@ -412,3 +433,6 @@ def nack(self, items: Sequence[requests.NackRequest]) -> None: for item in items ] ) + + def enter_shutdown_mode(self) -> None: + self._shutdown_mode = True diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index 932699261..75d99e6dc 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -647,7 +647,7 @@ def send_unary_ack( # Futures may be present even with exactly-once delivery # disabled, in transition periods after the setting is changed on # the subscription. - if req.future: + if req.future and req.future.running: if exactly_once_delivery_enabled: e = AcknowledgeError( AcknowledgeStatus.OTHER, "RetryError while sending ack RPC." @@ -679,7 +679,7 @@ def send_unary_ack( # Futures may be present even with exactly-once delivery # disabled, in transition periods after the setting is changed on # the subscription. - if req.future: + if req.future and req.future.running: req.future.set_result(AcknowledgeStatus.SUCCESS) requests_completed.append(req) @@ -726,7 +726,7 @@ def send_unary_modack( # Futures may be present even with exactly-once delivery # disabled, in transition periods after the setting is changed on # the subscription. - if req.future: + if req.future and req.future.running: if exactly_once_delivery_enabled: e = AcknowledgeError( AcknowledgeStatus.OTHER, @@ -759,7 +759,7 @@ def send_unary_modack( # Futures may be present even with exactly-once delivery # disabled, in transition periods after the setting is changed on # the subscription. - if req.future: + if req.future and req.future.running: req.future.set_result(AcknowledgeStatus.SUCCESS) requests_completed.append(req) diff --git a/tests/unit/pubsub_v1/subscriber/test_dispatcher.py b/tests/unit/pubsub_v1/subscriber/test_dispatcher.py index c6902da69..bd6e874b7 100644 --- a/tests/unit/pubsub_v1/subscriber/test_dispatcher.py +++ b/tests/unit/pubsub_v1/subscriber/test_dispatcher.py @@ -25,6 +25,7 @@ import mock import pytest from google.cloud.pubsub_v1.subscriber.exceptions import ( + AcknowledgeError, AcknowledgeStatus, ) @@ -195,6 +196,182 @@ def test_dispatch_duplicate_items_callback_active_manager_with_futures_no_eod( assert items[1].future.result() == AcknowledgeStatus.SUCCESS +@pytest.mark.parametrize( + "items,method_name", + [ + ( + [ + requests.AckRequest("0", 0, 0, "", None), + requests.AckRequest("0", 0, 1, "", None), + ], + "ack", + ), + ( + [ + requests.DropRequest("0", 0, ""), + requests.DropRequest("0", 1, ""), + ], + "drop", + ), + ( + [ + requests.LeaseRequest("0", 0, ""), + requests.LeaseRequest("0", 1, ""), + ], + "lease", + ), + ( + [ + requests.ModAckRequest("0", 0, None), + requests.ModAckRequest("0", 1, None), + ], + "modify_ack_deadline", + ), + ( + [ + requests.NackRequest("0", 0, "", None), + requests.NackRequest("0", 1, "", None), + ], + "nack", + ), + ], +) +def test_dispatch_in_shutdown_mode_no_futures(items, method_name): + manager = mock.create_autospec( + streaming_pull_manager.StreamingPullManager, instance=True + ) + dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue) + + dispatcher_._shutdown_mode = True + + manager._exactly_once_delivery_enabled.return_value = False + with mock.patch.object(dispatcher_, method_name) as method: + dispatcher_.dispatch_callback(items) + + method.assert_not_called() + manager._exactly_once_delivery_enabled.assert_called() + + +@pytest.mark.parametrize( + "items,method_name", + [ + ( + [ + requests.AckRequest("0", 0, 0, "", None), + requests.AckRequest("0", 0, 1, "", futures.Future()), + ], + "ack", + ), + ( + [ + requests.DropRequest("0", 0, ""), + requests.DropRequest("0", 1, ""), + ], + "drop", + ), + ( + [ + requests.LeaseRequest("0", 0, ""), + requests.LeaseRequest("0", 1, ""), + ], + "lease", + ), + ( + [ + requests.ModAckRequest("0", 0, None), + requests.ModAckRequest("0", 1, futures.Future()), + ], + "modify_ack_deadline", + ), + ( + [ + requests.NackRequest("0", 0, "", None), + requests.NackRequest("0", 1, "", futures.Future()), + ], + "nack", + ), + ], +) +def test_dispatch_in_shutdown_mode_no_eod(items, method_name): + manager = mock.create_autospec( + streaming_pull_manager.StreamingPullManager, instance=True + ) + dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue) + + dispatcher_._shutdown_mode = True + + manager._exactly_once_delivery_enabled.return_value = False + with mock.patch.object(dispatcher_, method_name) as method: + dispatcher_.dispatch_callback(items) + + method.assert_not_called() + manager._exactly_once_delivery_enabled.assert_called() + + if method_name != "drop" and method_name != "lease": + assert items[1].future.result() == AcknowledgeStatus.SUCCESS + + +@pytest.mark.parametrize( + "items,method_name", + [ + ( + [ + requests.AckRequest("0", 0, 0, "", None), + requests.AckRequest("0", 0, 1, "", futures.Future()), + ], + "ack", + ), + ( + [ + requests.DropRequest("0", 0, ""), + requests.DropRequest("0", 1, ""), + ], + "drop", + ), + ( + [ + requests.LeaseRequest("0", 0, ""), + requests.LeaseRequest("0", 1, ""), + ], + "lease", + ), + ( + [ + requests.ModAckRequest("0", 0, None), + requests.ModAckRequest("0", 1, futures.Future()), + ], + "modify_ack_deadline", + ), + ( + [ + requests.NackRequest("0", 0, "", None), + requests.NackRequest("0", 1, "", futures.Future()), + ], + "nack", + ), + ], +) +def test_dispatch_in_shutdown_mode_with_eod(items, method_name): + manager = mock.create_autospec( + streaming_pull_manager.StreamingPullManager, instance=True + ) + dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue) + + dispatcher_._shutdown_mode = True + + manager._exactly_once_delivery_enabled.return_value = True + with mock.patch.object(dispatcher_, method_name) as method: + dispatcher_.dispatch_callback(items) + + method.assert_not_called() + manager._exactly_once_delivery_enabled.assert_called() + + if method_name != "drop" and method_name != "lease": + with pytest.raises(AcknowledgeError) as e: + items[1].future.result() + assert e.value.error_code == AcknowledgeStatus.OTHER + + @pytest.mark.parametrize( "items,method_name", [ @@ -400,6 +577,7 @@ def test_ack_no_time(): future=None, ) ] + manager.send_unary_ack.return_value = (items, []) dispatcher_.ack(items)