diff --git a/can/interfaces/gs_usb.py b/can/interfaces/gs_usb.py index 6297fc1f5..35de63668 100644 --- a/can/interfaces/gs_usb.py +++ b/can/interfaces/gs_usb.py @@ -12,6 +12,42 @@ logger = logging.getLogger(__name__) +def _scan_gs_usb_devices() -> list[GsUsb]: + """Scan for gs_usb devices using auto-detected backend. + + Unlike :meth:`GsUsb.scan`, this does not force the ``libusb1`` backend, + allowing ``pyusb`` to auto-detect the best available backend. This enables + support for WinUSB on Windows in addition to libusbK. + """ + return [ + GsUsb(dev) + for dev in ( + usb.core.find( + find_all=True, + custom_match=GsUsb.is_gs_usb_device, + ) + or [] + ) + ] + + +def _find_gs_usb_device(bus: int, address: int) -> GsUsb | None: + """Find a specific gs_usb device using auto-detected backend. + + Unlike :meth:`GsUsb.find`, this does not force the ``libusb1`` backend, + allowing ``pyusb`` to auto-detect the best available backend. This enables + support for WinUSB on Windows in addition to libusbK. + """ + dev = usb.core.find( + custom_match=GsUsb.is_gs_usb_device, + bus=bus, + address=address, + ) + if dev: + return GsUsb(dev) + return None + + class GsUsbBus(can.BusABC): def __init__( self, @@ -32,7 +68,6 @@ def __init__( :param can_filters: not supported :param bitrate: CAN network bandwidth (bits/s) """ - self._is_shutdown = False if (index is not None) and ((bus or address) is not None): raise CanInitializationError( "index and bus/address cannot be used simultaneously" @@ -43,7 +78,7 @@ def __init__( self._index = None if index is not None: - devs = GsUsb.scan() + devs = _scan_gs_usb_devices() if len(devs) <= index: raise CanInitializationError( f"Cannot find device {index}. Devices found: {len(devs)}" @@ -51,7 +86,7 @@ def __init__( gs_usb = devs[index] self._index = index else: - gs_usb = GsUsb.find(bus=bus, address=address) + gs_usb = _find_gs_usb_device(bus=bus, address=address) if not gs_usb: raise CanInitializationError(f"Cannot find device {channel}") @@ -139,7 +174,7 @@ def _recv_internal(self, timeout: float | None) -> tuple[can.Message | None, boo frame = GsUsbFrame() # Do not set timeout as None or zero here to avoid blocking - timeout_ms = round(timeout * 1000) if timeout else 1 + timeout_ms = round(timeout * 1000) if timeout else 0 if not self.gs_usb.read(frame=frame, timeout_ms=timeout_ms): return None, False @@ -158,15 +193,16 @@ def _recv_internal(self, timeout: float | None) -> tuple[can.Message | None, boo return msg, False def shutdown(self): - if self._is_shutdown: + already_shutdown = self._is_shutdown + super().shutdown() + if already_shutdown: return - super().shutdown() self.gs_usb.stop() if self._index is not None: # Avoid errors on subsequent __init() by repeating the .scan() and .start() that would otherwise fail # the next time the device is opened in __init__() - devs = GsUsb.scan() + devs = _scan_gs_usb_devices() if self._index < len(devs): gs_usb = devs[self._index] try: @@ -175,4 +211,3 @@ def shutdown(self): gs_usb.stop() except usb.core.USBError: pass - self._is_shutdown = True diff --git a/doc/changelog.d/2031.changed.md b/doc/changelog.d/2031.changed.md new file mode 100644 index 000000000..7d33a6c80 --- /dev/null +++ b/doc/changelog.d/2031.changed.md @@ -0,0 +1,2 @@ +* make gs_usb use WinUSB instead of requiring libusbK. +* gs_usb: timeout=None means foreever diff --git a/doc/interfaces/gs_usb.rst b/doc/interfaces/gs_usb.rst index 8bab07c6f..580a994fc 100755 --- a/doc/interfaces/gs_usb.rst +++ b/doc/interfaces/gs_usb.rst @@ -52,8 +52,10 @@ Windows, Linux and Mac. The backend driver depends on `pyusb `_ so a ``pyusb`` backend driver library such as ``libusb`` must be installed. - On Windows a tool such as `Zadig `_ can be used to set the USB device driver to - ``libusbK``. + On Windows, WinUSB and libusbK are both supported. Devices with WCID (Windows Compatible ID) descriptors, + such as candleLight firmware, will automatically use WinUSB without any additional driver installation. + Alternatively, a tool such as `Zadig `_ can be used to set the USB device driver to + either ``WinUSB`` or ``libusbK``. Supplementary Info diff --git a/pyproject.toml b/pyproject.toml index ddaf61ef5..1f4475176 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -64,7 +64,7 @@ neovi = ["filelock", "python-ics>=2.12"] canalystii = ["canalystii>=0.1.0"] cantact = ["cantact>=0.0.7"] cvector = ["python-can-cvector"] -gs-usb = ["gs-usb>=0.2.1"] +gs-usb = ["gs-usb>=0.2.1", "pyusb>=1.0.2"] nixnet = ["nixnet>=0.3.2"] pcan = ["uptime~=3.0.1"] remote = ["python-can-remote"] @@ -103,6 +103,7 @@ test = [ "coverage==7.13.*", "hypothesis==6.*", "parameterized==0.9.*", + "gs-usb==0.3.*", ] dev = [ {include-group = "docs"}, diff --git a/test/test_interface_gs_usb.py b/test/test_interface_gs_usb.py new file mode 100644 index 000000000..b4ec76e5a --- /dev/null +++ b/test/test_interface_gs_usb.py @@ -0,0 +1,90 @@ +"""Tests for the gs_usb interface.""" + +from unittest.mock import MagicMock, patch + +import pytest + +from can.interfaces.gs_usb import ( + GsUsbBus, + _find_gs_usb_device, + _scan_gs_usb_devices, +) + + +@patch("can.interfaces.gs_usb.usb.core.find") +def test_scan_does_not_force_backend(mock_find): + """Verify that _scan_gs_usb_devices does not pass a backend argument, + allowing pyusb to auto-detect the best available backend (WinUSB, libusbK, etc.).""" + mock_find.return_value = [] + + _scan_gs_usb_devices() + + mock_find.assert_called_once() + call_kwargs = mock_find.call_args[1] + assert ( + "backend" not in call_kwargs + ), "backend should not be specified so pyusb can auto-detect" + assert call_kwargs["find_all"] is True + + +@patch("can.interfaces.gs_usb.usb.core.find") +def test_find_does_not_force_backend(mock_find): + """Verify that _find_gs_usb_device does not pass a backend argument, + allowing pyusb to auto-detect the best available backend (WinUSB, libusbK, etc.).""" + mock_find.return_value = None + + _find_gs_usb_device(bus=1, address=2) + + mock_find.assert_called_once() + call_kwargs = mock_find.call_args[1] + assert ( + "backend" not in call_kwargs + ), "backend should not be specified so pyusb can auto-detect" + assert call_kwargs["bus"] == 1 + assert call_kwargs["address"] == 2 + + +@patch("can.interfaces.gs_usb.usb.core.find") +def test_scan_returns_gs_usb_devices(mock_find): + """Verify that _scan_gs_usb_devices wraps found USB devices in GsUsb objects.""" + mock_dev1 = MagicMock() + mock_dev2 = MagicMock() + mock_find.return_value = [mock_dev1, mock_dev2] + + devices = _scan_gs_usb_devices() + + assert len(devices) == 2 + assert devices[0].gs_usb is mock_dev1 + assert devices[1].gs_usb is mock_dev2 + + +@patch("can.interfaces.gs_usb.usb.core.find") +def test_find_returns_gs_usb_device(mock_find): + """Verify that _find_gs_usb_device wraps the found USB device in a GsUsb object.""" + mock_dev = MagicMock() + mock_find.return_value = mock_dev + + device = _find_gs_usb_device(bus=1, address=2) + + assert device is not None + assert device.gs_usb is mock_dev + + +@patch("can.interfaces.gs_usb.usb.core.find") +def test_find_returns_none_when_no_device(mock_find): + """Verify that _find_gs_usb_device returns None when no device is found.""" + mock_find.return_value = None + + device = _find_gs_usb_device(bus=1, address=2) + + assert device is None + + +@patch("can.interfaces.gs_usb.usb.core.find") +def test_scan_returns_empty_list_when_no_devices(mock_find): + """Verify that _scan_gs_usb_devices returns an empty list when no devices are found.""" + mock_find.return_value = [] + + devices = _scan_gs_usb_devices() + + assert devices == []