From 786e403f03749f678fb414bbaadd3b7e4aaa1eaf Mon Sep 17 00:00:00 2001 From: Jeffrey 'Alex' Clark Date: Thu, 25 Jun 2026 12:27:54 -0400 Subject: [PATCH 1/6] PYTHON-5856 Move cursor execution logic into cursor class Move the command preparation and response construction that lived in Server.run_operation into _AsyncCursorBase._run_with_conn, so cursors call run_cursor_command directly without routing through Server. - Extract _CURSOR_DOC_FIELDS, _split_message, and _operation_to_command as module-level helpers in cursor_base.py - Add abstract _unpack_response to _AsyncCursorBase to make the interface explicit - Add _run_with_conn (carrying the @_handle_reauth decorator) to _AsyncCursorBase; this is the new home for all of run_operation's logic - client._run_operation now accepts execute_fn(conn, op, rp) -> Response instead of routing through server.run_operation - Remove Server.run_operation, Server.operation_to_command, and Server._split_message entirely --- pymongo/asynchronous/command_cursor.py | 2 +- pymongo/asynchronous/cursor.py | 2 +- pymongo/asynchronous/cursor_base.py | 119 ++++++++++++++++++++++++- pymongo/asynchronous/mongo_client.py | 25 ++---- pymongo/asynchronous/server.py | 26 +----- pymongo/synchronous/command_cursor.py | 4 +- pymongo/synchronous/cursor.py | 4 +- pymongo/synchronous/cursor_base.py | 119 ++++++++++++++++++++++++- pymongo/synchronous/mongo_client.py | 25 ++---- pymongo/synchronous/server.py | 26 +----- test/asynchronous/test_client.py | 2 +- test/test_client.py | 2 +- 12 files changed, 255 insertions(+), 101 deletions(-) diff --git a/pymongo/asynchronous/command_cursor.py b/pymongo/asynchronous/command_cursor.py index a06b501fa6..71404281a4 100644 --- a/pymongo/asynchronous/command_cursor.py +++ b/pymongo/asynchronous/command_cursor.py @@ -173,7 +173,7 @@ async def _send_message(self, operation: _GetMore) -> None: client = self._collection.database.client try: response = await client._run_operation( - operation, self._unpack_response, address=self._address + operation, self._run_with_conn, address=self._address ) except OperationFailure as exc: if exc.code in _CURSOR_CLOSED_ERRORS: diff --git a/pymongo/asynchronous/cursor.py b/pymongo/asynchronous/cursor.py index 0b2b09f742..51c00a0b36 100644 --- a/pymongo/asynchronous/cursor.py +++ b/pymongo/asynchronous/cursor.py @@ -980,7 +980,7 @@ async def _send_message(self, operation: Union[_Query, _GetMore]) -> None: try: response = await client._run_operation( - operation, self._unpack_response, address=self._address + operation, self._run_with_conn, address=self._address ) except OperationFailure as exc: if exc.code in _CURSOR_CLOSED_ERRORS or self._exhaust: diff --git a/pymongo/asynchronous/cursor_base.py b/pymongo/asynchronous/cursor_base.py index ce3114684a..fee9814870 100644 --- a/pymongo/asynchronous/cursor_base.py +++ b/pymongo/asynchronous/cursor_base.py @@ -16,20 +16,57 @@ from __future__ import annotations +import datetime from abc import abstractmethod -from typing import TYPE_CHECKING, Any, Optional +from collections.abc import Mapping, Sequence +from typing import TYPE_CHECKING, Any, Optional, Union from pymongo import _csot +from pymongo.asynchronous.command_runner import run_cursor_command +from pymongo.asynchronous.helpers import _handle_reauth from pymongo.cursor_shared import _AgnosticCursorBase from pymongo.lock import _async_create_lock -from pymongo.typings import _DocumentType +from pymongo.message import _GetMore, _OpMsg, _Query +from pymongo.response import PinnedResponse, Response +from pymongo.typings import _DocumentOut, _DocumentType if TYPE_CHECKING: from pymongo.asynchronous.client_session import AsyncClientSession from pymongo.asynchronous.pool import AsyncConnection + from pymongo.read_preferences import _ServerMode _IS_SYNC = False +_CURSOR_DOC_FIELDS = {"cursor": {"firstBatch": 1, "nextBatch": 1}} + + +def _split_message( + message: Union[tuple[int, Any], tuple[int, Any, int]], +) -> tuple[int, Any, int]: + """Return request_id, data, max_doc_size. + + :param message: (request_id, data, max_doc_size) or (request_id, data) + """ + if len(message) == 3: + return message # type: ignore[return-value] + # get_more and kill_cursors messages don't include BSON documents. + request_id, data = message # type: ignore[misc] + return request_id, data, 0 + + +async def _operation_to_command( + operation: Union[_Query, _GetMore], + conn: AsyncConnection, + use_cmd: bool, +) -> tuple[dict[str, Any], str]: + cmd, db = operation.as_command(conn, use_cmd) + if operation.client._encrypter and not operation.client._encrypter._bypass_auto_encryption: + cmd = await operation.client._encrypter.encrypt( # type: ignore[misc, assignment] + operation.db, cmd, operation.codec_options + ) + operation.update_command(cmd) + return cmd, db + class _ConnectionManager: """Used with exhaust cursors to ensure the connection is returned.""" @@ -66,6 +103,84 @@ def session(self) -> Optional[AsyncClientSession]: async def _next_batch(self, result: list, total: Optional[int] = None) -> bool: # type: ignore[type-arg] ... + @abstractmethod + def _unpack_response( + self, + response: _OpMsg, + cursor_id: Optional[int], + codec_options: Any, + user_fields: Optional[Mapping[str, Any]] = None, + legacy_response: bool = False, + ) -> Sequence[_DocumentOut]: ... + + @_handle_reauth + async def _run_with_conn( + self, + conn: AsyncConnection, + operation: Union[_Query, _GetMore], + read_preference: _ServerMode, + ) -> Response: + """Execute a cursor operation on the given connection and return a Response.""" + client = self._collection.database.client + use_cmd = operation.use_command(conn) + more_to_come = bool(operation.conn_mgr and operation.conn_mgr.more_to_come) + cmd, dbn = await _operation_to_command(operation, conn, use_cmd) + if more_to_come: + request_id, data, max_doc_size = 0, b"", 0 + else: + message = operation.get_message(read_preference, conn, use_cmd) + request_id, data, max_doc_size = _split_message(message) + user_fields = _CURSOR_DOC_FIELDS if use_cmd else None + docs, reply, duration = await run_cursor_command( + conn, + cmd, + dbn, + request_id, + data, + client=client, + session=operation.session, # type: ignore[arg-type] + listeners=client._event_listeners, + address=conn.address, + start=datetime.datetime.now(), + codec_options=operation.codec_options, + user_fields=user_fields, + command_name=operation.name, + pool_opts=conn.opts, + max_doc_size=max_doc_size, + more_to_come=more_to_come, + unpack_res=self._unpack_response, + cursor_id=operation.cursor_id, + ) + assert reply is not None + if client._should_pin_cursor(operation.session) or operation.exhaust: # type: ignore[arg-type] + conn.pin_cursor() + if isinstance(reply, _OpMsg): + # In OP_MSG, the server keeps sending only if the more_to_come flag is set. + more_to_come = reply.more_to_come + else: + # In OP_REPLY, the server keeps sending until cursor_id is 0. + more_to_come = bool(operation.exhaust and reply.cursor_id) + if operation.conn_mgr: + operation.conn_mgr.update_exhaust(more_to_come) + return PinnedResponse( + data=reply, + address=conn.address, + conn=conn, + duration=duration, + request_id=request_id, + from_command=use_cmd, + docs=docs, # type: ignore[arg-type] + more_to_come=more_to_come, + ) + return Response( + data=reply, + address=conn.address, + duration=duration, + request_id=request_id, + from_command=use_cmd, + docs=docs, # type: ignore[arg-type] + ) + async def _die_lock(self) -> None: """Closes this cursor.""" try: diff --git a/pymongo/asynchronous/mongo_client.py b/pymongo/asynchronous/mongo_client.py index a558f96356..ed2ffc7306 100644 --- a/pymongo/asynchronous/mongo_client.py +++ b/pymongo/asynchronous/mongo_client.py @@ -1911,13 +1911,14 @@ async def _conn_for_reads( async def _run_operation( self, operation: Union[_Query, _GetMore], - unpack_res: Callable, # type: ignore[type-arg] + execute_fn: Callable, # type: ignore[type-arg] address: Optional[_Address] = None, ) -> Response: """Run a _Query/_GetMore operation and return a Response. :param operation: a _Query or _GetMore object. - :param unpack_res: A callable that decodes the wire protocol response. + :param execute_fn: A callable ``(conn, operation, read_preference) -> Response`` + that executes the operation on a given connection. :param address: Optional address when sending a message to a specific server, used for getMore. """ @@ -1932,30 +1933,18 @@ async def _run_operation( async with operation.conn_mgr._lock: async with _MongoClientErrorHandler(self, server, operation.session) as err_handler: # type: ignore[arg-type] err_handler.contribute_socket(operation.conn_mgr.conn) - return await server.run_operation( - operation.conn_mgr.conn, - operation, - operation.read_preference, - self._event_listeners, - unpack_res, - self, + return await execute_fn( + operation.conn_mgr.conn, operation, operation.read_preference ) async def _cmd( _session: Optional[AsyncClientSession], - server: Server, + _server: Server, conn: AsyncConnection, read_preference: _ServerMode, ) -> Response: operation.reset() # Reset op in case of retry. - return await server.run_operation( - conn, - operation, - read_preference, - self._event_listeners, - unpack_res, - self, - ) + return await execute_fn(conn, operation, read_preference) return await self._retryable_read( _cmd, diff --git a/pymongo/asynchronous/server.py b/pymongo/asynchronous/server.py index 57158dfc44..1d9ace1f44 100644 --- a/pymongo/asynchronous/server.py +++ b/pymongo/asynchronous/server.py @@ -21,38 +21,28 @@ from typing import ( TYPE_CHECKING, Any, - Callable, Optional, - Union, ) -from pymongo.asynchronous.command_runner import run_cursor_command -from pymongo.asynchronous.helpers import _handle_reauth from pymongo.logger import ( _SDAM_LOGGER, _debug_log, _SDAMStatusMessage, ) -from pymongo.message import _GetMore, _OpMsg, _Query -from pymongo.response import PinnedResponse, Response if TYPE_CHECKING: from queue import Queue from weakref import ReferenceType from bson.objectid import ObjectId - from pymongo.asynchronous.mongo_client import AsyncMongoClient, _MongoClientErrorHandler + from pymongo.asynchronous.mongo_client import _MongoClientErrorHandler from pymongo.asynchronous.monitor import Monitor from pymongo.asynchronous.pool import AsyncConnection, Pool from pymongo.monitoring import _EventListeners - from pymongo.read_preferences import _ServerMode from pymongo.server_description import ServerDescription - from pymongo.typings import _DocumentOut _IS_SYNC = False -_CURSOR_DOC_FIELDS = {"cursor": {"firstBatch": 1, "nextBatch": 1}} - class Server: def __init__( @@ -241,19 +231,5 @@ def description(self, server_description: ServerDescription) -> None: def pool(self) -> Pool: return self._pool - def _split_message( - self, message: Union[tuple[int, Any], tuple[int, Any, int]] - ) -> tuple[int, Any, int]: - """Return request_id, data, max_doc_size. - - :param message: (request_id, data, max_doc_size) or (request_id, data) - """ - if len(message) == 3: - return message # type: ignore[return-value] - else: - # get_more and kill_cursors messages don't include BSON documents. - request_id, data = message # type: ignore[misc] - return request_id, data, 0 - def __repr__(self) -> str: return f"<{self.__class__.__name__} {self._description!r}>" diff --git a/pymongo/synchronous/command_cursor.py b/pymongo/synchronous/command_cursor.py index 1e7f26064d..8868d87939 100644 --- a/pymongo/synchronous/command_cursor.py +++ b/pymongo/synchronous/command_cursor.py @@ -172,9 +172,7 @@ def _send_message(self, operation: _GetMore) -> None: """Send a getmore message and handle the response.""" client = self._collection.database.client try: - response = client._run_operation( - operation, self._unpack_response, address=self._address - ) + response = client._run_operation(operation, self._run_with_conn, address=self._address) except OperationFailure as exc: if exc.code in _CURSOR_CLOSED_ERRORS: # Don't send killCursors because the cursor is already closed. diff --git a/pymongo/synchronous/cursor.py b/pymongo/synchronous/cursor.py index dd0fafdf3a..7ea81ea9ac 100644 --- a/pymongo/synchronous/cursor.py +++ b/pymongo/synchronous/cursor.py @@ -977,9 +977,7 @@ def _send_message(self, operation: Union[_Query, _GetMore]) -> None: raise InvalidOperation("exhaust cursors do not support auto encryption") try: - response = client._run_operation( - operation, self._unpack_response, address=self._address - ) + response = client._run_operation(operation, self._run_with_conn, address=self._address) except OperationFailure as exc: if exc.code in _CURSOR_CLOSED_ERRORS or self._exhaust: # Don't send killCursors because the cursor is already closed. diff --git a/pymongo/synchronous/cursor_base.py b/pymongo/synchronous/cursor_base.py index 96e69cb6ee..4806a65854 100644 --- a/pymongo/synchronous/cursor_base.py +++ b/pymongo/synchronous/cursor_base.py @@ -16,20 +16,57 @@ from __future__ import annotations +import datetime from abc import abstractmethod -from typing import TYPE_CHECKING, Any, Optional +from collections.abc import Mapping, Sequence +from typing import TYPE_CHECKING, Any, Optional, Union from pymongo import _csot from pymongo.cursor_shared import _AgnosticCursorBase from pymongo.lock import _create_lock -from pymongo.typings import _DocumentType +from pymongo.message import _GetMore, _OpMsg, _Query +from pymongo.response import PinnedResponse, Response +from pymongo.synchronous.command_runner import run_cursor_command +from pymongo.synchronous.helpers import _handle_reauth +from pymongo.typings import _DocumentOut, _DocumentType if TYPE_CHECKING: + from pymongo.read_preferences import _ServerMode from pymongo.synchronous.client_session import ClientSession from pymongo.synchronous.pool import Connection _IS_SYNC = True +_CURSOR_DOC_FIELDS = {"cursor": {"firstBatch": 1, "nextBatch": 1}} + + +def _split_message( + message: Union[tuple[int, Any], tuple[int, Any, int]], +) -> tuple[int, Any, int]: + """Return request_id, data, max_doc_size. + + :param message: (request_id, data, max_doc_size) or (request_id, data) + """ + if len(message) == 3: + return message # type: ignore[return-value] + # get_more and kill_cursors messages don't include BSON documents. + request_id, data = message # type: ignore[misc] + return request_id, data, 0 + + +def _operation_to_command( + operation: Union[_Query, _GetMore], + conn: Connection, + use_cmd: bool, +) -> tuple[dict[str, Any], str]: + cmd, db = operation.as_command(conn, use_cmd) + if operation.client._encrypter and not operation.client._encrypter._bypass_auto_encryption: + cmd = operation.client._encrypter.encrypt( # type: ignore[misc, assignment] + operation.db, cmd, operation.codec_options + ) + operation.update_command(cmd) + return cmd, db + class _ConnectionManager: """Used with exhaust cursors to ensure the connection is returned.""" @@ -66,6 +103,84 @@ def session(self) -> Optional[ClientSession]: def _next_batch(self, result: list, total: Optional[int] = None) -> bool: # type: ignore[type-arg] ... + @abstractmethod + def _unpack_response( + self, + response: _OpMsg, + cursor_id: Optional[int], + codec_options: Any, + user_fields: Optional[Mapping[str, Any]] = None, + legacy_response: bool = False, + ) -> Sequence[_DocumentOut]: ... + + @_handle_reauth + def _run_with_conn( + self, + conn: Connection, + operation: Union[_Query, _GetMore], + read_preference: _ServerMode, + ) -> Response: + """Execute a cursor operation on the given connection and return a Response.""" + client = self._collection.database.client + use_cmd = operation.use_command(conn) + more_to_come = bool(operation.conn_mgr and operation.conn_mgr.more_to_come) + cmd, dbn = _operation_to_command(operation, conn, use_cmd) + if more_to_come: + request_id, data, max_doc_size = 0, b"", 0 + else: + message = operation.get_message(read_preference, conn, use_cmd) + request_id, data, max_doc_size = _split_message(message) + user_fields = _CURSOR_DOC_FIELDS if use_cmd else None + docs, reply, duration = run_cursor_command( + conn, + cmd, + dbn, + request_id, + data, + client=client, + session=operation.session, # type: ignore[arg-type] + listeners=client._event_listeners, + address=conn.address, + start=datetime.datetime.now(), + codec_options=operation.codec_options, + user_fields=user_fields, + command_name=operation.name, + pool_opts=conn.opts, + max_doc_size=max_doc_size, + more_to_come=more_to_come, + unpack_res=self._unpack_response, + cursor_id=operation.cursor_id, + ) + assert reply is not None + if client._should_pin_cursor(operation.session) or operation.exhaust: # type: ignore[arg-type] + conn.pin_cursor() + if isinstance(reply, _OpMsg): + # In OP_MSG, the server keeps sending only if the more_to_come flag is set. + more_to_come = reply.more_to_come + else: + # In OP_REPLY, the server keeps sending until cursor_id is 0. + more_to_come = bool(operation.exhaust and reply.cursor_id) + if operation.conn_mgr: + operation.conn_mgr.update_exhaust(more_to_come) + return PinnedResponse( + data=reply, + address=conn.address, + conn=conn, + duration=duration, + request_id=request_id, + from_command=use_cmd, + docs=docs, # type: ignore[arg-type] + more_to_come=more_to_come, + ) + return Response( + data=reply, + address=conn.address, + duration=duration, + request_id=request_id, + from_command=use_cmd, + docs=docs, # type: ignore[arg-type] + ) + def _die_lock(self) -> None: """Closes this cursor.""" try: diff --git a/pymongo/synchronous/mongo_client.py b/pymongo/synchronous/mongo_client.py index 5f321afe5c..a4ae093282 100644 --- a/pymongo/synchronous/mongo_client.py +++ b/pymongo/synchronous/mongo_client.py @@ -1908,13 +1908,14 @@ def _conn_for_reads( def _run_operation( self, operation: Union[_Query, _GetMore], - unpack_res: Callable, # type: ignore[type-arg] + execute_fn: Callable, # type: ignore[type-arg] address: Optional[_Address] = None, ) -> Response: """Run a _Query/_GetMore operation and return a Response. :param operation: a _Query or _GetMore object. - :param unpack_res: A callable that decodes the wire protocol response. + :param execute_fn: A callable ``(conn, operation, read_preference) -> Response`` + that executes the operation on a given connection. :param address: Optional address when sending a message to a specific server, used for getMore. """ @@ -1929,30 +1930,16 @@ def _run_operation( with operation.conn_mgr._lock: with _MongoClientErrorHandler(self, server, operation.session) as err_handler: # type: ignore[arg-type] err_handler.contribute_socket(operation.conn_mgr.conn) - return server.run_operation( - operation.conn_mgr.conn, - operation, - operation.read_preference, - self._event_listeners, - unpack_res, - self, - ) + return execute_fn(operation.conn_mgr.conn, operation, operation.read_preference) def _cmd( _session: Optional[ClientSession], - server: Server, + _server: Server, conn: Connection, read_preference: _ServerMode, ) -> Response: operation.reset() # Reset op in case of retry. - return server.run_operation( - conn, - operation, - read_preference, - self._event_listeners, - unpack_res, - self, - ) + return execute_fn(conn, operation, read_preference) return self._retryable_read( _cmd, diff --git a/pymongo/synchronous/server.py b/pymongo/synchronous/server.py index 09d8fb75e1..3c074362eb 100644 --- a/pymongo/synchronous/server.py +++ b/pymongo/synchronous/server.py @@ -21,9 +21,7 @@ from typing import ( TYPE_CHECKING, Any, - Callable, Optional, - Union, ) from pymongo.logger import ( @@ -31,10 +29,6 @@ _debug_log, _SDAMStatusMessage, ) -from pymongo.message import _GetMore, _OpMsg, _Query -from pymongo.response import PinnedResponse, Response -from pymongo.synchronous.command_runner import run_cursor_command -from pymongo.synchronous.helpers import _handle_reauth if TYPE_CHECKING: from queue import Queue @@ -42,17 +36,13 @@ from bson.objectid import ObjectId from pymongo.monitoring import _EventListeners - from pymongo.read_preferences import _ServerMode from pymongo.server_description import ServerDescription - from pymongo.synchronous.mongo_client import MongoClient, _MongoClientErrorHandler + from pymongo.synchronous.mongo_client import _MongoClientErrorHandler from pymongo.synchronous.monitor import Monitor from pymongo.synchronous.pool import Connection, Pool - from pymongo.typings import _DocumentOut _IS_SYNC = True -_CURSOR_DOC_FIELDS = {"cursor": {"firstBatch": 1, "nextBatch": 1}} - class Server: def __init__( @@ -241,19 +231,5 @@ def description(self, server_description: ServerDescription) -> None: def pool(self) -> Pool: return self._pool - def _split_message( - self, message: Union[tuple[int, Any], tuple[int, Any, int]] - ) -> tuple[int, Any, int]: - """Return request_id, data, max_doc_size. - - :param message: (request_id, data, max_doc_size) or (request_id, data) - """ - if len(message) == 3: - return message # type: ignore[return-value] - else: - # get_more and kill_cursors messages don't include BSON documents. - request_id, data = message # type: ignore[misc] - return request_id, data, 0 - def __repr__(self) -> str: return f"<{self.__class__.__name__} {self._description!r}>" diff --git a/test/asynchronous/test_client.py b/test/asynchronous/test_client.py index 5da186931a..0a65ff70b7 100644 --- a/test/asynchronous/test_client.py +++ b/test/asynchronous/test_client.py @@ -1676,7 +1676,7 @@ async def test_stale_getmore(self): False, None, ), - unpack_res=AsyncCursor(client.pymongo_test.collection)._unpack_response, + execute_fn=AsyncCursor(client.pymongo_test.collection)._run_with_conn, address=("not-a-member", 27017), ) diff --git a/test/test_client.py b/test/test_client.py index b37b5e57ac..a32de9db72 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -1633,7 +1633,7 @@ def test_stale_getmore(self): False, None, ), - unpack_res=Cursor(client.pymongo_test.collection)._unpack_response, + execute_fn=Cursor(client.pymongo_test.collection)._run_with_conn, address=("not-a-member", 27017), ) From 31d154fd8e548c89ca56b9508648846ef02dfd83 Mon Sep 17 00:00:00 2001 From: Jeffrey 'Alex' Clark Date: Wed, 1 Jul 2026 12:42:37 -0400 Subject: [PATCH 2/6] Claude review --- pymongo/asynchronous/command_runner.py | 2 +- pymongo/asynchronous/cursor_base.py | 3 - pymongo/asynchronous/server.py | 106 ------------------------- pymongo/synchronous/command_runner.py | 2 +- pymongo/synchronous/cursor_base.py | 3 - pymongo/synchronous/server.py | 106 ------------------------- 6 files changed, 2 insertions(+), 220 deletions(-) diff --git a/pymongo/asynchronous/command_runner.py b/pymongo/asynchronous/command_runner.py index 1f32cc744e..3fc048fc39 100644 --- a/pymongo/asynchronous/command_runner.py +++ b/pymongo/asynchronous/command_runner.py @@ -24,7 +24,7 @@ batches. Pre-encrypted, so decryption is skipped. Callers: ``bulk.py``, ``client_bulk.py``. - :func:`run_cursor_command` — cursor ``find``/``getMore`` operations with - exhaust-cursor handling. Caller: ``server.py``. + exhaust-cursor handling. Caller: ``cursor_base.py``. :func:`_run_command` owns the entire shared skeleton: command logging, APM event publishing, ``send``/``receive``, ``$clusterTime`` gossip, diff --git a/pymongo/asynchronous/cursor_base.py b/pymongo/asynchronous/cursor_base.py index fee9814870..f4fdb0a340 100644 --- a/pymongo/asynchronous/cursor_base.py +++ b/pymongo/asynchronous/cursor_base.py @@ -16,7 +16,6 @@ from __future__ import annotations -import datetime from abc import abstractmethod from collections.abc import Mapping, Sequence from typing import TYPE_CHECKING, Any, Optional, Union @@ -140,8 +139,6 @@ async def _run_with_conn( client=client, session=operation.session, # type: ignore[arg-type] listeners=client._event_listeners, - address=conn.address, - start=datetime.datetime.now(), codec_options=operation.codec_options, user_fields=user_fields, command_name=operation.name, diff --git a/pymongo/asynchronous/server.py b/pymongo/asynchronous/server.py index 1d9ace1f44..0b19fd16a3 100644 --- a/pymongo/asynchronous/server.py +++ b/pymongo/asynchronous/server.py @@ -107,112 +107,6 @@ def request_check(self) -> None: """Check the server's state soon.""" self._monitor.request_check() - async def operation_to_command( - self, operation: Union[_Query, _GetMore], conn: AsyncConnection, apply_timeout: bool = False - ) -> tuple[dict[str, Any], str]: - cmd, db = operation.as_command(conn, apply_timeout) - # Support auto encryption - if operation.client._encrypter and not operation.client._encrypter._bypass_auto_encryption: - cmd = await operation.client._encrypter.encrypt( # type: ignore[misc, assignment] - operation.db, cmd, operation.codec_options - ) - operation.update_command(cmd) - - return cmd, db - - @_handle_reauth - async def run_operation( - self, - conn: AsyncConnection, - operation: Union[_Query, _GetMore], - read_preference: _ServerMode, - listeners: Optional[_EventListeners], - unpack_res: Callable[..., list[_DocumentOut]], - client: AsyncMongoClient[Any], - ) -> Response: - """Run a _Query or _GetMore operation and return a Response object. - - This method is used only to run _Query/_GetMore operations from - cursors. - Can raise ConnectionFailure, OperationFailure, etc. - - :param conn: An AsyncConnection instance. - :param operation: A _Query or _GetMore object. - :param read_preference: The read preference to use. - :param listeners: Instance of _EventListeners or None. - :param unpack_res: A callable that decodes the wire protocol response. - :param client: An AsyncMongoClient instance. - """ - assert listeners is not None - - use_cmd = operation.use_command(conn) - more_to_come = bool(operation.conn_mgr and operation.conn_mgr.more_to_come) - cmd, dbn = await self.operation_to_command(operation, conn, use_cmd) - if more_to_come: - request_id = 0 - data = b"" - max_doc_size = 0 - else: - message = operation.get_message(read_preference, conn, use_cmd) - request_id, data, max_doc_size = self._split_message(message) - - user_fields = _CURSOR_DOC_FIELDS if use_cmd else None - - docs, reply, duration = await run_cursor_command( - conn, - cmd, - dbn, - request_id, - data, - client=client, - session=operation.session, # type: ignore[arg-type] - listeners=listeners, - codec_options=operation.codec_options, - user_fields=user_fields, - command_name=operation.name, - pool_opts=conn.opts, - max_doc_size=max_doc_size, - more_to_come=more_to_come, - unpack_res=unpack_res, - cursor_id=operation.cursor_id, - ) - assert reply is not None - - response: Response - - if client._should_pin_cursor(operation.session) or operation.exhaust: # type: ignore[arg-type] - conn.pin_cursor() - if isinstance(reply, _OpMsg): - # In OP_MSG, the server keeps sending only if the - # more_to_come flag is set. - more_to_come = reply.more_to_come - else: - # In OP_REPLY, the server keeps sending until cursor_id is 0. - more_to_come = bool(operation.exhaust and reply.cursor_id) - if operation.conn_mgr: - operation.conn_mgr.update_exhaust(more_to_come) - response = PinnedResponse( - data=reply, - address=self._description.address, - conn=conn, - duration=duration, - request_id=request_id, - from_command=use_cmd, - docs=docs, # type: ignore[arg-type] - more_to_come=more_to_come, - ) - else: - response = Response( - data=reply, - address=self._description.address, - duration=duration, - request_id=request_id, - from_command=use_cmd, - docs=docs, # type: ignore[arg-type] - ) - - return response - async def checkout( self, handler: Optional[_MongoClientErrorHandler] = None ) -> AbstractAsyncContextManager[AsyncConnection]: diff --git a/pymongo/synchronous/command_runner.py b/pymongo/synchronous/command_runner.py index 077e0f9409..eec58722bb 100644 --- a/pymongo/synchronous/command_runner.py +++ b/pymongo/synchronous/command_runner.py @@ -24,7 +24,7 @@ batches. Pre-encrypted, so decryption is skipped. Callers: ``bulk.py``, ``client_bulk.py``. - :func:`run_cursor_command` — cursor ``find``/``getMore`` operations with - exhaust-cursor handling. Caller: ``server.py``. + exhaust-cursor handling. Caller: ``cursor_base.py``. :func:`_run_command` owns the entire shared skeleton: command logging, APM event publishing, ``send``/``receive``, ``$clusterTime`` gossip, diff --git a/pymongo/synchronous/cursor_base.py b/pymongo/synchronous/cursor_base.py index 4806a65854..17d76cd597 100644 --- a/pymongo/synchronous/cursor_base.py +++ b/pymongo/synchronous/cursor_base.py @@ -16,7 +16,6 @@ from __future__ import annotations -import datetime from abc import abstractmethod from collections.abc import Mapping, Sequence from typing import TYPE_CHECKING, Any, Optional, Union @@ -140,8 +139,6 @@ def _run_with_conn( client=client, session=operation.session, # type: ignore[arg-type] listeners=client._event_listeners, - address=conn.address, - start=datetime.datetime.now(), codec_options=operation.codec_options, user_fields=user_fields, command_name=operation.name, diff --git a/pymongo/synchronous/server.py b/pymongo/synchronous/server.py index 3c074362eb..8b0ad4bcb0 100644 --- a/pymongo/synchronous/server.py +++ b/pymongo/synchronous/server.py @@ -107,112 +107,6 @@ def request_check(self) -> None: """Check the server's state soon.""" self._monitor.request_check() - def operation_to_command( - self, operation: Union[_Query, _GetMore], conn: Connection, apply_timeout: bool = False - ) -> tuple[dict[str, Any], str]: - cmd, db = operation.as_command(conn, apply_timeout) - # Support auto encryption - if operation.client._encrypter and not operation.client._encrypter._bypass_auto_encryption: - cmd = operation.client._encrypter.encrypt( # type: ignore[misc, assignment] - operation.db, cmd, operation.codec_options - ) - operation.update_command(cmd) - - return cmd, db - - @_handle_reauth - def run_operation( - self, - conn: Connection, - operation: Union[_Query, _GetMore], - read_preference: _ServerMode, - listeners: Optional[_EventListeners], - unpack_res: Callable[..., list[_DocumentOut]], - client: MongoClient[Any], - ) -> Response: - """Run a _Query or _GetMore operation and return a Response object. - - This method is used only to run _Query/_GetMore operations from - cursors. - Can raise ConnectionFailure, OperationFailure, etc. - - :param conn: A Connection instance. - :param operation: A _Query or _GetMore object. - :param read_preference: The read preference to use. - :param listeners: Instance of _EventListeners or None. - :param unpack_res: A callable that decodes the wire protocol response. - :param client: A MongoClient instance. - """ - assert listeners is not None - - use_cmd = operation.use_command(conn) - more_to_come = bool(operation.conn_mgr and operation.conn_mgr.more_to_come) - cmd, dbn = self.operation_to_command(operation, conn, use_cmd) - if more_to_come: - request_id = 0 - data = b"" - max_doc_size = 0 - else: - message = operation.get_message(read_preference, conn, use_cmd) - request_id, data, max_doc_size = self._split_message(message) - - user_fields = _CURSOR_DOC_FIELDS if use_cmd else None - - docs, reply, duration = run_cursor_command( - conn, - cmd, - dbn, - request_id, - data, - client=client, - session=operation.session, # type: ignore[arg-type] - listeners=listeners, - codec_options=operation.codec_options, - user_fields=user_fields, - command_name=operation.name, - pool_opts=conn.opts, - max_doc_size=max_doc_size, - more_to_come=more_to_come, - unpack_res=unpack_res, - cursor_id=operation.cursor_id, - ) - assert reply is not None - - response: Response - - if client._should_pin_cursor(operation.session) or operation.exhaust: # type: ignore[arg-type] - conn.pin_cursor() - if isinstance(reply, _OpMsg): - # In OP_MSG, the server keeps sending only if the - # more_to_come flag is set. - more_to_come = reply.more_to_come - else: - # In OP_REPLY, the server keeps sending until cursor_id is 0. - more_to_come = bool(operation.exhaust and reply.cursor_id) - if operation.conn_mgr: - operation.conn_mgr.update_exhaust(more_to_come) - response = PinnedResponse( - data=reply, - address=self._description.address, - conn=conn, - duration=duration, - request_id=request_id, - from_command=use_cmd, - docs=docs, # type: ignore[arg-type] - more_to_come=more_to_come, - ) - else: - response = Response( - data=reply, - address=self._description.address, - duration=duration, - request_id=request_id, - from_command=use_cmd, - docs=docs, # type: ignore[arg-type] - ) - - return response - def checkout( self, handler: Optional[_MongoClientErrorHandler] = None ) -> AbstractContextManager[Connection]: From 2deb975e3dd9add1322bb21cbb74468199689db8 Mon Sep 17 00:00:00 2001 From: Jeffrey 'Alex' Clark Date: Wed, 1 Jul 2026 13:16:03 -0400 Subject: [PATCH 3/6] Copilot review --- pymongo/asynchronous/cursor_base.py | 8 ++++---- pymongo/asynchronous/mongo_client.py | 2 +- pymongo/synchronous/cursor_base.py | 8 ++++---- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/pymongo/asynchronous/cursor_base.py b/pymongo/asynchronous/cursor_base.py index f4fdb0a340..9d99cb7443 100644 --- a/pymongo/asynchronous/cursor_base.py +++ b/pymongo/asynchronous/cursor_base.py @@ -40,8 +40,8 @@ def _split_message( - message: Union[tuple[int, Any], tuple[int, Any, int]], -) -> tuple[int, Any, int]: + message: Union[tuple[int, bytes], tuple[int, bytes, int]], +) -> tuple[int, bytes, int]: """Return request_id, data, max_doc_size. :param message: (request_id, data, max_doc_size) or (request_id, data) @@ -56,9 +56,9 @@ def _split_message( async def _operation_to_command( operation: Union[_Query, _GetMore], conn: AsyncConnection, - use_cmd: bool, + apply_timeout: bool, ) -> tuple[dict[str, Any], str]: - cmd, db = operation.as_command(conn, use_cmd) + cmd, db = operation.as_command(conn, apply_timeout) if operation.client._encrypter and not operation.client._encrypter._bypass_auto_encryption: cmd = await operation.client._encrypter.encrypt( # type: ignore[misc, assignment] operation.db, cmd, operation.codec_options diff --git a/pymongo/asynchronous/mongo_client.py b/pymongo/asynchronous/mongo_client.py index ed2ffc7306..98377f2963 100644 --- a/pymongo/asynchronous/mongo_client.py +++ b/pymongo/asynchronous/mongo_client.py @@ -1917,7 +1917,7 @@ async def _run_operation( """Run a _Query/_GetMore operation and return a Response. :param operation: a _Query or _GetMore object. - :param execute_fn: A callable ``(conn, operation, read_preference) -> Response`` + :param execute_fn: A callable ``(conn, operation, read_preference) -> Awaitable[Response]`` that executes the operation on a given connection. :param address: Optional address when sending a message to a specific server, used for getMore. diff --git a/pymongo/synchronous/cursor_base.py b/pymongo/synchronous/cursor_base.py index 17d76cd597..c43e1848ac 100644 --- a/pymongo/synchronous/cursor_base.py +++ b/pymongo/synchronous/cursor_base.py @@ -40,8 +40,8 @@ def _split_message( - message: Union[tuple[int, Any], tuple[int, Any, int]], -) -> tuple[int, Any, int]: + message: Union[tuple[int, bytes], tuple[int, bytes, int]], +) -> tuple[int, bytes, int]: """Return request_id, data, max_doc_size. :param message: (request_id, data, max_doc_size) or (request_id, data) @@ -56,9 +56,9 @@ def _split_message( def _operation_to_command( operation: Union[_Query, _GetMore], conn: Connection, - use_cmd: bool, + apply_timeout: bool, ) -> tuple[dict[str, Any], str]: - cmd, db = operation.as_command(conn, use_cmd) + cmd, db = operation.as_command(conn, apply_timeout) if operation.client._encrypter and not operation.client._encrypter._bypass_auto_encryption: cmd = operation.client._encrypter.encrypt( # type: ignore[misc, assignment] operation.db, cmd, operation.codec_options From eff0e8bff5854e9a92e846a4a11603102c481a59 Mon Sep 17 00:00:00 2001 From: Jeffrey 'Alex' Clark Date: Wed, 1 Jul 2026 14:53:14 -0400 Subject: [PATCH 4/6] Claude review --- pymongo/asynchronous/cursor_base.py | 25 +++++++------------------ pymongo/cursor_shared.py | 16 ++++++++++++++++ pymongo/synchronous/cursor_base.py | 25 +++++++------------------ 3 files changed, 30 insertions(+), 36 deletions(-) diff --git a/pymongo/asynchronous/cursor_base.py b/pymongo/asynchronous/cursor_base.py index 9d99cb7443..e8ac4c3139 100644 --- a/pymongo/asynchronous/cursor_base.py +++ b/pymongo/asynchronous/cursor_base.py @@ -23,7 +23,7 @@ from pymongo import _csot from pymongo.asynchronous.command_runner import run_cursor_command from pymongo.asynchronous.helpers import _handle_reauth -from pymongo.cursor_shared import _AgnosticCursorBase +from pymongo.cursor_shared import _CURSOR_DOC_FIELDS, _AgnosticCursorBase, _split_message from pymongo.lock import _async_create_lock from pymongo.message import _GetMore, _OpMsg, _Query from pymongo.response import PinnedResponse, Response @@ -36,22 +36,6 @@ _IS_SYNC = False -_CURSOR_DOC_FIELDS = {"cursor": {"firstBatch": 1, "nextBatch": 1}} - - -def _split_message( - message: Union[tuple[int, bytes], tuple[int, bytes, int]], -) -> tuple[int, bytes, int]: - """Return request_id, data, max_doc_size. - - :param message: (request_id, data, max_doc_size) or (request_id, data) - """ - if len(message) == 3: - return message # type: ignore[return-value] - # get_more and kill_cursors messages don't include BSON documents. - request_id, data = message # type: ignore[misc] - return request_id, data, 0 - async def _operation_to_command( operation: Union[_Query, _GetMore], @@ -119,7 +103,12 @@ async def _run_with_conn( operation: Union[_Query, _GetMore], read_preference: _ServerMode, ) -> Response: - """Execute a cursor operation on the given connection and return a Response.""" + """Execute a cursor operation on the given connection and return a Response. + + :param conn: An AsyncConnection instance. + :param operation: A _Query or _GetMore object. + :param read_preference: The read preference to use. + """ client = self._collection.database.client use_cmd = operation.use_command(conn) more_to_come = bool(operation.conn_mgr and operation.conn_mgr.more_to_come) diff --git a/pymongo/cursor_shared.py b/pymongo/cursor_shared.py index 5d58f1845b..df0e1e2f58 100644 --- a/pymongo/cursor_shared.py +++ b/pymongo/cursor_shared.py @@ -24,6 +24,22 @@ from pymongo.message import _CursorAddress from pymongo.typings import _Address, _DocumentType +_CURSOR_DOC_FIELDS = {"cursor": {"firstBatch": 1, "nextBatch": 1}} + + +def _split_message( + message: Union[tuple[int, bytes], tuple[int, bytes, int]], +) -> tuple[int, bytes, int]: + """Return request_id, data, max_doc_size. + + :param message: (request_id, data, max_doc_size) or (request_id, data) + """ + if len(message) == 3: + return message + # get_more and kill_cursors messages don't include BSON documents. + request_id, data = message + return request_id, data, 0 + class _AgnosticCursorBase(Generic[_DocumentType], ABC): """ diff --git a/pymongo/synchronous/cursor_base.py b/pymongo/synchronous/cursor_base.py index c43e1848ac..4cad4e0c09 100644 --- a/pymongo/synchronous/cursor_base.py +++ b/pymongo/synchronous/cursor_base.py @@ -21,7 +21,7 @@ from typing import TYPE_CHECKING, Any, Optional, Union from pymongo import _csot -from pymongo.cursor_shared import _AgnosticCursorBase +from pymongo.cursor_shared import _CURSOR_DOC_FIELDS, _AgnosticCursorBase, _split_message from pymongo.lock import _create_lock from pymongo.message import _GetMore, _OpMsg, _Query from pymongo.response import PinnedResponse, Response @@ -36,22 +36,6 @@ _IS_SYNC = True -_CURSOR_DOC_FIELDS = {"cursor": {"firstBatch": 1, "nextBatch": 1}} - - -def _split_message( - message: Union[tuple[int, bytes], tuple[int, bytes, int]], -) -> tuple[int, bytes, int]: - """Return request_id, data, max_doc_size. - - :param message: (request_id, data, max_doc_size) or (request_id, data) - """ - if len(message) == 3: - return message # type: ignore[return-value] - # get_more and kill_cursors messages don't include BSON documents. - request_id, data = message # type: ignore[misc] - return request_id, data, 0 - def _operation_to_command( operation: Union[_Query, _GetMore], @@ -119,7 +103,12 @@ def _run_with_conn( operation: Union[_Query, _GetMore], read_preference: _ServerMode, ) -> Response: - """Execute a cursor operation on the given connection and return a Response.""" + """Execute a cursor operation on the given connection and return a Response. + + :param conn: A Connection instance. + :param operation: A _Query or _GetMore object. + :param read_preference: The read preference to use. + """ client = self._collection.database.client use_cmd = operation.use_command(conn) more_to_come = bool(operation.conn_mgr and operation.conn_mgr.more_to_come) From f610eb7434ad07b5f698a27c445497b0e58ee240 Mon Sep 17 00:00:00 2001 From: Jeffrey 'Alex' Clark Date: Wed, 1 Jul 2026 15:23:29 -0400 Subject: [PATCH 5/6] Rename execute_fn -> run_with_conn --- pymongo/asynchronous/mongo_client.py | 8 ++++---- pymongo/synchronous/mongo_client.py | 10 ++++++---- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/pymongo/asynchronous/mongo_client.py b/pymongo/asynchronous/mongo_client.py index 98377f2963..c745720ed9 100644 --- a/pymongo/asynchronous/mongo_client.py +++ b/pymongo/asynchronous/mongo_client.py @@ -1911,13 +1911,13 @@ async def _conn_for_reads( async def _run_operation( self, operation: Union[_Query, _GetMore], - execute_fn: Callable, # type: ignore[type-arg] + run_with_conn: Callable, # type: ignore[type-arg] address: Optional[_Address] = None, ) -> Response: """Run a _Query/_GetMore operation and return a Response. :param operation: a _Query or _GetMore object. - :param execute_fn: A callable ``(conn, operation, read_preference) -> Awaitable[Response]`` + :param run_with_conn: A callable ``(conn, operation, read_preference) -> Awaitable[Response]`` that executes the operation on a given connection. :param address: Optional address when sending a message to a specific server, used for getMore. @@ -1933,7 +1933,7 @@ async def _run_operation( async with operation.conn_mgr._lock: async with _MongoClientErrorHandler(self, server, operation.session) as err_handler: # type: ignore[arg-type] err_handler.contribute_socket(operation.conn_mgr.conn) - return await execute_fn( + return await run_with_conn( operation.conn_mgr.conn, operation, operation.read_preference ) @@ -1944,7 +1944,7 @@ async def _cmd( read_preference: _ServerMode, ) -> Response: operation.reset() # Reset op in case of retry. - return await execute_fn(conn, operation, read_preference) + return await run_with_conn(conn, operation, read_preference) return await self._retryable_read( _cmd, diff --git a/pymongo/synchronous/mongo_client.py b/pymongo/synchronous/mongo_client.py index a4ae093282..90912dd6f5 100644 --- a/pymongo/synchronous/mongo_client.py +++ b/pymongo/synchronous/mongo_client.py @@ -1908,13 +1908,13 @@ def _conn_for_reads( def _run_operation( self, operation: Union[_Query, _GetMore], - execute_fn: Callable, # type: ignore[type-arg] + run_with_conn: Callable, # type: ignore[type-arg] address: Optional[_Address] = None, ) -> Response: """Run a _Query/_GetMore operation and return a Response. :param operation: a _Query or _GetMore object. - :param execute_fn: A callable ``(conn, operation, read_preference) -> Response`` + :param run_with_conn: A callable ``(conn, operation, read_preference) -> Response`` that executes the operation on a given connection. :param address: Optional address when sending a message to a specific server, used for getMore. @@ -1930,7 +1930,9 @@ def _run_operation( with operation.conn_mgr._lock: with _MongoClientErrorHandler(self, server, operation.session) as err_handler: # type: ignore[arg-type] err_handler.contribute_socket(operation.conn_mgr.conn) - return execute_fn(operation.conn_mgr.conn, operation, operation.read_preference) + return run_with_conn( + operation.conn_mgr.conn, operation, operation.read_preference + ) def _cmd( _session: Optional[ClientSession], @@ -1939,7 +1941,7 @@ def _cmd( read_preference: _ServerMode, ) -> Response: operation.reset() # Reset op in case of retry. - return execute_fn(conn, operation, read_preference) + return run_with_conn(conn, operation, read_preference) return self._retryable_read( _cmd, From 6ff89cb287afc62efd67a6a99b769d56cd8fb972 Mon Sep 17 00:00:00 2001 From: Jeffrey 'Alex' Clark Date: Wed, 1 Jul 2026 15:29:08 -0400 Subject: [PATCH 6/6] PYTHON-5856 Fix stale execute_fn kwarg in test_stale_getmore The execute_fn -> run_with_conn rename missed this call site. --- test/asynchronous/test_client.py | 2 +- test/test_client.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/test/asynchronous/test_client.py b/test/asynchronous/test_client.py index 0a65ff70b7..a7f66d9620 100644 --- a/test/asynchronous/test_client.py +++ b/test/asynchronous/test_client.py @@ -1676,7 +1676,7 @@ async def test_stale_getmore(self): False, None, ), - execute_fn=AsyncCursor(client.pymongo_test.collection)._run_with_conn, + run_with_conn=AsyncCursor(client.pymongo_test.collection)._run_with_conn, address=("not-a-member", 27017), ) diff --git a/test/test_client.py b/test/test_client.py index a32de9db72..8f0da71321 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -1633,7 +1633,7 @@ def test_stale_getmore(self): False, None, ), - execute_fn=Cursor(client.pymongo_test.collection)._run_with_conn, + run_with_conn=Cursor(client.pymongo_test.collection)._run_with_conn, address=("not-a-member", 27017), )