From 9e7b5c6627e70e2b501f2c74dd17c9a547f150ba Mon Sep 17 00:00:00 2001 From: claude <6687499+pike00@users.noreply.github.com> Date: Tue, 9 Jun 2026 07:46:17 -0500 Subject: [PATCH 1/4] fix: make V1Channel re-subscribable after a failed subscribe --- roborock/devices/device.py | 5 +- roborock/devices/rpc/v1_channel.py | 95 +++++++++++++++----------- tests/devices/rpc/test_v1_channel.py | 60 +++++++++++++++++ tests/devices/test_v1_device.py | 99 +++++++++++++++++++++++++++- 4 files changed, 218 insertions(+), 41 deletions(-) diff --git a/roborock/devices/device.py b/roborock/devices/device.py index bf020814..50cda7be 100644 --- a/roborock/devices/device.py +++ b/roborock/devices/device.py @@ -202,7 +202,10 @@ async def connect(self) -> None: await self.v1_properties.start() elif self.b01_q10_properties is not None: await self.b01_q10_properties.start() - except RoborockException: + except BaseException: + # Any failure in start() must unsubscribe before propagating, so a + # retry by connect_loop() gets a clean channel. Broader than + # RoborockException so non-Roborock errors also release the channel. unsub() raise self._logger.info("Connected to device") diff --git a/roborock/devices/rpc/v1_channel.py b/roborock/devices/rpc/v1_channel.py index 6d310543..51250017 100644 --- a/roborock/devices/rpc/v1_channel.py +++ b/roborock/devices/rpc/v1_channel.py @@ -295,45 +295,66 @@ async def subscribe(self, callback: Callable[[RoborockMessage], None]) -> Callab if self._callback is not None: raise ValueError("Only one subscription allowed at a time") - # Make an initial, optimistic attempt to connect to local with the - # cache. The cache information will be refreshed by the background task. - try: - await self._local_connect(prefer_cache=True) - except RoborockException as err: - self._logger.debug("First local connection attempt failed, will retry: %s", err) - - # Start a background task to manage the local connection health. This - # happens independent of whether we were able to connect locally now. - if self._reconnect_task is None: - loop = asyncio.get_running_loop() - self._reconnect_task = loop.create_task(self._background_reconnect()) - - # We maintain an active MQTT subscription even when connected locally to receive - # unsolicited status updates (DPS push messages) directly from the cloud. + # Claim the subscription up front. Any failure in the setup below routes + # through _teardown(), which clears this again so the channel is left in + # a clean, re-subscribable state. Without this, a partially-completed + # subscribe (e.g. a transient failure later in connect()) would leave a + # stale callback and the next subscribe() would raise the guard above. + self._callback = callback try: - self._mqtt_unsub = await self._mqtt_channel.subscribe(self._on_mqtt_message) - except RoborockException as err: - if not self.is_local_connected: - # Propagate error if both local and MQTT failed - self._logger.debug("MQTT connection also failed: %s", err) - raise - self._logger.debug("MQTT subscription failed, continuing with local-only connection: %s", err) - - def unsub() -> None: - """Unsubscribe from all messages.""" - if self._reconnect_task: - self._reconnect_task.cancel() - self._reconnect_task = None - if self._mqtt_unsub: - self._mqtt_unsub() - self._mqtt_unsub = None - if self._local_unsub: - self._local_unsub() - self._local_unsub = None - self._logger.debug("Unsubscribed from device") + # Make an initial, optimistic attempt to connect to local with the + # cache. The cache information will be refreshed by the background task. + try: + await self._local_connect(prefer_cache=True) + except RoborockException as err: + self._logger.debug("First local connection attempt failed, will retry: %s", err) - self._callback = callback - return unsub + # Start a background task to manage the local connection health. This + # happens independent of whether we were able to connect locally now. + if self._reconnect_task is None: + loop = asyncio.get_running_loop() + self._reconnect_task = loop.create_task(self._background_reconnect()) + + # We maintain an active MQTT subscription even when connected locally to receive + # unsolicited status updates (DPS push messages) directly from the cloud. + try: + self._mqtt_unsub = await self._mqtt_channel.subscribe(self._on_mqtt_message) + except RoborockException as err: + if not self.is_local_connected: + # Propagate error if both local and MQTT failed + self._logger.debug("MQTT connection also failed: %s", err) + raise + self._logger.debug("MQTT subscription failed, continuing with local-only connection: %s", err) + except BaseException: + # Any failure during setup must leave the channel re-subscribable: + # cancel the reconnect task, drop subscriptions, and clear _callback. + self._teardown() + raise + + self._logger.debug("Subscribed to device") + return self._teardown + + def _teardown(self) -> None: + """Tear down all subscriptions and reset the channel to a re-subscribable state. + + Returned from subscribe() as the unsubscribe function and also invoked on + any failure partway through subscribe(). Idempotent: each resource is + guarded so repeat calls are no-ops. + """ + if self._reconnect_task: + self._reconnect_task.cancel() + self._reconnect_task = None + if self._mqtt_unsub: + self._mqtt_unsub() + self._mqtt_unsub = None + if self._local_unsub: + self._local_unsub() + self._local_unsub = None + if self._local_channel: + self._local_channel.close() + self._local_channel = None + self._callback = None + self._logger.debug("Unsubscribed from device") def add_dps_listener(self, listener: Callable[[dict[RoborockDataProtocol, Any]], None]) -> Callable[[], None]: """Add a listener for DPS updates. diff --git a/tests/devices/rpc/test_v1_channel.py b/tests/devices/rpc/test_v1_channel.py index fdd4eda9..f77efa02 100644 --- a/tests/devices/rpc/test_v1_channel.py +++ b/tests/devices/rpc/test_v1_channel.py @@ -642,3 +642,63 @@ async def test_v1_channel_dps_listener_raises_exception( unsub_dps1() unsub_dps2() + + +async def test_v1_channel_resubscribe_after_unsub( + v1_channel: V1Channel, + mock_mqtt_channel: FakeChannel, +) -> None: + """A subscribe -> unsub -> subscribe cycle must not raise. + + Regression: unsub() previously failed to clear ``_callback``, so the second + subscribe() tripped the "Only one subscription allowed at a time" guard. + This is the exact failure that bricked a second vacuum sharing an account. + """ + mock_mqtt_channel.response_queue.append(TEST_NETWORK_INFO_RESPONSE) + unsub = await v1_channel.subscribe(Mock()) + assert v1_channel._callback is not None + + unsub() + # The whole point of the fix: tearing down clears the callback. + assert v1_channel._callback is None + assert v1_channel._reconnect_task is None + assert not mock_mqtt_channel.subscribers + + # Re-subscribing must succeed (network info is now cached, no MQTT needed). + unsub2 = await v1_channel.subscribe(Mock()) + assert v1_channel._callback is not None + assert mock_mqtt_channel.subscribers + unsub2() + + +async def test_v1_channel_subscribe_failure_is_atomic( + v1_channel: V1Channel, + mock_mqtt_channel: FakeChannel, + mock_local_channel: FakeChannel, +) -> None: + """A failure partway through subscribe() leaves the channel re-subscribable. + + Regression: a failed subscribe() previously leaked the background reconnect + task and a partial subscription, so the next attempt could neither reuse nor + cleanly recreate the channel. + """ + # Both transports down: local connect fails and the MQTT subscribe fails. + mock_local_channel.connect.side_effect = RoborockException("local down") + mock_mqtt_channel.subscribe.side_effect = RoborockException("mqtt down") + + with pytest.raises(RoborockException): + await v1_channel.subscribe(Mock()) + + # No leaked task, no stale callback, no dangling subscription. + assert v1_channel._callback is None + assert v1_channel._reconnect_task is None + assert v1_channel._mqtt_unsub is None + assert not mock_mqtt_channel.subscribers + + # And the channel is re-subscribable once the transports recover. + mock_local_channel.connect.side_effect = None + mock_mqtt_channel.subscribe.side_effect = mock_mqtt_channel._subscribe + mock_mqtt_channel.response_queue.append(TEST_NETWORK_INFO_RESPONSE) + unsub = await v1_channel.subscribe(Mock()) + assert v1_channel._callback is not None + unsub() diff --git a/tests/devices/test_v1_device.py b/tests/devices/test_v1_device.py index 8afc62cd..44ab8466 100644 --- a/tests/devices/test_v1_device.py +++ b/tests/devices/test_v1_device.py @@ -8,14 +8,18 @@ import pytest from syrupy import SnapshotAssertion -from roborock.data import HomeData, S7MaxVStatus, UserData -from roborock.devices.cache import DeviceCache, NoCache +from roborock.data import HomeData, NetworkInfo, S7MaxVStatus, UserData +from roborock.devices.cache import DeviceCache, DeviceCacheData, InMemoryCache, NoCache from roborock.devices.device import RoborockDevice +from roborock.devices.rpc.v1_channel import V1Channel from roborock.devices.traits import v1 from roborock.devices.traits.v1.common import V1TraitMixin -from roborock.protocols.v1_protocol import decode_rpc_response +from roborock.devices.transport.local_channel import LocalSession +from roborock.exceptions import RoborockException +from roborock.protocols.v1_protocol import SecurityData, decode_rpc_response from roborock.roborock_message import RoborockMessage, RoborockMessageProtocol from tests import mock_data +from tests.fixtures.channel_fixtures import FakeChannel USER_DATA = UserData.from_dict(mock_data.USER_DATA) HOME_DATA = HomeData.from_dict(mock_data.HOME_DATA_RAW) @@ -181,3 +185,92 @@ async def test_device_trait_command_parsing( assert device.v1_properties device_dict = device.diagnostic_data() assert device_dict == snapshot + + +@pytest.mark.parametrize( + "start_error", + [RoborockException("transient status fetch failed"), ValueError("unexpected")], + ids=["roborock-exception", "non-roborock-exception"], +) +async def test_connect_unsubscribes_when_start_fails( + device: RoborockDevice, + channel: AsyncMock, + start_error: Exception, +) -> None: + """connect() must release the channel when start() fails, for any exception. + + Regression: the cleanup was scoped to ``except RoborockException``, so a + non-Roborock failure in start() propagated without unsubscribing, leaving the + channel subscribed and the next attempt unable to re-subscribe. + """ + unsub = Mock() + channel.subscribe = AsyncMock(return_value=unsub) + device.v1_properties.start = AsyncMock(side_effect=start_error) + + with pytest.raises(type(start_error)): + await device.connect() + + channel.subscribe.assert_awaited_once() + unsub.assert_called_once() # channel released before propagating + assert device._unsub is None # not marked connected; safe for connect_loop to retry + + +async def test_connect_retries_after_transient_start_failure() -> None: + """End-to-end regression for the Q5 multi-vacuum bug. + + A device backed by a real V1Channel: the first connect() subscribes, then + start() fails transiently. The retry must re-subscribe cleanly rather than + raising "Only one subscription allowed at a time", and the device must end + up connected. + """ + duid = HOME_DATA.devices[0].duid + + mqtt_channel = FakeChannel() + await mqtt_channel.connect() + local_channel = FakeChannel() + local_session = Mock(spec=LocalSession, return_value=local_channel) + + # Cache the network info so local connect doesn't need an MQTT round-trip. + cache = InMemoryCache() + device_cache = DeviceCache(duid, cache) + await device_cache.set(DeviceCacheData(network_info=NetworkInfo.from_dict(mock_data.NETWORK_INFO))) + + v1_channel = V1Channel( + device_uid=duid, + security_data=SecurityData(endpoint="test_endpoint", nonce=b"test_nonce_16byte"), + mqtt_channel=mqtt_channel, + local_session=local_session, + device_cache=device_cache, + ) + device = RoborockDevice( + device_info=HOME_DATA.devices[0], + product=HOME_DATA.products[0], + channel=v1_channel, + trait=v1.create( + duid, + HOME_DATA.products[0], + HOME_DATA, + AsyncMock(), + AsyncMock(), + AsyncMock(), + Mock(), + AsyncMock(), + device_cache=device_cache, + region=USER_DATA.region, + ), + ) + + # First connect() subscribes successfully, then start() fails transiently; + # the second succeeds. + device.v1_properties.start = AsyncMock(side_effect=[RoborockException("transient"), None]) + + with pytest.raises(RoborockException): + await device.connect() + assert device._unsub is None # channel released after the transient failure + + # The retry must NOT raise "Only one subscription allowed at a time". + await device.connect() + assert device._unsub is not None + assert device.is_connected + + await device.close() From c429fb2dda59eca022496deb0518251b1a005b05 Mon Sep 17 00:00:00 2001 From: claude <6687499+pike00@users.noreply.github.com> Date: Sat, 20 Jun 2026 09:19:18 -0500 Subject: [PATCH 2/4] fix: narrow subscribe/connect cleanup to Exception; use 16-byte test nonce Address review feedback on #845: - Narrow the cleanup guards in RoborockDevice.connect() and V1Channel.subscribe() from BaseException to Exception so KeyboardInterrupt/SystemExit/CancelledError propagate untouched while still releasing the channel on normal failures (incl. the ValueError and RoborockException retry paths the fix targets). - Use an actually-16-byte test nonce to match its name and avoid relying on absent length validation. --- roborock/devices/device.py | 2 +- roborock/devices/rpc/v1_channel.py | 2 +- tests/devices/rpc/test_v1_channel.py | 2 +- tests/devices/test_v1_device.py | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/roborock/devices/device.py b/roborock/devices/device.py index 50cda7be..35338d44 100644 --- a/roborock/devices/device.py +++ b/roborock/devices/device.py @@ -202,7 +202,7 @@ async def connect(self) -> None: await self.v1_properties.start() elif self.b01_q10_properties is not None: await self.b01_q10_properties.start() - except BaseException: + except Exception: # Any failure in start() must unsubscribe before propagating, so a # retry by connect_loop() gets a clean channel. Broader than # RoborockException so non-Roborock errors also release the channel. diff --git a/roborock/devices/rpc/v1_channel.py b/roborock/devices/rpc/v1_channel.py index 51250017..356d97f9 100644 --- a/roborock/devices/rpc/v1_channel.py +++ b/roborock/devices/rpc/v1_channel.py @@ -325,7 +325,7 @@ async def subscribe(self, callback: Callable[[RoborockMessage], None]) -> Callab self._logger.debug("MQTT connection also failed: %s", err) raise self._logger.debug("MQTT subscription failed, continuing with local-only connection: %s", err) - except BaseException: + except Exception: # Any failure during setup must leave the channel re-subscribable: # cancel the reconnect task, drop subscriptions, and clear _callback. self._teardown() diff --git a/tests/devices/rpc/test_v1_channel.py b/tests/devices/rpc/test_v1_channel.py index f77efa02..5c8b8c0b 100644 --- a/tests/devices/rpc/test_v1_channel.py +++ b/tests/devices/rpc/test_v1_channel.py @@ -31,7 +31,7 @@ USER_DATA = UserData.from_dict(mock_data.USER_DATA) TEST_DEVICE_UID = "abc123" TEST_LOCAL_KEY = "local_key" -TEST_SECURITY_DATA = SecurityData(endpoint="test_endpoint", nonce=b"test_nonce_16byte") +TEST_SECURITY_DATA = SecurityData(endpoint="test_endpoint", nonce=b"test_nonce_16byt") TEST_HOST = mock_data.TEST_LOCAL_API_HOST diff --git a/tests/devices/test_v1_device.py b/tests/devices/test_v1_device.py index 44ab8466..cfdbecbe 100644 --- a/tests/devices/test_v1_device.py +++ b/tests/devices/test_v1_device.py @@ -237,7 +237,7 @@ async def test_connect_retries_after_transient_start_failure() -> None: v1_channel = V1Channel( device_uid=duid, - security_data=SecurityData(endpoint="test_endpoint", nonce=b"test_nonce_16byte"), + security_data=SecurityData(endpoint="test_endpoint", nonce=b"test_nonce_16byt"), mqtt_channel=mqtt_channel, local_session=local_session, device_cache=device_cache, From 9090ef84feb00ce01015a468e042bcdb4e511352 Mon Sep 17 00:00:00 2001 From: claude <6687499+pike00@users.noreply.github.com> Date: Mon, 22 Jun 2026 10:38:11 -0500 Subject: [PATCH 3/4] test: verify v1 subscribe/connect behavior via public api, not internals Rewrite the V1Channel/RoborockDevice regression tests to assert observable behavior (the callback fires on message arrival; re-subscribe and re-connect succeed) instead of private attributes (_callback, _reconnect_task, _mqtt_unsub, _unsub), per review feedback. Add type: ignore for the test-only mock assignments and FakeChannel construction that were failing the mypy pre-commit hook. --- tests/devices/rpc/test_v1_channel.py | 46 ++++++++++++++++------------ tests/devices/test_v1_device.py | 22 ++++++++----- 2 files changed, 40 insertions(+), 28 deletions(-) diff --git a/tests/devices/rpc/test_v1_channel.py b/tests/devices/rpc/test_v1_channel.py index 5c8b8c0b..24e63c77 100644 --- a/tests/devices/rpc/test_v1_channel.py +++ b/tests/devices/rpc/test_v1_channel.py @@ -648,26 +648,32 @@ async def test_v1_channel_resubscribe_after_unsub( v1_channel: V1Channel, mock_mqtt_channel: FakeChannel, ) -> None: - """A subscribe -> unsub -> subscribe cycle must not raise. + """A subscribe -> unsub -> subscribe cycle must not raise, and the new callback works. - Regression: unsub() previously failed to clear ``_callback``, so the second + Regression: unsub() previously failed to reset the subscription, so the second subscribe() tripped the "Only one subscription allowed at a time" guard. This is the exact failure that bricked a second vacuum sharing an account. """ mock_mqtt_channel.response_queue.append(TEST_NETWORK_INFO_RESPONSE) - unsub = await v1_channel.subscribe(Mock()) - assert v1_channel._callback is not None + callback = Mock() + unsub = await v1_channel.subscribe(callback) - unsub() - # The whole point of the fix: tearing down clears the callback. - assert v1_channel._callback is None - assert v1_channel._reconnect_task is None - assert not mock_mqtt_channel.subscribers + # The subscribed callback receives messages arriving on the channel. + mock_mqtt_channel.notify_subscribers(TEST_RESPONSE) + callback.assert_called_once_with(TEST_RESPONSE) - # Re-subscribing must succeed (network info is now cached, no MQTT needed). - unsub2 = await v1_channel.subscribe(Mock()) - assert v1_channel._callback is not None - assert mock_mqtt_channel.subscribers + # After unsub, the old callback no longer receives messages. + unsub() + callback.reset_mock() + mock_mqtt_channel.notify_subscribers(TEST_RESPONSE) + callback.assert_not_called() + + # Re-subscribing must succeed (network info is now cached, no MQTT needed) and + # the new callback then receives messages. + new_callback = Mock() + unsub2 = await v1_channel.subscribe(new_callback) + mock_mqtt_channel.notify_subscribers(TEST_RESPONSE) + new_callback.assert_called_once_with(TEST_RESPONSE) unsub2() @@ -689,16 +695,16 @@ async def test_v1_channel_subscribe_failure_is_atomic( with pytest.raises(RoborockException): await v1_channel.subscribe(Mock()) - # No leaked task, no stale callback, no dangling subscription. - assert v1_channel._callback is None - assert v1_channel._reconnect_task is None - assert v1_channel._mqtt_unsub is None + # The failed subscribe left no dangling subscription on the channel. assert not mock_mqtt_channel.subscribers - # And the channel is re-subscribable once the transports recover. + # And the channel is re-subscribable once the transports recover: the new + # subscription succeeds and its callback receives messages. mock_local_channel.connect.side_effect = None mock_mqtt_channel.subscribe.side_effect = mock_mqtt_channel._subscribe mock_mqtt_channel.response_queue.append(TEST_NETWORK_INFO_RESPONSE) - unsub = await v1_channel.subscribe(Mock()) - assert v1_channel._callback is not None + callback = Mock() + unsub = await v1_channel.subscribe(callback) + mock_mqtt_channel.notify_subscribers(TEST_RESPONSE) + callback.assert_called_once_with(TEST_RESPONSE) unsub() diff --git a/tests/devices/test_v1_device.py b/tests/devices/test_v1_device.py index cfdbecbe..c327bdf0 100644 --- a/tests/devices/test_v1_device.py +++ b/tests/devices/test_v1_device.py @@ -205,14 +205,20 @@ async def test_connect_unsubscribes_when_start_fails( """ unsub = Mock() channel.subscribe = AsyncMock(return_value=unsub) - device.v1_properties.start = AsyncMock(side_effect=start_error) + device.v1_properties.start = AsyncMock(side_effect=start_error) # type: ignore[method-assign, union-attr] with pytest.raises(type(start_error)): await device.connect() + # The channel was released before the error propagated. channel.subscribe.assert_awaited_once() - unsub.assert_called_once() # channel released before propagating - assert device._unsub is None # not marked connected; safe for connect_loop to retry + unsub.assert_called_once() + + # The device is left re-connectable: once start() recovers, connect() + # succeeds instead of raising "Already connected to the device". + device.v1_properties.start = AsyncMock(return_value=None) # type: ignore[method-assign, union-attr] + await device.connect() + assert channel.subscribe.await_count == 2 async def test_connect_retries_after_transient_start_failure() -> None: @@ -238,7 +244,7 @@ async def test_connect_retries_after_transient_start_failure() -> None: v1_channel = V1Channel( device_uid=duid, security_data=SecurityData(endpoint="test_endpoint", nonce=b"test_nonce_16byt"), - mqtt_channel=mqtt_channel, + mqtt_channel=mqtt_channel, # type: ignore[arg-type] local_session=local_session, device_cache=device_cache, ) @@ -262,15 +268,15 @@ async def test_connect_retries_after_transient_start_failure() -> None: # First connect() subscribes successfully, then start() fails transiently; # the second succeeds. - device.v1_properties.start = AsyncMock(side_effect=[RoborockException("transient"), None]) + device.v1_properties.start = AsyncMock(side_effect=[RoborockException("transient"), None]) # type: ignore[method-assign, union-attr] with pytest.raises(RoborockException): await device.connect() - assert device._unsub is None # channel released after the transient failure - # The retry must NOT raise "Only one subscription allowed at a time". + # The retry must NOT raise "Only one subscription allowed at a time"; the + # clean release after the transient failure lets connect() re-subscribe and + # the device ends up connected. await device.connect() - assert device._unsub is not None assert device.is_connected await device.close() From 5f25deff75a84d9eea4f0e087a6f9b181df38219 Mon Sep 17 00:00:00 2001 From: claude <6687499+pike00@users.noreply.github.com> Date: Mon, 22 Jun 2026 11:19:40 -0500 Subject: [PATCH 4/4] refactor: distinguish expected vs unexpected exceptions in subscribe/connect cleanup Split the leak-safety catch-all in V1Channel.subscribe() and RoborockDevice.connect() into an explicit `except RoborockException` (the expected failure path) and an `except Exception` that logs the unexpected, shouldn't-happen error before tearing down. Both branches still run teardown/unsub and re-raise, so behavior is unchanged; the broad catch is now a deliberate, signposted exception to the usual don't-catch-Exception rule, and unexpected errors become observable in logs. --- roborock/devices/device.py | 13 ++++++++++--- roborock/devices/rpc/v1_channel.py | 13 +++++++++++-- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/roborock/devices/device.py b/roborock/devices/device.py index 35338d44..f6226f07 100644 --- a/roborock/devices/device.py +++ b/roborock/devices/device.py @@ -202,10 +202,17 @@ async def connect(self) -> None: await self.v1_properties.start() elif self.b01_q10_properties is not None: await self.b01_q10_properties.start() + except RoborockException: + # Expected: start() can fail transiently. Unsubscribe before propagating + # so the retry by connect_loop() gets a clean channel. + unsub() + raise except Exception: - # Any failure in start() must unsubscribe before propagating, so a - # retry by connect_loop() gets a clean channel. Broader than - # RoborockException so non-Roborock errors also release the channel. + # Not expected here. We normally avoid a bare ``except Exception`` in + # this codebase, but a leaked subscription would stop connect_loop() from + # ever reconnecting, so we deliberately catch broadly, log the unexpected + # error, and release the channel before propagating. + self._logger.exception("Unexpected error during connect; releasing channel to avoid a leak") unsub() raise self._logger.info("Connected to device") diff --git a/roborock/devices/rpc/v1_channel.py b/roborock/devices/rpc/v1_channel.py index 356d97f9..3197c6b3 100644 --- a/roborock/devices/rpc/v1_channel.py +++ b/roborock/devices/rpc/v1_channel.py @@ -325,9 +325,18 @@ async def subscribe(self, callback: Callable[[RoborockMessage], None]) -> Callab self._logger.debug("MQTT connection also failed: %s", err) raise self._logger.debug("MQTT subscription failed, continuing with local-only connection: %s", err) + except RoborockException: + # Expected failure path (e.g. both local and MQTT transports down). + # Release the channel so the next subscribe() starts clean. + self._teardown() + raise except Exception: - # Any failure during setup must leave the channel re-subscribable: - # cancel the reconnect task, drop subscriptions, and clear _callback. + # Not expected here. We normally avoid a bare ``except Exception`` in + # this codebase, but leaving a partial subscription behind (reconnect + # task, MQTT subscription, stale callback) would brick the device, so we + # deliberately catch broadly, log the unexpected error, and tear down + # before propagating. + self._logger.exception("Unexpected error during subscribe; tearing down to avoid a leak") self._teardown() raise