22
33from __future__ import annotations
44
5+ import asyncio
56import logging
7+ from typing import Any , Iterable
68
79from roborock .data .b01_q10 .b01_q10_code_mappings import B01_Q10_DP
810from roborock .devices .transport .mqtt_channel import MqttChannel
911from roborock .exceptions import RoborockException
1012from roborock .protocols .b01_q10_protocol import (
1113 ParamsType ,
14+ decode_rpc_response ,
1215 encode_mqtt_payload ,
1316)
17+ from roborock .roborock_message import RoborockMessage
1418
1519_LOGGER = logging .getLogger (__name__ )
20+ _TIMEOUT = 10.0
1621
1722
1823async def send_command (
@@ -34,3 +39,61 @@ async def send_command(
3439 ex ,
3540 )
3641 raise
42+
43+
44+ async def send_decoded_command (
45+ mqtt_channel : MqttChannel ,
46+ command : B01_Q10_DP ,
47+ params : ParamsType ,
48+ expected_dps : Iterable [B01_Q10_DP ] | None = None ,
49+ ) -> dict [B01_Q10_DP , Any ]:
50+ """Send a command and await the first decoded response.
51+
52+ Q10 responses are not correlated with a message id, so we filter on
53+ expected datapoints when provided.
54+ """
55+ roborock_message = encode_mqtt_payload (command , params )
56+ future : asyncio .Future [dict [B01_Q10_DP , Any ]] = asyncio .get_running_loop ().create_future ()
57+
58+ expected_set = set (expected_dps ) if expected_dps is not None else None
59+
60+ def find_response (response_message : RoborockMessage ) -> None :
61+ try :
62+ decoded_dps = decode_rpc_response (response_message )
63+ except RoborockException as ex :
64+ _LOGGER .debug (
65+ "Failed to decode B01 Q10 RPC response (expecting %s): %s: %s" ,
66+ command ,
67+ response_message ,
68+ ex ,
69+ )
70+ return
71+ if expected_set and not any (dps in decoded_dps for dps in expected_set ):
72+ return
73+ if not future .done ():
74+ future .set_result (decoded_dps )
75+
76+ unsub = await mqtt_channel .subscribe (find_response )
77+
78+ _LOGGER .debug ("Sending MQTT message: %s" , roborock_message )
79+ try :
80+ await mqtt_channel .publish (roborock_message )
81+ return await asyncio .wait_for (future , timeout = _TIMEOUT )
82+ except TimeoutError as ex :
83+ raise RoborockException (f"B01 Q10 command timed out after { _TIMEOUT } s ({ command } )" ) from ex
84+ except RoborockException as ex :
85+ _LOGGER .warning (
86+ "Error sending B01 Q10 decoded command (%s): %s" ,
87+ command ,
88+ ex ,
89+ )
90+ raise
91+ except Exception as ex :
92+ _LOGGER .exception (
93+ "Error sending B01 Q10 decoded command (%s): %s" ,
94+ command ,
95+ ex ,
96+ )
97+ raise
98+ finally :
99+ unsub ()
0 commit comments