Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
8c0f314
PYTHON-5846 Consolidate CMAP, heartbeat, and SDAM telemetry into _tel…
blink1073 Jun 30, 2026
265971c
PYTHON-5846 Refine _CmapTelemetry and _HeartbeatTelemetry API
blink1073 Jun 30, 2026
55b21ec
PYTHON-5846 Move duration tracking into telemetry classes
blink1073 Jun 30, 2026
a30d380
PYTHON-5846 Fix awaited value in heartbeat succeeded APM event
blink1073 Jun 30, 2026
577b594
PYTHON-5846 Add _emit_log helpers to _HeartbeatTelemetry and _SdamTel…
blink1073 Jun 30, 2026
fc143ff
PYTHON-5846 Fix durationMS unit in CMAP log entries (seconds -> milli…
blink1073 Jun 30, 2026
4ea5205
PYTHON-5846 Replace cached telemetry flags with dynamic properties
blink1073 Jul 1, 2026
553dc31
PYTHON-5846 Drop publish param from _HeartbeatTelemetry, derive from …
blink1073 Jul 1, 2026
0f33a1c
PYTHON-5846 Decompose Hello response in _HeartbeatTelemetry.succeeded
blink1073 Jul 1, 2026
3eefcf7
PYTHON-5846 Fix typing: use Hello type in _HeartbeatTelemetry.succeeded
blink1073 Jul 1, 2026
d844557
PYTHON-5846 Consolidate server selection logging and retry logging in…
blink1073 Jul 1, 2026
57f4e50
PYTHON-5846 Move _should_log check to call site in all telemetry classes
blink1073 Jul 1, 2026
d51f38b
PYTHON-5846 Cache _should_log/_should_publish in short-lived telemetr…
blink1073 Jul 1, 2026
de14677
PYTHON-5846 Cache _should_log/_should_publish locally and guard durat…
blink1073 Jul 1, 2026
488c750
PYTHON-5846 Add comments explaining unconditional start time recordin…
blink1073 Jul 1, 2026
58e966b
PYTHON-5846 Only record heartbeat start time when logging or publishi…
blink1073 Jul 1, 2026
2746386
Update pymongo/asynchronous/mongo_client.py
blink1073 Jul 1, 2026
012c143
PYTHON-5846 Address PR review comments on telemetry refactor
blink1073 Jul 1, 2026
9cca32b
PYTHON-5846 Address further PR review comments
blink1073 Jul 1, 2026
9fab1db
PYTHON-5846 Fix test callers to unpack (servers, ss) tuple from selec…
blink1073 Jul 1, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
532 changes: 530 additions & 2 deletions pymongo/_telemetry.py

Large diffs are not rendered by default.

28 changes: 12 additions & 16 deletions pymongo/asynchronous/mongo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
from bson.codec_options import DEFAULT_CODEC_OPTIONS, CodecOptions, TypeRegistry
from bson.timestamp import Timestamp
from pymongo import _csot, common, helpers_shared, periodic_executor
from pymongo._telemetry import log_command_retry
from pymongo.asynchronous import client_session, database, uri_parser
from pymongo.asynchronous.change_stream import AsyncChangeStream, AsyncClusterChangeStream
from pymongo.asynchronous.client_bulk import _AsyncClientBulk
Expand Down Expand Up @@ -90,8 +91,6 @@
)
from pymongo.logger import (
_CLIENT_LOGGER,
_COMMAND_LOGGER,
_debug_log,
_log_client_error,
_log_or_warn,
)
Expand Down Expand Up @@ -2991,6 +2990,15 @@ async def _get_server(self) -> Server:
operation_id=self._operation_id,
)

def _log_retry(self, is_write: bool) -> None:
log_command_retry(
self._client._topology_id,
self._operation,
self._operation_id,
self._attempt_number,
is_write,
)

async def _write(self) -> T:
"""Wrapper method for write-type retryable client executions

Expand All @@ -3014,13 +3022,7 @@ async def _write(self) -> T:
self._check_last_error()
self._retryable = False
if self._retrying:
_debug_log(
_COMMAND_LOGGER,
message=f"Retrying write attempt number {self._attempt_number}",
clientId=self._client._topology_id,
commandName=self._operation,
operationId=self._operation_id,
)
self._log_retry(is_write=True)
return await self._func(self._session, conn, self._retryable) # type: ignore
except PyMongoError as exc:
if not self._retryable:
Expand All @@ -3043,13 +3045,7 @@ async def _read(self) -> T:
if self._retrying and not self._retryable and not self._always_retryable:
self._check_last_error()
if self._retrying:
_debug_log(
_COMMAND_LOGGER,
message=f"Retrying read attempt number {self._attempt_number}",
clientId=self._client._topology_settings._topology_id,
commandName=self._operation,
operationId=self._operation_id,
)
self._log_retry(is_write=False)
return await self._func(self._session, self._server, conn, read_pref) # type: ignore


Expand Down
69 changes: 12 additions & 57 deletions pymongo/asynchronous/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,17 @@

import asyncio
import atexit
import logging
import time
import weakref
from typing import TYPE_CHECKING, Any, Optional

from pymongo import common, periodic_executor
from pymongo._csot import MovingMinimum
from pymongo._telemetry import _HeartbeatTelemetry, log_srv_monitor_failure
from pymongo.asynchronous.srv_resolver import _SrvResolver
from pymongo.errors import NetworkTimeout, _OperationCancelled
from pymongo.hello import Hello
from pymongo.lock import _async_create_lock
from pymongo.logger import _SDAM_LOGGER, _debug_log, _SDAMStatusMessage
from pymongo.periodic_executor import _shutdown_executors
from pymongo.pool_options import _is_faas
from pymongo.read_preferences import MovingAverage
Expand Down Expand Up @@ -151,9 +150,9 @@ def __init__(
self._pool = pool
self._settings = topology_settings
self._listeners = self._settings._pool_options._event_listeners
self._publish = self._listeners is not None and self._listeners.enabled_for_server_heartbeat
self._cancel_context: Optional[_CancellationContext] = None
self._conn_id: Optional[int] = None
self._current_hb: Optional[_HeartbeatTelemetry] = None
self._rtt_monitor = _RttMonitor(
topology,
topology_settings,
Expand Down Expand Up @@ -257,32 +256,16 @@ async def _check_server(self) -> ServerDescription:
Returns a ServerDescription.
"""
self._conn_id = None
start = time.monotonic()
self._current_hb = None
try:
return await self._check_once()
except ReferenceError:
raise
except Exception as error:
_sanitize(error)
sd = self._server_description
address = sd.address
duration = _monotonic_duration(start)
awaited = bool(self._stream and sd.is_server_type_known and sd.topology_version)
if self._publish:
assert self._listeners is not None
self._listeners.publish_server_heartbeat_failed(address, duration, error, awaited)
if _SDAM_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_SDAM_LOGGER,
message=_SDAMStatusMessage.HEARTBEAT_FAIL,
topologyId=self._topology._topology_id,
serverHost=address[0],
serverPort=address[1],
awaited=awaited,
durationMS=duration * 1000,
failure=error,
driverConnectionId=self._conn_id,
)
address = self._server_description.address
if self._current_hb is not None:
self._current_hb.failed(error, self._conn_id)
await self._reset_connection()
if isinstance(error, _OperationCancelled):
raise
Expand All @@ -303,25 +286,14 @@ async def _check_once(self) -> ServerDescription:
awaited = bool(
self._pool.conns and self._stream and sd.is_server_type_known and sd.topology_version
)
if self._publish:
assert self._listeners is not None
self._listeners.publish_server_heartbeat_started(address, awaited)
hb = _HeartbeatTelemetry(self._topology._topology_id, address, self._listeners, awaited)
self._current_hb = hb
hb.started()

if self._cancel_context and self._cancel_context.cancelled:
await self._reset_connection()
async with self._pool.checkout() as conn:
if _SDAM_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_SDAM_LOGGER,
message=_SDAMStatusMessage.HEARTBEAT_START,
topologyId=self._topology._topology_id,
driverConnectionId=conn.id,
serverConnectionId=conn.server_connection_id,
serverHost=address[0],
serverPort=address[1],
awaited=awaited,
)

hb.emit_started_log(conn.id, conn.server_connection_id)
self._cancel_context = conn.cancel_context
# Record the connection id so we can later attach it to the failed log message.
self._conn_id = conn.id
Expand All @@ -331,24 +303,7 @@ async def _check_once(self) -> ServerDescription:

avg_rtt, min_rtt = await self._rtt_monitor.get()
sd = ServerDescription(address, response, avg_rtt, min_round_trip_time=min_rtt)
if self._publish:
assert self._listeners is not None
self._listeners.publish_server_heartbeat_succeeded(
address, round_trip_time, response, response.awaitable
)
if _SDAM_LOGGER.isEnabledFor(logging.DEBUG):
_debug_log(
_SDAM_LOGGER,
message=_SDAMStatusMessage.HEARTBEAT_SUCCESS,
topologyId=self._topology._topology_id,
driverConnectionId=conn.id,
serverConnectionId=conn.server_connection_id,
serverHost=address[0],
serverPort=address[1],
awaited=awaited,
durationMS=round_trip_time * 1000,
reply=response.document,
)
hb.succeeded(round_trip_time, response, conn.id, conn.server_connection_id)
return sd

async def _check_with_socket(self, conn: AsyncConnection) -> tuple[Hello, float]: # type: ignore[type-arg]
Expand Down Expand Up @@ -429,7 +384,7 @@ async def _get_seedlist(self) -> Optional[list[tuple[str, Any]]]:
# - SRV records must be rescanned every heartbeatFrequencyMS
# - Topology must be left unchanged
self.request_check()
_debug_log(_SDAM_LOGGER, message="SRV monitor check failed", failure=repr(exc))
log_srv_monitor_failure(exc)
return None
else:
self._executor.update_interval(max(ttl, common.MIN_SRV_RESCAN_INTERVAL))
Expand Down
Loading
Loading