diff --git a/doc/changelog.rst b/doc/changelog.rst index b3e1c75319..6a71ebfe57 100644 --- a/doc/changelog.rst +++ b/doc/changelog.rst @@ -1,13 +1,16 @@ Changelog ========= -Changes in Version 4.18.0 -------------------------- +Changes in Version 4.18.0 (2026/XX/XX) +-------------------------------------- + +PyMongo 4.18 brings a number of changes including: - Improved TLS connection performance by reusing TLS sessions across connections to the same server, avoiding a full handshake on each new connection. Session resumption is supported on all Python versions for synchronous clients and on Python 3.11+ for async clients. +- Improved performance for MongoDB's Intelligent Workload Management (IWM) by only retrying overload errors when doing so is expected to not worsen server conditions. Changes in Version 4.17.0 (2026/04/20) -------------------------------------- diff --git a/pymongo/asynchronous/helpers.py b/pymongo/asynchronous/helpers.py index 0ee99a441a..18dbb88dd7 100644 --- a/pymongo/asynchronous/helpers.py +++ b/pymongo/asynchronous/helpers.py @@ -26,6 +26,7 @@ from typing import ( Any, Callable, + Optional, TypeVar, cast, ) @@ -82,10 +83,12 @@ async def inner(*args: Any, **kwargs: Any) -> Any: def _backoff( - attempt: int, initial_delay: float = _BACKOFF_INITIAL, max_delay: float = _BACKOFF_MAX + attempt: int, + base_backoff: float, + max_delay: float = _BACKOFF_MAX, ) -> float: jitter = random.random() # noqa: S311 - return jitter * min(initial_delay * (2**attempt), max_delay) + return jitter * min(base_backoff * (2**attempt), max_delay) class _RetryPolicy: @@ -101,9 +104,13 @@ def __init__( self.backoff_initial = backoff_initial self.backoff_max = backoff_max - def backoff(self, attempt: int) -> float: - """Return the backoff duration for the given attempt.""" - return _backoff(max(0, attempt - 1), self.backoff_initial, self.backoff_max) + def backoff(self, attempt: int, base_backoff: Optional[float] = None) -> float: + """Return the actual backoff duration for the given attempt and base backoff.""" + return _backoff( + max(0, attempt - 1), + self.backoff_initial if base_backoff is None or base_backoff < 0 else base_backoff, + self.backoff_max, + ) async def should_retry(self, attempt: int, delay: float) -> bool: """Return if we have retry attempts remaining and the next backoff would not exceed a timeout.""" diff --git a/pymongo/asynchronous/mongo_client.py b/pymongo/asynchronous/mongo_client.py index a558f96356..36b69c7d84 100644 --- a/pymongo/asynchronous/mongo_client.py +++ b/pymongo/asynchronous/mongo_client.py @@ -2799,6 +2799,7 @@ def __init__( self._attempt_number = 0 self._is_run_command = is_run_command self._is_aggregate_write = is_aggregate_write + self._retry_after_backoff_ms: Optional[float] = None async def run(self) -> T: """Runs the supplied func() and attempts a retry @@ -2848,26 +2849,29 @@ async def run(self) -> T: # Execute specialized catch on read if self._is_read: - if isinstance(exc, (ConnectionFailure, OperationFailure)): + if isinstance(exc_to_check, (ConnectionFailure, OperationFailure)): # ConnectionFailures do not supply a code property - exc_code = getattr(exc, "code", None) - overloaded = exc.has_error_label("SystemOverloadedError") + exc_code = getattr(exc_to_check, "code", None) + overloaded = exc_to_check.has_error_label("SystemOverloadedError") if overloaded: self._max_retries = self._client.options.max_adaptive_retries - always_retryable = exc.has_error_label("RetryableError") and overloaded + self._retry_after_backoff_ms = exc_to_check._retry_after_ms + always_retryable = ( + exc_to_check.has_error_label("RetryableError") and overloaded + ) if not self._client.options.retry_reads or ( not always_retryable and ( self._is_not_eligible_for_retry() or ( - isinstance(exc, OperationFailure) + isinstance(exc_to_check, OperationFailure) and exc_code not in helpers_shared._RETRYABLE_ERROR_CODES ) ) ): raise self._retrying = True - self._last_error = exc + self._last_error = exc_to_check self._attempt_number += 1 # Revert back to starting state only if the first @@ -2894,6 +2898,7 @@ async def run(self) -> T: overloaded = exc_to_check.has_error_label("SystemOverloadedError") if overloaded: self._max_retries = self._client.options.max_adaptive_retries + self._retry_after_backoff_ms = exc_to_check._retry_after_ms always_retryable = exc_to_check.has_error_label("RetryableError") and overloaded # Always retry abortTransaction and commitTransaction up to once @@ -2937,7 +2942,12 @@ async def run(self) -> T: self._always_retryable = always_retryable if overloaded: - delay = self._retry_policy.backoff(self._attempt_number) + delay = self._retry_policy.backoff( + self._attempt_number, + self._retry_after_backoff_ms / 1000 + if self._retry_after_backoff_ms + else None, + ) if not await self._retry_policy.should_retry(self._attempt_number, delay): if exc_to_check.has_error_label("NoWritesPerformed") and self._last_error: raise self._last_error from exc diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index 4ed3b85dbf..edb99b0e52 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -249,7 +249,7 @@ async def _hello( cmd = self.hello_cmd() performing_handshake = not self.performed_handshake awaitable = False - cmd["backpressure"] = True + cmd["backpressure"] = "2" if performing_handshake: self.performed_handshake = True cmd["client"] = self.opts.metadata diff --git a/pymongo/errors.py b/pymongo/errors.py index 59d9c203d9..a4cf75d31e 100644 --- a/pymongo/errors.py +++ b/pymongo/errors.py @@ -33,10 +33,16 @@ class PyMongoError(Exception): """Base class for all PyMongo exceptions.""" - def __init__(self, message: str = "", error_labels: Optional[Iterable[str]] = None) -> None: + def __init__( + self, + message: str = "", + error_labels: Optional[Iterable[str]] = None, + retry_after_ms: Optional[int] = None, + ) -> None: super().__init__(message) self._message = message self._error_labels = set(error_labels or []) + self._retry_after_ms = retry_after_ms def has_error_label(self, label: str) -> bool: """Return True if this error contains the given label. @@ -190,9 +196,15 @@ def __init__( max_wire_version: Optional[int] = None, ) -> None: error_labels = None + retry_after_ms = None if details is not None: error_labels = details.get("errorLabels") - super().__init__(_format_detailed_error(error, details), error_labels=error_labels) + retry_after_ms = details.get("retryAfterMS") + super().__init__( + _format_detailed_error(error, details), + error_labels=error_labels, + retry_after_ms=retry_after_ms, + ) self.__code = code self.__details = details self.__max_wire_version = max_wire_version diff --git a/pymongo/synchronous/helpers.py b/pymongo/synchronous/helpers.py index f24d72aea9..213762fafd 100644 --- a/pymongo/synchronous/helpers.py +++ b/pymongo/synchronous/helpers.py @@ -26,6 +26,7 @@ from typing import ( Any, Callable, + Optional, TypeVar, cast, ) @@ -82,10 +83,12 @@ def inner(*args: Any, **kwargs: Any) -> Any: def _backoff( - attempt: int, initial_delay: float = _BACKOFF_INITIAL, max_delay: float = _BACKOFF_MAX + attempt: int, + base_backoff: float, + max_delay: float = _BACKOFF_MAX, ) -> float: jitter = random.random() # noqa: S311 - return jitter * min(initial_delay * (2**attempt), max_delay) + return jitter * min(base_backoff * (2**attempt), max_delay) class _RetryPolicy: @@ -101,9 +104,13 @@ def __init__( self.backoff_initial = backoff_initial self.backoff_max = backoff_max - def backoff(self, attempt: int) -> float: - """Return the backoff duration for the given attempt.""" - return _backoff(max(0, attempt - 1), self.backoff_initial, self.backoff_max) + def backoff(self, attempt: int, base_backoff: Optional[float] = None) -> float: + """Return the actual backoff duration for the given attempt and base backoff.""" + return _backoff( + max(0, attempt - 1), + self.backoff_initial if base_backoff is None or base_backoff < 0 else base_backoff, + self.backoff_max, + ) def should_retry(self, attempt: int, delay: float) -> bool: """Return if we have retry attempts remaining and the next backoff would not exceed a timeout.""" diff --git a/pymongo/synchronous/mongo_client.py b/pymongo/synchronous/mongo_client.py index 5f321afe5c..70aa23a3fb 100644 --- a/pymongo/synchronous/mongo_client.py +++ b/pymongo/synchronous/mongo_client.py @@ -2790,6 +2790,7 @@ def __init__( self._attempt_number = 0 self._is_run_command = is_run_command self._is_aggregate_write = is_aggregate_write + self._retry_after_backoff_ms: Optional[float] = None def run(self) -> T: """Runs the supplied func() and attempts a retry @@ -2839,26 +2840,29 @@ def run(self) -> T: # Execute specialized catch on read if self._is_read: - if isinstance(exc, (ConnectionFailure, OperationFailure)): + if isinstance(exc_to_check, (ConnectionFailure, OperationFailure)): # ConnectionFailures do not supply a code property - exc_code = getattr(exc, "code", None) - overloaded = exc.has_error_label("SystemOverloadedError") + exc_code = getattr(exc_to_check, "code", None) + overloaded = exc_to_check.has_error_label("SystemOverloadedError") if overloaded: self._max_retries = self._client.options.max_adaptive_retries - always_retryable = exc.has_error_label("RetryableError") and overloaded + self._retry_after_backoff_ms = exc_to_check._retry_after_ms + always_retryable = ( + exc_to_check.has_error_label("RetryableError") and overloaded + ) if not self._client.options.retry_reads or ( not always_retryable and ( self._is_not_eligible_for_retry() or ( - isinstance(exc, OperationFailure) + isinstance(exc_to_check, OperationFailure) and exc_code not in helpers_shared._RETRYABLE_ERROR_CODES ) ) ): raise self._retrying = True - self._last_error = exc + self._last_error = exc_to_check self._attempt_number += 1 # Revert back to starting state only if the first @@ -2885,6 +2889,7 @@ def run(self) -> T: overloaded = exc_to_check.has_error_label("SystemOverloadedError") if overloaded: self._max_retries = self._client.options.max_adaptive_retries + self._retry_after_backoff_ms = exc_to_check._retry_after_ms always_retryable = exc_to_check.has_error_label("RetryableError") and overloaded # Always retry abortTransaction and commitTransaction up to once @@ -2928,7 +2933,12 @@ def run(self) -> T: self._always_retryable = always_retryable if overloaded: - delay = self._retry_policy.backoff(self._attempt_number) + delay = self._retry_policy.backoff( + self._attempt_number, + self._retry_after_backoff_ms / 1000 + if self._retry_after_backoff_ms + else None, + ) if not self._retry_policy.should_retry(self._attempt_number, delay): if exc_to_check.has_error_label("NoWritesPerformed") and self._last_error: raise self._last_error from exc diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index 1006735444..d64b260ef1 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -249,7 +249,7 @@ def _hello( cmd = self.hello_cmd() performing_handshake = not self.performed_handshake awaitable = False - cmd["backpressure"] = True + cmd["backpressure"] = "2" if performing_handshake: self.performed_handshake = True cmd["client"] = self.opts.metadata diff --git a/test/asynchronous/test_client_backpressure.py b/test/asynchronous/test_client_backpressure.py index fc3c9b85d8..ba4e8aeeba 100644 --- a/test/asynchronous/test_client_backpressure.py +++ b/test/asynchronous/test_client_backpressure.py @@ -294,6 +294,60 @@ async def test_04_overload_retries_limited_configured(self): # 6. Assert that the total number of started commands is max_retries + 1. self.assertEqual(len(self.listener.started_events), max_retries + 1) + @patch("random.random") + @async_client_context.require_version_min(9, 0, 0, -1) + @async_client_context.require_failCommand_appName + async def test_05_overload_errors_with_retryafterms_override_backoff(self, random_func): + # Drivers should test that overload errors with `retryAfterMS` override the default backoff duration. + + # 1. Let `client` be a `MongoClient`. + client = self.client + + # 2. Let `coll` be a collection. + coll = client.test.test + + # 3. Configure the random number generator used for exponential backoff jitter to always return a number as + # close as possible to `1`. + random_func.return_value = 1 + + # 4. Configure the following failPoint: + fail_point = dict( + mode="alwaysOn", + data=dict( + failCommands=["insert"], + errorCode=462, + errorLabels=["SystemOverloadedError", "RetryableError"], + appName=self.app_name, + ), + ) + async with self.fail_point(fail_point): + # 5. Insert the document `{ a: 1 }`. Expect that the command errors. Measure the duration of the command + # execution. + start0 = perf_counter() + with self.assertRaises(OperationFailure): + await coll.insert_one({"a": 1}) + end0 = perf_counter() + exponential_backoff_time = end0 - start0 + + # 6. Run the following command to set up `retryAfterMS` on overload errors. + try: + await client.admin.command("setParameter", 1, overloadRetryAfterMS=50) + + # 7. Execute step 5 again. + start1 = perf_counter() + with self.assertRaises(OperationFailure): + await coll.insert_one({"a": 1}) + end1 = perf_counter() + with_retry_after_ms_time = end1 - start1 + finally: + # 8. Run the following command to disable `retryAfterMS` on overload errors. + await client.admin.command("setParameter", 1, overloadRetryAfterMS=0) + + # 9. Compare the time between the two runs. + # The difference in the backoffs is 0.2 seconds. There is a 0.2-second window to account for potential variance + # between the two runs. + self.assertTrue(abs(exponential_backoff_time - (with_retry_after_ms_time + 0.2)) < 0.2) + # Location of JSON test specifications. if _IS_SYNC: diff --git a/test/asynchronous/test_client_metadata.py b/test/asynchronous/test_client_metadata.py index 147e01c39b..1a07e835a8 100644 --- a/test/asynchronous/test_client_metadata.py +++ b/test/asynchronous/test_client_metadata.py @@ -229,8 +229,8 @@ async def test_handshake_documents_include_backpressure(self): await client.admin.command("ping") # Assert that for every handshake document intercepted: - # the document has a field `backpressure` whose value is `true`. - self.assertEqual(self.handshake_req["backpressure"], True) + # the document has a field `backpressure` whose value is `"2"`. + self.assertEqual(self.handshake_req["backpressure"], "2") if __name__ == "__main__": diff --git a/test/test_client_backpressure.py b/test/test_client_backpressure.py index c50767db66..f821eeb924 100644 --- a/test/test_client_backpressure.py +++ b/test/test_client_backpressure.py @@ -292,6 +292,60 @@ def test_04_overload_retries_limited_configured(self): # 6. Assert that the total number of started commands is max_retries + 1. self.assertEqual(len(self.listener.started_events), max_retries + 1) + @patch("random.random") + @client_context.require_version_min(9, 0, 0, -1) + @client_context.require_failCommand_appName + def test_05_overload_errors_with_retryafterms_override_backoff(self, random_func): + # Drivers should test that overload errors with `retryAfterMS` override the default backoff duration. + + # 1. Let `client` be a `MongoClient`. + client = self.client + + # 2. Let `coll` be a collection. + coll = client.test.test + + # 3. Configure the random number generator used for exponential backoff jitter to always return a number as + # close as possible to `1`. + random_func.return_value = 1 + + # 4. Configure the following failPoint: + fail_point = dict( + mode="alwaysOn", + data=dict( + failCommands=["insert"], + errorCode=462, + errorLabels=["SystemOverloadedError", "RetryableError"], + appName=self.app_name, + ), + ) + with self.fail_point(fail_point): + # 5. Insert the document `{ a: 1 }`. Expect that the command errors. Measure the duration of the command + # execution. + start0 = perf_counter() + with self.assertRaises(OperationFailure): + coll.insert_one({"a": 1}) + end0 = perf_counter() + exponential_backoff_time = end0 - start0 + + # 6. Run the following command to set up `retryAfterMS` on overload errors. + try: + client.admin.command("setParameter", 1, overloadRetryAfterMS=50) + + # 7. Execute step 5 again. + start1 = perf_counter() + with self.assertRaises(OperationFailure): + coll.insert_one({"a": 1}) + end1 = perf_counter() + with_retry_after_ms_time = end1 - start1 + finally: + # 8. Run the following command to disable `retryAfterMS` on overload errors. + client.admin.command("setParameter", 1, overloadRetryAfterMS=0) + + # 9. Compare the time between the two runs. + # The difference in the backoffs is 0.2 seconds. There is a 0.2-second window to account for potential variance + # between the two runs. + self.assertTrue(abs(exponential_backoff_time - (with_retry_after_ms_time + 0.2)) < 0.2) + # Location of JSON test specifications. if _IS_SYNC: diff --git a/test/test_client_metadata.py b/test/test_client_metadata.py index 47a37b2151..f5ec92f2f3 100644 --- a/test/test_client_metadata.py +++ b/test/test_client_metadata.py @@ -229,8 +229,8 @@ def test_handshake_documents_include_backpressure(self): client.admin.command("ping") # Assert that for every handshake document intercepted: - # the document has a field `backpressure` whose value is `true`. - self.assertEqual(self.handshake_req["backpressure"], True) + # the document has a field `backpressure` whose value is `"2"`. + self.assertEqual(self.handshake_req["backpressure"], "2") if __name__ == "__main__":