diff --git a/selfdrive/car/tests/test_models.py b/selfdrive/car/tests/test_models.py index 94f5b332319ebc..71fab6450f2223 100644 --- a/selfdrive/car/tests/test_models.py +++ b/selfdrive/car/tests/test_models.py @@ -14,6 +14,7 @@ from opendbc.car.fingerprints import MIGRATION from opendbc.car.honda.values import CAR as HONDA, HondaFlags from opendbc.car.structs import car +from opendbc.car.toyota.values import CarControllerParams as ToyotaCarControllerParams from opendbc.car.tests.routes import non_tested_cars, routes, CarTestRoute from opendbc.car.values import Platform, PLATFORMS from opendbc.safety.tests.libsafety import libsafety_py @@ -32,8 +33,17 @@ INTERNAL_SEG_LIST = os.environ.get("INTERNAL_SEG_LIST", "") INTERNAL_SEG_CNT = int(os.environ.get("INTERNAL_SEG_CNT", "0")) MAX_EXAMPLES = int(os.environ.get("MAX_EXAMPLES", "300")) +TX_FUZZ_MAX_EXAMPLES = int(os.environ.get("TX_FUZZ_MAX_EXAMPLES", "5")) CI = os.environ.get("CI", None) is not None +TX_FUZZ_HISTORY = 300 +TX_FUZZ_WINDOW = 50 +TX_FUZZ_MAX_FRAMES = 200 +TX_FUZZ_MIN_START = 300 +MAX_FUZZ_CURVATURE = 0.02 +MAX_FUZZ_STEERING_ANGLE_DELTA = 10.0 +TOYOTA_TORQUE_HISTORY_SAMPLES = 6 + def get_test_cases() -> list[tuple[str, CarTestRoute | None]]: # build list of test cases @@ -171,6 +181,113 @@ def setUp(self): self.assertEqual(0, set_status, f"failed to set safetyModel {cfg}") self.safety.init_tests() + def _reset_panda_safety(self): + cfg = self.CP.safetyConfigs[-1] + set_status = self.safety.set_safety_hooks(cfg.safetyModel.raw, cfg.safetyParam) + self.assertEqual(0, set_status, f"failed to set safetyModel {cfg}") + self.safety.init_tests() + + def _replay_can_frame(self, CI, can, start_ts: int, assert_rx: bool) -> car.CarState: + t = int((can[0] - start_ts) / 1e3) + self.safety.set_timer(t) + + CS = CI.update(can).as_reader() + for msg in filter(lambda m: m.src < 64, can[1]): + to_send = libsafety_py.make_CANPacket(msg.address, msg.src % 4, msg.dat) + ret = self.safety.safety_rx_hook(to_send) + if assert_rx: + self.assertEqual(1, ret, f"safety rx failed ({ret=}): {(msg.address, msg.src % 4)}") + + self.safety.safety_tick_current_safety_config() + return CS + + def _get_fuzzy_car_control(self, draw, CS: car.CarState): + CC = car.CarControl.new_message() + + controls_allowed = bool(self.safety.get_controls_allowed()) + cruise_engaged = bool(self.safety.get_cruise_engaged_prev()) + longitudinal_allowed = bool(self.safety.get_longitudinal_allowed()) + + enabled = controls_allowed and draw(st.sampled_from([True, True, False])) + lat_active = enabled and draw(st.sampled_from([True, True, False])) + long_active = enabled and self.CP.openpilotLongitudinalControl and longitudinal_allowed and draw(st.sampled_from([True, True, False])) + + CC.enabled = enabled + CC.latActive = lat_active + CC.longActive = long_active + + cruise_actions = ["none"] + if cruise_engaged: + cruise_actions.append("cancel") + if controls_allowed and cruise_engaged: + cruise_actions.append("resume") + cruise_action = draw(st.sampled_from(cruise_actions)) + if cruise_action == "cancel": + CC.cruiseControl.cancel = True + elif cruise_action == "resume": + CC.cruiseControl.resume = True + + if long_active: + accel_min, accel_max = self.CarInterface.get_pid_accel_limits(self.CP, CS.vEgo, max(CS.vEgo, 0.0)) + accel_min, accel_max = sorted((float(accel_min), float(accel_max))) + CC.actuators.accel = draw(st.floats(min_value=accel_min, max_value=accel_max, + allow_nan=False, allow_infinity=False)) + CC.actuators.longControlState = car.CarControl.Actuators.LongControlState.pid + else: + CC.actuators.accel = 0.0 + CC.actuators.longControlState = car.CarControl.Actuators.LongControlState.off + + if lat_active: + if self.CP.steerControlType == SteerControlType.angle: + angle = float(CS.steeringAngleDeg) + CC.actuators.steeringAngleDeg = draw(st.floats(min_value=angle - MAX_FUZZ_STEERING_ANGLE_DELTA, + max_value=angle + MAX_FUZZ_STEERING_ANGLE_DELTA, + allow_nan=False, allow_infinity=False)) + else: + CC.actuators.torque = draw(st.floats(min_value=-1.0, max_value=1.0, + allow_nan=False, allow_infinity=False)) + CC.actuators.curvature = draw(st.floats(min_value=-MAX_FUZZ_CURVATURE, max_value=MAX_FUZZ_CURVATURE, + allow_nan=False, allow_infinity=False)) + else: + CC.actuators.torque = 0.0 + CC.actuators.steeringAngleDeg = 0.0 + CC.actuators.curvature = 0.0 + + return CC.as_reader() + + def _get_sendcan_message(self, sendcan, addr: int): + return next(((dat, bus) for tx_addr, dat, bus in sendcan if tx_addr == addr), None) + + def _inject_toyota_torque_history(self, CI, eps_torque: int, driver_torque: int): + eps_scale = self.CP.safetyConfigs[-1].safetyParam & 0xFF + self.assertGreater(eps_scale, 0, "invalid Toyota EPS scale") + + values = { + "STEER_TORQUE_EPS": (eps_torque / eps_scale) * 100.0, + "STEER_TORQUE_DRIVER": driver_torque, + } + addr, dat, bus = CI.CC.packer.make_can_msg("STEER_TORQUE_SENSOR", 0, values) + + for _ in range(TOYOTA_TORQUE_HISTORY_SAMPLES): + ret = self.safety.safety_rx_hook(libsafety_py.make_CANPacket(addr, bus % 4, dat)) + self.assertEqual(1, ret, f"Toyota torque history rx failed: {(hex(addr), dat.hex(), bus)}") + + self.safety.safety_tick_current_safety_config() + + def _get_toyota_lta_regression_control(self, CS: car.CarState, lat_active: bool, angle_offset: float): + CC = car.CarControl.new_message() + + CC.enabled = True + CC.latActive = lat_active + CC.longActive = False + CC.actuators.accel = 0.0 + CC.actuators.longControlState = car.CarControl.Actuators.LongControlState.off + CC.actuators.torque = 0.0 + CC.actuators.curvature = 0.0 + CC.actuators.steeringAngleDeg = float(CS.steeringAngleDeg + (angle_offset if lat_active else 0.0)) + + return CC.as_reader() + def test_car_params(self): if self.CP.dashcamOnly: self.skipTest("no need to check carParams for dashcamOnly") @@ -295,6 +412,123 @@ def test_car_controller(car_control): test_car_controller(CC.as_reader()) # Skip stdout/stderr capture with pytest, causes elevated memory usage + @pytest.mark.nocapture + @settings(max_examples=TX_FUZZ_MAX_EXAMPLES, deadline=None, + phases=(Phase.reuse, Phase.generate, Phase.shrink)) + @given(data=st.data()) + def test_panda_safety_tx_fuzzy(self, data): + """ + Replay short real CAN windows into panda safety and the CarInterface, then + fuzz plausible openpilot control outputs on top and ensure panda accepts + every generated TX message. + """ + if self.CP.dashcamOnly: + self.skipTest("no need to check panda safety for dashcamOnly") + + if self.CP.notCar: + self.skipTest("Skipping test for notCar") + + min_start_idx = max(TX_FUZZ_MIN_START, self.elm_frame or 0, self.car_safety_mode_frame or 0) + max_start_idx = len(self.can_msgs) - TX_FUZZ_MAX_FRAMES + self.assertGreater(max_start_idx, min_start_idx, "route too short for tx fuzz testing") + start_idx = data.draw(st.integers(min_value=min_start_idx, max_value=max_start_idx)) + + CI = self.CarInterface(self.CP.copy()) + self._reset_panda_safety() + + warmup_start_idx = max(0, start_idx - TX_FUZZ_HISTORY) + start_ts = self.can_msgs[warmup_start_idx][0] + + for can in self.can_msgs[warmup_start_idx:start_idx]: + self._replay_can_frame(CI, can, start_ts, assert_rx=False) + + checked_msgs = 0 + valid_frames = 0 + for frame_idx, can in enumerate(self.can_msgs[start_idx:start_idx + TX_FUZZ_MAX_FRAMES], start=start_idx): + CS = self._replay_can_frame(CI, can, start_ts, assert_rx=True) + if not CS.canValid or not self.safety.safety_config_valid(): + continue + + valid_frames += 1 + CC = self._get_fuzzy_car_control(data.draw, CS) + _, sendcan = CI.apply(CC, can[0]) + + for addr, dat, bus in sendcan: + to_send = libsafety_py.make_CANPacket(addr, bus % 4, dat) + self.assertTrue(self.safety.safety_tx_hook(to_send), + f"panda blocked openpilot tx at frame {frame_idx}: {(hex(addr), dat.hex(), bus)}") + checked_msgs += 1 + + if checked_msgs > 0 and valid_frames >= TX_FUZZ_WINDOW: + break + + self.assertGreaterEqual(valid_frames, TX_FUZZ_WINDOW, "tx fuzz test did not find enough valid frames") + self.assertGreater(checked_msgs, 0, "tx fuzz test did not exercise any messages") + + @pytest.mark.nocapture + def test_toyota_lta_tx_regression(self): + """ + Exercise accepted, blocked, inactive, and recovered Toyota LTA TX paths on + real route data. + """ + if self.CP.brand != "toyota" or self.CP.steerControlType != SteerControlType.angle: + self.skipTest("only applies to Toyota angle-control platforms") + + min_start_idx = max(TX_FUZZ_MIN_START, self.elm_frame or 0, self.car_safety_mode_frame or 0) + CI = self.CarInterface(self.CP.copy()) + self._reset_panda_safety() + + warmup_start_idx = max(0, min_start_idx - TX_FUZZ_HISTORY) + start_ts = self.can_msgs[warmup_start_idx][0] + for can in self.can_msgs[warmup_start_idx:min_start_idx]: + self._replay_can_frame(CI, can, start_ts, assert_rx=False) + + phase_plan = ( + ("accepted_active", True, 0.5, None, True, 100), + ("blocked_active", True, 1.0, (0, ToyotaCarControllerParams.MAX_LTA_DRIVER_TORQUE_ALLOWANCE + 1), False, 100), + ("recovered_active", True, -0.5, (0, 0), True, 100), + ("accepted_inactive", False, 0.0, None, True, 0), + ) + phase_idx = 0 + + for frame_idx, can in enumerate(self.can_msgs[min_start_idx:], start=min_start_idx): + CS = self._replay_can_frame(CI, can, start_ts, assert_rx=True) + if not CS.canValid or not self.safety.safety_config_valid(): + continue + + phase_name, lat_active, angle_offset, torque_history, should_allow_lta, expected_wind_down = phase_plan[phase_idx] + + self.safety.set_controls_allowed(True) + self.safety.set_cruise_engaged_prev(True) + if torque_history is not None: + self._inject_toyota_torque_history(CI, *torque_history) + + CC = self._get_toyota_lta_regression_control(CS, lat_active, angle_offset) + _, sendcan = CI.apply(CC, can[0]) + if self._get_sendcan_message(sendcan, 0x191) is None: + continue + + saw_lta = False + for addr, dat, bus in sendcan: + to_send = libsafety_py.make_CANPacket(addr, bus % 4, dat) + allowed = self.safety.safety_tx_hook(to_send) + + if addr == 0x191: + saw_lta = True + self.assertEqual(should_allow_lta, allowed, + f"unexpected Toyota LTA tx result for {phase_name} at frame {frame_idx}: {(hex(addr), dat.hex(), bus)}") + self.assertEqual(lat_active, bool(dat[0] & 0x1), f"unexpected Toyota LTA active bit for {phase_name}") + self.assertEqual(expected_wind_down, dat[5], f"unexpected Toyota LTA torque wind down for {phase_name}") + else: + self.assertTrue(allowed, f"toyota LTA regression blocked non-LTA tx for {phase_name}: {(hex(addr), dat.hex(), bus)}") + + self.assertTrue(saw_lta, f"Toyota regression did not emit LTA message for {phase_name}") + phase_idx += 1 + if phase_idx >= len(phase_plan): + break + + self.assertEqual(phase_idx, len(phase_plan), "failed to exercise full Toyota LTA regression sequence") + @pytest.mark.nocapture @settings(max_examples=MAX_EXAMPLES, deadline=None, phases=(Phase.reuse, Phase.generate, Phase.shrink))