Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
234 changes: 234 additions & 0 deletions selfdrive/car/tests/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from opendbc.car.fingerprints import MIGRATION
from opendbc.car.honda.values import CAR as HONDA, HondaFlags
from opendbc.car.structs import car
from opendbc.car.toyota.values import CarControllerParams as ToyotaCarControllerParams
from opendbc.car.tests.routes import non_tested_cars, routes, CarTestRoute
from opendbc.car.values import Platform, PLATFORMS
from opendbc.safety.tests.libsafety import libsafety_py
Expand All @@ -32,8 +33,17 @@
INTERNAL_SEG_LIST = os.environ.get("INTERNAL_SEG_LIST", "")
INTERNAL_SEG_CNT = int(os.environ.get("INTERNAL_SEG_CNT", "0"))
MAX_EXAMPLES = int(os.environ.get("MAX_EXAMPLES", "300"))
TX_FUZZ_MAX_EXAMPLES = int(os.environ.get("TX_FUZZ_MAX_EXAMPLES", "5"))
CI = os.environ.get("CI", None) is not None

TX_FUZZ_HISTORY = 300
TX_FUZZ_WINDOW = 50
TX_FUZZ_MAX_FRAMES = 200
TX_FUZZ_MIN_START = 300
MAX_FUZZ_CURVATURE = 0.02
MAX_FUZZ_STEERING_ANGLE_DELTA = 10.0
TOYOTA_TORQUE_HISTORY_SAMPLES = 6


def get_test_cases() -> list[tuple[str, CarTestRoute | None]]:
# build list of test cases
Expand Down Expand Up @@ -171,6 +181,113 @@ def setUp(self):
self.assertEqual(0, set_status, f"failed to set safetyModel {cfg}")
self.safety.init_tests()

def _reset_panda_safety(self):
cfg = self.CP.safetyConfigs[-1]
set_status = self.safety.set_safety_hooks(cfg.safetyModel.raw, cfg.safetyParam)
self.assertEqual(0, set_status, f"failed to set safetyModel {cfg}")
self.safety.init_tests()

def _replay_can_frame(self, CI, can, start_ts: int, assert_rx: bool) -> car.CarState:
t = int((can[0] - start_ts) / 1e3)
self.safety.set_timer(t)

CS = CI.update(can).as_reader()
for msg in filter(lambda m: m.src < 64, can[1]):
to_send = libsafety_py.make_CANPacket(msg.address, msg.src % 4, msg.dat)
ret = self.safety.safety_rx_hook(to_send)
if assert_rx:
self.assertEqual(1, ret, f"safety rx failed ({ret=}): {(msg.address, msg.src % 4)}")

self.safety.safety_tick_current_safety_config()
return CS

def _get_fuzzy_car_control(self, draw, CS: car.CarState):
CC = car.CarControl.new_message()

controls_allowed = bool(self.safety.get_controls_allowed())
cruise_engaged = bool(self.safety.get_cruise_engaged_prev())
longitudinal_allowed = bool(self.safety.get_longitudinal_allowed())

enabled = controls_allowed and draw(st.sampled_from([True, True, False]))
lat_active = enabled and draw(st.sampled_from([True, True, False]))
long_active = enabled and self.CP.openpilotLongitudinalControl and longitudinal_allowed and draw(st.sampled_from([True, True, False]))

CC.enabled = enabled
CC.latActive = lat_active
CC.longActive = long_active

cruise_actions = ["none"]
if cruise_engaged:
cruise_actions.append("cancel")
if controls_allowed and cruise_engaged:
cruise_actions.append("resume")
cruise_action = draw(st.sampled_from(cruise_actions))
if cruise_action == "cancel":
CC.cruiseControl.cancel = True
elif cruise_action == "resume":
CC.cruiseControl.resume = True

if long_active:
accel_min, accel_max = self.CarInterface.get_pid_accel_limits(self.CP, CS.vEgo, max(CS.vEgo, 0.0))
accel_min, accel_max = sorted((float(accel_min), float(accel_max)))
CC.actuators.accel = draw(st.floats(min_value=accel_min, max_value=accel_max,
allow_nan=False, allow_infinity=False))
CC.actuators.longControlState = car.CarControl.Actuators.LongControlState.pid
else:
CC.actuators.accel = 0.0
CC.actuators.longControlState = car.CarControl.Actuators.LongControlState.off

if lat_active:
if self.CP.steerControlType == SteerControlType.angle:
angle = float(CS.steeringAngleDeg)
CC.actuators.steeringAngleDeg = draw(st.floats(min_value=angle - MAX_FUZZ_STEERING_ANGLE_DELTA,
max_value=angle + MAX_FUZZ_STEERING_ANGLE_DELTA,
allow_nan=False, allow_infinity=False))
else:
CC.actuators.torque = draw(st.floats(min_value=-1.0, max_value=1.0,
allow_nan=False, allow_infinity=False))
CC.actuators.curvature = draw(st.floats(min_value=-MAX_FUZZ_CURVATURE, max_value=MAX_FUZZ_CURVATURE,
allow_nan=False, allow_infinity=False))
else:
CC.actuators.torque = 0.0
CC.actuators.steeringAngleDeg = 0.0
CC.actuators.curvature = 0.0

return CC.as_reader()

def _get_sendcan_message(self, sendcan, addr: int):
return next(((dat, bus) for tx_addr, dat, bus in sendcan if tx_addr == addr), None)

def _inject_toyota_torque_history(self, CI, eps_torque: int, driver_torque: int):
eps_scale = self.CP.safetyConfigs[-1].safetyParam & 0xFF
self.assertGreater(eps_scale, 0, "invalid Toyota EPS scale")

values = {
"STEER_TORQUE_EPS": (eps_torque / eps_scale) * 100.0,
"STEER_TORQUE_DRIVER": driver_torque,
}
addr, dat, bus = CI.CC.packer.make_can_msg("STEER_TORQUE_SENSOR", 0, values)

for _ in range(TOYOTA_TORQUE_HISTORY_SAMPLES):
ret = self.safety.safety_rx_hook(libsafety_py.make_CANPacket(addr, bus % 4, dat))
self.assertEqual(1, ret, f"Toyota torque history rx failed: {(hex(addr), dat.hex(), bus)}")

self.safety.safety_tick_current_safety_config()

def _get_toyota_lta_regression_control(self, CS: car.CarState, lat_active: bool, angle_offset: float):
CC = car.CarControl.new_message()

CC.enabled = True
CC.latActive = lat_active
CC.longActive = False
CC.actuators.accel = 0.0
CC.actuators.longControlState = car.CarControl.Actuators.LongControlState.off
CC.actuators.torque = 0.0
CC.actuators.curvature = 0.0
CC.actuators.steeringAngleDeg = float(CS.steeringAngleDeg + (angle_offset if lat_active else 0.0))

return CC.as_reader()

def test_car_params(self):
if self.CP.dashcamOnly:
self.skipTest("no need to check carParams for dashcamOnly")
Expand Down Expand Up @@ -295,6 +412,123 @@ def test_car_controller(car_control):
test_car_controller(CC.as_reader())

# Skip stdout/stderr capture with pytest, causes elevated memory usage
@pytest.mark.nocapture
@settings(max_examples=TX_FUZZ_MAX_EXAMPLES, deadline=None,
phases=(Phase.reuse, Phase.generate, Phase.shrink))
@given(data=st.data())
def test_panda_safety_tx_fuzzy(self, data):
"""
Replay short real CAN windows into panda safety and the CarInterface, then
fuzz plausible openpilot control outputs on top and ensure panda accepts
every generated TX message.
"""
if self.CP.dashcamOnly:
self.skipTest("no need to check panda safety for dashcamOnly")

if self.CP.notCar:
self.skipTest("Skipping test for notCar")

min_start_idx = max(TX_FUZZ_MIN_START, self.elm_frame or 0, self.car_safety_mode_frame or 0)
max_start_idx = len(self.can_msgs) - TX_FUZZ_MAX_FRAMES
self.assertGreater(max_start_idx, min_start_idx, "route too short for tx fuzz testing")
start_idx = data.draw(st.integers(min_value=min_start_idx, max_value=max_start_idx))

CI = self.CarInterface(self.CP.copy())
self._reset_panda_safety()

warmup_start_idx = max(0, start_idx - TX_FUZZ_HISTORY)
start_ts = self.can_msgs[warmup_start_idx][0]

for can in self.can_msgs[warmup_start_idx:start_idx]:
self._replay_can_frame(CI, can, start_ts, assert_rx=False)

checked_msgs = 0
valid_frames = 0
for frame_idx, can in enumerate(self.can_msgs[start_idx:start_idx + TX_FUZZ_MAX_FRAMES], start=start_idx):
CS = self._replay_can_frame(CI, can, start_ts, assert_rx=True)
if not CS.canValid or not self.safety.safety_config_valid():
continue

valid_frames += 1
CC = self._get_fuzzy_car_control(data.draw, CS)
_, sendcan = CI.apply(CC, can[0])

for addr, dat, bus in sendcan:
to_send = libsafety_py.make_CANPacket(addr, bus % 4, dat)
self.assertTrue(self.safety.safety_tx_hook(to_send),
f"panda blocked openpilot tx at frame {frame_idx}: {(hex(addr), dat.hex(), bus)}")
checked_msgs += 1

if checked_msgs > 0 and valid_frames >= TX_FUZZ_WINDOW:
break

self.assertGreaterEqual(valid_frames, TX_FUZZ_WINDOW, "tx fuzz test did not find enough valid frames")
self.assertGreater(checked_msgs, 0, "tx fuzz test did not exercise any messages")

@pytest.mark.nocapture
def test_toyota_lta_tx_regression(self):
"""
Exercise accepted, blocked, inactive, and recovered Toyota LTA TX paths on
real route data.
"""
if self.CP.brand != "toyota" or self.CP.steerControlType != SteerControlType.angle:
self.skipTest("only applies to Toyota angle-control platforms")

min_start_idx = max(TX_FUZZ_MIN_START, self.elm_frame or 0, self.car_safety_mode_frame or 0)
CI = self.CarInterface(self.CP.copy())
self._reset_panda_safety()

warmup_start_idx = max(0, min_start_idx - TX_FUZZ_HISTORY)
start_ts = self.can_msgs[warmup_start_idx][0]
for can in self.can_msgs[warmup_start_idx:min_start_idx]:
self._replay_can_frame(CI, can, start_ts, assert_rx=False)

phase_plan = (
("accepted_active", True, 0.5, None, True, 100),
("blocked_active", True, 1.0, (0, ToyotaCarControllerParams.MAX_LTA_DRIVER_TORQUE_ALLOWANCE + 1), False, 100),
("recovered_active", True, -0.5, (0, 0), True, 100),
("accepted_inactive", False, 0.0, None, True, 0),
)
phase_idx = 0

for frame_idx, can in enumerate(self.can_msgs[min_start_idx:], start=min_start_idx):
CS = self._replay_can_frame(CI, can, start_ts, assert_rx=True)
if not CS.canValid or not self.safety.safety_config_valid():
continue

phase_name, lat_active, angle_offset, torque_history, should_allow_lta, expected_wind_down = phase_plan[phase_idx]

self.safety.set_controls_allowed(True)
self.safety.set_cruise_engaged_prev(True)
if torque_history is not None:
self._inject_toyota_torque_history(CI, *torque_history)

CC = self._get_toyota_lta_regression_control(CS, lat_active, angle_offset)
_, sendcan = CI.apply(CC, can[0])
if self._get_sendcan_message(sendcan, 0x191) is None:
continue

saw_lta = False
for addr, dat, bus in sendcan:
to_send = libsafety_py.make_CANPacket(addr, bus % 4, dat)
allowed = self.safety.safety_tx_hook(to_send)

if addr == 0x191:
saw_lta = True
self.assertEqual(should_allow_lta, allowed,
f"unexpected Toyota LTA tx result for {phase_name} at frame {frame_idx}: {(hex(addr), dat.hex(), bus)}")
self.assertEqual(lat_active, bool(dat[0] & 0x1), f"unexpected Toyota LTA active bit for {phase_name}")
self.assertEqual(expected_wind_down, dat[5], f"unexpected Toyota LTA torque wind down for {phase_name}")
else:
self.assertTrue(allowed, f"toyota LTA regression blocked non-LTA tx for {phase_name}: {(hex(addr), dat.hex(), bus)}")

self.assertTrue(saw_lta, f"Toyota regression did not emit LTA message for {phase_name}")
phase_idx += 1
if phase_idx >= len(phase_plan):
break

self.assertEqual(phase_idx, len(phase_plan), "failed to exercise full Toyota LTA regression sequence")

@pytest.mark.nocapture
@settings(max_examples=MAX_EXAMPLES, deadline=None,
phases=(Phase.reuse, Phase.generate, Phase.shrink))
Expand Down
Loading