PYTHON-5846 Consolidate CMAP, heartbeat, and SDAM telemetry into _telemetry.py#2907
PYTHON-5846 Consolidate CMAP, heartbeat, and SDAM telemetry into _telemetry.py#2907blink1073 wants to merge 20 commits into
Conversation
…emetry.py Add _CmapTelemetry, _HeartbeatTelemetry, and _SdamTelemetry classes to eliminate the repetitive if-enabled_for_cmap / if-logger.isEnabledFor boilerplate spread across pool.py, monitor.py, topology.py, and server.py.
- Rename _CmapTelemetry._log -> _emit_log for consistency with _CommandTelemetry - Rename _HeartbeatTelemetry.apm_started -> started to match started/succeeded/failed lifecycle - Rename _HeartbeatTelemetry.log_started -> emit_started_log to signal it is the deferred log-only half - Replace is_sdam flag with separate publish/log bool parameters on _CmapTelemetry
_CmapTelemetry now owns connection-creation and checkout durations: connection_created() starts the clock, connection_ready() computes it; checkout_started() starts the clock, checkout_succeeded/failed() compute it. _HeartbeatTelemetry.started() starts the clock; failed() computes its own duration instead of receiving it as a parameter. Removes checkout_started_time from Pool._get_conn, _raise_if_not_ready, and _raise_wait_queue_timeout, and removes AsyncConnection.creation_time.
Use response.awaitable (ground truth from wire) rather than self._awaited (pre-computed before response) for ServerHeartbeatSucceededEvent.awaited. Also fix stale method references in _HeartbeatTelemetry docstring.
_CmapTelemetry, _HeartbeatTelemetry, and _SdamTelemetry now compute _should_publish and _should_log as properties that check listener state and logger level at call time rather than caching derived booleans in the constructor.
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
…to _telemetry.py
…y classes; document why long-lived classes use properties
…ion computation in _CmapTelemetry and _HeartbeatTelemetry
…g in _CmapTelemetry
| clientId=self._client._topology_id, | ||
| commandName=self._operation, | ||
| operationId=self._operation_id, | ||
| log_command_retry( |
There was a problem hiding this comment.
What do you think about a helper method here to avoid duplicating the entire log_command_retry call except for is_write between this and the next usage?
| from pymongo.hello import Hello | ||
| from pymongo.lock import _async_create_lock | ||
| from pymongo.logger import _SDAM_LOGGER, _debug_log, _SDAMStatusMessage | ||
| from pymongo.logger import _SDAM_LOGGER, _debug_log |
There was a problem hiding this comment.
They were used, I missed one direct usage, now moved to _telemetry.py
| should_publish = self._should_publish | ||
| # Always record start time: logging or publishing may be enabled by the time | ||
| # checkout_succeeded or checkout_failed is called to compute the duration. | ||
| self._checkout_start = time.monotonic() |
There was a problem hiding this comment.
This is shared across all connections in the pool and is called outside of a lock, introducing a possible race condition that can cause timing inconsistencies.
There was a problem hiding this comment.
Updated to keep the timing info in the pool class as it was before.
| def connection_created(self, conn_id: int) -> None: | ||
| # Always record start time: logging or publishing may be enabled by the time | ||
| # connection_ready is called to compute the duration. | ||
| self._conn_created_start = time.monotonic() |
There was a problem hiding this comment.
Same race condition here as above.
| clientId=self.description._topology_settings._topology_id, | ||
| failure=self._error_message(selector), | ||
| ) | ||
| ss.failed(self._error_message(selector)) |
There was a problem hiding this comment.
This will log self.description at the time of creation instead of at the time of failure. If SDAM updates self.description while this loop is waiting and before it calls ss.failed, we log the old description instead of the new.
Co-authored-by: Noah Stapp <noah@noahstapp.com>
- Fix race condition in _CmapTelemetry: remove shared _conn_created_start and _checkout_start slots; connection_ready now takes creation_time from AsyncConnection.creation_time, checkout_started returns the start time and checkout_succeeded/failed accept it as a parameter - Move SRV monitor failure log into _telemetry.py as log_srv_monitor_failure - Add _log_retry helper to _ClientConnectionRetryable to deduplicate log_command_retry call sites - Fix _ServerSelectionTelemetry.failed to accept live topology_description at failure time instead of using the stale snapshot from construction
| await self.checkin(conn) | ||
|
|
||
| def _raise_if_not_ready(self, checkout_started_time: float, emit_event: bool) -> None: | ||
| def _raise_if_not_ready(self, emit_event: bool, checkout_start: float) -> None: |
There was a problem hiding this comment.
intentional re-ordering + naming of checkout_started_time?
There was a problem hiding this comment.
Restored to original signature
|
|
||
| async def _get_conn( | ||
| self, checkout_started_time: float, handler: Optional[_MongoClientErrorHandler] = None | ||
| self, handler: Optional[_MongoClientErrorHandler] = None, checkout_start: float = 0.0 |
There was a problem hiding this comment.
Same here. Also, does a default of 0 make sense here?
There was a problem hiding this comment.
Restored to original signature
| self._raise_wait_queue_timeout(checkout_started_time) | ||
| self._raise_if_not_ready(checkout_started_time, emit_event=False) | ||
| self._raise_wait_queue_timeout(checkout_start) | ||
| self._raise_if_not_ready(emit_event=False, checkout_start=checkout_start) |
| **extra, | ||
| ) | ||
|
|
||
| def pool_created(self, non_default_options: dict[str, Any]) -> None: |
There was a problem hiding this comment.
Need docstrings on these public methods for consistency with the other telemetry classes.
There was a problem hiding this comment.
Added :allthedocstrings:
| topologyDescription=self.description, | ||
| clientId=self.description._topology_settings._topology_id, | ||
| ) | ||
| ss = _ServerSelectionTelemetry( |
There was a problem hiding this comment.
Can we pass this through as an argument from select_server so we don't have to make two _ServerSelectionTelemetry per call?
| @@ -74,6 +69,7 @@ def __init__( | |||
| self._events = None | |||
There was a problem hiding this comment.
Can we cut all three of these now that _SdamTelemetry exists?
| remainingTimeMS=remaining_time_ms, | ||
| ) | ||
|
|
||
| def failed(self, failure: str, topology_description: Any = None) -> None: |
There was a problem hiding this comment.
topology_description appears to always be passed
- Restore original _raise_if_not_ready and _get_conn parameter names/order - Add docstrings to all public methods on _CmapTelemetry and _SdamTelemetry - Remove redundant _publish/_listener/_events attrs from Server.__init__ - Avoid creating two _ServerSelectionTelemetry per select_server call by threading ss through select_servers and _select_server - Make _ServerSelectionTelemetry.failed topology_description a required arg - Move SRV monitor failure log to telemetry.py (log_srv_monitor_failure)
…t_servers/_select_server
PYTHON-5846
Changes in this PR
Consolidates all APM event publishing and structured logging into
pymongo/_telemetry.py, giving OpenTelemetry support a single place to hook into. Adds four new telemetry classes and one helper following the_CommandTelemetrypattern from #2891:_CmapTelemetry— connection pool and connection lifecycle events, owned byPool_HeartbeatTelemetry— server heartbeat events, owned per-check byMonitor_SdamTelemetry— topology and server description change events, owned byTopologyandServer_ServerSelectionTelemetry— server selection log entries (log-only per spec), constructed perselect_servercalllog_command_retry— retry log entries for retryable reads and writesTest Plan
Covered by existing test suites (
test_connection_monitoring.py,test_heartbeat_monitoring.py,test_sdam_monitoring_spec.py,test_connection_logging.py,test_server_selection_logging.py, unified format spec tests). No new public API.Checklist
Checklist for Author
Checklist for Reviewer