Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions roborock/data/v1/v1_containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,14 @@ class FieldNameBase(StrEnum):


class StatusField(FieldNameBase):
"""An enum that represents a field in the `Status` class.
"""An enum that represents a field in the `StatusV2` class.

This is used with `roborock.devices.traits.v1.status.DeviceFeaturesTrait`
to understand if a feature is supported by the device using `is_field_supported`.

The enum values are names of fields in the `Status` class. Each field is annotated
with a metadata value to determine if the field is supported by the device.
The enum values are names of fields in the `StatusV2` class. Each field is
annotated with `dps` metadata to map the field to a `RoborockDataProtocol`
value used to check support against the product schema.
"""

STATE = "state"
Expand Down Expand Up @@ -629,8 +630,9 @@ class ConsumableField(FieldNameBase):
This is used with `roborock.devices.traits.v1.status.DeviceFeaturesTrait`
to understand if a feature is supported by the device using `is_field_supported`.

The enum values are names of fields in the `Consumable` class. Each field is annotated
with a metadata value to determine if the field is supported by the device.
The enum values are names of fields in the `Consumable` class. Each field is
annotated with `dps` metadata to map the field to a `RoborockDataProtocol`
value used to check support against the product schema.
"""

MAIN_BRUSH_WORK_TIME = "main_brush_work_time"
Expand Down
4 changes: 3 additions & 1 deletion roborock/devices/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ async def connect(self) -> None:
unsub = await self._channel.subscribe(self._on_message)
try:
if self.v1_properties is not None:
await self.v1_properties.discover_features()
await self.v1_properties.start()
elif self.b01_q10_properties is not None:
await self.b01_q10_properties.start()
except RoborockException:
Expand All @@ -216,6 +216,8 @@ async def close(self) -> None:
await self._connect_task
except asyncio.CancelledError:
pass
if self.v1_properties is not None:
self.v1_properties.close()
if self.b01_q10_properties is not None:
await self.b01_q10_properties.close()
if self._unsub:
Expand Down
1 change: 1 addition & 0 deletions roborock/devices/device_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ def device_creator(home_data: HomeData, device: HomeDataDevice, product: HomeDat
channel.rpc_channel,
channel.mqtt_rpc_channel,
channel.map_rpc_channel,
channel.add_dps_listener,
Comment on lines 236 to +239
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

device_creator types channel as the Channel protocol, but then passes channel.add_dps_listener into v1.create(...). Channel doesn’t define add_dps_listener, so this will fail mypy (which is enabled via pre-commit). Consider either (a) extending the Channel protocol to include add_dps_listener (possibly as an optional/no-op for non-V1 channels), or (b) using a more specific protocol/type for V1 channels in this match arm (e.g., a V1Channel protocol with add_dps_listener).

Copilot uses AI. Check for mistakes.
web_api,
device_cache=device_cache,
map_parser_config=map_parser_config,
Expand Down
40 changes: 34 additions & 6 deletions roborock/devices/rpc/v1_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from dataclasses import dataclass
from typing import Any, TypeVar

from roborock.callbacks import CallbackList
from roborock.data import HomeDataDevice, NetworkInfo, RoborockBase, UserData
from roborock.devices.cache import DeviceCache
from roborock.devices.transport.channel import Channel
Expand All @@ -30,9 +31,10 @@
V1RpcChannel,
create_map_response_decoder,
create_security_data,
decode_data_protocol_message,
decode_rpc_response,
)
from roborock.roborock_message import RoborockMessage, RoborockMessageProtocol
from roborock.roborock_message import RoborockDataProtocol, RoborockMessage, RoborockMessageProtocol
from roborock.roborock_typing import RoborockCommand
from roborock.util import RoborockLoggerAdapter

Expand Down Expand Up @@ -188,6 +190,7 @@ def __init__(
self._device_cache = device_cache
self._reconnect_task: asyncio.Task[None] | None = None
self._last_network_info_refresh: datetime.datetime | None = None
self._dps_listeners = CallbackList[dict[RoborockDataProtocol, Any]]()

@property
def is_connected(self) -> bool:
Expand Down Expand Up @@ -305,12 +308,16 @@ async def subscribe(self, callback: Callable[[RoborockMessage], None]) -> Callab
loop = asyncio.get_running_loop()
self._reconnect_task = loop.create_task(self._background_reconnect())

if not self.is_local_connected:
# We were not able to connect locally, so fallback to MQTT and at least
# establish that connection explicitly. If this fails then raise an
# error and let the caller know we failed to subscribe.
# We maintain an active MQTT subscription even when connected locally to receive
# unsolicited status updates (DPS push messages) directly from the cloud.
try:
self._mqtt_unsub = await self._mqtt_channel.subscribe(self._on_mqtt_message)
self._logger.debug("V1Channel connected to device via MQTT")
except RoborockException as err:
if not self.is_local_connected:
# Propagate error if both local and MQTT failed
self._logger.debug("MQTT connection also failed: %s", err)
raise
self._logger.debug("MQTT subscription failed, continuing with local-only connection: %s", err)

def unsub() -> None:
"""Unsubscribe from all messages."""
Expand All @@ -328,6 +335,16 @@ def unsub() -> None:
self._callback = callback
return unsub

def add_dps_listener(self, listener: Callable[[dict[RoborockDataProtocol, Any]], None]) -> Callable[[], None]:
"""Add a listener for DPS updates.

This will attach a listener to the existing subscription, invoking
the listener whenever new DPS values arrive from the subscription.
This will only work if a subscription has already been setup, which is
handled by the device start.
"""
return self._dps_listeners.add_callback(listener)

async def _get_networking_info(self, *, prefer_cache: bool = True) -> NetworkInfo:
"""Retrieve networking information for the device.

Expand Down Expand Up @@ -428,6 +445,17 @@ def _on_mqtt_message(self, message: RoborockMessage) -> None:
self._logger.debug("V1Channel received MQTT message: %s", message)
if self._callback:
self._callback(message)
try:
datapoints = decode_data_protocol_message(message)
except RoborockException as e:
self._logger.debug("Error decoding data protocol message: %s", e)
return

if datapoints:
try:
self._dps_listeners(datapoints)
except Exception:
self._logger.exception("Error in DPS listener callback")

def _on_local_message(self, message: RoborockMessage) -> None:
"""Handle incoming local messages."""
Expand Down
31 changes: 30 additions & 1 deletion roborock/devices/traits/v1/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,18 @@
"""

import logging
from collections.abc import Callable
from dataclasses import dataclass, field, fields
from typing import Any, get_args

from roborock.data.containers import HomeData, HomeDataProduct, RoborockBase
from roborock.data.v1.v1_code_mappings import RoborockDockTypeCode
from roborock.devices.cache import DeviceCache
from roborock.devices.traits import Trait
from roborock.exceptions import RoborockException
from roborock.map.map_parser import MapParserConfig
from roborock.protocols.v1_protocol import V1RpcChannel
from roborock.protocols.v1_protocol import V1RpcChannel, decode_data_protocol_message
from roborock.roborock_message import RoborockDataProtocol, RoborockMessage
from roborock.web_api import UserWebApiClient

from . import (
Expand Down Expand Up @@ -176,6 +179,7 @@ def __init__(
rpc_channel: V1RpcChannel,
mqtt_rpc_channel: V1RpcChannel,
map_rpc_channel: V1RpcChannel,
add_dps_listener: Callable[[Callable[[dict[RoborockDataProtocol, Any]], None]], Callable[[], None]],
web_api: UserWebApiClient,
device_cache: DeviceCache,
map_parser_config: MapParserConfig | None = None,
Expand All @@ -189,6 +193,8 @@ def __init__(
self._web_api = web_api
self._device_cache = device_cache
self._region = region
self._unsub: Callable[[], None] | None = None
self._add_dps_listener = add_dps_listener

self.device_features = DeviceFeaturesTrait(product, self._device_cache)
self.status = StatusTrait(self.device_features, region=self._region)
Expand Down Expand Up @@ -227,6 +233,27 @@ def _get_rpc_channel(self, trait: V1TraitMixin) -> V1RpcChannel:
else:
return self._rpc_channel

async def start(self) -> None:
"""Start the properties API and discover features."""
if self._unsub:
return
await self.discover_features()
self._unsub = self._add_dps_listener(self._on_dps_update)

def close(self) -> None:
if self._unsub:
self._unsub()
Comment on lines +240 to +245
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PropertiesApi.close() calls the stored unsubscribe but doesn’t clear self._unsub. If close() is called more than once (or if start() is called again), this can lead to double-unsubscribe attempts or holding a stale callable. Consider setting self._unsub = None after invoking it (and possibly guarding against start() being called twice without closing).

Suggested change
await self.discover_features()
self._unsub = self._add_dps_listener(self._on_dps_update)
def close(self) -> None:
if self._unsub:
self._unsub()
# Avoid registering multiple listeners if start is called more than once
if getattr(self, "_unsub", None):
return
await self.discover_features()
self._unsub = self._add_dps_listener(self._on_dps_update)
def close(self) -> None:
unsub = getattr(self, "_unsub", None)
if unsub:
unsub()
self._unsub = None

Copilot uses AI. Check for mistakes.
self._unsub = None

def _on_dps_update(self, dps: dict[RoborockDataProtocol, Any]) -> None:
"""Handle incoming messages from the device.

This will notify all traits of the new values.
"""
_LOGGER.debug("Received message from device: %s", dps)
self.status.update_from_dps(dps)
self.consumables.update_from_dps(dps)

async def discover_features(self) -> None:
"""Populate any supported traits that were not initialized in __init__."""
_LOGGER.debug("Starting optional trait discovery")
Expand Down Expand Up @@ -330,6 +357,7 @@ def create(
rpc_channel: V1RpcChannel,
mqtt_rpc_channel: V1RpcChannel,
map_rpc_channel: V1RpcChannel,
add_dps_listener: Callable[[Callable[[dict[RoborockDataProtocol, Any]], None]], Callable[[], None]],
web_api: UserWebApiClient,
device_cache: DeviceCache,
map_parser_config: MapParserConfig | None = None,
Expand All @@ -343,6 +371,7 @@ def create(
rpc_channel,
mqtt_rpc_channel,
map_rpc_channel,
add_dps_listener,
web_api,
device_cache,
map_parser_config,
Expand Down
25 changes: 23 additions & 2 deletions roborock/devices/traits/v1/consumeable.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,24 @@
periodically, such as filters, brushes, etc.
"""

import logging
from enum import StrEnum
from typing import Self
from typing import Any, Self

from roborock.data import Consumable
from roborock.devices.traits.common import DpsDataConverter, TraitUpdateListener
from roborock.devices.traits.v1 import common
from roborock.roborock_message import RoborockDataProtocol
from roborock.roborock_typing import RoborockCommand

__all__ = [
"ConsumableTrait",
]

_LOGGER = logging.getLogger(__name__)

_DPS_CONVERTER = DpsDataConverter.from_dataclass(Consumable)


class ConsumableAttribute(StrEnum):
"""Enum for consumable attributes."""
Expand All @@ -35,7 +42,7 @@ def from_str(cls, value: str) -> Self:
raise ValueError(f"Unknown ConsumableAttribute: {value}")


class ConsumableTrait(Consumable, common.V1TraitMixin):
class ConsumableTrait(Consumable, common.V1TraitMixin, TraitUpdateListener):
"""Trait for managing consumable attributes on Roborock devices.

After the first refresh, you can tell what consumables are supported by
Expand All @@ -45,7 +52,21 @@ class ConsumableTrait(Consumable, common.V1TraitMixin):
command = RoborockCommand.GET_CONSUMABLE
converter = common.DefaultConverter(Consumable)

def __init__(self) -> None:
"""Initialize the consumable trait."""
super().__init__()
TraitUpdateListener.__init__(self, logger=_LOGGER)

async def reset_consumable(self, consumable: ConsumableAttribute) -> None:
"""Reset a specific consumable attribute on the device."""
await self.rpc_channel.send_command(RoborockCommand.RESET_CONSUMABLE, params=[consumable.value])
await self.refresh()

def update_from_dps(self, decoded_dps: dict[RoborockDataProtocol, Any]) -> None:
"""Update the trait from data protocol push message data.

This handles unsolicited status updates pushed by the device
via RoborockDataProtocol codes (e.g. STATE=121, BATTERY=122).
"""
if _DPS_CONVERTER.update_from_dps(self, decoded_dps):
self._notify_update()
Comment on lines 45 to +72
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConsumableTrait now mixes in TraitUpdateListener, but it never calls TraitUpdateListener.__init__(). As a result, _update_callbacks is never initialized and update_from_dps() will raise AttributeError when it tries to call _notify_update(). Add an __init__ that calls super().__init__() (to init the Consumable dataclass) and then TraitUpdateListener.__init__(..., logger=...) (and add a module logger similar to StatusTrait).

Copilot uses AI. Check for mistakes.
20 changes: 19 additions & 1 deletion roborock/devices/traits/v1/status.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import logging
from functools import cached_property
from typing import Any

from roborock import (
CleanRoutes,
Expand All @@ -10,13 +12,19 @@
get_water_mode_mapping,
get_water_modes,
)
from roborock.devices.traits.common import DpsDataConverter, TraitUpdateListener
from roborock.roborock_message import RoborockDataProtocol
from roborock.roborock_typing import RoborockCommand

from . import common
from .device_features import DeviceFeaturesTrait

_LOGGER = logging.getLogger(__name__)

class StatusTrait(StatusV2, common.V1TraitMixin):
_DPS_CONVERTER = DpsDataConverter.from_dataclass(StatusV2)


class StatusTrait(StatusV2, common.V1TraitMixin, TraitUpdateListener):
"""Trait for managing the status of Roborock devices.

The StatusTrait gives you the access to the state of a Roborock vacuum.
Expand Down Expand Up @@ -47,6 +55,7 @@ class StatusTrait(StatusV2, common.V1TraitMixin):
def __init__(self, device_feature_trait: DeviceFeaturesTrait, region: str | None = None) -> None:
"""Initialize the StatusTrait."""
super().__init__()
TraitUpdateListener.__init__(self, logger=_LOGGER)
self._device_features_trait = device_feature_trait
self._region = region

Expand Down Expand Up @@ -91,3 +100,12 @@ def mop_route_name(self) -> str | None:
if self.mop_mode is None:
return None
return self.mop_route_mapping.get(self.mop_mode)

def update_from_dps(self, decoded_dps: dict[RoborockDataProtocol, Any]) -> None:
"""Update the trait from data protocol push message data.

This handles unsolicited status updates pushed by the device
via RoborockDataProtocol codes (e.g. STATE=121, BATTERY=122).
"""
if _DPS_CONVERTER.update_from_dps(self, decoded_dps):
self._notify_update()
Loading
Loading