From d3f04ca726685232414d8cc5e0708b27ba29aff3 Mon Sep 17 00:00:00 2001 From: Felmon Fekadu Date: Tue, 24 Mar 2026 00:24:30 -0600 Subject: [PATCH 1/3] selfdrive: add route-backed panda TX fuzz coverage Replay real CAN windows into panda safety and the CarInterface before fuzzing plausible CarControl outputs, so TX checks run against warmed-up safety state instead of random payloads alone. Also add a Toyota LTA route-backed regression that alternates active and inactive angle commands to cover desired-angle history across state transitions. --- selfdrive/car/tests/test_models.py | 195 +++++++++++++++++++++++++++++ 1 file changed, 195 insertions(+) diff --git a/selfdrive/car/tests/test_models.py b/selfdrive/car/tests/test_models.py index 94f5b332319ebc..87ecf2ee391a4b 100644 --- a/selfdrive/car/tests/test_models.py +++ b/selfdrive/car/tests/test_models.py @@ -32,8 +32,16 @@ INTERNAL_SEG_LIST = os.environ.get("INTERNAL_SEG_LIST", "") INTERNAL_SEG_CNT = int(os.environ.get("INTERNAL_SEG_CNT", "0")) MAX_EXAMPLES = int(os.environ.get("MAX_EXAMPLES", "300")) +TX_FUZZ_MAX_EXAMPLES = int(os.environ.get("TX_FUZZ_MAX_EXAMPLES", "5")) CI = os.environ.get("CI", None) is not None +TX_FUZZ_HISTORY = 300 +TX_FUZZ_WINDOW = 50 +TX_FUZZ_MIN_START = 300 +MAX_FUZZ_CURVATURE = 0.02 +MAX_FUZZ_STEERING_ANGLE_DELTA = 10.0 +TOYOTA_LTA_REGRESSION_FRAMES = 12 + def get_test_cases() -> list[tuple[str, CarTestRoute | None]]: # build list of test cases @@ -171,6 +179,96 @@ def setUp(self): self.assertEqual(0, set_status, f"failed to set safetyModel {cfg}") self.safety.init_tests() + def _reset_panda_safety(self): + cfg = self.CP.safetyConfigs[-1] + set_status = self.safety.set_safety_hooks(cfg.safetyModel.raw, cfg.safetyParam) + self.assertEqual(0, set_status, f"failed to set safetyModel {cfg}") + self.safety.init_tests() + + def _replay_can_frame(self, CI, can, start_ts: int, assert_rx: bool) -> car.CarState: + t = int((can[0] - start_ts) / 1e3) + self.safety.set_timer(t) + + CS = CI.update(can).as_reader() + for msg in filter(lambda m: m.src < 64, can[1]): + to_send = libsafety_py.make_CANPacket(msg.address, msg.src % 4, msg.dat) + ret = self.safety.safety_rx_hook(to_send) + if assert_rx: + self.assertEqual(1, ret, f"safety rx failed ({ret=}): {(msg.address, msg.src % 4)}") + + self.safety.safety_tick_current_safety_config() + return CS + + def _get_fuzzy_car_control(self, draw, CS: car.CarState): + CC = car.CarControl.new_message() + + controls_allowed = bool(self.safety.get_controls_allowed()) + cruise_engaged = bool(self.safety.get_cruise_engaged_prev()) + longitudinal_allowed = bool(self.safety.get_longitudinal_allowed()) + + enabled = controls_allowed and draw(st.sampled_from([True, True, False])) + lat_active = enabled and draw(st.sampled_from([True, True, False])) + long_active = enabled and self.CP.openpilotLongitudinalControl and longitudinal_allowed and draw(st.sampled_from([True, True, False])) + + CC.enabled = enabled + CC.latActive = lat_active + CC.longActive = long_active + + cruise_actions = ["none"] + if cruise_engaged: + cruise_actions.append("cancel") + if controls_allowed and cruise_engaged: + cruise_actions.append("resume") + cruise_action = draw(st.sampled_from(cruise_actions)) + if cruise_action == "cancel": + CC.cruiseControl.cancel = True + elif cruise_action == "resume": + CC.cruiseControl.resume = True + + if long_active: + accel_min, accel_max = self.CarInterface.get_pid_accel_limits(self.CP, CS.vEgo, max(CS.vEgo, 0.0)) + accel_min, accel_max = sorted((float(accel_min), float(accel_max))) + CC.actuators.accel = draw(st.floats(min_value=accel_min, max_value=accel_max, + allow_nan=False, allow_infinity=False)) + CC.actuators.longControlState = car.CarControl.Actuators.LongControlState.pid + else: + CC.actuators.accel = 0.0 + CC.actuators.longControlState = car.CarControl.Actuators.LongControlState.off + + if lat_active: + if self.CP.steerControlType == SteerControlType.angle: + angle = float(CS.steeringAngleDeg) + CC.actuators.steeringAngleDeg = draw(st.floats(min_value=angle - MAX_FUZZ_STEERING_ANGLE_DELTA, + max_value=angle + MAX_FUZZ_STEERING_ANGLE_DELTA, + allow_nan=False, allow_infinity=False)) + else: + CC.actuators.torque = draw(st.floats(min_value=-1.0, max_value=1.0, + allow_nan=False, allow_infinity=False)) + CC.actuators.curvature = draw(st.floats(min_value=-MAX_FUZZ_CURVATURE, max_value=MAX_FUZZ_CURVATURE, + allow_nan=False, allow_infinity=False)) + else: + CC.actuators.torque = 0.0 + CC.actuators.steeringAngleDeg = 0.0 + CC.actuators.curvature = 0.0 + + return CC.as_reader() + + def _get_toyota_lta_regression_control(self, CS: car.CarState, step: int): + CC = car.CarControl.new_message() + active = (step % 4) < 2 + angle_offset = 0.5 if (step % 8) < 4 else -0.5 + + CC.enabled = True + CC.latActive = active + CC.longActive = False + CC.actuators.accel = 0.0 + CC.actuators.longControlState = car.CarControl.Actuators.LongControlState.off + CC.actuators.torque = 0.0 + CC.actuators.curvature = 0.0 + CC.actuators.steeringAngleDeg = float(CS.steeringAngleDeg + (angle_offset if active else 0.0)) + + return CC.as_reader() + def test_car_params(self): if self.CP.dashcamOnly: self.skipTest("no need to check carParams for dashcamOnly") @@ -295,6 +393,103 @@ def test_car_controller(car_control): test_car_controller(CC.as_reader()) # Skip stdout/stderr capture with pytest, causes elevated memory usage + @pytest.mark.nocapture + @settings(max_examples=TX_FUZZ_MAX_EXAMPLES, deadline=None, + phases=(Phase.reuse, Phase.generate, Phase.shrink)) + @given(data=st.data()) + def test_panda_safety_tx_fuzzy(self, data): + """ + Replay short real CAN windows into panda safety and the CarInterface, then + fuzz plausible openpilot control outputs on top and ensure panda accepts + every generated TX message. + """ + if self.CP.dashcamOnly: + self.skipTest("no need to check panda safety for dashcamOnly") + + if self.CP.notCar: + self.skipTest("Skipping test for notCar") + + min_start_idx = max(TX_FUZZ_MIN_START, self.elm_frame or 0, self.car_safety_mode_frame or 0) + max_start_idx = len(self.can_msgs) - TX_FUZZ_WINDOW + self.assertGreater(max_start_idx, min_start_idx, "route too short for tx fuzz testing") + start_idx = data.draw(st.integers(min_value=min_start_idx, max_value=max_start_idx)) + + CI = self.CarInterface(self.CP.copy()) + self._reset_panda_safety() + + warmup_start_idx = max(0, start_idx - TX_FUZZ_HISTORY) + start_ts = self.can_msgs[warmup_start_idx][0] + + for can in self.can_msgs[warmup_start_idx:start_idx]: + self._replay_can_frame(CI, can, start_ts, assert_rx=False) + + checked_msgs = 0 + for frame_idx, can in enumerate(self.can_msgs[start_idx:start_idx + TX_FUZZ_WINDOW], start=start_idx): + CS = self._replay_can_frame(CI, can, start_ts, assert_rx=True) + if not CS.canValid or not self.safety.safety_config_valid(): + continue + + CC = self._get_fuzzy_car_control(data.draw, CS) + _, sendcan = CI.apply(CC, can[0]) + + for addr, dat, bus in sendcan: + to_send = libsafety_py.make_CANPacket(addr, bus % 4, dat) + self.assertTrue(self.safety.safety_tx_hook(to_send), + f"panda blocked openpilot tx at frame {frame_idx}: {(hex(addr), dat.hex(), bus)}") + checked_msgs += 1 + + self.assertGreater(checked_msgs, 0, "tx fuzz test did not exercise any messages") + + @pytest.mark.nocapture + def test_toyota_lta_history_regression(self): + """ + Exercise a deterministic active/inactive LTA sequence on real route data to cover + desired-angle history handling across state transitions. + """ + if self.CP.brand != "toyota" or self.CP.steerControlType != SteerControlType.angle: + self.skipTest("only applies to Toyota angle-control platforms") + + min_start_idx = max(TX_FUZZ_MIN_START, self.elm_frame or 0, self.car_safety_mode_frame or 0) + CI = self.CarInterface(self.CP.copy()) + self._reset_panda_safety() + + warmup_start_idx = max(0, min_start_idx - TX_FUZZ_HISTORY) + start_ts = self.can_msgs[warmup_start_idx][0] + for can in self.can_msgs[warmup_start_idx:min_start_idx]: + self._replay_can_frame(CI, can, start_ts, assert_rx=False) + + exercised_frames = 0 + lta_msgs = 0 + saw_active_lta = False + saw_inactive_lta = False + + for frame_idx, can in enumerate(self.can_msgs[min_start_idx:], start=min_start_idx): + CS = self._replay_can_frame(CI, can, start_ts, assert_rx=True) + if not CS.canValid or not self.safety.safety_config_valid(): + continue + + CC = self._get_toyota_lta_regression_control(CS, exercised_frames) + _, sendcan = CI.apply(CC, can[0]) + + for addr, dat, bus in sendcan: + to_send = libsafety_py.make_CANPacket(addr, bus % 4, dat) + self.assertTrue(self.safety.safety_tx_hook(to_send), + f"toyota LTA regression blocked tx at frame {frame_idx}: {(hex(addr), dat.hex(), bus)}") + + if addr == 0x191: + lta_msgs += 1 + active = bool(dat[0] & 0x1) + saw_active_lta |= active + saw_inactive_lta |= not active + + exercised_frames += 1 + if exercised_frames >= TOYOTA_LTA_REGRESSION_FRAMES: + break + + self.assertEqual(exercised_frames, TOYOTA_LTA_REGRESSION_FRAMES, "failed to find enough valid Toyota LTA frames") + self.assertGreater(lta_msgs, 0, "Toyota regression did not exercise any LTA messages") + self.assertTrue(saw_active_lta and saw_inactive_lta, "Toyota regression did not cover active and inactive LTA states") + @pytest.mark.nocapture @settings(max_examples=MAX_EXAMPLES, deadline=None, phases=(Phase.reuse, Phase.generate, Phase.shrink)) From 03dddeeca524ab53f2f895265e92924a6f38b0e5 Mon Sep 17 00:00:00 2001 From: Felmon Fekadu Date: Tue, 24 Mar 2026 00:45:28 -0600 Subject: [PATCH 2/3] selfdrive: harden replayed panda TX fuzz coverage Bound the replayed TX fuzz test on valid frames so it reliably exercises controller output instead of failing on empty route slices. Also replace the Toyota LTA all-pass sequence with a deterministic accepted/blocked/inactive/recovered regression that forces the over-threshold panda torque path on openpilot-generated messages. --- selfdrive/car/tests/test_models.py | 81 ++++++++++++++++++++---------- 1 file changed, 54 insertions(+), 27 deletions(-) diff --git a/selfdrive/car/tests/test_models.py b/selfdrive/car/tests/test_models.py index 87ecf2ee391a4b..60158799307f26 100644 --- a/selfdrive/car/tests/test_models.py +++ b/selfdrive/car/tests/test_models.py @@ -14,6 +14,7 @@ from opendbc.car.fingerprints import MIGRATION from opendbc.car.honda.values import CAR as HONDA, HondaFlags from opendbc.car.structs import car +from opendbc.car.toyota.values import CarControllerParams as ToyotaCarControllerParams from opendbc.car.tests.routes import non_tested_cars, routes, CarTestRoute from opendbc.car.values import Platform, PLATFORMS from opendbc.safety.tests.libsafety import libsafety_py @@ -37,10 +38,10 @@ TX_FUZZ_HISTORY = 300 TX_FUZZ_WINDOW = 50 +TX_FUZZ_MAX_FRAMES = 200 TX_FUZZ_MIN_START = 300 MAX_FUZZ_CURVATURE = 0.02 MAX_FUZZ_STEERING_ANGLE_DELTA = 10.0 -TOYOTA_LTA_REGRESSION_FRAMES = 12 def get_test_cases() -> list[tuple[str, CarTestRoute | None]]: @@ -253,19 +254,20 @@ def _get_fuzzy_car_control(self, draw, CS: car.CarState): return CC.as_reader() - def _get_toyota_lta_regression_control(self, CS: car.CarState, step: int): + def _get_sendcan_message(self, sendcan, addr: int): + return next(((dat, bus) for tx_addr, dat, bus in sendcan if tx_addr == addr), None) + + def _get_toyota_lta_regression_control(self, CS: car.CarState, lat_active: bool, angle_offset: float): CC = car.CarControl.new_message() - active = (step % 4) < 2 - angle_offset = 0.5 if (step % 8) < 4 else -0.5 CC.enabled = True - CC.latActive = active + CC.latActive = lat_active CC.longActive = False CC.actuators.accel = 0.0 CC.actuators.longControlState = car.CarControl.Actuators.LongControlState.off CC.actuators.torque = 0.0 CC.actuators.curvature = 0.0 - CC.actuators.steeringAngleDeg = float(CS.steeringAngleDeg + (angle_offset if active else 0.0)) + CC.actuators.steeringAngleDeg = float(CS.steeringAngleDeg + (angle_offset if lat_active else 0.0)) return CC.as_reader() @@ -410,7 +412,7 @@ def test_panda_safety_tx_fuzzy(self, data): self.skipTest("Skipping test for notCar") min_start_idx = max(TX_FUZZ_MIN_START, self.elm_frame or 0, self.car_safety_mode_frame or 0) - max_start_idx = len(self.can_msgs) - TX_FUZZ_WINDOW + max_start_idx = len(self.can_msgs) - TX_FUZZ_MAX_FRAMES self.assertGreater(max_start_idx, min_start_idx, "route too short for tx fuzz testing") start_idx = data.draw(st.integers(min_value=min_start_idx, max_value=max_start_idx)) @@ -424,11 +426,13 @@ def test_panda_safety_tx_fuzzy(self, data): self._replay_can_frame(CI, can, start_ts, assert_rx=False) checked_msgs = 0 - for frame_idx, can in enumerate(self.can_msgs[start_idx:start_idx + TX_FUZZ_WINDOW], start=start_idx): + valid_frames = 0 + for frame_idx, can in enumerate(self.can_msgs[start_idx:start_idx + TX_FUZZ_MAX_FRAMES], start=start_idx): CS = self._replay_can_frame(CI, can, start_ts, assert_rx=True) if not CS.canValid or not self.safety.safety_config_valid(): continue + valid_frames += 1 CC = self._get_fuzzy_car_control(data.draw, CS) _, sendcan = CI.apply(CC, can[0]) @@ -438,13 +442,17 @@ def test_panda_safety_tx_fuzzy(self, data): f"panda blocked openpilot tx at frame {frame_idx}: {(hex(addr), dat.hex(), bus)}") checked_msgs += 1 + if checked_msgs > 0 and valid_frames >= TX_FUZZ_WINDOW: + break + + self.assertGreaterEqual(valid_frames, TX_FUZZ_WINDOW, "tx fuzz test did not find enough valid frames") self.assertGreater(checked_msgs, 0, "tx fuzz test did not exercise any messages") @pytest.mark.nocapture - def test_toyota_lta_history_regression(self): + def test_toyota_lta_tx_regression(self): """ - Exercise a deterministic active/inactive LTA sequence on real route data to cover - desired-angle history handling across state transitions. + Exercise accepted, blocked, inactive, and recovered Toyota LTA TX paths on + real route data. """ if self.CP.brand != "toyota" or self.CP.steerControlType != SteerControlType.angle: self.skipTest("only applies to Toyota angle-control platforms") @@ -458,37 +466,56 @@ def test_toyota_lta_history_regression(self): for can in self.can_msgs[warmup_start_idx:min_start_idx]: self._replay_can_frame(CI, can, start_ts, assert_rx=False) - exercised_frames = 0 - lta_msgs = 0 - saw_active_lta = False - saw_inactive_lta = False + phase_plan = ( + ("accepted_active", True, 0.5, False, True, 100), + ("blocked_active", True, 1.0, True, False, 100), + ("accepted_inactive", False, 0.0, False, True, 0), + ("recovered_active", True, -0.5, False, True, 100), + ) + phase_idx = 0 for frame_idx, can in enumerate(self.can_msgs[min_start_idx:], start=min_start_idx): CS = self._replay_can_frame(CI, can, start_ts, assert_rx=True) if not CS.canValid or not self.safety.safety_config_valid(): continue - CC = self._get_toyota_lta_regression_control(CS, exercised_frames) + phase_name, lat_active, angle_offset, inject_torque_limit, should_allow_lta, expected_wind_down = phase_plan[phase_idx] + + self.safety.set_controls_allowed(True) + self.safety.set_cruise_engaged_prev(True) + if inject_torque_limit: + self.safety.set_torque_meas(ToyotaCarControllerParams.STEER_MAX + 1, ToyotaCarControllerParams.STEER_MAX + 1) + self.safety.set_torque_driver(ToyotaCarControllerParams.MAX_LTA_DRIVER_TORQUE_ALLOWANCE + 1, + ToyotaCarControllerParams.MAX_LTA_DRIVER_TORQUE_ALLOWANCE + 1) + else: + self.safety.set_torque_meas(0, 0) + self.safety.set_torque_driver(0, 0) + + CC = self._get_toyota_lta_regression_control(CS, lat_active, angle_offset) _, sendcan = CI.apply(CC, can[0]) + if self._get_sendcan_message(sendcan, 0x191) is None: + continue + saw_lta = False for addr, dat, bus in sendcan: to_send = libsafety_py.make_CANPacket(addr, bus % 4, dat) - self.assertTrue(self.safety.safety_tx_hook(to_send), - f"toyota LTA regression blocked tx at frame {frame_idx}: {(hex(addr), dat.hex(), bus)}") + allowed = self.safety.safety_tx_hook(to_send) if addr == 0x191: - lta_msgs += 1 - active = bool(dat[0] & 0x1) - saw_active_lta |= active - saw_inactive_lta |= not active + saw_lta = True + self.assertEqual(should_allow_lta, allowed, + f"unexpected Toyota LTA tx result for {phase_name} at frame {frame_idx}: {(hex(addr), dat.hex(), bus)}") + self.assertEqual(lat_active, bool(dat[0] & 0x1), f"unexpected Toyota LTA active bit for {phase_name}") + self.assertEqual(expected_wind_down, dat[5], f"unexpected Toyota LTA torque wind down for {phase_name}") + else: + self.assertTrue(allowed, f"toyota LTA regression blocked non-LTA tx for {phase_name}: {(hex(addr), dat.hex(), bus)}") - exercised_frames += 1 - if exercised_frames >= TOYOTA_LTA_REGRESSION_FRAMES: + self.assertTrue(saw_lta, f"Toyota regression did not emit LTA message for {phase_name}") + phase_idx += 1 + if phase_idx >= len(phase_plan): break - self.assertEqual(exercised_frames, TOYOTA_LTA_REGRESSION_FRAMES, "failed to find enough valid Toyota LTA frames") - self.assertGreater(lta_msgs, 0, "Toyota regression did not exercise any LTA messages") - self.assertTrue(saw_active_lta and saw_inactive_lta, "Toyota regression did not cover active and inactive LTA states") + self.assertEqual(phase_idx, len(phase_plan), "failed to exercise full Toyota LTA regression sequence") @pytest.mark.nocapture @settings(max_examples=MAX_EXAMPLES, deadline=None, From a119a1bd0045eadf35703d3d1b2a8d8ab215a056 Mon Sep 17 00:00:00 2001 From: Felmon Fekadu Date: Tue, 24 Mar 2026 01:00:01 -0600 Subject: [PATCH 3/3] selfdrive: drive Toyota TX regression through RX history Replace the Toyota regression's direct panda torque setters with repeated STEER_TORQUE_SENSOR RX samples, and move recovery ahead of the inactive phase so the test exercises blocked-history behavior through the real safety RX path. --- selfdrive/car/tests/test_models.py | 36 ++++++++++++++++++++---------- 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/selfdrive/car/tests/test_models.py b/selfdrive/car/tests/test_models.py index 60158799307f26..71fab6450f2223 100644 --- a/selfdrive/car/tests/test_models.py +++ b/selfdrive/car/tests/test_models.py @@ -42,6 +42,7 @@ TX_FUZZ_MIN_START = 300 MAX_FUZZ_CURVATURE = 0.02 MAX_FUZZ_STEERING_ANGLE_DELTA = 10.0 +TOYOTA_TORQUE_HISTORY_SAMPLES = 6 def get_test_cases() -> list[tuple[str, CarTestRoute | None]]: @@ -257,6 +258,22 @@ def _get_fuzzy_car_control(self, draw, CS: car.CarState): def _get_sendcan_message(self, sendcan, addr: int): return next(((dat, bus) for tx_addr, dat, bus in sendcan if tx_addr == addr), None) + def _inject_toyota_torque_history(self, CI, eps_torque: int, driver_torque: int): + eps_scale = self.CP.safetyConfigs[-1].safetyParam & 0xFF + self.assertGreater(eps_scale, 0, "invalid Toyota EPS scale") + + values = { + "STEER_TORQUE_EPS": (eps_torque / eps_scale) * 100.0, + "STEER_TORQUE_DRIVER": driver_torque, + } + addr, dat, bus = CI.CC.packer.make_can_msg("STEER_TORQUE_SENSOR", 0, values) + + for _ in range(TOYOTA_TORQUE_HISTORY_SAMPLES): + ret = self.safety.safety_rx_hook(libsafety_py.make_CANPacket(addr, bus % 4, dat)) + self.assertEqual(1, ret, f"Toyota torque history rx failed: {(hex(addr), dat.hex(), bus)}") + + self.safety.safety_tick_current_safety_config() + def _get_toyota_lta_regression_control(self, CS: car.CarState, lat_active: bool, angle_offset: float): CC = car.CarControl.new_message() @@ -467,10 +484,10 @@ def test_toyota_lta_tx_regression(self): self._replay_can_frame(CI, can, start_ts, assert_rx=False) phase_plan = ( - ("accepted_active", True, 0.5, False, True, 100), - ("blocked_active", True, 1.0, True, False, 100), - ("accepted_inactive", False, 0.0, False, True, 0), - ("recovered_active", True, -0.5, False, True, 100), + ("accepted_active", True, 0.5, None, True, 100), + ("blocked_active", True, 1.0, (0, ToyotaCarControllerParams.MAX_LTA_DRIVER_TORQUE_ALLOWANCE + 1), False, 100), + ("recovered_active", True, -0.5, (0, 0), True, 100), + ("accepted_inactive", False, 0.0, None, True, 0), ) phase_idx = 0 @@ -479,17 +496,12 @@ def test_toyota_lta_tx_regression(self): if not CS.canValid or not self.safety.safety_config_valid(): continue - phase_name, lat_active, angle_offset, inject_torque_limit, should_allow_lta, expected_wind_down = phase_plan[phase_idx] + phase_name, lat_active, angle_offset, torque_history, should_allow_lta, expected_wind_down = phase_plan[phase_idx] self.safety.set_controls_allowed(True) self.safety.set_cruise_engaged_prev(True) - if inject_torque_limit: - self.safety.set_torque_meas(ToyotaCarControllerParams.STEER_MAX + 1, ToyotaCarControllerParams.STEER_MAX + 1) - self.safety.set_torque_driver(ToyotaCarControllerParams.MAX_LTA_DRIVER_TORQUE_ALLOWANCE + 1, - ToyotaCarControllerParams.MAX_LTA_DRIVER_TORQUE_ALLOWANCE + 1) - else: - self.safety.set_torque_meas(0, 0) - self.safety.set_torque_driver(0, 0) + if torque_history is not None: + self._inject_toyota_torque_history(CI, *torque_history) CC = self._get_toyota_lta_regression_control(CS, lat_active, angle_offset) _, sendcan = CI.apply(CC, can[0])