Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 43 additions & 8 deletions can/interfaces/gs_usb.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,42 @@
logger = logging.getLogger(__name__)


def _scan_gs_usb_devices() -> list[GsUsb]:
"""Scan for gs_usb devices using auto-detected backend.
Unlike :meth:`GsUsb.scan`, this does not force the ``libusb1`` backend,
allowing ``pyusb`` to auto-detect the best available backend. This enables
support for WinUSB on Windows in addition to libusbK.
"""
return [
GsUsb(dev)
for dev in (
usb.core.find(
find_all=True,
custom_match=GsUsb.is_gs_usb_device,
)
or []
)
]


def _find_gs_usb_device(bus: int, address: int) -> GsUsb | None:
"""Find a specific gs_usb device using auto-detected backend.
Unlike :meth:`GsUsb.find`, this does not force the ``libusb1`` backend,
allowing ``pyusb`` to auto-detect the best available backend. This enables
support for WinUSB on Windows in addition to libusbK.
"""
dev = usb.core.find(
custom_match=GsUsb.is_gs_usb_device,
bus=bus,
address=address,
)
if dev:
return GsUsb(dev)
return None


class GsUsbBus(can.BusABC):
def __init__(
self,
Expand All @@ -32,7 +68,6 @@ def __init__(
:param can_filters: not supported
:param bitrate: CAN network bandwidth (bits/s)
"""
self._is_shutdown = False
if (index is not None) and ((bus or address) is not None):
raise CanInitializationError(
"index and bus/address cannot be used simultaneously"
Expand All @@ -43,15 +78,15 @@ def __init__(

self._index = None
if index is not None:
devs = GsUsb.scan()
devs = _scan_gs_usb_devices()
if len(devs) <= index:
raise CanInitializationError(
f"Cannot find device {index}. Devices found: {len(devs)}"
)
gs_usb = devs[index]
self._index = index
else:
gs_usb = GsUsb.find(bus=bus, address=address)
gs_usb = _find_gs_usb_device(bus=bus, address=address)
if not gs_usb:
raise CanInitializationError(f"Cannot find device {channel}")

Expand Down Expand Up @@ -139,7 +174,7 @@ def _recv_internal(self, timeout: float | None) -> tuple[can.Message | None, boo
frame = GsUsbFrame()

# Do not set timeout as None or zero here to avoid blocking
timeout_ms = round(timeout * 1000) if timeout else 1
timeout_ms = round(timeout * 1000) if timeout else 0
if not self.gs_usb.read(frame=frame, timeout_ms=timeout_ms):
return None, False

Expand All @@ -158,15 +193,16 @@ def _recv_internal(self, timeout: float | None) -> tuple[can.Message | None, boo
return msg, False

def shutdown(self):
if self._is_shutdown:
already_shutdown = self._is_shutdown
super().shutdown()
if already_shutdown:
return

super().shutdown()
self.gs_usb.stop()
if self._index is not None:
# Avoid errors on subsequent __init() by repeating the .scan() and .start() that would otherwise fail
# the next time the device is opened in __init__()
devs = GsUsb.scan()
devs = _scan_gs_usb_devices()
if self._index < len(devs):
gs_usb = devs[self._index]
try:
Expand All @@ -175,4 +211,3 @@ def shutdown(self):
gs_usb.stop()
except usb.core.USBError:
pass
self._is_shutdown = True
2 changes: 2 additions & 0 deletions doc/changelog.d/2031.changed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
* make gs_usb use WinUSB instead of requiring libusbK.
* gs_usb: timeout=None means foreever
6 changes: 4 additions & 2 deletions doc/interfaces/gs_usb.rst
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@ Windows, Linux and Mac.
The backend driver depends on `pyusb <https://pyusb.github.io/pyusb/>`_ so a ``pyusb`` backend driver library such as
``libusb`` must be installed.

On Windows a tool such as `Zadig <https://zadig.akeo.ie/>`_ can be used to set the USB device driver to
``libusbK``.
On Windows, WinUSB and libusbK are both supported. Devices with WCID (Windows Compatible ID) descriptors,
such as candleLight firmware, will automatically use WinUSB without any additional driver installation.
Alternatively, a tool such as `Zadig <https://zadig.akeo.ie/>`_ can be used to set the USB device driver to
either ``WinUSB`` or ``libusbK``.


Supplementary Info
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ neovi = ["filelock", "python-ics>=2.12"]
canalystii = ["canalystii>=0.1.0"]
cantact = ["cantact>=0.0.7"]
cvector = ["python-can-cvector"]
gs-usb = ["gs-usb>=0.2.1"]
gs-usb = ["gs-usb>=0.2.1", "pyusb>=1.0.2"]
nixnet = ["nixnet>=0.3.2"]
pcan = ["uptime~=3.0.1"]
remote = ["python-can-remote"]
Expand Down Expand Up @@ -103,6 +103,7 @@ test = [
"coverage==7.13.*",
"hypothesis==6.*",
"parameterized==0.9.*",
"gs-usb==0.3.*",
]
dev = [
{include-group = "docs"},
Expand Down
90 changes: 90 additions & 0 deletions test/test_interface_gs_usb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
"""Tests for the gs_usb interface."""

from unittest.mock import MagicMock, patch

import pytest

from can.interfaces.gs_usb import (
GsUsbBus,
_find_gs_usb_device,
_scan_gs_usb_devices,
)


@patch("can.interfaces.gs_usb.usb.core.find")
def test_scan_does_not_force_backend(mock_find):
"""Verify that _scan_gs_usb_devices does not pass a backend argument,
allowing pyusb to auto-detect the best available backend (WinUSB, libusbK, etc.)."""
mock_find.return_value = []

_scan_gs_usb_devices()

mock_find.assert_called_once()
call_kwargs = mock_find.call_args[1]
assert (
"backend" not in call_kwargs
), "backend should not be specified so pyusb can auto-detect"
assert call_kwargs["find_all"] is True


@patch("can.interfaces.gs_usb.usb.core.find")
def test_find_does_not_force_backend(mock_find):
"""Verify that _find_gs_usb_device does not pass a backend argument,
allowing pyusb to auto-detect the best available backend (WinUSB, libusbK, etc.)."""
mock_find.return_value = None

_find_gs_usb_device(bus=1, address=2)

mock_find.assert_called_once()
call_kwargs = mock_find.call_args[1]
assert (
"backend" not in call_kwargs
), "backend should not be specified so pyusb can auto-detect"
assert call_kwargs["bus"] == 1
assert call_kwargs["address"] == 2


@patch("can.interfaces.gs_usb.usb.core.find")
def test_scan_returns_gs_usb_devices(mock_find):
"""Verify that _scan_gs_usb_devices wraps found USB devices in GsUsb objects."""
mock_dev1 = MagicMock()
mock_dev2 = MagicMock()
mock_find.return_value = [mock_dev1, mock_dev2]

devices = _scan_gs_usb_devices()

assert len(devices) == 2
assert devices[0].gs_usb is mock_dev1
assert devices[1].gs_usb is mock_dev2


@patch("can.interfaces.gs_usb.usb.core.find")
def test_find_returns_gs_usb_device(mock_find):
"""Verify that _find_gs_usb_device wraps the found USB device in a GsUsb object."""
mock_dev = MagicMock()
mock_find.return_value = mock_dev

device = _find_gs_usb_device(bus=1, address=2)

assert device is not None
assert device.gs_usb is mock_dev


@patch("can.interfaces.gs_usb.usb.core.find")
def test_find_returns_none_when_no_device(mock_find):
"""Verify that _find_gs_usb_device returns None when no device is found."""
mock_find.return_value = None

device = _find_gs_usb_device(bus=1, address=2)

assert device is None


@patch("can.interfaces.gs_usb.usb.core.find")
def test_scan_returns_empty_list_when_no_devices(mock_find):
"""Verify that _scan_gs_usb_devices returns an empty list when no devices are found."""
mock_find.return_value = []

devices = _scan_gs_usb_devices()

assert devices == []