-
Notifications
You must be signed in to change notification settings - Fork 67
q7/b01: command-layer segment clean + map payload retrieval helpers (split 1/3) #778
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
arduano
wants to merge
6
commits into
Python-roborock:main
Choose a base branch
from
arduano:leo/pr774-split-01-q7-commands
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+257
−3
Open
Changes from all commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
2d03d6a
🦎 q7: split 1/3 command-layer segment + map retrieval APIs
arduano 861b338
Merge branch 'main' into leo/pr774-split-01-q7-commands
arduano 632b160
🦎 q7: harden map command decode + cover RPC payload tests
arduano 733159c
🦎 q7: drop RPC-wrapped map payload path
arduano a20ae01
🦎 q7: split map helpers into dedicated map trait
arduano 0b8f6fe
🦎 q7: align map trait with refresh-based pattern
arduano File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,101 @@ | ||
| """Map trait for B01 Q7 devices.""" | ||
|
|
||
| import asyncio | ||
| from dataclasses import dataclass, field | ||
|
|
||
| from roborock.data import RoborockBase | ||
| from roborock.devices.rpc.b01_q7_channel import send_decoded_command, send_map_command | ||
| from roborock.devices.traits import Trait | ||
| from roborock.devices.transport.mqtt_channel import MqttChannel | ||
| from roborock.exceptions import RoborockException | ||
| from roborock.protocols.b01_q7_protocol import Q7RequestMessage | ||
| from roborock.roborock_typing import RoborockB01Q7Methods | ||
|
|
||
| _Q7_DPS = 10000 | ||
|
|
||
|
|
||
| @dataclass | ||
| class Q7MapListEntry(RoborockBase): | ||
| """Single map list entry returned by `service.get_map_list`.""" | ||
|
|
||
| id: int | None = None | ||
| cur: bool | None = None | ||
|
|
||
|
|
||
| @dataclass | ||
| class Q7MapList(RoborockBase): | ||
| """Map list response returned by `service.get_map_list`.""" | ||
|
|
||
| map_list: list[Q7MapListEntry] = field(default_factory=list) | ||
|
|
||
|
|
||
| class MapTrait(Trait): | ||
| """Map retrieval + map metadata helpers for Q7 devices.""" | ||
|
|
||
| def __init__(self, channel: MqttChannel) -> None: | ||
| self._channel = channel | ||
| # Map uploads are serialized per-device to avoid response cross-wiring. | ||
| self._map_command_lock = asyncio.Lock() | ||
| self._map_list: Q7MapList | None = None | ||
|
|
||
| @property | ||
| def map_list(self) -> Q7MapList | None: | ||
| """Latest cached map list metadata, populated by ``refresh()``.""" | ||
| return self._map_list | ||
|
|
||
| @property | ||
| def current_map_id(self) -> int | None: | ||
| """Current map id derived from cached map list metadata.""" | ||
| if self._map_list is None: | ||
| return None | ||
| return self._extract_current_map_id(self._map_list) | ||
|
|
||
| async def refresh(self) -> None: | ||
| """Refresh cached map list metadata from the device.""" | ||
| response = await send_decoded_command( | ||
| self._channel, | ||
| Q7RequestMessage(dps=_Q7_DPS, command=RoborockB01Q7Methods.GET_MAP_LIST, params={}), | ||
| ) | ||
| if not isinstance(response, dict): | ||
| raise TypeError(f"Unexpected response type for GET_MAP_LIST: {type(response).__name__}: {response!r}") | ||
|
|
||
| parsed = Q7MapList.from_dict(response) | ||
| if parsed is None: | ||
| raise TypeError(f"Failed to decode map list response: {response!r}") | ||
|
|
||
| self._map_list = parsed | ||
|
|
||
| async def _get_map_payload(self, *, map_id: int) -> bytes: | ||
| """Fetch raw map payload bytes for the given map id.""" | ||
| request = Q7RequestMessage( | ||
| dps=_Q7_DPS, | ||
| command=RoborockB01Q7Methods.UPLOAD_BY_MAPID, | ||
| params={"map_id": map_id}, | ||
| ) | ||
| async with self._map_command_lock: | ||
| return await send_map_command(self._channel, request) | ||
|
|
||
| async def get_current_map_payload(self) -> bytes: | ||
| """Fetch raw map payload bytes for the currently selected map.""" | ||
| if self._map_list is None: | ||
| await self.refresh() | ||
|
|
||
| map_id = self.current_map_id | ||
| if map_id is None: | ||
| raise RoborockException(f"Unable to determine map_id from map list response: {self._map_list!r}") | ||
| return await self._get_map_payload(map_id=map_id) | ||
|
|
||
| @staticmethod | ||
| def _extract_current_map_id(map_list_response: Q7MapList) -> int | None: | ||
| map_list = map_list_response.map_list | ||
| if not map_list: | ||
| return None | ||
|
|
||
| for entry in map_list: | ||
| if entry.cur and isinstance(entry.id, int): | ||
| return entry.id | ||
|
|
||
| first = map_list[0] | ||
| if isinstance(first.id, int): | ||
| return first.id | ||
| return None |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PR description says map uploads may return either a MAP_RESPONSE or a decoded RPC response with a hex string under
data.payload, but the added tests only cover the MAP_RESPONSE path. Add a test case that queues an RPC_RESPONSE containingdata: {payload: "...hex..."}and assertsget_current_map_payload()returns the decoded bytes (and errors clearly on non-hex payloads).