From e51f85cdb59a141fcebd9d5d3ab1e718df36c54e Mon Sep 17 00:00:00 2001 From: Luke Date: Sat, 7 Mar 2026 10:35:25 -0500 Subject: [PATCH 1/3] fix: don't reconnect on no active subscribers --- roborock/mqtt/roborock_session.py | 12 +++++++- .../e2e/__snapshots__/test_mqtt_session.ambr | 4 +-- tests/mqtt/test_roborock_session.py | 29 +++++++++++++++++++ 3 files changed, 42 insertions(+), 3 deletions(-) diff --git a/roborock/mqtt/roborock_session.py b/roborock/mqtt/roborock_session.py index 353d3fd2..19f90dc1 100644 --- a/roborock/mqtt/roborock_session.py +++ b/roborock/mqtt/roborock_session.py @@ -27,7 +27,7 @@ _LOGGER = logging.getLogger(__name__) _MQTT_LOGGER = logging.getLogger(f"{__name__}.aiomqtt") -CLIENT_KEEPALIVE = datetime.timedelta(seconds=60) +CLIENT_KEEPALIVE = datetime.timedelta(seconds=45) TOPIC_KEEPALIVE = datetime.timedelta(seconds=60) # Exponential backoff parameters @@ -175,6 +175,16 @@ async def _run_reconnect_loop(self, start_future: asyncio.Future[None] | None) - if self._stop: _LOGGER.debug("MQTT session closed, stopping retry loop") return + if not self._client_subscribed_topics and not self._listeners.keys(): + _LOGGER.debug("MQTT session disconnected with no active subscriptions, deferring reconnect") + self._diagnostics.increment("reconnect_deferred") + while not self._stop and not self._client_subscribed_topics and not self._listeners.keys(): + await asyncio.sleep(0.1) + if self._stop: + _LOGGER.debug("MQTT session closed while waiting for active subscriptions") + return + self._backoff = MIN_BACKOFF_INTERVAL + continue _LOGGER.info("MQTT session disconnected, retrying in %s seconds", self._backoff.total_seconds()) self._diagnostics.increment("reconnect_wait") await asyncio.sleep(self._backoff.total_seconds()) diff --git a/tests/e2e/__snapshots__/test_mqtt_session.ambr b/tests/e2e/__snapshots__/test_mqtt_session.ambr index 533a573d..7caf6eef 100644 --- a/tests/e2e/__snapshots__/test_mqtt_session.ambr +++ b/tests/e2e/__snapshots__/test_mqtt_session.ambr @@ -1,7 +1,7 @@ # serializer version: 1 # name: test_session_e2e_publish_message [mqtt >] - 00000000 10 21 00 04 4d 51 54 54 05 c2 00 3c 00 00 00 00 |.!..MQTT...<....| + 00000000 10 21 00 04 4d 51 54 54 05 c2 00 2d 00 00 00 00 |.!..MQTT...-....| 00000010 08 75 73 65 72 6e 61 6d 65 00 08 70 61 73 73 77 |.username..passw| 00000020 6f 72 64 |ord| [mqtt <] @@ -15,7 +15,7 @@ # --- # name: test_session_e2e_receive_message [mqtt >] - 00000000 10 21 00 04 4d 51 54 54 05 c2 00 3c 00 00 00 00 |.!..MQTT...<....| + 00000000 10 21 00 04 4d 51 54 54 05 c2 00 2d 00 00 00 00 |.!..MQTT...-....| 00000010 08 75 73 65 72 6e 61 6d 65 00 08 70 61 73 73 77 |.username..passw| 00000020 6f 72 64 |ord| [mqtt <] diff --git a/tests/mqtt/test_roborock_session.py b/tests/mqtt/test_roborock_session.py index 17ca9c78..4b2a1149 100644 --- a/tests/mqtt/test_roborock_session.py +++ b/tests/mqtt/test_roborock_session.py @@ -529,8 +529,37 @@ def succeed_then_fail_unauthorized() -> Any: session = await create_mqtt_session(params) assert session.connected + # Keep an active subscription so reconnect attempts are not deferred. + await session.subscribe("topic-1", Subscriber().append) + try: async with asyncio.timeout(10): assert await unauthorized.wait() finally: await session.close() + + +async def test_session_defers_reconnect_when_idle() -> None: + """Test that reconnects are deferred when there are no active subscriptions.""" + + session = RoborockMqttSession(FAKE_PARAMS) + start_future: asyncio.Future[None] = asyncio.Future() + connect_attempts = 0 + + async def fake_run_connection(start: asyncio.Future[None] | None) -> None: + nonlocal connect_attempts + connect_attempts += 1 + if start and not start.done(): + start.set_result(None) + + with patch.object(session, "_run_connection", side_effect=fake_run_connection): + reconnect_task = asyncio.create_task(session._run_reconnect_loop(start_future)) + try: + await start_future + await asyncio.sleep(0.1) + assert connect_attempts == 1 + assert session._diagnostics.as_dict().get("reconnect_deferred", 0) >= 1 + finally: + session._stop = True + reconnect_task.cancel() + await asyncio.gather(reconnect_task, return_exceptions=True) From aa2b846a3b49e76d6e2cafba3ce4a46ab29dbbcb Mon Sep 17 00:00:00 2001 From: Luke Date: Sat, 7 Mar 2026 10:44:56 -0500 Subject: [PATCH 2/3] fix: snapshot --- tests/e2e/__snapshots__/test_device_manager.ambr | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/e2e/__snapshots__/test_device_manager.ambr b/tests/e2e/__snapshots__/test_device_manager.ambr index 7ef8b342..17a045d4 100644 --- a/tests/e2e/__snapshots__/test_device_manager.ambr +++ b/tests/e2e/__snapshots__/test_device_manager.ambr @@ -1,7 +1,7 @@ # serializer version: 1 # name: test_a01_device[home_data0] [mqtt >] - 00000000 10 29 00 04 4d 51 54 54 05 c2 00 3c 00 00 00 00 |.)..MQTT...<....| + 00000000 10 29 00 04 4d 51 54 54 05 c2 00 2d 00 00 00 00 |.)..MQTT...-....| 00000010 08 31 39 36 34 38 66 39 34 00 10 32 33 34 36 37 |.19648f94..23467| 00000020 38 65 61 38 35 34 66 31 39 39 65 |8ea854f199e| [mqtt <] @@ -29,7 +29,7 @@ # --- # name: test_l01_device [mqtt >] - 00000000 10 29 00 04 4d 51 54 54 05 c2 00 3c 00 00 00 00 |.)..MQTT...<....| + 00000000 10 29 00 04 4d 51 54 54 05 c2 00 2d 00 00 00 00 |.)..MQTT...-....| 00000010 08 31 39 36 34 38 66 39 34 00 10 32 33 34 36 37 |.19648f94..23467| 00000020 38 65 61 38 35 34 66 31 39 39 65 |8ea854f199e| [mqtt <] @@ -240,7 +240,7 @@ # --- # name: test_q10_device[home_data0] [mqtt >] - 00000000 10 29 00 04 4d 51 54 54 05 c2 00 3c 00 00 00 00 |.)..MQTT...<....| + 00000000 10 29 00 04 4d 51 54 54 05 c2 00 2d 00 00 00 00 |.)..MQTT...-....| 00000010 08 31 39 36 34 38 66 39 34 00 10 32 33 34 36 37 |.19648f94..23467| 00000020 38 65 61 38 35 34 66 31 39 39 65 |8ea854f199e| [mqtt <] @@ -262,7 +262,7 @@ # --- # name: test_q7_device[home_data0] [mqtt >] - 00000000 10 29 00 04 4d 51 54 54 05 c2 00 3c 00 00 00 00 |.)..MQTT...<....| + 00000000 10 29 00 04 4d 51 54 54 05 c2 00 2d 00 00 00 00 |.)..MQTT...-....| 00000010 08 31 39 36 34 38 66 39 34 00 10 32 33 34 36 37 |.19648f94..23467| 00000020 38 65 61 38 35 34 66 31 39 39 65 |8ea854f199e| [mqtt <] @@ -326,7 +326,7 @@ # --- # name: test_v1_device [mqtt >] - 00000000 10 29 00 04 4d 51 54 54 05 c2 00 3c 00 00 00 00 |.)..MQTT...<....| + 00000000 10 29 00 04 4d 51 54 54 05 c2 00 2d 00 00 00 00 |.)..MQTT...-....| 00000010 08 31 39 36 34 38 66 39 34 00 10 32 33 34 36 37 |.19648f94..23467| 00000020 38 65 61 38 35 34 66 31 39 39 65 |8ea854f199e| [mqtt <] From f5fb7940e021df898f4234444bd7742da0d69a7e Mon Sep 17 00:00:00 2001 From: Luke Date: Sat, 7 Mar 2026 13:09:40 -0500 Subject: [PATCH 3/3] chore: address comments --- roborock/mqtt/roborock_session.py | 9 ++++--- tests/mqtt/test_roborock_session.py | 40 +++++++++++++---------------- 2 files changed, 24 insertions(+), 25 deletions(-) diff --git a/roborock/mqtt/roborock_session.py b/roborock/mqtt/roborock_session.py index 19f90dc1..ec6e5aa7 100644 --- a/roborock/mqtt/roborock_session.py +++ b/roborock/mqtt/roborock_session.py @@ -56,9 +56,12 @@ class RoborockMqttSession(MqttSession): The client is run as a background task that will run until shutdown. Once connected, the client will wait for messages to be received in a loop. If the connection is lost, the client will be re-created and reconnected. There - is backoff to avoid spamming the broker with connection attempts. The client - will automatically re-establish any subscriptions when the connection is - re-established. + is backoff to avoid spamming the broker with connection attempts. + + Reconnect attempts are deferred while there are no active subscriptions, + which avoids unnecessary reconnect churn for idle sessions. Reconnects + resume as soon as a subscription is added again. The client automatically + re-establishes any existing subscriptions when the connection returns. """ def __init__( diff --git a/tests/mqtt/test_roborock_session.py b/tests/mqtt/test_roborock_session.py index 4b2a1149..65ccbbdf 100644 --- a/tests/mqtt/test_roborock_session.py +++ b/tests/mqtt/test_roborock_session.py @@ -539,27 +539,23 @@ def succeed_then_fail_unauthorized() -> Any: await session.close() -async def test_session_defers_reconnect_when_idle() -> None: +async def test_session_defers_reconnect_when_idle( + mock_aenter_client: AsyncMock, + message_iterator: FakeAsyncIterator, + mqtt_client_lite: AsyncMock, +) -> None: """Test that reconnects are deferred when there are no active subscriptions.""" - session = RoborockMqttSession(FAKE_PARAMS) - start_future: asyncio.Future[None] = asyncio.Future() - connect_attempts = 0 - - async def fake_run_connection(start: asyncio.Future[None] | None) -> None: - nonlocal connect_attempts - connect_attempts += 1 - if start and not start.done(): - start.set_result(None) - - with patch.object(session, "_run_connection", side_effect=fake_run_connection): - reconnect_task = asyncio.create_task(session._run_reconnect_loop(start_future)) - try: - await start_future - await asyncio.sleep(0.1) - assert connect_attempts == 1 - assert session._diagnostics.as_dict().get("reconnect_deferred", 0) >= 1 - finally: - session._stop = True - reconnect_task.cancel() - await asyncio.gather(reconnect_task, return_exceptions=True) + params = copy.deepcopy(FAKE_PARAMS) + message_iterator.loop = False + + session = await create_mqtt_session(params) + + assert mqtt_client_lite.messages is message_iterator + + try: + await asyncio.sleep(0.1) + assert mock_aenter_client.await_count == 1 + assert params.diagnostics.as_dict().get("reconnect_deferred", 0) >= 1 + finally: + await session.close()