Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions roborock/devices/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,16 @@ async def connect(self) -> None:
elif self.b01_q10_properties is not None:
await self.b01_q10_properties.start()
except RoborockException:
# Expected: start() can fail transiently. Unsubscribe before propagating
# so the retry by connect_loop() gets a clean channel.
unsub()
raise
except Exception:
# Not expected here. We normally avoid a bare ``except Exception`` in
# this codebase, but a leaked subscription would stop connect_loop() from
# ever reconnecting, so we deliberately catch broadly, log the unexpected
# error, and release the channel before propagating.
self._logger.exception("Unexpected error during connect; releasing channel to avoid a leak")
unsub()
raise
self._logger.info("Connected to device")
Expand Down
104 changes: 67 additions & 37 deletions roborock/devices/rpc/v1_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,45 +295,75 @@ async def subscribe(self, callback: Callable[[RoborockMessage], None]) -> Callab
if self._callback is not None:
raise ValueError("Only one subscription allowed at a time")

# Make an initial, optimistic attempt to connect to local with the
# cache. The cache information will be refreshed by the background task.
try:
await self._local_connect(prefer_cache=True)
except RoborockException as err:
self._logger.debug("First local connection attempt failed, will retry: %s", err)

# Start a background task to manage the local connection health. This
# happens independent of whether we were able to connect locally now.
if self._reconnect_task is None:
loop = asyncio.get_running_loop()
self._reconnect_task = loop.create_task(self._background_reconnect())

# We maintain an active MQTT subscription even when connected locally to receive
# unsolicited status updates (DPS push messages) directly from the cloud.
# Claim the subscription up front. Any failure in the setup below routes
# through _teardown(), which clears this again so the channel is left in
# a clean, re-subscribable state. Without this, a partially-completed
# subscribe (e.g. a transient failure later in connect()) would leave a
# stale callback and the next subscribe() would raise the guard above.
self._callback = callback
Comment thread
allenporter marked this conversation as resolved.
try:
self._mqtt_unsub = await self._mqtt_channel.subscribe(self._on_mqtt_message)
except RoborockException as err:
if not self.is_local_connected:
# Propagate error if both local and MQTT failed
self._logger.debug("MQTT connection also failed: %s", err)
raise
self._logger.debug("MQTT subscription failed, continuing with local-only connection: %s", err)

def unsub() -> None:
"""Unsubscribe from all messages."""
if self._reconnect_task:
self._reconnect_task.cancel()
self._reconnect_task = None
if self._mqtt_unsub:
self._mqtt_unsub()
self._mqtt_unsub = None
if self._local_unsub:
self._local_unsub()
self._local_unsub = None
self._logger.debug("Unsubscribed from device")
# Make an initial, optimistic attempt to connect to local with the
# cache. The cache information will be refreshed by the background task.
try:
await self._local_connect(prefer_cache=True)
except RoborockException as err:
self._logger.debug("First local connection attempt failed, will retry: %s", err)

self._callback = callback
return unsub
# Start a background task to manage the local connection health. This
# happens independent of whether we were able to connect locally now.
if self._reconnect_task is None:
loop = asyncio.get_running_loop()
self._reconnect_task = loop.create_task(self._background_reconnect())

# We maintain an active MQTT subscription even when connected locally to receive
# unsolicited status updates (DPS push messages) directly from the cloud.
try:
self._mqtt_unsub = await self._mqtt_channel.subscribe(self._on_mqtt_message)
except RoborockException as err:
if not self.is_local_connected:
# Propagate error if both local and MQTT failed
self._logger.debug("MQTT connection also failed: %s", err)
raise
self._logger.debug("MQTT subscription failed, continuing with local-only connection: %s", err)
except RoborockException:
# Expected failure path (e.g. both local and MQTT transports down).
# Release the channel so the next subscribe() starts clean.
self._teardown()
raise
except Exception:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a style nit but: I'd like to make this clear if its expected or unexpected exceptions here by explicitly catching and forwarding both RoborockException and Exception. w/ teardown code. Essentially i'd say it like: We do expect RoborockException but this is so critical to get right to avoid leaks that we also catch Exception. I'd also like to log that it was un expected uncaught exception here. Same in Device.

That is, i'd not like it to be normal to catch Exception in this code base. We can violate the rules here given this case is important to not leak, but it's a "shouldn't happen".

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 5f25def. Both V1Channel.subscribe() and RoborockDevice.connect() now catch RoborockException (expected) and Exception (unexpected) as separate branches; each runs the teardown/unsub and re-raises, so behavior is unchanged.

The except Exception branch logs at exception level ("Unexpected error during subscribe/connect; ... to avoid a leak") and carries a comment making clear it is a deliberate, signposted exception to the usual "don't catch Exception" rule, kept only because a leaked subscription or reconnect task would brick the device. The expected RoborockException path stays quiet (the transient cause is already debug-logged at the point it happens).

# Not expected here. We normally avoid a bare ``except Exception`` in
# this codebase, but leaving a partial subscription behind (reconnect
# task, MQTT subscription, stale callback) would brick the device, so we
# deliberately catch broadly, log the unexpected error, and tear down
# before propagating.
self._logger.exception("Unexpected error during subscribe; tearing down to avoid a leak")
self._teardown()
raise

self._logger.debug("Subscribed to device")
return self._teardown

def _teardown(self) -> None:
"""Tear down all subscriptions and reset the channel to a re-subscribable state.

Returned from subscribe() as the unsubscribe function and also invoked on
any failure partway through subscribe(). Idempotent: each resource is
guarded so repeat calls are no-ops.
"""
if self._reconnect_task:
self._reconnect_task.cancel()
self._reconnect_task = None
if self._mqtt_unsub:
self._mqtt_unsub()
self._mqtt_unsub = None
if self._local_unsub:
self._local_unsub()
self._local_unsub = None
if self._local_channel:
self._local_channel.close()
self._local_channel = None
self._callback = None
self._logger.debug("Unsubscribed from device")
Comment thread
pike00 marked this conversation as resolved.

def add_dps_listener(self, listener: Callable[[dict[RoborockDataProtocol, Any]], None]) -> Callable[[], None]:
"""Add a listener for DPS updates.
Expand Down
68 changes: 67 additions & 1 deletion tests/devices/rpc/test_v1_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
USER_DATA = UserData.from_dict(mock_data.USER_DATA)
TEST_DEVICE_UID = "abc123"
TEST_LOCAL_KEY = "local_key"
TEST_SECURITY_DATA = SecurityData(endpoint="test_endpoint", nonce=b"test_nonce_16byte")
TEST_SECURITY_DATA = SecurityData(endpoint="test_endpoint", nonce=b"test_nonce_16byt")
TEST_HOST = mock_data.TEST_LOCAL_API_HOST


Expand Down Expand Up @@ -642,3 +642,69 @@ async def test_v1_channel_dps_listener_raises_exception(

unsub_dps1()
unsub_dps2()


async def test_v1_channel_resubscribe_after_unsub(
v1_channel: V1Channel,
mock_mqtt_channel: FakeChannel,
) -> None:
"""A subscribe -> unsub -> subscribe cycle must not raise, and the new callback works.

Regression: unsub() previously failed to reset the subscription, so the second
subscribe() tripped the "Only one subscription allowed at a time" guard.
This is the exact failure that bricked a second vacuum sharing an account.
"""
mock_mqtt_channel.response_queue.append(TEST_NETWORK_INFO_RESPONSE)
callback = Mock()
unsub = await v1_channel.subscribe(callback)

# The subscribed callback receives messages arriving on the channel.
mock_mqtt_channel.notify_subscribers(TEST_RESPONSE)
callback.assert_called_once_with(TEST_RESPONSE)

# After unsub, the old callback no longer receives messages.
unsub()
callback.reset_mock()
mock_mqtt_channel.notify_subscribers(TEST_RESPONSE)
callback.assert_not_called()

# Re-subscribing must succeed (network info is now cached, no MQTT needed) and
# the new callback then receives messages.
new_callback = Mock()
unsub2 = await v1_channel.subscribe(new_callback)
mock_mqtt_channel.notify_subscribers(TEST_RESPONSE)
new_callback.assert_called_once_with(TEST_RESPONSE)
unsub2()


async def test_v1_channel_subscribe_failure_is_atomic(
v1_channel: V1Channel,
mock_mqtt_channel: FakeChannel,
mock_local_channel: FakeChannel,
) -> None:
"""A failure partway through subscribe() leaves the channel re-subscribable.

Regression: a failed subscribe() previously leaked the background reconnect
task and a partial subscription, so the next attempt could neither reuse nor
cleanly recreate the channel.
"""
# Both transports down: local connect fails and the MQTT subscribe fails.
mock_local_channel.connect.side_effect = RoborockException("local down")
mock_mqtt_channel.subscribe.side_effect = RoborockException("mqtt down")

with pytest.raises(RoborockException):
await v1_channel.subscribe(Mock())

# The failed subscribe left no dangling subscription on the channel.
assert not mock_mqtt_channel.subscribers

# And the channel is re-subscribable once the transports recover: the new
# subscription succeeds and its callback receives messages.
mock_local_channel.connect.side_effect = None
mock_mqtt_channel.subscribe.side_effect = mock_mqtt_channel._subscribe
mock_mqtt_channel.response_queue.append(TEST_NETWORK_INFO_RESPONSE)
callback = Mock()
unsub = await v1_channel.subscribe(callback)
mock_mqtt_channel.notify_subscribers(TEST_RESPONSE)
callback.assert_called_once_with(TEST_RESPONSE)
unsub()
105 changes: 102 additions & 3 deletions tests/devices/test_v1_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,18 @@
import pytest
from syrupy import SnapshotAssertion

from roborock.data import HomeData, S7MaxVStatus, UserData
from roborock.devices.cache import DeviceCache, NoCache
from roborock.data import HomeData, NetworkInfo, S7MaxVStatus, UserData
from roborock.devices.cache import DeviceCache, DeviceCacheData, InMemoryCache, NoCache
from roborock.devices.device import RoborockDevice
from roborock.devices.rpc.v1_channel import V1Channel
from roborock.devices.traits import v1
from roborock.devices.traits.v1.common import V1TraitMixin
from roborock.protocols.v1_protocol import decode_rpc_response
from roborock.devices.transport.local_channel import LocalSession
from roborock.exceptions import RoborockException
from roborock.protocols.v1_protocol import SecurityData, decode_rpc_response
from roborock.roborock_message import RoborockMessage, RoborockMessageProtocol
from tests import mock_data
from tests.fixtures.channel_fixtures import FakeChannel

USER_DATA = UserData.from_dict(mock_data.USER_DATA)
HOME_DATA = HomeData.from_dict(mock_data.HOME_DATA_RAW)
Expand Down Expand Up @@ -181,3 +185,98 @@ async def test_device_trait_command_parsing(
assert device.v1_properties
device_dict = device.diagnostic_data()
assert device_dict == snapshot


@pytest.mark.parametrize(
"start_error",
[RoborockException("transient status fetch failed"), ValueError("unexpected")],
ids=["roborock-exception", "non-roborock-exception"],
)
async def test_connect_unsubscribes_when_start_fails(
device: RoborockDevice,
channel: AsyncMock,
start_error: Exception,
) -> None:
"""connect() must release the channel when start() fails, for any exception.

Regression: the cleanup was scoped to ``except RoborockException``, so a
non-Roborock failure in start() propagated without unsubscribing, leaving the
channel subscribed and the next attempt unable to re-subscribe.
"""
unsub = Mock()
channel.subscribe = AsyncMock(return_value=unsub)
device.v1_properties.start = AsyncMock(side_effect=start_error) # type: ignore[method-assign, union-attr]

with pytest.raises(type(start_error)):
await device.connect()

# The channel was released before the error propagated.
channel.subscribe.assert_awaited_once()
unsub.assert_called_once()

# The device is left re-connectable: once start() recovers, connect()
# succeeds instead of raising "Already connected to the device".
device.v1_properties.start = AsyncMock(return_value=None) # type: ignore[method-assign, union-attr]
await device.connect()
assert channel.subscribe.await_count == 2


async def test_connect_retries_after_transient_start_failure() -> None:
"""End-to-end regression for the Q5 multi-vacuum bug.

A device backed by a real V1Channel: the first connect() subscribes, then
start() fails transiently. The retry must re-subscribe cleanly rather than
raising "Only one subscription allowed at a time", and the device must end
up connected.
"""
duid = HOME_DATA.devices[0].duid

mqtt_channel = FakeChannel()
await mqtt_channel.connect()
local_channel = FakeChannel()
local_session = Mock(spec=LocalSession, return_value=local_channel)

# Cache the network info so local connect doesn't need an MQTT round-trip.
cache = InMemoryCache()
device_cache = DeviceCache(duid, cache)
await device_cache.set(DeviceCacheData(network_info=NetworkInfo.from_dict(mock_data.NETWORK_INFO)))

v1_channel = V1Channel(
device_uid=duid,
security_data=SecurityData(endpoint="test_endpoint", nonce=b"test_nonce_16byt"),
mqtt_channel=mqtt_channel, # type: ignore[arg-type]
local_session=local_session,
device_cache=device_cache,
)
device = RoborockDevice(
device_info=HOME_DATA.devices[0],
product=HOME_DATA.products[0],
channel=v1_channel,
trait=v1.create(
duid,
HOME_DATA.products[0],
HOME_DATA,
AsyncMock(),
AsyncMock(),
AsyncMock(),
Mock(),
AsyncMock(),
device_cache=device_cache,
region=USER_DATA.region,
),
)

# First connect() subscribes successfully, then start() fails transiently;
# the second succeeds.
device.v1_properties.start = AsyncMock(side_effect=[RoborockException("transient"), None]) # type: ignore[method-assign, union-attr]

with pytest.raises(RoborockException):
await device.connect()

# The retry must NOT raise "Only one subscription allowed at a time"; the
# clean release after the transient failure lets connect() re-subscribe and
# the device ends up connected.
await device.connect()
assert device.is_connected

await device.close()
Loading