-
Notifications
You must be signed in to change notification settings - Fork 87
fix: make V1Channel re-subscribable after a failed subscribe #845
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
9e7b5c6
c429fb2
7465eed
9090ef8
9e1edcc
5f25def
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -295,45 +295,75 @@ async def subscribe(self, callback: Callable[[RoborockMessage], None]) -> Callab | |
| if self._callback is not None: | ||
| raise ValueError("Only one subscription allowed at a time") | ||
|
|
||
| # Make an initial, optimistic attempt to connect to local with the | ||
| # cache. The cache information will be refreshed by the background task. | ||
| try: | ||
| await self._local_connect(prefer_cache=True) | ||
| except RoborockException as err: | ||
| self._logger.debug("First local connection attempt failed, will retry: %s", err) | ||
|
|
||
| # Start a background task to manage the local connection health. This | ||
| # happens independent of whether we were able to connect locally now. | ||
| if self._reconnect_task is None: | ||
| loop = asyncio.get_running_loop() | ||
| self._reconnect_task = loop.create_task(self._background_reconnect()) | ||
|
|
||
| # We maintain an active MQTT subscription even when connected locally to receive | ||
| # unsolicited status updates (DPS push messages) directly from the cloud. | ||
| # Claim the subscription up front. Any failure in the setup below routes | ||
| # through _teardown(), which clears this again so the channel is left in | ||
| # a clean, re-subscribable state. Without this, a partially-completed | ||
| # subscribe (e.g. a transient failure later in connect()) would leave a | ||
| # stale callback and the next subscribe() would raise the guard above. | ||
| self._callback = callback | ||
| try: | ||
| self._mqtt_unsub = await self._mqtt_channel.subscribe(self._on_mqtt_message) | ||
| except RoborockException as err: | ||
| if not self.is_local_connected: | ||
| # Propagate error if both local and MQTT failed | ||
| self._logger.debug("MQTT connection also failed: %s", err) | ||
| raise | ||
| self._logger.debug("MQTT subscription failed, continuing with local-only connection: %s", err) | ||
|
|
||
| def unsub() -> None: | ||
| """Unsubscribe from all messages.""" | ||
| if self._reconnect_task: | ||
| self._reconnect_task.cancel() | ||
| self._reconnect_task = None | ||
| if self._mqtt_unsub: | ||
| self._mqtt_unsub() | ||
| self._mqtt_unsub = None | ||
| if self._local_unsub: | ||
| self._local_unsub() | ||
| self._local_unsub = None | ||
| self._logger.debug("Unsubscribed from device") | ||
| # Make an initial, optimistic attempt to connect to local with the | ||
| # cache. The cache information will be refreshed by the background task. | ||
| try: | ||
| await self._local_connect(prefer_cache=True) | ||
| except RoborockException as err: | ||
| self._logger.debug("First local connection attempt failed, will retry: %s", err) | ||
|
|
||
| self._callback = callback | ||
| return unsub | ||
| # Start a background task to manage the local connection health. This | ||
| # happens independent of whether we were able to connect locally now. | ||
| if self._reconnect_task is None: | ||
| loop = asyncio.get_running_loop() | ||
| self._reconnect_task = loop.create_task(self._background_reconnect()) | ||
|
|
||
| # We maintain an active MQTT subscription even when connected locally to receive | ||
| # unsolicited status updates (DPS push messages) directly from the cloud. | ||
| try: | ||
| self._mqtt_unsub = await self._mqtt_channel.subscribe(self._on_mqtt_message) | ||
| except RoborockException as err: | ||
| if not self.is_local_connected: | ||
| # Propagate error if both local and MQTT failed | ||
| self._logger.debug("MQTT connection also failed: %s", err) | ||
| raise | ||
| self._logger.debug("MQTT subscription failed, continuing with local-only connection: %s", err) | ||
| except RoborockException: | ||
| # Expected failure path (e.g. both local and MQTT transports down). | ||
| # Release the channel so the next subscribe() starts clean. | ||
| self._teardown() | ||
| raise | ||
| except Exception: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a style nit but: I'd like to make this clear if its expected or unexpected exceptions here by explicitly catching and forwarding both That is, i'd not like it to be normal to catch Exception in this code base. We can violate the rules here given this case is important to not leak, but it's a "shouldn't happen".
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done in 5f25def. Both The |
||
| # Not expected here. We normally avoid a bare ``except Exception`` in | ||
| # this codebase, but leaving a partial subscription behind (reconnect | ||
| # task, MQTT subscription, stale callback) would brick the device, so we | ||
| # deliberately catch broadly, log the unexpected error, and tear down | ||
| # before propagating. | ||
| self._logger.exception("Unexpected error during subscribe; tearing down to avoid a leak") | ||
| self._teardown() | ||
| raise | ||
|
|
||
| self._logger.debug("Subscribed to device") | ||
| return self._teardown | ||
|
|
||
| def _teardown(self) -> None: | ||
| """Tear down all subscriptions and reset the channel to a re-subscribable state. | ||
|
|
||
| Returned from subscribe() as the unsubscribe function and also invoked on | ||
| any failure partway through subscribe(). Idempotent: each resource is | ||
| guarded so repeat calls are no-ops. | ||
| """ | ||
| if self._reconnect_task: | ||
| self._reconnect_task.cancel() | ||
| self._reconnect_task = None | ||
| if self._mqtt_unsub: | ||
| self._mqtt_unsub() | ||
| self._mqtt_unsub = None | ||
| if self._local_unsub: | ||
| self._local_unsub() | ||
| self._local_unsub = None | ||
| if self._local_channel: | ||
| self._local_channel.close() | ||
| self._local_channel = None | ||
| self._callback = None | ||
| self._logger.debug("Unsubscribed from device") | ||
|
pike00 marked this conversation as resolved.
|
||
|
|
||
| def add_dps_listener(self, listener: Callable[[dict[RoborockDataProtocol, Any]], None]) -> Callable[[], None]: | ||
| """Add a listener for DPS updates. | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.