Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pymongo/_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def _emit_log(self, message: _CommandStatusMessage, **extra: Any) -> None:
commandName=self._name,
databaseName=self._dbname,
requestId=self._request_id,
operationId=self._request_id,
operationId=self._op_id if self._op_id is not None else self._request_id,
driverConnectionId=self._conn.id,
serverConnectionId=self._conn.server_connection_id,
serverHost=self._conn.address[0],
Expand Down
6 changes: 6 additions & 0 deletions pymongo/asynchronous/command_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ async def run_cursor_command(
more_to_come: bool = False,
unpack_res: Optional[Callable[..., Any]] = None,
cursor_id: Optional[int] = None,
op_id: Optional[int] = None,
) -> tuple[list[dict[str, Any]], Optional[_OpMsg], datetime.timedelta]:
"""Run a cursor ``find``/``getMore`` operation over ``conn``.

Expand All @@ -304,6 +305,7 @@ async def run_cursor_command(
:param unpack_res: A callable decoding the wire response; when ``None`` the
reply's own ``unpack_response`` is used.
:param cursor_id: The cursor id passed to ``unpack_res``.
:param op_id: The APM operation id; defaults to ``request_id``.
"""
topology_id = client._topology_id if client is not None else None
return await _run_command(
Expand All @@ -325,6 +327,7 @@ async def run_cursor_command(
more_to_come=more_to_come,
unpack_res=unpack_res,
cursor_id=cursor_id,
op_id=op_id,
)


Expand All @@ -348,6 +351,7 @@ async def run_command(
user_fields: Optional[Mapping[str, Any]] = None,
exhaust_allowed: bool = False,
write_concern: Optional[WriteConcern] = None,
op_id: Optional[int] = None,
) -> _DocumentType:
"""Encode and execute a command over ``conn``, or raise socket.error.

Expand Down Expand Up @@ -376,6 +380,7 @@ async def run_command(
passed to ``bson._decode_all_selective``.
:param exhaust_allowed: True if we should enable OP_MSG exhaustAllowed.
:param write_concern: The write concern for this command. Applied via CSOT.
:param op_id: The APM operation id; defaults to ``request_id``.
"""
name = next(iter(spec))

Expand Down Expand Up @@ -428,6 +433,7 @@ async def run_command(
codec_options=codec_options,
user_fields=user_fields,
orig=orig,
op_id=op_id,
check=check,
allowable_errors=allowable_errors,
parse_write_concern_error=parse_write_concern_error,
Expand Down
12 changes: 10 additions & 2 deletions pymongo/asynchronous/mongo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@
_log_client_error,
_log_or_warn,
)
from pymongo.message import _CursorAddress, _GetMore, _Query
from pymongo.message import _CursorAddress, _GetMore, _Query, _randint
from pymongo.monitoring import ConnectionClosedReason, _EventListeners
from pymongo.operations import (
DeleteMany,
Expand Down Expand Up @@ -1837,6 +1837,8 @@ async def _select_server(
be pinned to a mongos server address.
- `address` (optional): Address when sending a message
to a specific server, used for getMore.
- `operation_id` (optional): Stable operation id shared across retries,
used for command monitoring.
"""
try:
topology = await self._get_topology()
Expand Down Expand Up @@ -1932,6 +1934,7 @@ async def _run_operation(
async with operation.conn_mgr._lock:
async with _MongoClientErrorHandler(self, server, operation.session) as err_handler: # type: ignore[arg-type]
err_handler.contribute_socket(operation.conn_mgr.conn)
operation.conn_mgr.conn.op_id = _randint()
return await server.run_operation(
operation.conn_mgr.conn,
operation,
Expand Down Expand Up @@ -2023,6 +2026,7 @@ async def _retry_internal(
:param retryable: If the operation should be retried once, defaults to None
:param is_run_command: If this is a runCommand operation, defaults to False
:param is_aggregate_write: If this is a aggregate operation with a write, defaults to False.
:param operation_id: Stable operation id shared across retries, defaults to None

:return: Output of the calling func()
"""
Expand Down Expand Up @@ -2069,6 +2073,7 @@ async def _retryable_read(
(may not always be supported even if supplied), defaults to False
:param is_run_command: If this is a runCommand operation, defaults to False.
:param is_aggregate_write: If this is a aggregate operation with a write, defaults to False.
:param operation_id: Stable operation id shared across retries, defaults to None
"""

# Ensure that the client supports retrying on reads and there is no session in
Expand Down Expand Up @@ -2112,6 +2117,7 @@ async def _retryable_write(
:param session: Client session we will use to execute write operation
:param operation: The name of the operation that the server is being selected for
:param bulk: bulk abstraction to execute operations in bulk, defaults to None
:param operation_id: Stable operation id shared across retries, defaults to None
"""
async with self._tmp_session(session) as s:
return await self._retry_with_session(retryable, func, s, bulk, operation, operation_id)
Expand Down Expand Up @@ -2795,7 +2801,7 @@ def __init__(
self._server: Server = None # type: ignore
self._deprioritized_servers: list[Server] = []
self._operation = operation
self._operation_id = operation_id
self._operation_id = operation_id if operation_id is not None else _randint()
self._attempt_number = 0
self._is_run_command = is_run_command
self._is_aggregate_write = is_aggregate_write
Expand Down Expand Up @@ -3001,6 +3007,7 @@ async def _write(self) -> T:
is_mongos = False
self._server = await self._get_server()
async with self._client._checkout(self._server, self._session) as conn:
conn.op_id = self._operation_id
max_wire_version = conn.max_wire_version
sessions_supported = (
self._session
Expand Down Expand Up @@ -3040,6 +3047,7 @@ async def _read(self) -> T:
conn,
read_pref,
):
conn.op_id = self._operation_id
if self._retrying and not self._retryable and not self._always_retryable:
self._check_last_error()
if self._retrying:
Expand Down
4 changes: 4 additions & 0 deletions pymongo/asynchronous/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ def __init__(
self.creation_time = time.monotonic()
# For gossiping $clusterTime from the connection handshake to the client.
self._cluster_time = None
# Stable operation id for the operation currently using this connection.
self.op_id: Optional[int] = None

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want to track op_id on connections. It introduces more risk of accidental inclusion or staleness on operations like auth compared to explicitly passing it through each operation. What about passing op_id as a kwarg on Connection.command() and on _Query/_GetMore where operations are actually initiated?


def set_conn_timeout(self, timeout: Optional[float]) -> None:
"""Cache last timeout to avoid duplicate calls to conn.settimeout."""
Expand Down Expand Up @@ -416,6 +418,7 @@ async def command(
user_fields=user_fields,
exhaust_allowed=exhaust_allowed,
write_concern=write_concern,
op_id=self.op_id,
)
except (OperationFailure, NotPrimaryError):
raise
Expand Down Expand Up @@ -1319,6 +1322,7 @@ async def checkin(self, conn: AsyncConnection) -> None:
txn = conn.pinned_txn
cursor = conn.pinned_cursor
conn.active = False
conn.op_id = None
conn.pinned_txn = False
conn.pinned_cursor = False
self.__pinned_sockets.discard(conn)
Expand Down
1 change: 1 addition & 0 deletions pymongo/asynchronous/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ async def run_operation(
more_to_come=more_to_come,
unpack_res=unpack_res,
cursor_id=operation.cursor_id,
op_id=conn.op_id,
)
assert reply is not None

Expand Down
6 changes: 6 additions & 0 deletions pymongo/synchronous/command_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ def run_cursor_command(
more_to_come: bool = False,
unpack_res: Optional[Callable[..., Any]] = None,
cursor_id: Optional[int] = None,
op_id: Optional[int] = None,
) -> tuple[list[dict[str, Any]], Optional[_OpMsg], datetime.timedelta]:
"""Run a cursor ``find``/``getMore`` operation over ``conn``.

Expand All @@ -304,6 +305,7 @@ def run_cursor_command(
:param unpack_res: A callable decoding the wire response; when ``None`` the
reply's own ``unpack_response`` is used.
:param cursor_id: The cursor id passed to ``unpack_res``.
:param op_id: The APM operation id; defaults to ``request_id``.
"""
topology_id = client._topology_id if client is not None else None
return _run_command(
Expand All @@ -325,6 +327,7 @@ def run_cursor_command(
more_to_come=more_to_come,
unpack_res=unpack_res,
cursor_id=cursor_id,
op_id=op_id,
)


Expand All @@ -348,6 +351,7 @@ def run_command(
user_fields: Optional[Mapping[str, Any]] = None,
exhaust_allowed: bool = False,
write_concern: Optional[WriteConcern] = None,
op_id: Optional[int] = None,
) -> _DocumentType:
"""Encode and execute a command over ``conn``, or raise socket.error.

Expand Down Expand Up @@ -376,6 +380,7 @@ def run_command(
passed to ``bson._decode_all_selective``.
:param exhaust_allowed: True if we should enable OP_MSG exhaustAllowed.
:param write_concern: The write concern for this command. Applied via CSOT.
:param op_id: The APM operation id; defaults to ``request_id``.
"""
name = next(iter(spec))

Expand Down Expand Up @@ -428,6 +433,7 @@ def run_command(
codec_options=codec_options,
user_fields=user_fields,
orig=orig,
op_id=op_id,
check=check,
allowable_errors=allowable_errors,
parse_write_concern_error=parse_write_concern_error,
Expand Down
12 changes: 10 additions & 2 deletions pymongo/synchronous/mongo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@
_log_client_error,
_log_or_warn,
)
from pymongo.message import _CursorAddress, _GetMore, _Query
from pymongo.message import _CursorAddress, _GetMore, _Query, _randint
from pymongo.monitoring import ConnectionClosedReason, _EventListeners
from pymongo.operations import (
DeleteMany,
Expand Down Expand Up @@ -1834,6 +1834,8 @@ def _select_server(
be pinned to a mongos server address.
- `address` (optional): Address when sending a message
to a specific server, used for getMore.
- `operation_id` (optional): Stable operation id shared across retries,
used for command monitoring.
"""
try:
topology = self._get_topology()
Expand Down Expand Up @@ -1929,6 +1931,7 @@ def _run_operation(
with operation.conn_mgr._lock:
with _MongoClientErrorHandler(self, server, operation.session) as err_handler: # type: ignore[arg-type]
err_handler.contribute_socket(operation.conn_mgr.conn)
operation.conn_mgr.conn.op_id = _randint()
return server.run_operation(
operation.conn_mgr.conn,
operation,
Expand Down Expand Up @@ -2020,6 +2023,7 @@ def _retry_internal(
:param retryable: If the operation should be retried once, defaults to None
:param is_run_command: If this is a runCommand operation, defaults to False
:param is_aggregate_write: If this is a aggregate operation with a write, defaults to False.
:param operation_id: Stable operation id shared across retries, defaults to None

:return: Output of the calling func()
"""
Expand Down Expand Up @@ -2066,6 +2070,7 @@ def _retryable_read(
(may not always be supported even if supplied), defaults to False
:param is_run_command: If this is a runCommand operation, defaults to False.
:param is_aggregate_write: If this is a aggregate operation with a write, defaults to False.
:param operation_id: Stable operation id shared across retries, defaults to None
"""

# Ensure that the client supports retrying on reads and there is no session in
Expand Down Expand Up @@ -2109,6 +2114,7 @@ def _retryable_write(
:param session: Client session we will use to execute write operation
:param operation: The name of the operation that the server is being selected for
:param bulk: bulk abstraction to execute operations in bulk, defaults to None
:param operation_id: Stable operation id shared across retries, defaults to None
"""
with self._tmp_session(session) as s:
return self._retry_with_session(retryable, func, s, bulk, operation, operation_id)
Expand Down Expand Up @@ -2786,7 +2792,7 @@ def __init__(
self._server: Server = None # type: ignore
self._deprioritized_servers: list[Server] = []
self._operation = operation
self._operation_id = operation_id
self._operation_id = operation_id if operation_id is not None else _randint()
self._attempt_number = 0
self._is_run_command = is_run_command
self._is_aggregate_write = is_aggregate_write
Expand Down Expand Up @@ -2992,6 +2998,7 @@ def _write(self) -> T:
is_mongos = False
self._server = self._get_server()
with self._client._checkout(self._server, self._session) as conn:
conn.op_id = self._operation_id
max_wire_version = conn.max_wire_version
sessions_supported = (
self._session
Expand Down Expand Up @@ -3031,6 +3038,7 @@ def _read(self) -> T:
conn,
read_pref,
):
conn.op_id = self._operation_id
if self._retrying and not self._retryable and not self._always_retryable:
self._check_last_error()
if self._retrying:
Expand Down
4 changes: 4 additions & 0 deletions pymongo/synchronous/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ def __init__(
self.creation_time = time.monotonic()
# For gossiping $clusterTime from the connection handshake to the client.
self._cluster_time = None
# Stable operation id for the operation currently using this connection.
self.op_id: Optional[int] = None

def set_conn_timeout(self, timeout: Optional[float]) -> None:
"""Cache last timeout to avoid duplicate calls to conn.settimeout."""
Expand Down Expand Up @@ -416,6 +418,7 @@ def command(
user_fields=user_fields,
exhaust_allowed=exhaust_allowed,
write_concern=write_concern,
op_id=self.op_id,
)
except (OperationFailure, NotPrimaryError):
raise
Expand Down Expand Up @@ -1315,6 +1318,7 @@ def checkin(self, conn: Connection) -> None:
txn = conn.pinned_txn
cursor = conn.pinned_cursor
conn.active = False
conn.op_id = None
conn.pinned_txn = False
conn.pinned_cursor = False
self.__pinned_sockets.discard(conn)
Expand Down
1 change: 1 addition & 0 deletions pymongo/synchronous/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ def run_operation(
more_to_come=more_to_come,
unpack_res=unpack_res,
cursor_id=operation.cursor_id,
op_id=conn.op_id,
)
assert reply is not None

Expand Down
2 changes: 1 addition & 1 deletion test/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,7 @@ def require_failCommand_fail_point(self, func: Any) -> Any:
func=func,
)

def require_failCommand_appName(self, func):
def require_failCommand_appName(self, func: Any) -> Any:
"""Run a test only if the server supports the failCommand appName."""
# SERVER-47195 and SERVER-49336.
return self._require(
Expand Down
2 changes: 1 addition & 1 deletion test/asynchronous/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,7 @@ def require_failCommand_fail_point(self, func: Any) -> Any:
func=func,
)

def require_failCommand_appName(self, func):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of this change?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was needed to fix an untyped-decorator mypy error that was occurring in my test. This matches the signature of require_failCommand_fail_point.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need both the require_failCommand_fail_point and require_failCommand_appName annotations, the latter is a strict subset of the former.

def require_failCommand_appName(self, func: Any) -> Any:
"""Run a test only if the server supports the failCommand appName."""
# SERVER-47195 and SERVER-49336.
return self._require(
Expand Down
Loading
Loading