diff --git a/pymongo/_telemetry.py b/pymongo/_telemetry.py index 471a013eb9..67cd6e4987 100644 --- a/pymongo/_telemetry.py +++ b/pymongo/_telemetry.py @@ -18,16 +18,30 @@ import datetime import logging +import queue +import time from collections.abc import MutableMapping from typing import TYPE_CHECKING, Any, Optional -from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log +from pymongo.logger import ( + _COMMAND_LOGGER, + _CONNECTION_LOGGER, + _SDAM_LOGGER, + _SERVER_SELECTION_LOGGER, + _CommandStatusMessage, + _ConnectionStatusMessage, + _debug_log, + _SDAMStatusMessage, + _ServerSelectionStatusMessage, + _verbose_connection_error_reason, +) from pymongo.pool_shared import _ConnectionTelemetryInfo if TYPE_CHECKING: from bson.objectid import ObjectId + from pymongo.hello import Hello from pymongo.monitoring import _EventListeners - from pymongo.typings import _DocumentOut + from pymongo.typings import _Address, _DocumentOut class _CommandTelemetry: @@ -183,3 +197,517 @@ def failed( service_id=self._conn.service_id, database_name=self._dbname, ) + + +class _CmapTelemetry: + """Combines CMAP structured logging and APM event publishing for pool and connection events.""" + + __slots__ = ( + "_address", + "_client_id", + "_listeners", + "_log", + "_publish", + ) + + def __init__( + self, + client_id: Optional[ObjectId], + address: _Address, + listeners: Optional[_EventListeners], + publish: bool, + log: bool, + ) -> None: + self._client_id = client_id + self._address = address + self._listeners = listeners + self._publish = publish + self._log = log + + @property + def _should_publish(self) -> bool: + """Computed per-call because listener registration can change while the pool is open.""" + return self._publish and self._listeners is not None and self._listeners.enabled_for_cmap + + @property + def _should_log(self) -> bool: + """Computed per-call because logging level can be reconfigured at runtime.""" + return self._log and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG) + + def _emit_log(self, message: _ConnectionStatusMessage, **extra: Any) -> None: + _debug_log( + _CONNECTION_LOGGER, + message=message, + clientId=self._client_id, + serverHost=self._address[0], + serverPort=self._address[1], + **extra, + ) + + def pool_created(self, non_default_options: dict[str, Any]) -> None: + """Emit the pool created log entry and APM event.""" + # Log before publishing to prevent potential listener preemption in tests. + if self._should_log: + self._emit_log(_ConnectionStatusMessage.POOL_CREATED, **non_default_options) + if self._should_publish: + assert self._listeners is not None + self._listeners.publish_pool_created(self._address, non_default_options) + + def pool_ready(self) -> None: + """Emit the pool ready log entry and APM event.""" + # Log before publishing to prevent potential listener preemption in tests. + if self._should_log: + self._emit_log(_ConnectionStatusMessage.POOL_READY) + if self._should_publish: + assert self._listeners is not None + self._listeners.publish_pool_ready(self._address) + + def pool_cleared(self, service_id: Optional[ObjectId], interrupt_connections: bool) -> None: + """Emit the pool cleared log entry and APM event.""" + # Log before publishing to prevent potential listener preemption in tests. + if self._should_log: + self._emit_log(_ConnectionStatusMessage.POOL_CLEARED, serviceId=service_id) + if self._should_publish: + assert self._listeners is not None + self._listeners.publish_pool_cleared( + self._address, + service_id=service_id, + interrupt_connections=interrupt_connections, + ) + + def pool_closed(self) -> None: + """Emit the pool closed log entry and APM event.""" + # Log before publishing to prevent potential listener preemption in tests. + if self._should_log: + self._emit_log(_ConnectionStatusMessage.POOL_CLOSED) + if self._should_publish: + assert self._listeners is not None + self._listeners.publish_pool_closed(self._address) + + def connection_created(self, conn_id: int) -> None: + """Emit the connection created log entry and APM event.""" + # Log before publishing to prevent potential listener preemption in tests. + if self._should_log: + self._emit_log(_ConnectionStatusMessage.CONN_CREATED, driverConnectionId=conn_id) + if self._should_publish: + assert self._listeners is not None + self._listeners.publish_connection_created(self._address, conn_id) + + def connection_ready(self, conn_id: int, creation_time: float) -> None: + """Emit the connection ready log entry and APM event.""" + should_log = self._should_log + should_publish = self._should_publish + if not should_log and not should_publish: + return + duration = max(0.0, time.monotonic() - creation_time) + # Log before publishing to prevent potential listener preemption in tests. + if should_log: + self._emit_log( + _ConnectionStatusMessage.CONN_READY, + driverConnectionId=conn_id, + durationMS=duration * 1000, + ) + if should_publish: + assert self._listeners is not None + self._listeners.publish_connection_ready(self._address, conn_id, duration) + + def connection_closed(self, conn_id: int, reason: str) -> None: + """Emit the connection closed log entry and APM event.""" + should_log = self._should_log + should_publish = self._should_publish + if should_publish: + assert self._listeners is not None + self._listeners.publish_connection_closed(self._address, conn_id, reason) + if should_log: + self._emit_log( + _ConnectionStatusMessage.CONN_CLOSED, + driverConnectionId=conn_id, + reason=_verbose_connection_error_reason(reason), + error=reason, + ) + + def checkout_started(self) -> float: + """Emit the checkout started event/log and return the start time for duration tracking.""" + start = time.monotonic() + if self._should_publish: + assert self._listeners is not None + self._listeners.publish_connection_check_out_started(self._address) + if self._should_log: + self._emit_log(_ConnectionStatusMessage.CHECKOUT_STARTED) + return start + + def checkout_succeeded(self, conn_id: int, start: float) -> None: + """Emit the checkout succeeded log entry and APM event.""" + should_log = self._should_log + should_publish = self._should_publish + if not should_log and not should_publish: + return + duration = max(0.0, time.monotonic() - start) + if should_publish: + assert self._listeners is not None + self._listeners.publish_connection_checked_out(self._address, conn_id, duration) + if should_log: + self._emit_log( + _ConnectionStatusMessage.CHECKOUT_SUCCEEDED, + driverConnectionId=conn_id, + durationMS=duration * 1000, + ) + + def checkout_failed(self, reason: str, error: str, start: float) -> None: + """Emit the checkout failed log entry and APM event.""" + should_log = self._should_log + should_publish = self._should_publish + if not should_log and not should_publish: + return + duration = max(0.0, time.monotonic() - start) + if should_publish: + assert self._listeners is not None + self._listeners.publish_connection_check_out_failed(self._address, error, duration) + if should_log: + self._emit_log( + _ConnectionStatusMessage.CHECKOUT_FAILED, + reason=reason, + error=error, + durationMS=duration * 1000, + ) + + def checked_in(self, conn_id: int) -> None: + """Emit the connection checked-in log entry and APM event.""" + if self._should_publish: + assert self._listeners is not None + self._listeners.publish_connection_checked_in(self._address, conn_id) + if self._should_log: + self._emit_log(_ConnectionStatusMessage.CHECKEDIN, driverConnectionId=conn_id) + + +class _HeartbeatTelemetry: + """Combines SDAM structured logging and APM event publishing for server heartbeats. + + The APM started event is published before connection checkout (no conn_id yet); + the log entry for started is emitted after checkout once the conn_id is known. + Call :meth:`started` first, then :meth:`emit_started_log` inside the checkout + context, then :meth:`succeeded` or :meth:`failed` when the outcome is known. + """ + + __slots__ = ( + "_address", + "_awaited", + "_listeners", + "_should_log", + "_should_publish", + "_start", + "_topology_id", + ) + + def __init__( + self, + topology_id: ObjectId, + address: _Address, + listeners: Optional[_EventListeners], + awaited: bool, + ) -> None: + self._topology_id = topology_id + self._address = address + self._listeners = listeners + self._awaited = awaited + # Cached at construction: this object is short-lived (one heartbeat check) so + # listener registration and logging level are stable for its lifetime. + self._should_publish = listeners is not None and listeners.enabled_for_server_heartbeat + self._should_log = _SDAM_LOGGER.isEnabledFor(logging.DEBUG) + + def _emit_log(self, message: _SDAMStatusMessage, **extra: Any) -> None: + _debug_log( + _SDAM_LOGGER, + message=message, + topologyId=self._topology_id, + serverHost=self._address[0], + serverPort=self._address[1], + awaited=self._awaited, + **extra, + ) + + def started(self) -> None: + """Publish the APM heartbeat-started event (before connection checkout).""" + if self._should_publish or self._should_log: + self._start = time.monotonic() + if self._should_publish: + assert self._listeners is not None + self._listeners.publish_server_heartbeat_started(self._address, self._awaited) + + def emit_started_log(self, conn_id: int, server_conn_id: Optional[int]) -> None: + """Emit the log entry for heartbeat started (after connection checkout).""" + if self._should_log: + self._emit_log( + _SDAMStatusMessage.HEARTBEAT_START, + driverConnectionId=conn_id, + serverConnectionId=server_conn_id, + ) + + def succeeded( + self, + round_trip_time: float, + response: Hello[Any], + conn_id: int, + server_conn_id: Optional[int], + ) -> None: + """Emit the SUCCEEDED log entry and APM event.""" + if self._should_publish: + assert self._listeners is not None + self._listeners.publish_server_heartbeat_succeeded( + self._address, round_trip_time, response, response.awaitable + ) + if self._should_log: + self._emit_log( + _SDAMStatusMessage.HEARTBEAT_SUCCESS, + driverConnectionId=conn_id, + serverConnectionId=server_conn_id, + durationMS=round_trip_time * 1000, + reply=response.document, + ) + + def failed(self, error: Exception, conn_id: Optional[int]) -> None: + """Emit the FAILED log entry and APM event.""" + should_publish = self._should_publish + should_log = self._should_log + if not should_publish and not should_log: + return + duration = max(0.0, time.monotonic() - self._start) + if should_publish: + assert self._listeners is not None + self._listeners.publish_server_heartbeat_failed( + self._address, duration, error, self._awaited + ) + if should_log: + self._emit_log( + _SDAMStatusMessage.HEARTBEAT_FAIL, + durationMS=duration * 1000, + failure=error, + driverConnectionId=conn_id, + ) + + +class _SdamTelemetry: + """Combines SDAM structured logging and APM event publishing for topology and server events. + + Topology events are queued for asynchronous delivery; log entries are emitted inline. + """ + + __slots__ = ("_events", "_listeners", "_topology_id") + + def __init__( + self, + topology_id: ObjectId, + listeners: Optional[_EventListeners], + events: Optional[queue.Queue[Any]], + ) -> None: + self._topology_id = topology_id + self._listeners = listeners + self._events = events + + @property + def _publish_server(self) -> bool: + """Computed per-call because listener registration can change while the topology is open.""" + return self._listeners is not None and self._listeners.enabled_for_server + + @property + def _publish_tp(self) -> bool: + """Computed per-call because listener registration can change while the topology is open.""" + return self._listeners is not None and self._listeners.enabled_for_topology + + @property + def _should_log(self) -> bool: + """Computed per-call because logging level can be reconfigured at runtime.""" + return _SDAM_LOGGER.isEnabledFor(logging.DEBUG) + + def _enqueue(self, fn: Any, args: tuple[Any, ...]) -> None: + if self._events is not None: + self._events.put((fn, args)) + + def _emit_log(self, message: _SDAMStatusMessage, **extra: Any) -> None: + _debug_log( + _SDAM_LOGGER, + message=message, + topologyId=self._topology_id, + **extra, + ) + + def topology_opened(self) -> None: + """Emit the topology opened log entry and APM event.""" + if self._should_log: + self._emit_log(_SDAMStatusMessage.START_TOPOLOGY) + if self._publish_tp: + assert self._listeners is not None + self._enqueue(self._listeners.publish_topology_opened, (self._topology_id,)) + + def topology_description_changed(self, old_td: Any, new_td: Any) -> None: + """Emit the topology description changed APM event and log entry.""" + if self._publish_tp: + assert self._listeners is not None + self._enqueue( + self._listeners.publish_topology_description_changed, + (old_td, new_td, self._topology_id), + ) + if self._should_log: + self._emit_log( + _SDAMStatusMessage.TOPOLOGY_CHANGE, + previousDescription=repr(old_td), + newDescription=repr(new_td), + ) + + def topology_closed(self, old_td: Any, new_td: Any) -> None: + """Emit APM and log events for topology description change + topology closed.""" + if self._publish_tp: + assert self._listeners is not None + self._enqueue( + self._listeners.publish_topology_description_changed, + (old_td, new_td, self._topology_id), + ) + self._enqueue(self._listeners.publish_topology_closed, (self._topology_id,)) + if self._should_log: + self._emit_log( + _SDAMStatusMessage.TOPOLOGY_CHANGE, + previousDescription=repr(old_td), + newDescription=repr(new_td), + ) + self._emit_log(_SDAMStatusMessage.STOP_TOPOLOGY) + + def server_opened(self, address: _Address) -> None: + """Emit the server opened log entry and APM event.""" + if self._publish_server: + assert self._listeners is not None + self._enqueue(self._listeners.publish_server_opened, (address, self._topology_id)) + if self._should_log: + self._emit_log( + _SDAMStatusMessage.START_SERVER, + serverHost=address[0], + serverPort=address[1], + ) + + def server_description_changed(self, sd_old: Any, sd_new: Any, address: _Address) -> None: + """Emit the server description changed APM event.""" + if self._publish_server: + assert self._listeners is not None + self._enqueue( + self._listeners.publish_server_description_changed, + (sd_old, sd_new, address, self._topology_id), + ) + + def server_closed(self, address: _Address) -> None: + """Emit the server closed log entry and APM event.""" + if self._publish_server: + assert self._listeners is not None + self._enqueue(self._listeners.publish_server_closed, (address, self._topology_id)) + if self._should_log: + self._emit_log( + _SDAMStatusMessage.STOP_SERVER, + serverHost=address[0], + serverPort=address[1], + ) + + +class _ServerSelectionTelemetry: + """Structured logging for server selection events. + + The server selection spec defines only log entries, not APM events, so this + class has no publish methods. + + Construct once per :meth:`select_server` call. + """ + + __slots__ = ( + "_operation", + "_operation_id", + "_selector", + "_should_log", + "_topology_description", + "_topology_id", + ) + + def __init__( + self, + topology_id: Any, + selector: Any, + operation: str, + operation_id: Optional[int], + topology_description: Any, + ) -> None: + self._topology_id = topology_id + self._selector = selector + self._operation = operation + self._operation_id = operation_id + self._topology_description = topology_description + # Cached at construction: this object is short-lived (one select_server call) so + # logging level is stable for its lifetime. + self._should_log = _SERVER_SELECTION_LOGGER.isEnabledFor(logging.DEBUG) + + def _emit_log( + self, message: _ServerSelectionStatusMessage, topology_description: Any, **extra: Any + ) -> None: + _debug_log( + _SERVER_SELECTION_LOGGER, + message=message, + clientId=self._topology_id, + selector=self._selector, + operation=self._operation, + operationId=self._operation_id, + topologyDescription=topology_description, + **extra, + ) + + def started(self) -> None: + """Emit the server selection STARTED log entry.""" + if self._should_log: + self._emit_log(_ServerSelectionStatusMessage.STARTED, self._topology_description) + + def waiting(self, remaining_time_ms: int) -> None: + """Emit the server selection WAITING log entry.""" + if self._should_log: + self._emit_log( + _ServerSelectionStatusMessage.WAITING, + self._topology_description, + remainingTimeMS=remaining_time_ms, + ) + + def failed(self, failure: str, topology_description: Any) -> None: + """Emit the server selection FAILED log entry with the current topology description.""" + if self._should_log: + self._emit_log( + _ServerSelectionStatusMessage.FAILED, + topology_description, + failure=failure, + ) + + def succeeded(self, server_host: str, server_port: Optional[int]) -> None: + """Emit the server selection SUCCEEDED log entry.""" + if self._should_log: + self._emit_log( + _ServerSelectionStatusMessage.SUCCEEDED, + self._topology_description, + serverHost=server_host, + serverPort=server_port, + ) + + +def log_srv_monitor_failure(failure: Exception) -> None: + """Emit a log entry when the SRV monitor fails to poll DNS records.""" + if _SDAM_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log(_SDAM_LOGGER, message="SRV monitor check failed", failure=repr(failure)) + + +def log_command_retry( + topology_id: Any, + command_name: str, + operation_id: Optional[int], + attempt_number: int, + is_write: bool, +) -> None: + """Emit a command-retry log entry.""" + if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): + op = "write" if is_write else "read" + _debug_log( + _COMMAND_LOGGER, + message=f"Retrying {op} attempt number {attempt_number}", + clientId=topology_id, + commandName=command_name, + operationId=operation_id, + ) diff --git a/pymongo/asynchronous/mongo_client.py b/pymongo/asynchronous/mongo_client.py index a558f96356..3e811d3c0e 100644 --- a/pymongo/asynchronous/mongo_client.py +++ b/pymongo/asynchronous/mongo_client.py @@ -57,6 +57,7 @@ from bson.codec_options import DEFAULT_CODEC_OPTIONS, CodecOptions, TypeRegistry from bson.timestamp import Timestamp from pymongo import _csot, common, helpers_shared, periodic_executor +from pymongo._telemetry import log_command_retry from pymongo.asynchronous import client_session, database, uri_parser from pymongo.asynchronous.change_stream import AsyncChangeStream, AsyncClusterChangeStream from pymongo.asynchronous.client_bulk import _AsyncClientBulk @@ -90,8 +91,6 @@ ) from pymongo.logger import ( _CLIENT_LOGGER, - _COMMAND_LOGGER, - _debug_log, _log_client_error, _log_or_warn, ) @@ -2991,6 +2990,15 @@ async def _get_server(self) -> Server: operation_id=self._operation_id, ) + def _log_retry(self, is_write: bool) -> None: + log_command_retry( + self._client._topology_id, + self._operation, + self._operation_id, + self._attempt_number, + is_write, + ) + async def _write(self) -> T: """Wrapper method for write-type retryable client executions @@ -3014,13 +3022,7 @@ async def _write(self) -> T: self._check_last_error() self._retryable = False if self._retrying: - _debug_log( - _COMMAND_LOGGER, - message=f"Retrying write attempt number {self._attempt_number}", - clientId=self._client._topology_id, - commandName=self._operation, - operationId=self._operation_id, - ) + self._log_retry(is_write=True) return await self._func(self._session, conn, self._retryable) # type: ignore except PyMongoError as exc: if not self._retryable: @@ -3043,13 +3045,7 @@ async def _read(self) -> T: if self._retrying and not self._retryable and not self._always_retryable: self._check_last_error() if self._retrying: - _debug_log( - _COMMAND_LOGGER, - message=f"Retrying read attempt number {self._attempt_number}", - clientId=self._client._topology_settings._topology_id, - commandName=self._operation, - operationId=self._operation_id, - ) + self._log_retry(is_write=False) return await self._func(self._session, self._server, conn, read_pref) # type: ignore diff --git a/pymongo/asynchronous/monitor.py b/pymongo/asynchronous/monitor.py index 45c12b219f..f601f54a2d 100644 --- a/pymongo/asynchronous/monitor.py +++ b/pymongo/asynchronous/monitor.py @@ -18,18 +18,17 @@ import asyncio import atexit -import logging import time import weakref from typing import TYPE_CHECKING, Any, Optional from pymongo import common, periodic_executor from pymongo._csot import MovingMinimum +from pymongo._telemetry import _HeartbeatTelemetry, log_srv_monitor_failure from pymongo.asynchronous.srv_resolver import _SrvResolver from pymongo.errors import NetworkTimeout, _OperationCancelled from pymongo.hello import Hello from pymongo.lock import _async_create_lock -from pymongo.logger import _SDAM_LOGGER, _debug_log, _SDAMStatusMessage from pymongo.periodic_executor import _shutdown_executors from pymongo.pool_options import _is_faas from pymongo.read_preferences import MovingAverage @@ -151,9 +150,9 @@ def __init__( self._pool = pool self._settings = topology_settings self._listeners = self._settings._pool_options._event_listeners - self._publish = self._listeners is not None and self._listeners.enabled_for_server_heartbeat self._cancel_context: Optional[_CancellationContext] = None self._conn_id: Optional[int] = None + self._current_hb: Optional[_HeartbeatTelemetry] = None self._rtt_monitor = _RttMonitor( topology, topology_settings, @@ -257,32 +256,16 @@ async def _check_server(self) -> ServerDescription: Returns a ServerDescription. """ self._conn_id = None - start = time.monotonic() + self._current_hb = None try: return await self._check_once() except ReferenceError: raise except Exception as error: _sanitize(error) - sd = self._server_description - address = sd.address - duration = _monotonic_duration(start) - awaited = bool(self._stream and sd.is_server_type_known and sd.topology_version) - if self._publish: - assert self._listeners is not None - self._listeners.publish_server_heartbeat_failed(address, duration, error, awaited) - if _SDAM_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _SDAM_LOGGER, - message=_SDAMStatusMessage.HEARTBEAT_FAIL, - topologyId=self._topology._topology_id, - serverHost=address[0], - serverPort=address[1], - awaited=awaited, - durationMS=duration * 1000, - failure=error, - driverConnectionId=self._conn_id, - ) + address = self._server_description.address + if self._current_hb is not None: + self._current_hb.failed(error, self._conn_id) await self._reset_connection() if isinstance(error, _OperationCancelled): raise @@ -303,25 +286,14 @@ async def _check_once(self) -> ServerDescription: awaited = bool( self._pool.conns and self._stream and sd.is_server_type_known and sd.topology_version ) - if self._publish: - assert self._listeners is not None - self._listeners.publish_server_heartbeat_started(address, awaited) + hb = _HeartbeatTelemetry(self._topology._topology_id, address, self._listeners, awaited) + self._current_hb = hb + hb.started() if self._cancel_context and self._cancel_context.cancelled: await self._reset_connection() async with self._pool.checkout() as conn: - if _SDAM_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _SDAM_LOGGER, - message=_SDAMStatusMessage.HEARTBEAT_START, - topologyId=self._topology._topology_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=address[0], - serverPort=address[1], - awaited=awaited, - ) - + hb.emit_started_log(conn.id, conn.server_connection_id) self._cancel_context = conn.cancel_context # Record the connection id so we can later attach it to the failed log message. self._conn_id = conn.id @@ -331,24 +303,7 @@ async def _check_once(self) -> ServerDescription: avg_rtt, min_rtt = await self._rtt_monitor.get() sd = ServerDescription(address, response, avg_rtt, min_round_trip_time=min_rtt) - if self._publish: - assert self._listeners is not None - self._listeners.publish_server_heartbeat_succeeded( - address, round_trip_time, response, response.awaitable - ) - if _SDAM_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _SDAM_LOGGER, - message=_SDAMStatusMessage.HEARTBEAT_SUCCESS, - topologyId=self._topology._topology_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=address[0], - serverPort=address[1], - awaited=awaited, - durationMS=round_trip_time * 1000, - reply=response.document, - ) + hb.succeeded(round_trip_time, response, conn.id, conn.server_connection_id) return sd async def _check_with_socket(self, conn: AsyncConnection) -> tuple[Hello, float]: # type: ignore[type-arg] @@ -429,7 +384,7 @@ async def _get_seedlist(self) -> Optional[list[tuple[str, Any]]]: # - SRV records must be rescanned every heartbeatFrequencyMS # - Topology must be left unchanged self.request_check() - _debug_log(_SDAM_LOGGER, message="SRV monitor check failed", failure=repr(exc)) + log_srv_monitor_failure(exc) return None else: self._executor.update_interval(max(ttl, common.MIN_SRV_RESCAN_INTERVAL)) diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index 4ed3b85dbf..b3cfae8a45 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -17,7 +17,6 @@ import asyncio import collections import contextlib -import logging import os import socket import ssl @@ -35,6 +34,7 @@ from bson import DEFAULT_CODEC_OPTIONS from pymongo import _csot, helpers_shared +from pymongo._telemetry import _CmapTelemetry from pymongo.asynchronous.client_session import _validate_session_write_concern from pymongo.asynchronous.command_runner import run_command from pymongo.asynchronous.helpers import _handle_reauth @@ -65,12 +65,6 @@ _async_create_condition, _async_create_lock, ) -from pymongo.logger import ( - _CONNECTION_LOGGER, - _ConnectionStatusMessage, - _debug_log, - _verbose_connection_error_reason, -) from pymongo.monitoring import ( ConnectionCheckOutFailedReason, ConnectionClosedReason, @@ -134,6 +128,7 @@ def __init__( self.id = id self.is_sdam = is_sdam self.closed = False + self.creation_time = time.monotonic() self.last_checkin_time = time.monotonic() self.performed_handshake = False self.is_writable: bool = False @@ -145,8 +140,7 @@ def __init__( self.hello_ok: bool = False self.is_mongos = False self.listeners = pool.opts._event_listeners - self.enabled_for_cmap = pool.enabled_for_cmap - self.enabled_for_logging = pool.enabled_for_logging + self._telemetry = pool._telemetry self.compression_settings = pool.opts._compression_settings self.compression_context: Union[SnappyContext, ZlibContext, ZstdContext, None] = None self.socket_checker: SocketChecker = SocketChecker() @@ -173,8 +167,6 @@ def __init__( self.active = False self.last_timeout = self.opts.socket_timeout self.connect_rtt = 0.0 - self._client_id = pool._client_id - self.creation_time = time.monotonic() # For gossiping $clusterTime from the connection handshake to the client. self._cluster_time = None @@ -475,21 +467,7 @@ async def authenticate(self, reauthenticate: bool = False) -> None: await auth.authenticate(creds, self, reauthenticate=reauthenticate) self.ready = True - duration = time.monotonic() - self.creation_time - # Log before publishing event to prevent potential listener preemption in tests - if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - message=_ConnectionStatusMessage.CONN_READY, - clientId=self._client_id, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=self.id, - durationMS=duration, - ) - if self.enabled_for_cmap: - assert self.listeners is not None - self.listeners.publish_connection_ready(self.address, self.id, duration) + self._telemetry.connection_ready(self.id, self.creation_time) def validate_session( self, client: Optional[AsyncMongoClient[Any]], session: Optional[AsyncClientSession] @@ -510,20 +488,7 @@ async def close_conn(self, reason: Optional[str]) -> None: return await self._close_conn() if reason: - if self.enabled_for_cmap: - assert self.listeners is not None - self.listeners.publish_connection_closed(self.address, self.id, reason) - if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - message=_ConnectionStatusMessage.CONN_CLOSED, - clientId=self._client_id, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=self.id, - reason=_verbose_connection_error_reason(reason), - error=reason, - ) + self._telemetry.connection_closed(self.id, reason) async def _close_conn(self) -> None: """Close this connection.""" @@ -699,13 +664,6 @@ def __init__( self.address = address self.opts = options self.is_sdam = is_sdam - # Don't publish events or logs in Monitor pools. - self.enabled_for_cmap = ( - not self.is_sdam - and self.opts._event_listeners is not None - and self.opts._event_listeners.enabled_for_cmap - ) - self.enabled_for_logging = not self.is_sdam # The first portion of the wait queue. # Enforces: maxPoolSize @@ -721,25 +679,13 @@ def __init__( self._max_connecting_cond = _async_create_condition(self.lock) self._pending = 0 self._max_connecting = self.opts.max_connecting - self._client_id = client_id self._ssl_session_cache: Optional[list[Any]] = ( [None] if self.opts._ssl_context is not None else None ) - # Log before publishing event to prevent potential listener preemption in tests - if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - message=_ConnectionStatusMessage.POOL_CREATED, - clientId=self._client_id, - serverHost=self.address[0], - serverPort=self.address[1], - **self.opts.non_default_options, - ) - if self.enabled_for_cmap: - assert self.opts._event_listeners is not None - self.opts._event_listeners.publish_pool_created( - self.address, self.opts.non_default_options - ) + self._telemetry = _CmapTelemetry( + client_id, address, options._event_listeners, publish=not is_sdam, log=not is_sdam + ) + self._telemetry.pool_created(self.opts.non_default_options) # Similar to active_sockets but includes threads in the wait queue. self.operation_count: int = 0 # Retain references to pinned connections to prevent the CPython GC @@ -754,17 +700,7 @@ async def ready(self) -> None: async with self.lock: if self.state != PoolState.READY: self.state = PoolState.READY - if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - message=_ConnectionStatusMessage.POOL_READY, - clientId=self._client_id, - serverHost=self.address[0], - serverPort=self.address[1], - ) - if self.enabled_for_cmap: - assert self.opts._event_listeners is not None - self.opts._event_listeners.publish_pool_ready(self.address) + self._telemetry.pool_ready() @property def closed(self) -> bool: @@ -812,23 +748,7 @@ async def _reset( # and free-threaded Python causes ConnectionCheckOutFailedEvent to # arrive before PoolClearedEvent (PYTHON-3519). if not close and old_state != PoolState.PAUSED: - _listeners = self.opts._event_listeners - if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - message=_ConnectionStatusMessage.POOL_CLEARED, - clientId=self._client_id, - serverHost=self.address[0], - serverPort=self.address[1], - serviceId=service_id, - ) - if self.enabled_for_cmap: - assert _listeners is not None - _listeners.publish_pool_cleared( - self.address, - service_id=service_id, - interrupt_connections=interrupt_connections, - ) + self._telemetry.pool_cleared(service_id, interrupt_connections) # Clear the wait queue self._max_connecting_cond.notify_all() @@ -838,7 +758,6 @@ async def _reset( for context in self.active_contexts: context.cancel() - listeners = self.opts._event_listeners # CMAP spec says that close() MUST close sockets before publishing the # PoolClosedEvent but that reset() SHOULD close sockets *after* # publishing the PoolClearedEvent. @@ -851,17 +770,7 @@ async def _reset( else: for conn in sockets: await conn.close_conn(ConnectionClosedReason.POOL_CLOSED) - if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - message=_ConnectionStatusMessage.POOL_CLOSED, - clientId=self._client_id, - serverHost=self.address[0], - serverPort=self.address[1], - ) - if self.enabled_for_cmap: - assert listeners is not None - listeners.publish_pool_closed(self.address) + self._telemetry.pool_closed() else: if not _IS_SYNC: await asyncio.gather( @@ -996,20 +905,7 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A tmp_context = _CancellationContext() self.active_contexts.add(tmp_context) - listeners = self.opts._event_listeners - # Log before publishing event to prevent potential listener preemption in tests - if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - message=_ConnectionStatusMessage.CONN_CREATED, - clientId=self._client_id, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=conn_id, - ) - if self.enabled_for_cmap: - assert listeners is not None - listeners.publish_connection_created(self.address, conn_id) + self._telemetry.connection_created(conn_id) try: networking_interface = await _configured_protocol_interface( @@ -1019,22 +915,7 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A except BaseException as error: async with self.lock: self.active_contexts.discard(tmp_context) - if self.enabled_for_cmap: - assert listeners is not None - listeners.publish_connection_closed( - self.address, conn_id, ConnectionClosedReason.ERROR - ) - if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - message=_ConnectionStatusMessage.CONN_CLOSED, - clientId=self._client_id, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=conn_id, - reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR), - error=ConnectionClosedReason.ERROR, - ) + self._telemetry.connection_closed(conn_id, ConnectionClosedReason.ERROR) if isinstance(error, (IOError, OSError, *SSLErrors)): details = _get_timeout_details(self.opts) # Wrap to AutoReconnect/NetworkTimeout BEFORE labeling so the @@ -1096,36 +977,11 @@ async def checkout( :param handler: A _MongoClientErrorHandler. """ - listeners = self.opts._event_listeners - checkout_started_time = time.monotonic() - if self.enabled_for_cmap: - assert listeners is not None - listeners.publish_connection_check_out_started(self.address) - if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - message=_ConnectionStatusMessage.CHECKOUT_STARTED, - clientId=self._client_id, - serverHost=self.address[0], - serverPort=self.address[1], - ) + checkout_started_time = self._telemetry.checkout_started() conn = await self._get_conn(checkout_started_time, handler=handler) - duration = time.monotonic() - checkout_started_time - if self.enabled_for_cmap: - assert listeners is not None - listeners.publish_connection_checked_out(self.address, conn.id, duration) - if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - message=_ConnectionStatusMessage.CHECKOUT_SUCCEEDED, - clientId=self._client_id, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=conn.id, - durationMS=duration, - ) + self._telemetry.checkout_succeeded(conn.id, checkout_started_time) try: async with self.lock: self.active_contexts.add(conn.cancel_context) @@ -1159,23 +1015,11 @@ async def checkout( def _raise_if_not_ready(self, checkout_started_time: float, emit_event: bool) -> None: if self.state != PoolState.READY: if emit_event: - duration = time.monotonic() - checkout_started_time - if self.enabled_for_cmap: - assert self.opts._event_listeners is not None - self.opts._event_listeners.publish_connection_check_out_failed( - self.address, ConnectionCheckOutFailedReason.CONN_ERROR, duration - ) - if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - message=_ConnectionStatusMessage.CHECKOUT_FAILED, - clientId=self._client_id, - serverHost=self.address[0], - serverPort=self.address[1], - reason="An error occurred while trying to establish a new connection", - error=ConnectionCheckOutFailedReason.CONN_ERROR, - durationMS=duration, - ) + self._telemetry.checkout_failed( + "An error occurred while trying to establish a new connection", + ConnectionCheckOutFailedReason.CONN_ERROR, + checkout_started_time, + ) details = _get_timeout_details(self.opts) _raise_connection_failure( @@ -1193,23 +1037,11 @@ async def _get_conn( await self.reset_without_pause() if self.closed: - duration = time.monotonic() - checkout_started_time - if self.enabled_for_cmap: - assert self.opts._event_listeners is not None - self.opts._event_listeners.publish_connection_check_out_failed( - self.address, ConnectionCheckOutFailedReason.POOL_CLOSED, duration - ) - if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - message=_ConnectionStatusMessage.CHECKOUT_FAILED, - clientId=self._client_id, - serverHost=self.address[0], - serverPort=self.address[1], - reason="Connection pool was closed", - error=ConnectionCheckOutFailedReason.POOL_CLOSED, - durationMS=duration, - ) + self._telemetry.checkout_failed( + "Connection pool was closed", + ConnectionCheckOutFailedReason.POOL_CLOSED, + checkout_started_time, + ) raise _PoolClosedError( "Attempted to check out a connection from closed connection pool" ) @@ -1289,23 +1121,11 @@ async def _get_conn( self.size_cond.notify() if not emitted_event: - duration = time.monotonic() - checkout_started_time - if self.enabled_for_cmap: - assert self.opts._event_listeners is not None - self.opts._event_listeners.publish_connection_check_out_failed( - self.address, ConnectionCheckOutFailedReason.CONN_ERROR, duration - ) - if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - message=_ConnectionStatusMessage.CHECKOUT_FAILED, - clientId=self._client_id, - serverHost=self.address[0], - serverPort=self.address[1], - reason="An error occurred while trying to establish a new connection", - error=ConnectionCheckOutFailedReason.CONN_ERROR, - durationMS=duration, - ) + self._telemetry.checkout_failed( + "An error occurred while trying to establish a new connection", + ConnectionCheckOutFailedReason.CONN_ERROR, + checkout_started_time, + ) raise conn.active = True @@ -1322,21 +1142,9 @@ async def checkin(self, conn: AsyncConnection) -> None: conn.pinned_txn = False conn.pinned_cursor = False self.__pinned_sockets.discard(conn) - listeners = self.opts._event_listeners async with self.lock: self.active_contexts.discard(conn.cancel_context) - if self.enabled_for_cmap: - assert listeners is not None - listeners.publish_connection_checked_in(self.address, conn.id) - if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - message=_ConnectionStatusMessage.CHECKEDIN, - clientId=self._client_id, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=conn.id, - ) + self._telemetry.checked_in(conn.id) if self.pid != os.getpid(): await self.reset_without_pause() else: @@ -1344,22 +1152,7 @@ async def checkin(self, conn: AsyncConnection) -> None: await conn.close_conn(ConnectionClosedReason.POOL_CLOSED) elif conn.closed: # CMAP requires the closed event be emitted after the check in. - if self.enabled_for_cmap: - assert listeners is not None - listeners.publish_connection_closed( - self.address, conn.id, ConnectionClosedReason.ERROR - ) - if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - message=_ConnectionStatusMessage.CONN_CLOSED, - clientId=self._client_id, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=conn.id, - reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR), - error=ConnectionClosedReason.ERROR, - ) + self._telemetry.connection_closed(conn.id, ConnectionClosedReason.ERROR) else: close_conn = False async with self.lock: @@ -1424,24 +1217,11 @@ async def _perished(self, conn: AsyncConnection) -> bool: return False def _raise_wait_queue_timeout(self, checkout_started_time: float) -> NoReturn: - listeners = self.opts._event_listeners - duration = time.monotonic() - checkout_started_time - if self.enabled_for_cmap: - assert listeners is not None - listeners.publish_connection_check_out_failed( - self.address, ConnectionCheckOutFailedReason.TIMEOUT, duration - ) - if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - message=_ConnectionStatusMessage.CHECKOUT_FAILED, - clientId=self._client_id, - serverHost=self.address[0], - serverPort=self.address[1], - reason="Wait queue timeout elapsed without a connection becoming available", - error=ConnectionCheckOutFailedReason.TIMEOUT, - durationMS=duration, - ) + self._telemetry.checkout_failed( + "Wait queue timeout elapsed without a connection becoming available", + ConnectionCheckOutFailedReason.TIMEOUT, + checkout_started_time, + ) timeout = _csot.get_timeout() or self.opts.wait_queue_timeout if self.opts.load_balanced: other_ops = self.active_sockets - self.ncursors - self.ntxns diff --git a/pymongo/asynchronous/server.py b/pymongo/asynchronous/server.py index 57158dfc44..85b7fa1ad8 100644 --- a/pymongo/asynchronous/server.py +++ b/pymongo/asynchronous/server.py @@ -16,7 +16,6 @@ from __future__ import annotations -import logging from contextlib import AbstractAsyncContextManager from typing import ( TYPE_CHECKING, @@ -26,13 +25,9 @@ Union, ) +from pymongo._telemetry import _SdamTelemetry from pymongo.asynchronous.command_runner import run_cursor_command from pymongo.asynchronous.helpers import _handle_reauth -from pymongo.logger import ( - _SDAM_LOGGER, - _debug_log, - _SDAMStatusMessage, -) from pymongo.message import _GetMore, _OpMsg, _Query from pymongo.response import PinnedResponse, Response @@ -69,11 +64,8 @@ def __init__( self._pool = pool self._monitor = monitor self._topology_id = topology_id - self._publish = listeners is not None and listeners.enabled_for_server - self._listener = listeners - self._events = None - if self._publish: - self._events = events() # type: ignore[misc] + _events = events() if listeners is not None and listeners.enabled_for_server else None # type: ignore[misc] + self._sdam = _SdamTelemetry(topology_id, listeners, _events) # type: ignore[arg-type] async def open(self) -> None: """Start monitoring, or restart after a fork. @@ -92,23 +84,7 @@ async def close(self) -> None: Reconnect with open(). """ - if self._publish: - assert self._listener is not None - assert self._events is not None - self._events.put( - ( - self._listener.publish_server_closed, - (self._description.address, self._topology_id), - ) - ) - if _SDAM_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _SDAM_LOGGER, - message=_SDAMStatusMessage.STOP_SERVER, - topologyId=self._topology_id, - serverHost=self._description.address[0], - serverPort=self._description.address[1], - ) + self._sdam.server_closed(self._description.address) await self._monitor.close() await self._pool.close() diff --git a/pymongo/asynchronous/topology.py b/pymongo/asynchronous/topology.py index 6af075cdc2..f3032a6775 100644 --- a/pymongo/asynchronous/topology.py +++ b/pymongo/asynchronous/topology.py @@ -17,7 +17,6 @@ from __future__ import annotations import asyncio -import logging import os import queue import random @@ -30,6 +29,7 @@ from typing import TYPE_CHECKING, Any, Callable, Optional, cast from pymongo import _csot, common, helpers_shared, periodic_executor +from pymongo._telemetry import _SdamTelemetry, _ServerSelectionTelemetry from pymongo.asynchronous.client_session import _ServerSession, _ServerSessionPool from pymongo.asynchronous.monitor import MonitorBase, SrvMonitor from pymongo.asynchronous.pool import Pool @@ -51,13 +51,6 @@ _async_create_condition, _async_create_lock, ) -from pymongo.logger import ( - _SDAM_LOGGER, - _SERVER_SELECTION_LOGGER, - _debug_log, - _SDAMStatusMessage, - _ServerSelectionStatusMessage, -) from pymongo.pool_options import PoolOptions from pymongo.server_description import ServerDescription from pymongo.server_selectors import ( @@ -108,27 +101,19 @@ class Topology: def __init__(self, topology_settings: TopologySettings): self._topology_id = topology_settings._topology_id self._listeners = topology_settings._pool_options._event_listeners - self._publish_server = self._listeners is not None and self._listeners.enabled_for_server - self._publish_tp = self._listeners is not None and self._listeners.enabled_for_topology # Create events queue if there are publishers. self._events: queue.Queue[Any] | None = None self.__events_executor: Any = None - if self._publish_server or self._publish_tp: + publish_server = self._listeners is not None and self._listeners.enabled_for_server + publish_tp = self._listeners is not None and self._listeners.enabled_for_topology + if publish_server or publish_tp: self._events = queue.Queue(maxsize=100) - if _SDAM_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _SDAM_LOGGER, - message=_SDAMStatusMessage.START_TOPOLOGY, - topologyId=self._topology_id, - ) + self._sdam = _SdamTelemetry(self._topology_id, self._listeners, self._events) + self._sdam.topology_opened() - if self._publish_tp: - assert self._events is not None - assert self._listeners is not None - self._events.put((self._listeners.publish_topology_opened, (self._topology_id,))) self._settings = topology_settings topology_description = TopologyDescription( topology_settings.get_topology_type(), @@ -143,37 +128,10 @@ def __init__(self, topology_settings: TopologySettings): initial_td = TopologyDescription( TOPOLOGY_TYPE.Unknown, {}, None, None, None, self._settings ) - if self._publish_tp: - assert self._events is not None - assert self._listeners is not None - self._events.put( - ( - self._listeners.publish_topology_description_changed, - (initial_td, self._description, self._topology_id), - ) - ) - if _SDAM_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _SDAM_LOGGER, - message=_SDAMStatusMessage.TOPOLOGY_CHANGE, - topologyId=self._topology_id, - previousDescription=repr(initial_td), - newDescription=repr(self._description), - ) + self._sdam.topology_description_changed(initial_td, self._description) for seed in topology_settings.seeds: - if self._publish_server: - assert self._events is not None - assert self._listeners is not None - self._events.put((self._listeners.publish_server_opened, (seed, self._topology_id))) - if _SDAM_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _SDAM_LOGGER, - message=_SDAMStatusMessage.START_SERVER, - topologyId=self._topology_id, - serverHost=seed[0], - serverPort=seed[1], - ) + self._sdam.server_opened(seed) # Store the seed list to help diagnose errors in _error_message(). self._seed_addresses = list(topology_description.server_descriptions()) @@ -188,7 +146,7 @@ def __init__(self, topology_settings: TopologySettings): self._max_cluster_time: Optional[ClusterTime] = None self._session_pool = _ServerSessionPool() - if self._publish_server or self._publish_tp: + if self._sdam._publish_server or self._sdam._publish_tp: assert self._events is not None weak: weakref.ReferenceType[queue.Queue[Any]] @@ -269,7 +227,7 @@ async def select_servers( address: Optional[_Address] = None, operation_id: Optional[int] = None, deprioritized_servers: Optional[list[Server]] = None, - ) -> list[Server]: + ) -> tuple[list[Server], _ServerSelectionTelemetry]: """Return a list of Servers matching selector, or time out. :param selector: function that takes a list of Servers and returns @@ -295,7 +253,7 @@ async def select_servers( await self.cleanup_monitors() async with self._lock: - server_descriptions = await self._select_servers_loop( + server_descriptions, ss = await self._select_servers_loop( selector, server_timeout, operation, @@ -306,7 +264,7 @@ async def select_servers( return [ cast(Server, self.get_server_by_address(sd.address)) for sd in server_descriptions - ] + ], ss async def _select_servers_loop( self, @@ -316,22 +274,15 @@ async def _select_servers_loop( operation_id: Optional[int], address: Optional[_Address], deprioritized_servers: Optional[list[Server]] = None, - ) -> list[ServerDescription]: + ) -> tuple[list[ServerDescription], _ServerSelectionTelemetry]: """select_servers() guts. Hold the lock when calling this.""" now = time.monotonic() end_time = now + timeout logged_waiting = False - - if _SERVER_SELECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _SERVER_SELECTION_LOGGER, - message=_ServerSelectionStatusMessage.STARTED, - selector=selector, - operation=operation, - operationId=operation_id, - topologyDescription=self.description, - clientId=self.description._topology_settings._topology_id, - ) + ss = _ServerSelectionTelemetry( + self._topology_id, selector, operation, operation_id, self.description + ) + ss.started() server_descriptions = self._description.apply_selector( selector, @@ -345,32 +296,13 @@ async def _select_servers_loop( while not server_descriptions: # No suitable servers. if timeout == 0 or now > end_time: - if _SERVER_SELECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _SERVER_SELECTION_LOGGER, - message=_ServerSelectionStatusMessage.FAILED, - selector=selector, - operation=operation, - operationId=operation_id, - topologyDescription=self.description, - clientId=self.description._topology_settings._topology_id, - failure=self._error_message(selector), - ) + ss.failed(self._error_message(selector), self.description) raise ServerSelectionTimeoutError( f"{self._error_message(selector)}, Timeout: {timeout}s, Topology Description: {self.description!r}" ) if not logged_waiting: - _debug_log( - _SERVER_SELECTION_LOGGER, - message=_ServerSelectionStatusMessage.WAITING, - selector=selector, - operation=operation, - operationId=operation_id, - topologyDescription=self.description, - clientId=self.description._topology_settings._topology_id, - remainingTimeMS=int(1000 * (end_time - time.monotonic())), - ) + ss.waiting(int(1000 * (end_time - time.monotonic()))) logged_waiting = True await self._ensure_opened() @@ -388,7 +320,7 @@ async def _select_servers_loop( ) self._description.check_compatible() - return server_descriptions + return server_descriptions, ss async def _select_server( self, @@ -398,8 +330,8 @@ async def _select_server( address: Optional[_Address] = None, deprioritized_servers: Optional[list[Server]] = None, operation_id: Optional[int] = None, - ) -> Server: - servers = await self.select_servers( + ) -> tuple[Server, _ServerSelectionTelemetry]: + servers, ss = await self.select_servers( selector, operation, server_selection_timeout, @@ -408,12 +340,12 @@ async def _select_server( deprioritized_servers, ) if len(servers) == 1: - return servers[0] + return servers[0], ss server1, server2 = random.sample(servers, 2) if server1.pool.operation_count <= server2.pool.operation_count: - return server1 + return server1, ss else: - return server2 + return server2, ss async def select_server( self, @@ -425,7 +357,7 @@ async def select_server( operation_id: Optional[int] = None, ) -> Server: """Like select_servers, but choose a random server if several match.""" - server = await self._select_server( + server, ss = await self._select_server( selector, operation, server_selection_timeout, @@ -435,18 +367,7 @@ async def select_server( ) if _csot.get_timeout(): _csot.set_rtt(server.description.min_round_trip_time) - if _SERVER_SELECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _SERVER_SELECTION_LOGGER, - message=_ServerSelectionStatusMessage.SUCCEEDED, - selector=selector, - operation=operation, - operationId=operation_id, - topologyDescription=self.description, - clientId=self.description._topology_settings._topology_id, - serverHost=server.description.address[0], - serverPort=server.description.address[1], - ) + ss.succeeded(server.description.address[0], server.description.address[1]) return server async def select_server_by_address( @@ -508,36 +429,16 @@ async def _process_change( await server.pool.ready() suppress_event = sd_old == server_description - if self._publish_server and not suppress_event: - assert self._events is not None - assert self._listeners is not None - self._events.put( - ( - self._listeners.publish_server_description_changed, - (sd_old, server_description, server_description.address, self._topology_id), - ) + if not suppress_event: + self._sdam.server_description_changed( + sd_old, server_description, server_description.address ) self._description = new_td await self._update_servers() - if self._publish_tp and not suppress_event: - assert self._events is not None - assert self._listeners is not None - self._events.put( - ( - self._listeners.publish_topology_description_changed, - (td_old, self._description, self._topology_id), - ) - ) - if _SDAM_LOGGER.isEnabledFor(logging.DEBUG) and not suppress_event: - _debug_log( - _SDAM_LOGGER, - message=_SDAMStatusMessage.TOPOLOGY_CHANGE, - topologyId=self._topology_id, - previousDescription=repr(td_old), - newDescription=repr(self._description), - ) + if not suppress_event: + self._sdam.topology_description_changed(td_old, self._description) # Shutdown SRV polling for unsupported cluster types. # This is only applicable if the old topology was Unknown, and the @@ -588,24 +489,7 @@ async def _process_srv_update(self, seedlist: list[tuple[str, Any]]) -> None: self._description = _updated_topology_description_srv_polling(self._description, seedlist) await self._update_servers() - - if self._publish_tp: - assert self._events is not None - assert self._listeners is not None - self._events.put( - ( - self._listeners.publish_topology_description_changed, - (td_old, self._description, self._topology_id), - ) - ) - if _SDAM_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _SDAM_LOGGER, - message=_SDAMStatusMessage.TOPOLOGY_CHANGE, - topologyId=self._topology_id, - previousDescription=repr(td_old), - newDescription=repr(self._description), - ) + self._sdam.topology_description_changed(td_old, self._description) async def on_srv_update(self, seedlist: list[tuple[str, Any]]) -> None: """Process a new list of nodes obtained from scanning SRV records.""" @@ -743,9 +627,7 @@ async def close(self) -> None: self._closed = True # Publish only after releasing the lock. - if self._publish_tp: - assert self._events is not None - assert self._listeners is not None + if self._sdam._publish_tp: self._description = TopologyDescription( TOPOLOGY_TYPE.Unknown, {}, @@ -754,30 +636,9 @@ async def close(self) -> None: self._description.max_election_id, self._description._topology_settings, ) - self._events.put( - ( - self._listeners.publish_topology_description_changed, - ( - old_td, - self._description, - self._topology_id, - ), - ) - ) - self._events.put((self._listeners.publish_topology_closed, (self._topology_id,))) - if _SDAM_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _SDAM_LOGGER, - message=_SDAMStatusMessage.TOPOLOGY_CHANGE, - topologyId=self._topology_id, - previousDescription=repr(old_td), - newDescription=repr(self._description), - ) - _debug_log( - _SDAM_LOGGER, message=_SDAMStatusMessage.STOP_TOPOLOGY, topologyId=self._topology_id - ) + self._sdam.topology_closed(old_td, self._description) - if self._publish_server or self._publish_tp: + if self._sdam._publish_server or self._sdam._publish_tp: # Make sure the events executor thread is fully closed before publishing the remaining events self.__events_executor.close() await self.__events_executor.join(1) @@ -818,7 +679,7 @@ async def _ensure_opened(self) -> None: await self._update_servers() # Start or restart the events publishing thread. - if self._publish_tp or self._publish_server: + if self._sdam._publish_tp or self._sdam._publish_server: self.__events_executor.open() # Start the SRV polling thread. @@ -957,7 +818,7 @@ async def _update_servers(self) -> None: ) weak = None - if self._publish_server and self._events is not None: + if self._sdam._publish_server and self._events is not None: weak = weakref.ref(self._events) server = Server( server_description=sd, diff --git a/pymongo/synchronous/mongo_client.py b/pymongo/synchronous/mongo_client.py index 5f321afe5c..8a3c969242 100644 --- a/pymongo/synchronous/mongo_client.py +++ b/pymongo/synchronous/mongo_client.py @@ -57,6 +57,7 @@ from bson.codec_options import DEFAULT_CODEC_OPTIONS, CodecOptions, TypeRegistry from bson.timestamp import Timestamp from pymongo import _csot, common, helpers_shared, periodic_executor +from pymongo._telemetry import log_command_retry from pymongo.client_options import ClientOptions from pymongo.driver_info import DriverInfo from pymongo.errors import ( @@ -80,8 +81,6 @@ ) from pymongo.logger import ( _CLIENT_LOGGER, - _COMMAND_LOGGER, - _debug_log, _log_client_error, _log_or_warn, ) @@ -2982,6 +2981,15 @@ def _get_server(self) -> Server: operation_id=self._operation_id, ) + def _log_retry(self, is_write: bool) -> None: + log_command_retry( + self._client._topology_id, + self._operation, + self._operation_id, + self._attempt_number, + is_write, + ) + def _write(self) -> T: """Wrapper method for write-type retryable client executions @@ -3005,13 +3013,7 @@ def _write(self) -> T: self._check_last_error() self._retryable = False if self._retrying: - _debug_log( - _COMMAND_LOGGER, - message=f"Retrying write attempt number {self._attempt_number}", - clientId=self._client._topology_id, - commandName=self._operation, - operationId=self._operation_id, - ) + self._log_retry(is_write=True) return self._func(self._session, conn, self._retryable) # type: ignore except PyMongoError as exc: if not self._retryable: @@ -3034,13 +3036,7 @@ def _read(self) -> T: if self._retrying and not self._retryable and not self._always_retryable: self._check_last_error() if self._retrying: - _debug_log( - _COMMAND_LOGGER, - message=f"Retrying read attempt number {self._attempt_number}", - clientId=self._client._topology_settings._topology_id, - commandName=self._operation, - operationId=self._operation_id, - ) + self._log_retry(is_write=False) return self._func(self._session, self._server, conn, read_pref) # type: ignore diff --git a/pymongo/synchronous/monitor.py b/pymongo/synchronous/monitor.py index f395588814..694e1f8a09 100644 --- a/pymongo/synchronous/monitor.py +++ b/pymongo/synchronous/monitor.py @@ -18,17 +18,16 @@ import asyncio import atexit -import logging import time import weakref from typing import TYPE_CHECKING, Any, Optional from pymongo import common, periodic_executor from pymongo._csot import MovingMinimum +from pymongo._telemetry import _HeartbeatTelemetry, log_srv_monitor_failure from pymongo.errors import NetworkTimeout, _OperationCancelled from pymongo.hello import Hello from pymongo.lock import _create_lock -from pymongo.logger import _SDAM_LOGGER, _debug_log, _SDAMStatusMessage from pymongo.periodic_executor import _shutdown_executors from pymongo.pool_options import _is_faas from pymongo.read_preferences import MovingAverage @@ -151,9 +150,9 @@ def __init__( self._pool = pool self._settings = topology_settings self._listeners = self._settings._pool_options._event_listeners - self._publish = self._listeners is not None and self._listeners.enabled_for_server_heartbeat self._cancel_context: Optional[_CancellationContext] = None self._conn_id: Optional[int] = None + self._current_hb: Optional[_HeartbeatTelemetry] = None self._rtt_monitor = _RttMonitor( topology, topology_settings, @@ -255,32 +254,16 @@ def _check_server(self) -> ServerDescription: Returns a ServerDescription. """ self._conn_id = None - start = time.monotonic() + self._current_hb = None try: return self._check_once() except ReferenceError: raise except Exception as error: _sanitize(error) - sd = self._server_description - address = sd.address - duration = _monotonic_duration(start) - awaited = bool(self._stream and sd.is_server_type_known and sd.topology_version) - if self._publish: - assert self._listeners is not None - self._listeners.publish_server_heartbeat_failed(address, duration, error, awaited) - if _SDAM_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _SDAM_LOGGER, - message=_SDAMStatusMessage.HEARTBEAT_FAIL, - topologyId=self._topology._topology_id, - serverHost=address[0], - serverPort=address[1], - awaited=awaited, - durationMS=duration * 1000, - failure=error, - driverConnectionId=self._conn_id, - ) + address = self._server_description.address + if self._current_hb is not None: + self._current_hb.failed(error, self._conn_id) self._reset_connection() if isinstance(error, _OperationCancelled): raise @@ -301,25 +284,14 @@ def _check_once(self) -> ServerDescription: awaited = bool( self._pool.conns and self._stream and sd.is_server_type_known and sd.topology_version ) - if self._publish: - assert self._listeners is not None - self._listeners.publish_server_heartbeat_started(address, awaited) + hb = _HeartbeatTelemetry(self._topology._topology_id, address, self._listeners, awaited) + self._current_hb = hb + hb.started() if self._cancel_context and self._cancel_context.cancelled: self._reset_connection() with self._pool.checkout() as conn: - if _SDAM_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _SDAM_LOGGER, - message=_SDAMStatusMessage.HEARTBEAT_START, - topologyId=self._topology._topology_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=address[0], - serverPort=address[1], - awaited=awaited, - ) - + hb.emit_started_log(conn.id, conn.server_connection_id) self._cancel_context = conn.cancel_context # Record the connection id so we can later attach it to the failed log message. self._conn_id = conn.id @@ -329,24 +301,7 @@ def _check_once(self) -> ServerDescription: avg_rtt, min_rtt = self._rtt_monitor.get() sd = ServerDescription(address, response, avg_rtt, min_round_trip_time=min_rtt) - if self._publish: - assert self._listeners is not None - self._listeners.publish_server_heartbeat_succeeded( - address, round_trip_time, response, response.awaitable - ) - if _SDAM_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _SDAM_LOGGER, - message=_SDAMStatusMessage.HEARTBEAT_SUCCESS, - topologyId=self._topology._topology_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=address[0], - serverPort=address[1], - awaited=awaited, - durationMS=round_trip_time * 1000, - reply=response.document, - ) + hb.succeeded(round_trip_time, response, conn.id, conn.server_connection_id) return sd def _check_with_socket(self, conn: Connection) -> tuple[Hello, float]: # type: ignore[type-arg] @@ -427,7 +382,7 @@ def _get_seedlist(self) -> Optional[list[tuple[str, Any]]]: # - SRV records must be rescanned every heartbeatFrequencyMS # - Topology must be left unchanged self.request_check() - _debug_log(_SDAM_LOGGER, message="SRV monitor check failed", failure=repr(exc)) + log_srv_monitor_failure(exc) return None else: self._executor.update_interval(max(ttl, common.MIN_SRV_RESCAN_INTERVAL)) diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index 1006735444..9159ff704a 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -17,7 +17,6 @@ import asyncio import collections import contextlib -import logging import os import socket import ssl @@ -35,6 +34,7 @@ from bson import DEFAULT_CODEC_OPTIONS from pymongo import _csot, helpers_shared +from pymongo._telemetry import _CmapTelemetry from pymongo.common import ( MAX_BSON_SIZE, MAX_MESSAGE_SIZE, @@ -62,12 +62,6 @@ _create_condition, _create_lock, ) -from pymongo.logger import ( - _CONNECTION_LOGGER, - _ConnectionStatusMessage, - _debug_log, - _verbose_connection_error_reason, -) from pymongo.monitoring import ( ConnectionCheckOutFailedReason, ConnectionClosedReason, @@ -134,6 +128,7 @@ def __init__( self.id = id self.is_sdam = is_sdam self.closed = False + self.creation_time = time.monotonic() self.last_checkin_time = time.monotonic() self.performed_handshake = False self.is_writable: bool = False @@ -145,8 +140,7 @@ def __init__( self.hello_ok: bool = False self.is_mongos = False self.listeners = pool.opts._event_listeners - self.enabled_for_cmap = pool.enabled_for_cmap - self.enabled_for_logging = pool.enabled_for_logging + self._telemetry = pool._telemetry self.compression_settings = pool.opts._compression_settings self.compression_context: Union[SnappyContext, ZlibContext, ZstdContext, None] = None self.socket_checker: SocketChecker = SocketChecker() @@ -173,8 +167,6 @@ def __init__( self.active = False self.last_timeout = self.opts.socket_timeout self.connect_rtt = 0.0 - self._client_id = pool._client_id - self.creation_time = time.monotonic() # For gossiping $clusterTime from the connection handshake to the client. self._cluster_time = None @@ -475,21 +467,7 @@ def authenticate(self, reauthenticate: bool = False) -> None: auth.authenticate(creds, self, reauthenticate=reauthenticate) self.ready = True - duration = time.monotonic() - self.creation_time - # Log before publishing event to prevent potential listener preemption in tests - if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - message=_ConnectionStatusMessage.CONN_READY, - clientId=self._client_id, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=self.id, - durationMS=duration, - ) - if self.enabled_for_cmap: - assert self.listeners is not None - self.listeners.publish_connection_ready(self.address, self.id, duration) + self._telemetry.connection_ready(self.id, self.creation_time) def validate_session( self, client: Optional[MongoClient[Any]], session: Optional[ClientSession] @@ -508,20 +486,7 @@ def close_conn(self, reason: Optional[str]) -> None: return self._close_conn() if reason: - if self.enabled_for_cmap: - assert self.listeners is not None - self.listeners.publish_connection_closed(self.address, self.id, reason) - if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - message=_ConnectionStatusMessage.CONN_CLOSED, - clientId=self._client_id, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=self.id, - reason=_verbose_connection_error_reason(reason), - error=reason, - ) + self._telemetry.connection_closed(self.id, reason) def _close_conn(self) -> None: """Close this connection.""" @@ -697,13 +662,6 @@ def __init__( self.address = address self.opts = options self.is_sdam = is_sdam - # Don't publish events or logs in Monitor pools. - self.enabled_for_cmap = ( - not self.is_sdam - and self.opts._event_listeners is not None - and self.opts._event_listeners.enabled_for_cmap - ) - self.enabled_for_logging = not self.is_sdam # The first portion of the wait queue. # Enforces: maxPoolSize @@ -719,25 +677,13 @@ def __init__( self._max_connecting_cond = _create_condition(self.lock) self._pending = 0 self._max_connecting = self.opts.max_connecting - self._client_id = client_id self._ssl_session_cache: Optional[list[Any]] = ( [None] if self.opts._ssl_context is not None else None ) - # Log before publishing event to prevent potential listener preemption in tests - if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - message=_ConnectionStatusMessage.POOL_CREATED, - clientId=self._client_id, - serverHost=self.address[0], - serverPort=self.address[1], - **self.opts.non_default_options, - ) - if self.enabled_for_cmap: - assert self.opts._event_listeners is not None - self.opts._event_listeners.publish_pool_created( - self.address, self.opts.non_default_options - ) + self._telemetry = _CmapTelemetry( + client_id, address, options._event_listeners, publish=not is_sdam, log=not is_sdam + ) + self._telemetry.pool_created(self.opts.non_default_options) # Similar to active_sockets but includes threads in the wait queue. self.operation_count: int = 0 # Retain references to pinned connections to prevent the CPython GC @@ -752,17 +698,7 @@ def ready(self) -> None: with self.lock: if self.state != PoolState.READY: self.state = PoolState.READY - if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - message=_ConnectionStatusMessage.POOL_READY, - clientId=self._client_id, - serverHost=self.address[0], - serverPort=self.address[1], - ) - if self.enabled_for_cmap: - assert self.opts._event_listeners is not None - self.opts._event_listeners.publish_pool_ready(self.address) + self._telemetry.pool_ready() @property def closed(self) -> bool: @@ -810,23 +746,7 @@ def _reset( # and free-threaded Python causes ConnectionCheckOutFailedEvent to # arrive before PoolClearedEvent (PYTHON-3519). if not close and old_state != PoolState.PAUSED: - _listeners = self.opts._event_listeners - if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - message=_ConnectionStatusMessage.POOL_CLEARED, - clientId=self._client_id, - serverHost=self.address[0], - serverPort=self.address[1], - serviceId=service_id, - ) - if self.enabled_for_cmap: - assert _listeners is not None - _listeners.publish_pool_cleared( - self.address, - service_id=service_id, - interrupt_connections=interrupt_connections, - ) + self._telemetry.pool_cleared(service_id, interrupt_connections) # Clear the wait queue self._max_connecting_cond.notify_all() @@ -836,7 +756,6 @@ def _reset( for context in self.active_contexts: context.cancel() - listeners = self.opts._event_listeners # CMAP spec says that close() MUST close sockets before publishing the # PoolClosedEvent but that reset() SHOULD close sockets *after* # publishing the PoolClearedEvent. @@ -849,17 +768,7 @@ def _reset( else: for conn in sockets: conn.close_conn(ConnectionClosedReason.POOL_CLOSED) - if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - message=_ConnectionStatusMessage.POOL_CLOSED, - clientId=self._client_id, - serverHost=self.address[0], - serverPort=self.address[1], - ) - if self.enabled_for_cmap: - assert listeners is not None - listeners.publish_pool_closed(self.address) + self._telemetry.pool_closed() else: if not _IS_SYNC: asyncio.gather( @@ -992,20 +901,7 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect tmp_context = _CancellationContext() self.active_contexts.add(tmp_context) - listeners = self.opts._event_listeners - # Log before publishing event to prevent potential listener preemption in tests - if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - message=_ConnectionStatusMessage.CONN_CREATED, - clientId=self._client_id, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=conn_id, - ) - if self.enabled_for_cmap: - assert listeners is not None - listeners.publish_connection_created(self.address, conn_id) + self._telemetry.connection_created(conn_id) try: networking_interface = _configured_socket_interface( @@ -1015,22 +911,7 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect except BaseException as error: with self.lock: self.active_contexts.discard(tmp_context) - if self.enabled_for_cmap: - assert listeners is not None - listeners.publish_connection_closed( - self.address, conn_id, ConnectionClosedReason.ERROR - ) - if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - message=_ConnectionStatusMessage.CONN_CLOSED, - clientId=self._client_id, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=conn_id, - reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR), - error=ConnectionClosedReason.ERROR, - ) + self._telemetry.connection_closed(conn_id, ConnectionClosedReason.ERROR) if isinstance(error, (IOError, OSError, *SSLErrors)): details = _get_timeout_details(self.opts) # Wrap to AutoReconnect/NetworkTimeout BEFORE labeling so the @@ -1092,36 +973,11 @@ def checkout( :param handler: A _MongoClientErrorHandler. """ - listeners = self.opts._event_listeners - checkout_started_time = time.monotonic() - if self.enabled_for_cmap: - assert listeners is not None - listeners.publish_connection_check_out_started(self.address) - if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - message=_ConnectionStatusMessage.CHECKOUT_STARTED, - clientId=self._client_id, - serverHost=self.address[0], - serverPort=self.address[1], - ) + checkout_started_time = self._telemetry.checkout_started() conn = self._get_conn(checkout_started_time, handler=handler) - duration = time.monotonic() - checkout_started_time - if self.enabled_for_cmap: - assert listeners is not None - listeners.publish_connection_checked_out(self.address, conn.id, duration) - if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - message=_ConnectionStatusMessage.CHECKOUT_SUCCEEDED, - clientId=self._client_id, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=conn.id, - durationMS=duration, - ) + self._telemetry.checkout_succeeded(conn.id, checkout_started_time) try: with self.lock: self.active_contexts.add(conn.cancel_context) @@ -1155,23 +1011,11 @@ def checkout( def _raise_if_not_ready(self, checkout_started_time: float, emit_event: bool) -> None: if self.state != PoolState.READY: if emit_event: - duration = time.monotonic() - checkout_started_time - if self.enabled_for_cmap: - assert self.opts._event_listeners is not None - self.opts._event_listeners.publish_connection_check_out_failed( - self.address, ConnectionCheckOutFailedReason.CONN_ERROR, duration - ) - if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - message=_ConnectionStatusMessage.CHECKOUT_FAILED, - clientId=self._client_id, - serverHost=self.address[0], - serverPort=self.address[1], - reason="An error occurred while trying to establish a new connection", - error=ConnectionCheckOutFailedReason.CONN_ERROR, - durationMS=duration, - ) + self._telemetry.checkout_failed( + "An error occurred while trying to establish a new connection", + ConnectionCheckOutFailedReason.CONN_ERROR, + checkout_started_time, + ) details = _get_timeout_details(self.opts) _raise_connection_failure( @@ -1189,23 +1033,11 @@ def _get_conn( self.reset_without_pause() if self.closed: - duration = time.monotonic() - checkout_started_time - if self.enabled_for_cmap: - assert self.opts._event_listeners is not None - self.opts._event_listeners.publish_connection_check_out_failed( - self.address, ConnectionCheckOutFailedReason.POOL_CLOSED, duration - ) - if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - message=_ConnectionStatusMessage.CHECKOUT_FAILED, - clientId=self._client_id, - serverHost=self.address[0], - serverPort=self.address[1], - reason="Connection pool was closed", - error=ConnectionCheckOutFailedReason.POOL_CLOSED, - durationMS=duration, - ) + self._telemetry.checkout_failed( + "Connection pool was closed", + ConnectionCheckOutFailedReason.POOL_CLOSED, + checkout_started_time, + ) raise _PoolClosedError( "Attempted to check out a connection from closed connection pool" ) @@ -1285,23 +1117,11 @@ def _get_conn( self.size_cond.notify() if not emitted_event: - duration = time.monotonic() - checkout_started_time - if self.enabled_for_cmap: - assert self.opts._event_listeners is not None - self.opts._event_listeners.publish_connection_check_out_failed( - self.address, ConnectionCheckOutFailedReason.CONN_ERROR, duration - ) - if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - message=_ConnectionStatusMessage.CHECKOUT_FAILED, - clientId=self._client_id, - serverHost=self.address[0], - serverPort=self.address[1], - reason="An error occurred while trying to establish a new connection", - error=ConnectionCheckOutFailedReason.CONN_ERROR, - durationMS=duration, - ) + self._telemetry.checkout_failed( + "An error occurred while trying to establish a new connection", + ConnectionCheckOutFailedReason.CONN_ERROR, + checkout_started_time, + ) raise conn.active = True @@ -1318,21 +1138,9 @@ def checkin(self, conn: Connection) -> None: conn.pinned_txn = False conn.pinned_cursor = False self.__pinned_sockets.discard(conn) - listeners = self.opts._event_listeners with self.lock: self.active_contexts.discard(conn.cancel_context) - if self.enabled_for_cmap: - assert listeners is not None - listeners.publish_connection_checked_in(self.address, conn.id) - if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - message=_ConnectionStatusMessage.CHECKEDIN, - clientId=self._client_id, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=conn.id, - ) + self._telemetry.checked_in(conn.id) if self.pid != os.getpid(): self.reset_without_pause() else: @@ -1340,22 +1148,7 @@ def checkin(self, conn: Connection) -> None: conn.close_conn(ConnectionClosedReason.POOL_CLOSED) elif conn.closed: # CMAP requires the closed event be emitted after the check in. - if self.enabled_for_cmap: - assert listeners is not None - listeners.publish_connection_closed( - self.address, conn.id, ConnectionClosedReason.ERROR - ) - if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - message=_ConnectionStatusMessage.CONN_CLOSED, - clientId=self._client_id, - serverHost=self.address[0], - serverPort=self.address[1], - driverConnectionId=conn.id, - reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR), - error=ConnectionClosedReason.ERROR, - ) + self._telemetry.connection_closed(conn.id, ConnectionClosedReason.ERROR) else: close_conn = False with self.lock: @@ -1420,24 +1213,11 @@ def _perished(self, conn: Connection) -> bool: return False def _raise_wait_queue_timeout(self, checkout_started_time: float) -> NoReturn: - listeners = self.opts._event_listeners - duration = time.monotonic() - checkout_started_time - if self.enabled_for_cmap: - assert listeners is not None - listeners.publish_connection_check_out_failed( - self.address, ConnectionCheckOutFailedReason.TIMEOUT, duration - ) - if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _CONNECTION_LOGGER, - message=_ConnectionStatusMessage.CHECKOUT_FAILED, - clientId=self._client_id, - serverHost=self.address[0], - serverPort=self.address[1], - reason="Wait queue timeout elapsed without a connection becoming available", - error=ConnectionCheckOutFailedReason.TIMEOUT, - durationMS=duration, - ) + self._telemetry.checkout_failed( + "Wait queue timeout elapsed without a connection becoming available", + ConnectionCheckOutFailedReason.TIMEOUT, + checkout_started_time, + ) timeout = _csot.get_timeout() or self.opts.wait_queue_timeout if self.opts.load_balanced: other_ops = self.active_sockets - self.ncursors - self.ntxns diff --git a/pymongo/synchronous/server.py b/pymongo/synchronous/server.py index 09d8fb75e1..9424b9794a 100644 --- a/pymongo/synchronous/server.py +++ b/pymongo/synchronous/server.py @@ -16,7 +16,6 @@ from __future__ import annotations -import logging from contextlib import AbstractContextManager from typing import ( TYPE_CHECKING, @@ -26,11 +25,7 @@ Union, ) -from pymongo.logger import ( - _SDAM_LOGGER, - _debug_log, - _SDAMStatusMessage, -) +from pymongo._telemetry import _SdamTelemetry from pymongo.message import _GetMore, _OpMsg, _Query from pymongo.response import PinnedResponse, Response from pymongo.synchronous.command_runner import run_cursor_command @@ -69,11 +64,8 @@ def __init__( self._pool = pool self._monitor = monitor self._topology_id = topology_id - self._publish = listeners is not None and listeners.enabled_for_server - self._listener = listeners - self._events = None - if self._publish: - self._events = events() # type: ignore[misc] + _events = events() if listeners is not None and listeners.enabled_for_server else None # type: ignore[misc] + self._sdam = _SdamTelemetry(topology_id, listeners, _events) # type: ignore[arg-type] def open(self) -> None: """Start monitoring, or restart after a fork. @@ -92,23 +84,7 @@ def close(self) -> None: Reconnect with open(). """ - if self._publish: - assert self._listener is not None - assert self._events is not None - self._events.put( - ( - self._listener.publish_server_closed, - (self._description.address, self._topology_id), - ) - ) - if _SDAM_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _SDAM_LOGGER, - message=_SDAMStatusMessage.STOP_SERVER, - topologyId=self._topology_id, - serverHost=self._description.address[0], - serverPort=self._description.address[1], - ) + self._sdam.server_closed(self._description.address) self._monitor.close() self._pool.close() diff --git a/pymongo/synchronous/topology.py b/pymongo/synchronous/topology.py index b419833256..5e868046c3 100644 --- a/pymongo/synchronous/topology.py +++ b/pymongo/synchronous/topology.py @@ -17,7 +17,6 @@ from __future__ import annotations import asyncio -import logging import os import queue import random @@ -30,6 +29,7 @@ from typing import TYPE_CHECKING, Any, Callable, Optional, cast from pymongo import _csot, common, helpers_shared, periodic_executor +from pymongo._telemetry import _SdamTelemetry, _ServerSelectionTelemetry from pymongo.errors import ( ConnectionFailure, InvalidOperation, @@ -47,13 +47,6 @@ _create_condition, _create_lock, ) -from pymongo.logger import ( - _SDAM_LOGGER, - _SERVER_SELECTION_LOGGER, - _debug_log, - _SDAMStatusMessage, - _ServerSelectionStatusMessage, -) from pymongo.pool_options import PoolOptions from pymongo.server_description import ServerDescription from pymongo.server_selectors import ( @@ -108,27 +101,19 @@ class Topology: def __init__(self, topology_settings: TopologySettings): self._topology_id = topology_settings._topology_id self._listeners = topology_settings._pool_options._event_listeners - self._publish_server = self._listeners is not None and self._listeners.enabled_for_server - self._publish_tp = self._listeners is not None and self._listeners.enabled_for_topology # Create events queue if there are publishers. self._events: queue.Queue[Any] | None = None self.__events_executor: Any = None - if self._publish_server or self._publish_tp: + publish_server = self._listeners is not None and self._listeners.enabled_for_server + publish_tp = self._listeners is not None and self._listeners.enabled_for_topology + if publish_server or publish_tp: self._events = queue.Queue(maxsize=100) - if _SDAM_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _SDAM_LOGGER, - message=_SDAMStatusMessage.START_TOPOLOGY, - topologyId=self._topology_id, - ) + self._sdam = _SdamTelemetry(self._topology_id, self._listeners, self._events) + self._sdam.topology_opened() - if self._publish_tp: - assert self._events is not None - assert self._listeners is not None - self._events.put((self._listeners.publish_topology_opened, (self._topology_id,))) self._settings = topology_settings topology_description = TopologyDescription( topology_settings.get_topology_type(), @@ -143,37 +128,10 @@ def __init__(self, topology_settings: TopologySettings): initial_td = TopologyDescription( TOPOLOGY_TYPE.Unknown, {}, None, None, None, self._settings ) - if self._publish_tp: - assert self._events is not None - assert self._listeners is not None - self._events.put( - ( - self._listeners.publish_topology_description_changed, - (initial_td, self._description, self._topology_id), - ) - ) - if _SDAM_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _SDAM_LOGGER, - message=_SDAMStatusMessage.TOPOLOGY_CHANGE, - topologyId=self._topology_id, - previousDescription=repr(initial_td), - newDescription=repr(self._description), - ) + self._sdam.topology_description_changed(initial_td, self._description) for seed in topology_settings.seeds: - if self._publish_server: - assert self._events is not None - assert self._listeners is not None - self._events.put((self._listeners.publish_server_opened, (seed, self._topology_id))) - if _SDAM_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _SDAM_LOGGER, - message=_SDAMStatusMessage.START_SERVER, - topologyId=self._topology_id, - serverHost=seed[0], - serverPort=seed[1], - ) + self._sdam.server_opened(seed) # Store the seed list to help diagnose errors in _error_message(). self._seed_addresses = list(topology_description.server_descriptions()) @@ -188,7 +146,7 @@ def __init__(self, topology_settings: TopologySettings): self._max_cluster_time: Optional[ClusterTime] = None self._session_pool = _ServerSessionPool() - if self._publish_server or self._publish_tp: + if self._sdam._publish_server or self._sdam._publish_tp: assert self._events is not None weak: weakref.ReferenceType[queue.Queue[Any]] @@ -269,7 +227,7 @@ def select_servers( address: Optional[_Address] = None, operation_id: Optional[int] = None, deprioritized_servers: Optional[list[Server]] = None, - ) -> list[Server]: + ) -> tuple[list[Server], _ServerSelectionTelemetry]: """Return a list of Servers matching selector, or time out. :param selector: function that takes a list of Servers and returns @@ -295,7 +253,7 @@ def select_servers( self.cleanup_monitors() with self._lock: - server_descriptions = self._select_servers_loop( + server_descriptions, ss = self._select_servers_loop( selector, server_timeout, operation, @@ -306,7 +264,7 @@ def select_servers( return [ cast(Server, self.get_server_by_address(sd.address)) for sd in server_descriptions - ] + ], ss def _select_servers_loop( self, @@ -316,22 +274,15 @@ def _select_servers_loop( operation_id: Optional[int], address: Optional[_Address], deprioritized_servers: Optional[list[Server]] = None, - ) -> list[ServerDescription]: + ) -> tuple[list[ServerDescription], _ServerSelectionTelemetry]: """select_servers() guts. Hold the lock when calling this.""" now = time.monotonic() end_time = now + timeout logged_waiting = False - - if _SERVER_SELECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _SERVER_SELECTION_LOGGER, - message=_ServerSelectionStatusMessage.STARTED, - selector=selector, - operation=operation, - operationId=operation_id, - topologyDescription=self.description, - clientId=self.description._topology_settings._topology_id, - ) + ss = _ServerSelectionTelemetry( + self._topology_id, selector, operation, operation_id, self.description + ) + ss.started() server_descriptions = self._description.apply_selector( selector, @@ -345,32 +296,13 @@ def _select_servers_loop( while not server_descriptions: # No suitable servers. if timeout == 0 or now > end_time: - if _SERVER_SELECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _SERVER_SELECTION_LOGGER, - message=_ServerSelectionStatusMessage.FAILED, - selector=selector, - operation=operation, - operationId=operation_id, - topologyDescription=self.description, - clientId=self.description._topology_settings._topology_id, - failure=self._error_message(selector), - ) + ss.failed(self._error_message(selector), self.description) raise ServerSelectionTimeoutError( f"{self._error_message(selector)}, Timeout: {timeout}s, Topology Description: {self.description!r}" ) if not logged_waiting: - _debug_log( - _SERVER_SELECTION_LOGGER, - message=_ServerSelectionStatusMessage.WAITING, - selector=selector, - operation=operation, - operationId=operation_id, - topologyDescription=self.description, - clientId=self.description._topology_settings._topology_id, - remainingTimeMS=int(1000 * (end_time - time.monotonic())), - ) + ss.waiting(int(1000 * (end_time - time.monotonic()))) logged_waiting = True self._ensure_opened() @@ -388,7 +320,7 @@ def _select_servers_loop( ) self._description.check_compatible() - return server_descriptions + return server_descriptions, ss def _select_server( self, @@ -398,8 +330,8 @@ def _select_server( address: Optional[_Address] = None, deprioritized_servers: Optional[list[Server]] = None, operation_id: Optional[int] = None, - ) -> Server: - servers = self.select_servers( + ) -> tuple[Server, _ServerSelectionTelemetry]: + servers, ss = self.select_servers( selector, operation, server_selection_timeout, @@ -408,12 +340,12 @@ def _select_server( deprioritized_servers, ) if len(servers) == 1: - return servers[0] + return servers[0], ss server1, server2 = random.sample(servers, 2) if server1.pool.operation_count <= server2.pool.operation_count: - return server1 + return server1, ss else: - return server2 + return server2, ss def select_server( self, @@ -425,7 +357,7 @@ def select_server( operation_id: Optional[int] = None, ) -> Server: """Like select_servers, but choose a random server if several match.""" - server = self._select_server( + server, ss = self._select_server( selector, operation, server_selection_timeout, @@ -435,18 +367,7 @@ def select_server( ) if _csot.get_timeout(): _csot.set_rtt(server.description.min_round_trip_time) - if _SERVER_SELECTION_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _SERVER_SELECTION_LOGGER, - message=_ServerSelectionStatusMessage.SUCCEEDED, - selector=selector, - operation=operation, - operationId=operation_id, - topologyDescription=self.description, - clientId=self.description._topology_settings._topology_id, - serverHost=server.description.address[0], - serverPort=server.description.address[1], - ) + ss.succeeded(server.description.address[0], server.description.address[1]) return server def select_server_by_address( @@ -508,36 +429,16 @@ def _process_change( server.pool.ready() suppress_event = sd_old == server_description - if self._publish_server and not suppress_event: - assert self._events is not None - assert self._listeners is not None - self._events.put( - ( - self._listeners.publish_server_description_changed, - (sd_old, server_description, server_description.address, self._topology_id), - ) + if not suppress_event: + self._sdam.server_description_changed( + sd_old, server_description, server_description.address ) self._description = new_td self._update_servers() - if self._publish_tp and not suppress_event: - assert self._events is not None - assert self._listeners is not None - self._events.put( - ( - self._listeners.publish_topology_description_changed, - (td_old, self._description, self._topology_id), - ) - ) - if _SDAM_LOGGER.isEnabledFor(logging.DEBUG) and not suppress_event: - _debug_log( - _SDAM_LOGGER, - message=_SDAMStatusMessage.TOPOLOGY_CHANGE, - topologyId=self._topology_id, - previousDescription=repr(td_old), - newDescription=repr(self._description), - ) + if not suppress_event: + self._sdam.topology_description_changed(td_old, self._description) # Shutdown SRV polling for unsupported cluster types. # This is only applicable if the old topology was Unknown, and the @@ -588,24 +489,7 @@ def _process_srv_update(self, seedlist: list[tuple[str, Any]]) -> None: self._description = _updated_topology_description_srv_polling(self._description, seedlist) self._update_servers() - - if self._publish_tp: - assert self._events is not None - assert self._listeners is not None - self._events.put( - ( - self._listeners.publish_topology_description_changed, - (td_old, self._description, self._topology_id), - ) - ) - if _SDAM_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _SDAM_LOGGER, - message=_SDAMStatusMessage.TOPOLOGY_CHANGE, - topologyId=self._topology_id, - previousDescription=repr(td_old), - newDescription=repr(self._description), - ) + self._sdam.topology_description_changed(td_old, self._description) def on_srv_update(self, seedlist: list[tuple[str, Any]]) -> None: """Process a new list of nodes obtained from scanning SRV records.""" @@ -741,9 +625,7 @@ def close(self) -> None: self._closed = True # Publish only after releasing the lock. - if self._publish_tp: - assert self._events is not None - assert self._listeners is not None + if self._sdam._publish_tp: self._description = TopologyDescription( TOPOLOGY_TYPE.Unknown, {}, @@ -752,30 +634,9 @@ def close(self) -> None: self._description.max_election_id, self._description._topology_settings, ) - self._events.put( - ( - self._listeners.publish_topology_description_changed, - ( - old_td, - self._description, - self._topology_id, - ), - ) - ) - self._events.put((self._listeners.publish_topology_closed, (self._topology_id,))) - if _SDAM_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _SDAM_LOGGER, - message=_SDAMStatusMessage.TOPOLOGY_CHANGE, - topologyId=self._topology_id, - previousDescription=repr(old_td), - newDescription=repr(self._description), - ) - _debug_log( - _SDAM_LOGGER, message=_SDAMStatusMessage.STOP_TOPOLOGY, topologyId=self._topology_id - ) + self._sdam.topology_closed(old_td, self._description) - if self._publish_server or self._publish_tp: + if self._sdam._publish_server or self._sdam._publish_tp: # Make sure the events executor thread is fully closed before publishing the remaining events self.__events_executor.close() self.__events_executor.join(1) @@ -816,7 +677,7 @@ def _ensure_opened(self) -> None: self._update_servers() # Start or restart the events publishing thread. - if self._publish_tp or self._publish_server: + if self._sdam._publish_tp or self._sdam._publish_server: self.__events_executor.open() # Start the SRV polling thread. @@ -955,7 +816,7 @@ def _update_servers(self) -> None: ) weak = None - if self._publish_server and self._events is not None: + if self._sdam._publish_server and self._events is not None: weak = weakref.ref(self._events) server = Server( server_description=sd, diff --git a/test/asynchronous/test_mongos_load_balancing.py b/test/asynchronous/test_mongos_load_balancing.py index 028c9a5cdb..bb89e25842 100644 --- a/test/asynchronous/test_mongos_load_balancing.py +++ b/test/asynchronous/test_mongos_load_balancing.py @@ -61,7 +61,7 @@ async def do_simple_op(client, ntasks): async def writable_addresses(topology): return { server.description.address - for server in await topology.select_servers(writable_server_selector, _Op.TEST) + for server in (await topology.select_servers(writable_server_selector, _Op.TEST))[0] } diff --git a/test/asynchronous/test_read_preferences.py b/test/asynchronous/test_read_preferences.py index 9f92a39920..17c7a4a996 100644 --- a/test/asynchronous/test_read_preferences.py +++ b/test/asynchronous/test_read_preferences.py @@ -310,9 +310,9 @@ async def test_nearest(self): not_used = data_members.difference(used) latencies = ", ".join( "%s: %sms" % (server.description.address, server.description.round_trip_time) - for server in await (await c._get_topology()).select_servers( - readable_server_selector, _Op.TEST - ) + for server in ( + await (await c._get_topology()).select_servers(readable_server_selector, _Op.TEST) + )[0] ) self.assertFalse( diff --git a/test/asynchronous/utils.py b/test/asynchronous/utils.py index 5842224220..1e8ece6c25 100644 --- a/test/asynchronous/utils.py +++ b/test/asynchronous/utils.py @@ -43,18 +43,14 @@ async def async_get_pool(client: AsyncMongoClient) -> Pool: """Get the standalone, primary, or mongos pool.""" topology = await client._get_topology() - server = await topology._select_server(writable_server_selector, _Op.TEST) + server, _ = await topology._select_server(writable_server_selector, _Op.TEST) return server.pool async def async_get_pools(client: AsyncMongoClient) -> list[Pool]: """Get all pools.""" - return [ - server.pool - for server in await (await client._get_topology()).select_servers( - any_server_selector, _Op.TEST - ) - ] + servers, _ = await (await client._get_topology()).select_servers(any_server_selector, _Op.TEST) + return [server.pool for server in servers] async def async_wait_until(predicate, success_description, timeout=10): diff --git a/test/asynchronous/utils_selection_tests.py b/test/asynchronous/utils_selection_tests.py index eec6dc9719..81df672fb6 100644 --- a/test/asynchronous/utils_selection_tests.py +++ b/test/asynchronous/utils_selection_tests.py @@ -150,13 +150,13 @@ async def run_scenario(self): return - actual_suitable_s = await top_suitable.select_servers( + actual_suitable_s, _ = await top_suitable.select_servers( pref, _Op.TEST, server_selection_timeout=0, deprioritized_servers=top_suitable_deprioritized_servers, ) - actual_latency_s = await top_latency.select_servers( + actual_latency_s, _ = await top_latency.select_servers( pref, _Op.TEST, server_selection_timeout=0, diff --git a/test/test_mongos_load_balancing.py b/test/test_mongos_load_balancing.py index 362f67925c..e7d284f805 100644 --- a/test/test_mongos_load_balancing.py +++ b/test/test_mongos_load_balancing.py @@ -61,7 +61,7 @@ def do_simple_op(client, ntasks): def writable_addresses(topology): return { server.description.address - for server in topology.select_servers(writable_server_selector, _Op.TEST) + for server in (topology.select_servers(writable_server_selector, _Op.TEST))[0] } diff --git a/test/test_read_preferences.py b/test/test_read_preferences.py index 27c8d0704a..9e1b0fde25 100644 --- a/test/test_read_preferences.py +++ b/test/test_read_preferences.py @@ -292,7 +292,9 @@ def test_nearest(self): not_used = data_members.difference(used) latencies = ", ".join( "%s: %sms" % (server.description.address, server.description.round_trip_time) - for server in (c._get_topology()).select_servers(readable_server_selector, _Op.TEST) + for server in ((c._get_topology()).select_servers(readable_server_selector, _Op.TEST))[ + 0 + ] ) self.assertFalse( diff --git a/test/utils.py b/test/utils.py index 54b7e351e3..77d8549d8f 100644 --- a/test/utils.py +++ b/test/utils.py @@ -43,16 +43,14 @@ def get_pool(client: MongoClient) -> Pool: """Get the standalone, primary, or mongos pool.""" topology = client._get_topology() - server = topology._select_server(writable_server_selector, _Op.TEST) + server, _ = topology._select_server(writable_server_selector, _Op.TEST) return server.pool def get_pools(client: MongoClient) -> list[Pool]: """Get all pools.""" - return [ - server.pool - for server in (client._get_topology()).select_servers(any_server_selector, _Op.TEST) - ] + servers, _ = (client._get_topology()).select_servers(any_server_selector, _Op.TEST) + return [server.pool for server in servers] def wait_until(predicate, success_description, timeout=10): diff --git a/test/utils_selection_tests.py b/test/utils_selection_tests.py index 3754c74674..18e84c103a 100644 --- a/test/utils_selection_tests.py +++ b/test/utils_selection_tests.py @@ -150,13 +150,13 @@ def run_scenario(self): return - actual_suitable_s = top_suitable.select_servers( + actual_suitable_s, _ = top_suitable.select_servers( pref, _Op.TEST, server_selection_timeout=0, deprioritized_servers=top_suitable_deprioritized_servers, ) - actual_latency_s = top_latency.select_servers( + actual_latency_s, _ = top_latency.select_servers( pref, _Op.TEST, server_selection_timeout=0,