diff --git a/can/interfaces/gs_usb.py b/can/interfaces/gs_usb.py
index 6297fc1f5..35de63668 100644
--- a/can/interfaces/gs_usb.py
+++ b/can/interfaces/gs_usb.py
@@ -12,6 +12,42 @@
logger = logging.getLogger(__name__)
+def _scan_gs_usb_devices() -> list[GsUsb]:
+ """Scan for gs_usb devices using auto-detected backend.
+
+ Unlike :meth:`GsUsb.scan`, this does not force the ``libusb1`` backend,
+ allowing ``pyusb`` to auto-detect the best available backend. This enables
+ support for WinUSB on Windows in addition to libusbK.
+ """
+ return [
+ GsUsb(dev)
+ for dev in (
+ usb.core.find(
+ find_all=True,
+ custom_match=GsUsb.is_gs_usb_device,
+ )
+ or []
+ )
+ ]
+
+
+def _find_gs_usb_device(bus: int, address: int) -> GsUsb | None:
+ """Find a specific gs_usb device using auto-detected backend.
+
+ Unlike :meth:`GsUsb.find`, this does not force the ``libusb1`` backend,
+ allowing ``pyusb`` to auto-detect the best available backend. This enables
+ support for WinUSB on Windows in addition to libusbK.
+ """
+ dev = usb.core.find(
+ custom_match=GsUsb.is_gs_usb_device,
+ bus=bus,
+ address=address,
+ )
+ if dev:
+ return GsUsb(dev)
+ return None
+
+
class GsUsbBus(can.BusABC):
def __init__(
self,
@@ -32,7 +68,6 @@ def __init__(
:param can_filters: not supported
:param bitrate: CAN network bandwidth (bits/s)
"""
- self._is_shutdown = False
if (index is not None) and ((bus or address) is not None):
raise CanInitializationError(
"index and bus/address cannot be used simultaneously"
@@ -43,7 +78,7 @@ def __init__(
self._index = None
if index is not None:
- devs = GsUsb.scan()
+ devs = _scan_gs_usb_devices()
if len(devs) <= index:
raise CanInitializationError(
f"Cannot find device {index}. Devices found: {len(devs)}"
@@ -51,7 +86,7 @@ def __init__(
gs_usb = devs[index]
self._index = index
else:
- gs_usb = GsUsb.find(bus=bus, address=address)
+ gs_usb = _find_gs_usb_device(bus=bus, address=address)
if not gs_usb:
raise CanInitializationError(f"Cannot find device {channel}")
@@ -139,7 +174,7 @@ def _recv_internal(self, timeout: float | None) -> tuple[can.Message | None, boo
frame = GsUsbFrame()
# Do not set timeout as None or zero here to avoid blocking
- timeout_ms = round(timeout * 1000) if timeout else 1
+ timeout_ms = round(timeout * 1000) if timeout else 0
if not self.gs_usb.read(frame=frame, timeout_ms=timeout_ms):
return None, False
@@ -158,15 +193,16 @@ def _recv_internal(self, timeout: float | None) -> tuple[can.Message | None, boo
return msg, False
def shutdown(self):
- if self._is_shutdown:
+ already_shutdown = self._is_shutdown
+ super().shutdown()
+ if already_shutdown:
return
- super().shutdown()
self.gs_usb.stop()
if self._index is not None:
# Avoid errors on subsequent __init() by repeating the .scan() and .start() that would otherwise fail
# the next time the device is opened in __init__()
- devs = GsUsb.scan()
+ devs = _scan_gs_usb_devices()
if self._index < len(devs):
gs_usb = devs[self._index]
try:
@@ -175,4 +211,3 @@ def shutdown(self):
gs_usb.stop()
except usb.core.USBError:
pass
- self._is_shutdown = True
diff --git a/doc/changelog.d/2031.changed.md b/doc/changelog.d/2031.changed.md
new file mode 100644
index 000000000..7d33a6c80
--- /dev/null
+++ b/doc/changelog.d/2031.changed.md
@@ -0,0 +1,2 @@
+* make gs_usb use WinUSB instead of requiring libusbK.
+* gs_usb: timeout=None means foreever
diff --git a/doc/interfaces/gs_usb.rst b/doc/interfaces/gs_usb.rst
index 8bab07c6f..580a994fc 100755
--- a/doc/interfaces/gs_usb.rst
+++ b/doc/interfaces/gs_usb.rst
@@ -52,8 +52,10 @@ Windows, Linux and Mac.
The backend driver depends on `pyusb `_ so a ``pyusb`` backend driver library such as
``libusb`` must be installed.
- On Windows a tool such as `Zadig `_ can be used to set the USB device driver to
- ``libusbK``.
+ On Windows, WinUSB and libusbK are both supported. Devices with WCID (Windows Compatible ID) descriptors,
+ such as candleLight firmware, will automatically use WinUSB without any additional driver installation.
+ Alternatively, a tool such as `Zadig `_ can be used to set the USB device driver to
+ either ``WinUSB`` or ``libusbK``.
Supplementary Info
diff --git a/pyproject.toml b/pyproject.toml
index ddaf61ef5..1f4475176 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -64,7 +64,7 @@ neovi = ["filelock", "python-ics>=2.12"]
canalystii = ["canalystii>=0.1.0"]
cantact = ["cantact>=0.0.7"]
cvector = ["python-can-cvector"]
-gs-usb = ["gs-usb>=0.2.1"]
+gs-usb = ["gs-usb>=0.2.1", "pyusb>=1.0.2"]
nixnet = ["nixnet>=0.3.2"]
pcan = ["uptime~=3.0.1"]
remote = ["python-can-remote"]
@@ -103,6 +103,7 @@ test = [
"coverage==7.13.*",
"hypothesis==6.*",
"parameterized==0.9.*",
+ "gs-usb==0.3.*",
]
dev = [
{include-group = "docs"},
diff --git a/test/test_interface_gs_usb.py b/test/test_interface_gs_usb.py
new file mode 100644
index 000000000..b4ec76e5a
--- /dev/null
+++ b/test/test_interface_gs_usb.py
@@ -0,0 +1,90 @@
+"""Tests for the gs_usb interface."""
+
+from unittest.mock import MagicMock, patch
+
+import pytest
+
+from can.interfaces.gs_usb import (
+ GsUsbBus,
+ _find_gs_usb_device,
+ _scan_gs_usb_devices,
+)
+
+
+@patch("can.interfaces.gs_usb.usb.core.find")
+def test_scan_does_not_force_backend(mock_find):
+ """Verify that _scan_gs_usb_devices does not pass a backend argument,
+ allowing pyusb to auto-detect the best available backend (WinUSB, libusbK, etc.)."""
+ mock_find.return_value = []
+
+ _scan_gs_usb_devices()
+
+ mock_find.assert_called_once()
+ call_kwargs = mock_find.call_args[1]
+ assert (
+ "backend" not in call_kwargs
+ ), "backend should not be specified so pyusb can auto-detect"
+ assert call_kwargs["find_all"] is True
+
+
+@patch("can.interfaces.gs_usb.usb.core.find")
+def test_find_does_not_force_backend(mock_find):
+ """Verify that _find_gs_usb_device does not pass a backend argument,
+ allowing pyusb to auto-detect the best available backend (WinUSB, libusbK, etc.)."""
+ mock_find.return_value = None
+
+ _find_gs_usb_device(bus=1, address=2)
+
+ mock_find.assert_called_once()
+ call_kwargs = mock_find.call_args[1]
+ assert (
+ "backend" not in call_kwargs
+ ), "backend should not be specified so pyusb can auto-detect"
+ assert call_kwargs["bus"] == 1
+ assert call_kwargs["address"] == 2
+
+
+@patch("can.interfaces.gs_usb.usb.core.find")
+def test_scan_returns_gs_usb_devices(mock_find):
+ """Verify that _scan_gs_usb_devices wraps found USB devices in GsUsb objects."""
+ mock_dev1 = MagicMock()
+ mock_dev2 = MagicMock()
+ mock_find.return_value = [mock_dev1, mock_dev2]
+
+ devices = _scan_gs_usb_devices()
+
+ assert len(devices) == 2
+ assert devices[0].gs_usb is mock_dev1
+ assert devices[1].gs_usb is mock_dev2
+
+
+@patch("can.interfaces.gs_usb.usb.core.find")
+def test_find_returns_gs_usb_device(mock_find):
+ """Verify that _find_gs_usb_device wraps the found USB device in a GsUsb object."""
+ mock_dev = MagicMock()
+ mock_find.return_value = mock_dev
+
+ device = _find_gs_usb_device(bus=1, address=2)
+
+ assert device is not None
+ assert device.gs_usb is mock_dev
+
+
+@patch("can.interfaces.gs_usb.usb.core.find")
+def test_find_returns_none_when_no_device(mock_find):
+ """Verify that _find_gs_usb_device returns None when no device is found."""
+ mock_find.return_value = None
+
+ device = _find_gs_usb_device(bus=1, address=2)
+
+ assert device is None
+
+
+@patch("can.interfaces.gs_usb.usb.core.find")
+def test_scan_returns_empty_list_when_no_devices(mock_find):
+ """Verify that _scan_gs_usb_devices returns an empty list when no devices are found."""
+ mock_find.return_value = []
+
+ devices = _scan_gs_usb_devices()
+
+ assert devices == []