Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/source/overview/rl/trainer.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,9 @@ trainer.save_checkpoint()
- The event mechanism can be used for automated experiments, data collection, and environment reset.
- Logging and monitoring help analyze training progress and tune hyperparameters.

## API References
- VLA backend lookup: `embodichain.agents.rl.vla_registry.get_vla_backend()`
- VLA backend listing: `embodichain.agents.rl.vla_registry.get_registered_vla_backend_names()`
- VLA backend creation: `embodichain.agents.rl.vla_registry.create_vla_backend()`

---
13 changes: 10 additions & 3 deletions embodichain/agents/rl/algo/grpo.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,12 @@ def _compute_step_group_advantages(
return advantages.view(n_envs, t_steps) * seq_mask

def update(self, rollout: TensorDict) -> Dict[str, float]:
rollout = rollout.clone()
raw_obs = getattr(rollout, "raw_obs", None)
chunk_step = getattr(rollout, "chunk_step", None)
if raw_obs is not None:
rollout.raw_obs = raw_obs
if chunk_step is not None:
rollout.chunk_step = chunk_step
num_envs = rollout.batch_size[0]
if num_envs % self.cfg.group_size != 0:
raise ValueError(
Expand Down Expand Up @@ -149,7 +154,7 @@ def update(self, rollout: TensorDict) -> Dict[str, float]:
seq_mask_batch = batch["seq_mask"].float()

policy_module = getattr(self.policy, "module", self.policy)
eval_batch = policy_module.evaluate_actions(batch)
eval_batch = policy_module.evaluate_actions(batch, rollout=rollout)
logprobs = eval_batch["sample_log_prob"]
entropy = eval_batch["entropy"]
ratio = (logprobs - old_logprobs).exp()
Expand All @@ -168,7 +173,9 @@ def update(self, rollout: TensorDict) -> Dict[str, float]:

if self.ref_policy is not None:
with torch.no_grad():
ref_batch = self.ref_policy.evaluate_actions(batch)
ref_batch = self.ref_policy.evaluate_actions(
batch, rollout=rollout
)
ref_logprobs = ref_batch["sample_log_prob"]
log_ref_over_pi = ref_logprobs - logprobs
kl_per = torch.exp(log_ref_over_pi) - log_ref_over_pi - 1.0
Expand Down
1 change: 0 additions & 1 deletion embodichain/agents/rl/algo/ppo.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ def __init__(self, cfg: PPOCfg, policy):

def update(self, rollout: TensorDict) -> Dict[str, float]:
"""Update the policy using a collected rollout."""
rollout = rollout.clone()
compute_gae(rollout, gamma=self.cfg.gamma, gae_lambda=self.cfg.gae_lambda)
flat_rollout = transition_view(rollout, flatten=True)

Expand Down
71 changes: 61 additions & 10 deletions embodichain/agents/rl/buffer/standard_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,24 @@ def __init__(
obs_dim: int,
action_dim: int,
device: torch.device,
use_raw_obs: bool = False,
action_chunk_size: int = 0,
store_flat_obs: bool = True,
) -> None:
if use_raw_obs and store_flat_obs:
raise ValueError(
"RolloutBuffer does not support storing flat observations when "
"use_raw_obs=True. Set store_flat_obs=False for raw-observation "
"policies."
)
self.num_envs = num_envs
self.rollout_len = rollout_len
self.obs_dim = obs_dim
self.action_dim = action_dim
self.device = device
self.use_raw_obs = use_raw_obs
self.action_chunk_size = action_chunk_size
self.store_flat_obs = store_flat_obs
self._rollout = self._allocate_rollout()
self._is_full = False

Expand All @@ -58,6 +70,8 @@ def start_rollout(self) -> TensorDict:
if self._is_full:
raise RuntimeError("RolloutBuffer already contains a rollout.")
self._clear_dynamic_fields()
if self.use_raw_obs:
self._rollout.raw_obs = [None] * (self.rollout_len + 1)
return self._rollout

def add(self, rollout: TensorDict) -> None:
Expand Down Expand Up @@ -97,15 +111,18 @@ def is_full(self) -> bool:

def _allocate_rollout(self) -> TensorDict:
"""Preallocate rollout storage with uniform `[num_envs, time + 1]` shape."""
return TensorDict(
rollout_tensors = {}
if self.store_flat_obs:
rollout_tensors["obs"] = torch.empty(
self.num_envs,
self.rollout_len + 1,
self.obs_dim,
dtype=torch.float32,
device=self.device,
)
td = TensorDict(
{
"obs": torch.empty(
self.num_envs,
self.rollout_len + 1,
self.obs_dim,
dtype=torch.float32,
device=self.device,
),
**rollout_tensors,
"action": torch.empty(
self.num_envs,
self.rollout_len + 1,
Expand Down Expand Up @@ -153,12 +170,34 @@ def _allocate_rollout(self) -> TensorDict:
batch_size=[self.num_envs, self.rollout_len + 1],
device=self.device,
)
if self.action_chunk_size > 0:
td["action_chunk"] = torch.zeros(
self.num_envs,
self.rollout_len + 1,
self.action_chunk_size,
self.action_dim,
dtype=torch.float32,
device=self.device,
)
return td

def _clear_dynamic_fields(self) -> None:
"""Drop algorithm-added fields before reusing the shared rollout."""
for key in ("advantage", "return", "seq_mask", "seq_return", "entropy"):
for key in (
"advantage",
"return",
"seq_mask",
"seq_return",
"entropy",
"step_repeat",
"execute_full_chunk",
):
if key in self._rollout.keys():
del self._rollout[key]
if self.use_raw_obs and hasattr(self._rollout, "raw_obs"):
delattr(self._rollout, "raw_obs")
if hasattr(self._rollout, "chunk_step"):
delattr(self._rollout, "chunk_step")
self._reset_padding_slot()

def _reset_padding_slot(self) -> None:
Expand All @@ -170,11 +209,12 @@ def _reset_padding_slot(self) -> None:
self._rollout["done"][:, last_idx].fill_(False)
self._rollout["terminated"][:, last_idx].fill_(False)
self._rollout["truncated"][:, last_idx].fill_(False)
if "action_chunk" in self._rollout.keys():
self._rollout["action_chunk"][:, last_idx].zero_()

def _validate_rollout_layout(self, rollout: TensorDict) -> None:
"""Validate the expected tensor shapes for the shared rollout."""
expected_shapes = {
"obs": (self.num_envs, self.rollout_len + 1, self.obs_dim),
"action": (self.num_envs, self.rollout_len + 1, self.action_dim),
"sample_log_prob": (self.num_envs, self.rollout_len + 1),
"value": (self.num_envs, self.rollout_len + 1),
Expand All @@ -183,10 +223,21 @@ def _validate_rollout_layout(self, rollout: TensorDict) -> None:
"terminated": (self.num_envs, self.rollout_len + 1),
"truncated": (self.num_envs, self.rollout_len + 1),
}
if self.store_flat_obs:
expected_shapes["obs"] = (
self.num_envs,
self.rollout_len + 1,
self.obs_dim,
)
for key, expected_shape in expected_shapes.items():
actual_shape = tuple(rollout[key].shape)
if actual_shape != expected_shape:
raise ValueError(
f"Rollout field '{key}' shape mismatch: expected {expected_shape}, "
f"got {actual_shape}."
)
if not self.store_flat_obs and "obs" in rollout.keys():
raise ValueError(
"RolloutBuffer configured with store_flat_obs=False must not contain "
"a preallocated 'obs' field."
)
48 changes: 34 additions & 14 deletions embodichain/agents/rl/buffer/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,26 +42,42 @@ def transition_view(rollout: TensorDict, flatten: bool = False) -> TensorDict:
"""
action = rollout["action"][:, :-1]
num_envs, time_dim = action.shape[:2]
transition_fields = {
"action": action,
"sample_log_prob": rollout["sample_log_prob"][:, :-1],
"value": rollout["value"][:, :-1],
"next_value": rollout["value"][:, 1:],
"reward": rollout["reward"][:, :-1],
"done": rollout["done"][:, :-1],
"terminated": rollout["terminated"][:, :-1],
"truncated": rollout["truncated"][:, :-1],
}
if "obs" in rollout.keys():
transition_fields["obs"] = rollout["obs"][:, :-1]
td = TensorDict(
{
"obs": rollout["obs"][:, :-1],
"action": action,
"sample_log_prob": rollout["sample_log_prob"][:, :-1],
"value": rollout["value"][:, :-1],
"next_value": rollout["value"][:, 1:],
"reward": rollout["reward"][:, :-1],
"done": rollout["done"][:, :-1],
"terminated": rollout["terminated"][:, :-1],
"truncated": rollout["truncated"][:, :-1],
},
transition_fields,
batch_size=[num_envs, time_dim],
device=rollout.device,
)

for key in ("advantage", "return", "seq_mask", "seq_return", "entropy"):
for key in (
"advantage",
"return",
"seq_mask",
"seq_return",
"entropy",
"step_repeat",
"execute_full_chunk",
):
if key in rollout.keys():
td[key] = rollout[key][:, :-1]

if hasattr(rollout, "chunk_step") and rollout.chunk_step is not None:
td["chunk_step"] = rollout.chunk_step

if "action_chunk" in rollout.keys():
td["action_chunk"] = rollout["action_chunk"][:, :-1]

if flatten:
return td.reshape(num_envs * time_dim)
return td
Expand All @@ -72,6 +88,10 @@ def iterate_minibatches(
) -> Iterator[TensorDict]:
"""Yield shuffled minibatches from a flattened rollout."""
total = rollout.batch_size[0]
indices = torch.randperm(total, device=device)
idx_device = rollout.device if rollout.device is not None else device
indices = torch.randperm(total, device=idx_device)
for start in range(0, total, batch_size):
yield rollout[indices[start : start + batch_size]]
batch_indices = indices[start : start + batch_size]
batch = rollout[batch_indices].clone()
batch["_indices"] = batch_indices
yield batch
Loading
Loading