diff --git a/pymongo/asynchronous/command_cursor.py b/pymongo/asynchronous/command_cursor.py index a06b501fa6..71404281a4 100644 --- a/pymongo/asynchronous/command_cursor.py +++ b/pymongo/asynchronous/command_cursor.py @@ -173,7 +173,7 @@ async def _send_message(self, operation: _GetMore) -> None: client = self._collection.database.client try: response = await client._run_operation( - operation, self._unpack_response, address=self._address + operation, self._run_with_conn, address=self._address ) except OperationFailure as exc: if exc.code in _CURSOR_CLOSED_ERRORS: diff --git a/pymongo/asynchronous/command_runner.py b/pymongo/asynchronous/command_runner.py index 1f32cc744e..3fc048fc39 100644 --- a/pymongo/asynchronous/command_runner.py +++ b/pymongo/asynchronous/command_runner.py @@ -24,7 +24,7 @@ batches. Pre-encrypted, so decryption is skipped. Callers: ``bulk.py``, ``client_bulk.py``. - :func:`run_cursor_command` — cursor ``find``/``getMore`` operations with - exhaust-cursor handling. Caller: ``server.py``. + exhaust-cursor handling. Caller: ``cursor_base.py``. :func:`_run_command` owns the entire shared skeleton: command logging, APM event publishing, ``send``/``receive``, ``$clusterTime`` gossip, diff --git a/pymongo/asynchronous/cursor.py b/pymongo/asynchronous/cursor.py index 0b2b09f742..51c00a0b36 100644 --- a/pymongo/asynchronous/cursor.py +++ b/pymongo/asynchronous/cursor.py @@ -980,7 +980,7 @@ async def _send_message(self, operation: Union[_Query, _GetMore]) -> None: try: response = await client._run_operation( - operation, self._unpack_response, address=self._address + operation, self._run_with_conn, address=self._address ) except OperationFailure as exc: if exc.code in _CURSOR_CLOSED_ERRORS or self._exhaust: diff --git a/pymongo/asynchronous/cursor_base.py b/pymongo/asynchronous/cursor_base.py index ce3114684a..e8ac4c3139 100644 --- a/pymongo/asynchronous/cursor_base.py +++ b/pymongo/asynchronous/cursor_base.py @@ -17,20 +17,40 @@ from __future__ import annotations from abc import abstractmethod -from typing import TYPE_CHECKING, Any, Optional +from collections.abc import Mapping, Sequence +from typing import TYPE_CHECKING, Any, Optional, Union from pymongo import _csot -from pymongo.cursor_shared import _AgnosticCursorBase +from pymongo.asynchronous.command_runner import run_cursor_command +from pymongo.asynchronous.helpers import _handle_reauth +from pymongo.cursor_shared import _CURSOR_DOC_FIELDS, _AgnosticCursorBase, _split_message from pymongo.lock import _async_create_lock -from pymongo.typings import _DocumentType +from pymongo.message import _GetMore, _OpMsg, _Query +from pymongo.response import PinnedResponse, Response +from pymongo.typings import _DocumentOut, _DocumentType if TYPE_CHECKING: from pymongo.asynchronous.client_session import AsyncClientSession from pymongo.asynchronous.pool import AsyncConnection + from pymongo.read_preferences import _ServerMode _IS_SYNC = False +async def _operation_to_command( + operation: Union[_Query, _GetMore], + conn: AsyncConnection, + apply_timeout: bool, +) -> tuple[dict[str, Any], str]: + cmd, db = operation.as_command(conn, apply_timeout) + if operation.client._encrypter and not operation.client._encrypter._bypass_auto_encryption: + cmd = await operation.client._encrypter.encrypt( # type: ignore[misc, assignment] + operation.db, cmd, operation.codec_options + ) + operation.update_command(cmd) + return cmd, db + + class _ConnectionManager: """Used with exhaust cursors to ensure the connection is returned.""" @@ -66,6 +86,87 @@ def session(self) -> Optional[AsyncClientSession]: async def _next_batch(self, result: list, total: Optional[int] = None) -> bool: # type: ignore[type-arg] ... + @abstractmethod + def _unpack_response( + self, + response: _OpMsg, + cursor_id: Optional[int], + codec_options: Any, + user_fields: Optional[Mapping[str, Any]] = None, + legacy_response: bool = False, + ) -> Sequence[_DocumentOut]: ... + + @_handle_reauth + async def _run_with_conn( + self, + conn: AsyncConnection, + operation: Union[_Query, _GetMore], + read_preference: _ServerMode, + ) -> Response: + """Execute a cursor operation on the given connection and return a Response. + + :param conn: An AsyncConnection instance. + :param operation: A _Query or _GetMore object. + :param read_preference: The read preference to use. + """ + client = self._collection.database.client + use_cmd = operation.use_command(conn) + more_to_come = bool(operation.conn_mgr and operation.conn_mgr.more_to_come) + cmd, dbn = await _operation_to_command(operation, conn, use_cmd) + if more_to_come: + request_id, data, max_doc_size = 0, b"", 0 + else: + message = operation.get_message(read_preference, conn, use_cmd) + request_id, data, max_doc_size = _split_message(message) + user_fields = _CURSOR_DOC_FIELDS if use_cmd else None + docs, reply, duration = await run_cursor_command( + conn, + cmd, + dbn, + request_id, + data, + client=client, + session=operation.session, # type: ignore[arg-type] + listeners=client._event_listeners, + codec_options=operation.codec_options, + user_fields=user_fields, + command_name=operation.name, + pool_opts=conn.opts, + max_doc_size=max_doc_size, + more_to_come=more_to_come, + unpack_res=self._unpack_response, + cursor_id=operation.cursor_id, + ) + assert reply is not None + if client._should_pin_cursor(operation.session) or operation.exhaust: # type: ignore[arg-type] + conn.pin_cursor() + if isinstance(reply, _OpMsg): + # In OP_MSG, the server keeps sending only if the more_to_come flag is set. + more_to_come = reply.more_to_come + else: + # In OP_REPLY, the server keeps sending until cursor_id is 0. + more_to_come = bool(operation.exhaust and reply.cursor_id) + if operation.conn_mgr: + operation.conn_mgr.update_exhaust(more_to_come) + return PinnedResponse( + data=reply, + address=conn.address, + conn=conn, + duration=duration, + request_id=request_id, + from_command=use_cmd, + docs=docs, # type: ignore[arg-type] + more_to_come=more_to_come, + ) + return Response( + data=reply, + address=conn.address, + duration=duration, + request_id=request_id, + from_command=use_cmd, + docs=docs, # type: ignore[arg-type] + ) + async def _die_lock(self) -> None: """Closes this cursor.""" try: diff --git a/pymongo/asynchronous/mongo_client.py b/pymongo/asynchronous/mongo_client.py index a558f96356..c745720ed9 100644 --- a/pymongo/asynchronous/mongo_client.py +++ b/pymongo/asynchronous/mongo_client.py @@ -1911,13 +1911,14 @@ async def _conn_for_reads( async def _run_operation( self, operation: Union[_Query, _GetMore], - unpack_res: Callable, # type: ignore[type-arg] + run_with_conn: Callable, # type: ignore[type-arg] address: Optional[_Address] = None, ) -> Response: """Run a _Query/_GetMore operation and return a Response. :param operation: a _Query or _GetMore object. - :param unpack_res: A callable that decodes the wire protocol response. + :param run_with_conn: A callable ``(conn, operation, read_preference) -> Awaitable[Response]`` + that executes the operation on a given connection. :param address: Optional address when sending a message to a specific server, used for getMore. """ @@ -1932,30 +1933,18 @@ async def _run_operation( async with operation.conn_mgr._lock: async with _MongoClientErrorHandler(self, server, operation.session) as err_handler: # type: ignore[arg-type] err_handler.contribute_socket(operation.conn_mgr.conn) - return await server.run_operation( - operation.conn_mgr.conn, - operation, - operation.read_preference, - self._event_listeners, - unpack_res, - self, + return await run_with_conn( + operation.conn_mgr.conn, operation, operation.read_preference ) async def _cmd( _session: Optional[AsyncClientSession], - server: Server, + _server: Server, conn: AsyncConnection, read_preference: _ServerMode, ) -> Response: operation.reset() # Reset op in case of retry. - return await server.run_operation( - conn, - operation, - read_preference, - self._event_listeners, - unpack_res, - self, - ) + return await run_with_conn(conn, operation, read_preference) return await self._retryable_read( _cmd, diff --git a/pymongo/asynchronous/server.py b/pymongo/asynchronous/server.py index 57158dfc44..0b19fd16a3 100644 --- a/pymongo/asynchronous/server.py +++ b/pymongo/asynchronous/server.py @@ -21,38 +21,28 @@ from typing import ( TYPE_CHECKING, Any, - Callable, Optional, - Union, ) -from pymongo.asynchronous.command_runner import run_cursor_command -from pymongo.asynchronous.helpers import _handle_reauth from pymongo.logger import ( _SDAM_LOGGER, _debug_log, _SDAMStatusMessage, ) -from pymongo.message import _GetMore, _OpMsg, _Query -from pymongo.response import PinnedResponse, Response if TYPE_CHECKING: from queue import Queue from weakref import ReferenceType from bson.objectid import ObjectId - from pymongo.asynchronous.mongo_client import AsyncMongoClient, _MongoClientErrorHandler + from pymongo.asynchronous.mongo_client import _MongoClientErrorHandler from pymongo.asynchronous.monitor import Monitor from pymongo.asynchronous.pool import AsyncConnection, Pool from pymongo.monitoring import _EventListeners - from pymongo.read_preferences import _ServerMode from pymongo.server_description import ServerDescription - from pymongo.typings import _DocumentOut _IS_SYNC = False -_CURSOR_DOC_FIELDS = {"cursor": {"firstBatch": 1, "nextBatch": 1}} - class Server: def __init__( @@ -117,112 +107,6 @@ def request_check(self) -> None: """Check the server's state soon.""" self._monitor.request_check() - async def operation_to_command( - self, operation: Union[_Query, _GetMore], conn: AsyncConnection, apply_timeout: bool = False - ) -> tuple[dict[str, Any], str]: - cmd, db = operation.as_command(conn, apply_timeout) - # Support auto encryption - if operation.client._encrypter and not operation.client._encrypter._bypass_auto_encryption: - cmd = await operation.client._encrypter.encrypt( # type: ignore[misc, assignment] - operation.db, cmd, operation.codec_options - ) - operation.update_command(cmd) - - return cmd, db - - @_handle_reauth - async def run_operation( - self, - conn: AsyncConnection, - operation: Union[_Query, _GetMore], - read_preference: _ServerMode, - listeners: Optional[_EventListeners], - unpack_res: Callable[..., list[_DocumentOut]], - client: AsyncMongoClient[Any], - ) -> Response: - """Run a _Query or _GetMore operation and return a Response object. - - This method is used only to run _Query/_GetMore operations from - cursors. - Can raise ConnectionFailure, OperationFailure, etc. - - :param conn: An AsyncConnection instance. - :param operation: A _Query or _GetMore object. - :param read_preference: The read preference to use. - :param listeners: Instance of _EventListeners or None. - :param unpack_res: A callable that decodes the wire protocol response. - :param client: An AsyncMongoClient instance. - """ - assert listeners is not None - - use_cmd = operation.use_command(conn) - more_to_come = bool(operation.conn_mgr and operation.conn_mgr.more_to_come) - cmd, dbn = await self.operation_to_command(operation, conn, use_cmd) - if more_to_come: - request_id = 0 - data = b"" - max_doc_size = 0 - else: - message = operation.get_message(read_preference, conn, use_cmd) - request_id, data, max_doc_size = self._split_message(message) - - user_fields = _CURSOR_DOC_FIELDS if use_cmd else None - - docs, reply, duration = await run_cursor_command( - conn, - cmd, - dbn, - request_id, - data, - client=client, - session=operation.session, # type: ignore[arg-type] - listeners=listeners, - codec_options=operation.codec_options, - user_fields=user_fields, - command_name=operation.name, - pool_opts=conn.opts, - max_doc_size=max_doc_size, - more_to_come=more_to_come, - unpack_res=unpack_res, - cursor_id=operation.cursor_id, - ) - assert reply is not None - - response: Response - - if client._should_pin_cursor(operation.session) or operation.exhaust: # type: ignore[arg-type] - conn.pin_cursor() - if isinstance(reply, _OpMsg): - # In OP_MSG, the server keeps sending only if the - # more_to_come flag is set. - more_to_come = reply.more_to_come - else: - # In OP_REPLY, the server keeps sending until cursor_id is 0. - more_to_come = bool(operation.exhaust and reply.cursor_id) - if operation.conn_mgr: - operation.conn_mgr.update_exhaust(more_to_come) - response = PinnedResponse( - data=reply, - address=self._description.address, - conn=conn, - duration=duration, - request_id=request_id, - from_command=use_cmd, - docs=docs, # type: ignore[arg-type] - more_to_come=more_to_come, - ) - else: - response = Response( - data=reply, - address=self._description.address, - duration=duration, - request_id=request_id, - from_command=use_cmd, - docs=docs, # type: ignore[arg-type] - ) - - return response - async def checkout( self, handler: Optional[_MongoClientErrorHandler] = None ) -> AbstractAsyncContextManager[AsyncConnection]: @@ -241,19 +125,5 @@ def description(self, server_description: ServerDescription) -> None: def pool(self) -> Pool: return self._pool - def _split_message( - self, message: Union[tuple[int, Any], tuple[int, Any, int]] - ) -> tuple[int, Any, int]: - """Return request_id, data, max_doc_size. - - :param message: (request_id, data, max_doc_size) or (request_id, data) - """ - if len(message) == 3: - return message # type: ignore[return-value] - else: - # get_more and kill_cursors messages don't include BSON documents. - request_id, data = message # type: ignore[misc] - return request_id, data, 0 - def __repr__(self) -> str: return f"<{self.__class__.__name__} {self._description!r}>" diff --git a/pymongo/cursor_shared.py b/pymongo/cursor_shared.py index 5d58f1845b..df0e1e2f58 100644 --- a/pymongo/cursor_shared.py +++ b/pymongo/cursor_shared.py @@ -24,6 +24,22 @@ from pymongo.message import _CursorAddress from pymongo.typings import _Address, _DocumentType +_CURSOR_DOC_FIELDS = {"cursor": {"firstBatch": 1, "nextBatch": 1}} + + +def _split_message( + message: Union[tuple[int, bytes], tuple[int, bytes, int]], +) -> tuple[int, bytes, int]: + """Return request_id, data, max_doc_size. + + :param message: (request_id, data, max_doc_size) or (request_id, data) + """ + if len(message) == 3: + return message + # get_more and kill_cursors messages don't include BSON documents. + request_id, data = message + return request_id, data, 0 + class _AgnosticCursorBase(Generic[_DocumentType], ABC): """ diff --git a/pymongo/synchronous/command_cursor.py b/pymongo/synchronous/command_cursor.py index 1e7f26064d..8868d87939 100644 --- a/pymongo/synchronous/command_cursor.py +++ b/pymongo/synchronous/command_cursor.py @@ -172,9 +172,7 @@ def _send_message(self, operation: _GetMore) -> None: """Send a getmore message and handle the response.""" client = self._collection.database.client try: - response = client._run_operation( - operation, self._unpack_response, address=self._address - ) + response = client._run_operation(operation, self._run_with_conn, address=self._address) except OperationFailure as exc: if exc.code in _CURSOR_CLOSED_ERRORS: # Don't send killCursors because the cursor is already closed. diff --git a/pymongo/synchronous/command_runner.py b/pymongo/synchronous/command_runner.py index 077e0f9409..eec58722bb 100644 --- a/pymongo/synchronous/command_runner.py +++ b/pymongo/synchronous/command_runner.py @@ -24,7 +24,7 @@ batches. Pre-encrypted, so decryption is skipped. Callers: ``bulk.py``, ``client_bulk.py``. - :func:`run_cursor_command` — cursor ``find``/``getMore`` operations with - exhaust-cursor handling. Caller: ``server.py``. + exhaust-cursor handling. Caller: ``cursor_base.py``. :func:`_run_command` owns the entire shared skeleton: command logging, APM event publishing, ``send``/``receive``, ``$clusterTime`` gossip, diff --git a/pymongo/synchronous/cursor.py b/pymongo/synchronous/cursor.py index dd0fafdf3a..7ea81ea9ac 100644 --- a/pymongo/synchronous/cursor.py +++ b/pymongo/synchronous/cursor.py @@ -977,9 +977,7 @@ def _send_message(self, operation: Union[_Query, _GetMore]) -> None: raise InvalidOperation("exhaust cursors do not support auto encryption") try: - response = client._run_operation( - operation, self._unpack_response, address=self._address - ) + response = client._run_operation(operation, self._run_with_conn, address=self._address) except OperationFailure as exc: if exc.code in _CURSOR_CLOSED_ERRORS or self._exhaust: # Don't send killCursors because the cursor is already closed. diff --git a/pymongo/synchronous/cursor_base.py b/pymongo/synchronous/cursor_base.py index 96e69cb6ee..4cad4e0c09 100644 --- a/pymongo/synchronous/cursor_base.py +++ b/pymongo/synchronous/cursor_base.py @@ -17,20 +17,40 @@ from __future__ import annotations from abc import abstractmethod -from typing import TYPE_CHECKING, Any, Optional +from collections.abc import Mapping, Sequence +from typing import TYPE_CHECKING, Any, Optional, Union from pymongo import _csot -from pymongo.cursor_shared import _AgnosticCursorBase +from pymongo.cursor_shared import _CURSOR_DOC_FIELDS, _AgnosticCursorBase, _split_message from pymongo.lock import _create_lock -from pymongo.typings import _DocumentType +from pymongo.message import _GetMore, _OpMsg, _Query +from pymongo.response import PinnedResponse, Response +from pymongo.synchronous.command_runner import run_cursor_command +from pymongo.synchronous.helpers import _handle_reauth +from pymongo.typings import _DocumentOut, _DocumentType if TYPE_CHECKING: + from pymongo.read_preferences import _ServerMode from pymongo.synchronous.client_session import ClientSession from pymongo.synchronous.pool import Connection _IS_SYNC = True +def _operation_to_command( + operation: Union[_Query, _GetMore], + conn: Connection, + apply_timeout: bool, +) -> tuple[dict[str, Any], str]: + cmd, db = operation.as_command(conn, apply_timeout) + if operation.client._encrypter and not operation.client._encrypter._bypass_auto_encryption: + cmd = operation.client._encrypter.encrypt( # type: ignore[misc, assignment] + operation.db, cmd, operation.codec_options + ) + operation.update_command(cmd) + return cmd, db + + class _ConnectionManager: """Used with exhaust cursors to ensure the connection is returned.""" @@ -66,6 +86,87 @@ def session(self) -> Optional[ClientSession]: def _next_batch(self, result: list, total: Optional[int] = None) -> bool: # type: ignore[type-arg] ... + @abstractmethod + def _unpack_response( + self, + response: _OpMsg, + cursor_id: Optional[int], + codec_options: Any, + user_fields: Optional[Mapping[str, Any]] = None, + legacy_response: bool = False, + ) -> Sequence[_DocumentOut]: ... + + @_handle_reauth + def _run_with_conn( + self, + conn: Connection, + operation: Union[_Query, _GetMore], + read_preference: _ServerMode, + ) -> Response: + """Execute a cursor operation on the given connection and return a Response. + + :param conn: A Connection instance. + :param operation: A _Query or _GetMore object. + :param read_preference: The read preference to use. + """ + client = self._collection.database.client + use_cmd = operation.use_command(conn) + more_to_come = bool(operation.conn_mgr and operation.conn_mgr.more_to_come) + cmd, dbn = _operation_to_command(operation, conn, use_cmd) + if more_to_come: + request_id, data, max_doc_size = 0, b"", 0 + else: + message = operation.get_message(read_preference, conn, use_cmd) + request_id, data, max_doc_size = _split_message(message) + user_fields = _CURSOR_DOC_FIELDS if use_cmd else None + docs, reply, duration = run_cursor_command( + conn, + cmd, + dbn, + request_id, + data, + client=client, + session=operation.session, # type: ignore[arg-type] + listeners=client._event_listeners, + codec_options=operation.codec_options, + user_fields=user_fields, + command_name=operation.name, + pool_opts=conn.opts, + max_doc_size=max_doc_size, + more_to_come=more_to_come, + unpack_res=self._unpack_response, + cursor_id=operation.cursor_id, + ) + assert reply is not None + if client._should_pin_cursor(operation.session) or operation.exhaust: # type: ignore[arg-type] + conn.pin_cursor() + if isinstance(reply, _OpMsg): + # In OP_MSG, the server keeps sending only if the more_to_come flag is set. + more_to_come = reply.more_to_come + else: + # In OP_REPLY, the server keeps sending until cursor_id is 0. + more_to_come = bool(operation.exhaust and reply.cursor_id) + if operation.conn_mgr: + operation.conn_mgr.update_exhaust(more_to_come) + return PinnedResponse( + data=reply, + address=conn.address, + conn=conn, + duration=duration, + request_id=request_id, + from_command=use_cmd, + docs=docs, # type: ignore[arg-type] + more_to_come=more_to_come, + ) + return Response( + data=reply, + address=conn.address, + duration=duration, + request_id=request_id, + from_command=use_cmd, + docs=docs, # type: ignore[arg-type] + ) + def _die_lock(self) -> None: """Closes this cursor.""" try: diff --git a/pymongo/synchronous/mongo_client.py b/pymongo/synchronous/mongo_client.py index 5f321afe5c..90912dd6f5 100644 --- a/pymongo/synchronous/mongo_client.py +++ b/pymongo/synchronous/mongo_client.py @@ -1908,13 +1908,14 @@ def _conn_for_reads( def _run_operation( self, operation: Union[_Query, _GetMore], - unpack_res: Callable, # type: ignore[type-arg] + run_with_conn: Callable, # type: ignore[type-arg] address: Optional[_Address] = None, ) -> Response: """Run a _Query/_GetMore operation and return a Response. :param operation: a _Query or _GetMore object. - :param unpack_res: A callable that decodes the wire protocol response. + :param run_with_conn: A callable ``(conn, operation, read_preference) -> Response`` + that executes the operation on a given connection. :param address: Optional address when sending a message to a specific server, used for getMore. """ @@ -1929,30 +1930,18 @@ def _run_operation( with operation.conn_mgr._lock: with _MongoClientErrorHandler(self, server, operation.session) as err_handler: # type: ignore[arg-type] err_handler.contribute_socket(operation.conn_mgr.conn) - return server.run_operation( - operation.conn_mgr.conn, - operation, - operation.read_preference, - self._event_listeners, - unpack_res, - self, + return run_with_conn( + operation.conn_mgr.conn, operation, operation.read_preference ) def _cmd( _session: Optional[ClientSession], - server: Server, + _server: Server, conn: Connection, read_preference: _ServerMode, ) -> Response: operation.reset() # Reset op in case of retry. - return server.run_operation( - conn, - operation, - read_preference, - self._event_listeners, - unpack_res, - self, - ) + return run_with_conn(conn, operation, read_preference) return self._retryable_read( _cmd, diff --git a/pymongo/synchronous/server.py b/pymongo/synchronous/server.py index 09d8fb75e1..8b0ad4bcb0 100644 --- a/pymongo/synchronous/server.py +++ b/pymongo/synchronous/server.py @@ -21,9 +21,7 @@ from typing import ( TYPE_CHECKING, Any, - Callable, Optional, - Union, ) from pymongo.logger import ( @@ -31,10 +29,6 @@ _debug_log, _SDAMStatusMessage, ) -from pymongo.message import _GetMore, _OpMsg, _Query -from pymongo.response import PinnedResponse, Response -from pymongo.synchronous.command_runner import run_cursor_command -from pymongo.synchronous.helpers import _handle_reauth if TYPE_CHECKING: from queue import Queue @@ -42,17 +36,13 @@ from bson.objectid import ObjectId from pymongo.monitoring import _EventListeners - from pymongo.read_preferences import _ServerMode from pymongo.server_description import ServerDescription - from pymongo.synchronous.mongo_client import MongoClient, _MongoClientErrorHandler + from pymongo.synchronous.mongo_client import _MongoClientErrorHandler from pymongo.synchronous.monitor import Monitor from pymongo.synchronous.pool import Connection, Pool - from pymongo.typings import _DocumentOut _IS_SYNC = True -_CURSOR_DOC_FIELDS = {"cursor": {"firstBatch": 1, "nextBatch": 1}} - class Server: def __init__( @@ -117,112 +107,6 @@ def request_check(self) -> None: """Check the server's state soon.""" self._monitor.request_check() - def operation_to_command( - self, operation: Union[_Query, _GetMore], conn: Connection, apply_timeout: bool = False - ) -> tuple[dict[str, Any], str]: - cmd, db = operation.as_command(conn, apply_timeout) - # Support auto encryption - if operation.client._encrypter and not operation.client._encrypter._bypass_auto_encryption: - cmd = operation.client._encrypter.encrypt( # type: ignore[misc, assignment] - operation.db, cmd, operation.codec_options - ) - operation.update_command(cmd) - - return cmd, db - - @_handle_reauth - def run_operation( - self, - conn: Connection, - operation: Union[_Query, _GetMore], - read_preference: _ServerMode, - listeners: Optional[_EventListeners], - unpack_res: Callable[..., list[_DocumentOut]], - client: MongoClient[Any], - ) -> Response: - """Run a _Query or _GetMore operation and return a Response object. - - This method is used only to run _Query/_GetMore operations from - cursors. - Can raise ConnectionFailure, OperationFailure, etc. - - :param conn: A Connection instance. - :param operation: A _Query or _GetMore object. - :param read_preference: The read preference to use. - :param listeners: Instance of _EventListeners or None. - :param unpack_res: A callable that decodes the wire protocol response. - :param client: A MongoClient instance. - """ - assert listeners is not None - - use_cmd = operation.use_command(conn) - more_to_come = bool(operation.conn_mgr and operation.conn_mgr.more_to_come) - cmd, dbn = self.operation_to_command(operation, conn, use_cmd) - if more_to_come: - request_id = 0 - data = b"" - max_doc_size = 0 - else: - message = operation.get_message(read_preference, conn, use_cmd) - request_id, data, max_doc_size = self._split_message(message) - - user_fields = _CURSOR_DOC_FIELDS if use_cmd else None - - docs, reply, duration = run_cursor_command( - conn, - cmd, - dbn, - request_id, - data, - client=client, - session=operation.session, # type: ignore[arg-type] - listeners=listeners, - codec_options=operation.codec_options, - user_fields=user_fields, - command_name=operation.name, - pool_opts=conn.opts, - max_doc_size=max_doc_size, - more_to_come=more_to_come, - unpack_res=unpack_res, - cursor_id=operation.cursor_id, - ) - assert reply is not None - - response: Response - - if client._should_pin_cursor(operation.session) or operation.exhaust: # type: ignore[arg-type] - conn.pin_cursor() - if isinstance(reply, _OpMsg): - # In OP_MSG, the server keeps sending only if the - # more_to_come flag is set. - more_to_come = reply.more_to_come - else: - # In OP_REPLY, the server keeps sending until cursor_id is 0. - more_to_come = bool(operation.exhaust and reply.cursor_id) - if operation.conn_mgr: - operation.conn_mgr.update_exhaust(more_to_come) - response = PinnedResponse( - data=reply, - address=self._description.address, - conn=conn, - duration=duration, - request_id=request_id, - from_command=use_cmd, - docs=docs, # type: ignore[arg-type] - more_to_come=more_to_come, - ) - else: - response = Response( - data=reply, - address=self._description.address, - duration=duration, - request_id=request_id, - from_command=use_cmd, - docs=docs, # type: ignore[arg-type] - ) - - return response - def checkout( self, handler: Optional[_MongoClientErrorHandler] = None ) -> AbstractContextManager[Connection]: @@ -241,19 +125,5 @@ def description(self, server_description: ServerDescription) -> None: def pool(self) -> Pool: return self._pool - def _split_message( - self, message: Union[tuple[int, Any], tuple[int, Any, int]] - ) -> tuple[int, Any, int]: - """Return request_id, data, max_doc_size. - - :param message: (request_id, data, max_doc_size) or (request_id, data) - """ - if len(message) == 3: - return message # type: ignore[return-value] - else: - # get_more and kill_cursors messages don't include BSON documents. - request_id, data = message # type: ignore[misc] - return request_id, data, 0 - def __repr__(self) -> str: return f"<{self.__class__.__name__} {self._description!r}>" diff --git a/test/asynchronous/test_client.py b/test/asynchronous/test_client.py index 5da186931a..a7f66d9620 100644 --- a/test/asynchronous/test_client.py +++ b/test/asynchronous/test_client.py @@ -1676,7 +1676,7 @@ async def test_stale_getmore(self): False, None, ), - unpack_res=AsyncCursor(client.pymongo_test.collection)._unpack_response, + run_with_conn=AsyncCursor(client.pymongo_test.collection)._run_with_conn, address=("not-a-member", 27017), ) diff --git a/test/test_client.py b/test/test_client.py index b37b5e57ac..8f0da71321 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -1633,7 +1633,7 @@ def test_stale_getmore(self): False, None, ), - unpack_res=Cursor(client.pymongo_test.collection)._unpack_response, + run_with_conn=Cursor(client.pymongo_test.collection)._run_with_conn, address=("not-a-member", 27017), )