Skip to content

Commit

Permalink
add env_ids propagation test and run formatter
Browse files Browse the repository at this point in the history
  • Loading branch information
jtigue-bdai committed Oct 15, 2024
1 parent 2349118 commit 5a35952
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def _update_buffers_impl(self, env_ids: Sequence[int]):

# numerical derivative
lin_acc_w = (lin_vel_w - self._prev_lin_vel_w[env_ids]) / self._dt + self._gravity_bias_w[env_ids]
ang_acc_w = (ang_vel_w - self._prev_ang_vel_w[env_ids) / self._dt
ang_acc_w = (ang_vel_w - self._prev_ang_vel_w[env_ids]) / self._dt
# store the velocities
self._data.lin_vel_b[env_ids] = math_utils.quat_rotate_inverse(self._data.quat_w[env_ids], lin_vel_w)
self._data.ang_vel_b[env_ids] = math_utils.quat_rotate_inverse(self._data.quat_w[env_ids], ang_vel_w)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,21 @@

def constant_noise(data: torch.Tensor, cfg: noise_cfg.ConstantNoiseCfg) -> torch.Tensor:
"""Applies a constant noise bias to a given data set.
Args:
data: The unmodified data set to apply noise to.
cfg: The configuration parameters for constant noise.
Returns:
The data modified by the noise parameters provided.
The data modified by the noise parameters provided.
"""

# fix tensor device for bias on first call and update config parameters
if isinstance(cfg.bias,torch.Tensor):
if isinstance(cfg.bias, torch.Tensor):
if cfg.bias.device is not data.device:
cfg.bias = cfg.bias.to(data.device,)
cfg.bias = cfg.bias.to(
data.device,
)

if cfg.operation == "add":
return data + cfg.bias
Expand All @@ -45,23 +47,23 @@ def constant_noise(data: torch.Tensor, cfg: noise_cfg.ConstantNoiseCfg) -> torch

def uniform_noise(data: torch.Tensor, cfg: noise_cfg.UniformNoiseCfg) -> torch.Tensor:
"""Applies a uniform noise to a given data set.
Args:
data: The unmodified data set to apply noise to.
cfg: The configuration parameters for uniform noise.
Returns:
The data modified by the noise parameters provided.
The data modified by the noise parameters provided.
"""

# fix tensor device for n_max on first call and update config parameters
if isinstance(cfg.n_max,torch.Tensor):
if isinstance(cfg.n_max, torch.Tensor):
if cfg.n_max.device is not data.device:
cfg.n_max = cfg.n_max.to(data.device)
# fix tensor device for n_min on first call and update config parameters
if isinstance(cfg.n_min,torch.Tensor):
if isinstance(cfg.n_min, torch.Tensor):
if cfg.n_min.device is not data.device:
cfg.n_min = cfg.n_min.to(data.device)
cfg.n_min = cfg.n_min.to(data.device)

if cfg.operation == "add":
return data + torch.rand_like(data) * (cfg.n_max - cfg.n_min) + cfg.n_min
Expand All @@ -75,23 +77,23 @@ def uniform_noise(data: torch.Tensor, cfg: noise_cfg.UniformNoiseCfg) -> torch.T

def gaussian_noise(data: torch.Tensor, cfg: noise_cfg.GaussianNoiseCfg) -> torch.Tensor:
"""Applies a gaussian noise to a given data set.
Args:
data: The unmodified data set to apply noise to.
cfg: The configuration parameters for gaussian noise.
Returns:
The data modified by the noise parameters provided.
The data modified by the noise parameters provided.
"""

# fix tensor device for mean on first call and update config parameters
if isinstance(cfg.mean,torch.Tensor):
if isinstance(cfg.mean, torch.Tensor):
if cfg.mean.device is not data.device:
cfg.mean = cfg.mean.to(data.device)
# fix tensor device for std on first call and update config parameters
if isinstance(cfg.std,torch.Tensor):
# fix tensor device for std on first call and update config parameters
if isinstance(cfg.std, torch.Tensor):
if cfg.std.device is not data.device:
cfg.std = cfg.std.to(data.device)
cfg.std = cfg.std.to(data.device)

if cfg.operation == "add":
return data + cfg.mean + cfg.std * torch.randn_like(data)
Expand Down
40 changes: 37 additions & 3 deletions source/extensions/omni.isaac.lab/test/sensors/test_imu.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class MySceneCfg(InteractiveSceneCfg):
prim_path="{ENV_REGEX_NS}/pendulum/imu_link",
debug_vis=not app_launcher._headless,
visualizer_cfg=RED_ARROW_X_MARKER_CFG.replace(prim_path="/Visuals/Acceleration/imu_link"),
gravity_bias=(0.0, 0.0, 0.0),
gravity_bias=(0.0, 0.0, 9.81),
)
imu_pendulum_base: ImuCfg = ImuCfg(
prim_path="{ENV_REGEX_NS}/pendulum/link_1",
Expand All @@ -134,7 +134,7 @@ class MySceneCfg(InteractiveSceneCfg):
),
debug_vis=not app_launcher._headless,
visualizer_cfg=GREEN_ARROW_X_MARKER_CFG.replace(prim_path="/Visuals/Acceleration/base"),
gravity_bias=(0.0, 0.0, 0.0),
gravity_bias=(0.0, 0.0, 9.81),
)

def __post_init__(self):
Expand Down Expand Up @@ -346,7 +346,11 @@ def test_single_dof_pendulum(self):

ax = -joint_acc * pend_length * torch.sin(joint_pos) - joint_vel**2 * pend_length * torch.cos(joint_pos)
ay = torch.zeros(2, 1, device=self.scene.device)
az = -joint_acc * pend_length * torch.cos(joint_pos) + joint_vel**2 * pend_length * torch.sin(joint_pos)
az = (
-joint_acc * pend_length * torch.cos(joint_pos)
+ joint_vel**2 * pend_length * torch.sin(joint_pos)
+ 9.81
)
gt_linear_acc_w = torch.cat([ax, ay, az], dim=-1)

# skip first step where initial velocity is zero
Expand Down Expand Up @@ -430,6 +434,7 @@ def test_single_dof_pendulum(self):
)

def test_offset_calculation(self):
"""Test offset configuration argument."""
# should achieve same results between the two imu sensors on the robot
for idx in range(500):
# set acceleration
Expand Down Expand Up @@ -494,5 +499,34 @@ def test_offset_calculation(self):
)


def test_env_ids_propogation(self):
"""Test that env_ids argument propagates through update and reset methods"""
self.scene.reset()

for idx in range(10):
# set acceleration
self.scene.articulations["robot"].write_root_velocity_to_sim(
torch.tensor([[0.5, 0.0, 0.0, 0.0, 0.0, 0.0]], dtype=torch.float32, device=self.scene.device).repeat(
self.scene.num_envs, 1
)
* (idx + 1)
)
# write data to sim
self.scene.write_data_to_sim()
# perform step
self.sim.step()
# read data from sim
self.scene.update(self.sim.get_physics_dt())

# reset scene for env 1
self.scene.reset(env_ids=[1])
# read data from sim
self.scene.update(self.sim.get_physics_dt())
# perform step
self.sim.step()
# read data from sim
self.scene.update(self.sim.get_physics_dt())


if __name__ == "__main__":
run_tests()
116 changes: 55 additions & 61 deletions source/extensions/omni.isaac.lab/test/utils/test_noise.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,10 @@

"""Rest everything follows."""

import time
import torch
import unittest
from dataclasses import MISSING

import omni.isaac.lab.utils.noise as noise
from omni.isaac.lab.utils import configclass



class TestNoise(unittest.TestCase):
Expand All @@ -29,96 +25,94 @@ class TestNoise(unittest.TestCase):
def test_gaussian_noise(self):
"""Test guassian_noise function."""

for device in ["cpu","cuda"]:
for noise_device in ["cpu","cuda"]:
for op in ["add","scale","abs"]:
for device in ["cpu", "cuda"]:
for noise_device in ["cpu", "cuda"]:
for op in ["add", "scale", "abs"]:
with self.subTest(device=device, noise_device=noise_device, operation=op):
# create random data set
data = torch.rand(10000, 3, device=device)
# define standard deviation and mean
std = torch.tensor([0.1,0.2,0.3],device=noise_device)
mean = torch.tensor([0.4,0.5,0.6],device=noise_device)
std = torch.tensor([0.1, 0.2, 0.3], device=noise_device)
mean = torch.tensor([0.4, 0.5, 0.6], device=noise_device)
# create noise config
noise_cfg = noise.GaussianNoiseCfg(std=std,
mean=mean,
operation=op)
noise_cfg = noise.GaussianNoiseCfg(std=std, mean=mean, operation=op)

for i in range(10):
# apply noise
noisy_data = noise_cfg.func(data,cfg=noise_cfg)
noisy_data = noise_cfg.func(data, cfg=noise_cfg)
# calculate resulting noise compared to original data set
if op=="add":
std_result, mean_result = torch.std_mean(noisy_data-data,dim=0)
elif op=="scale":
std_result, mean_result = torch.std_mean(noisy_data/data,dim=0)
elif op=="abs":
std_result, mean_result = torch.std_mean(noisy_data,dim=0)

self.assertTrue(noise_cfg.mean.device,device)
self.assertTrue(noise_cfg.std.device,device)
torch.testing.assert_close(noise_cfg.std,std_result,atol=1e-2,rtol=1e-2)
torch.testing.assert_close(noise_cfg.mean,mean_result,atol=1e-2,rtol=1e-2)

if op == "add":
std_result, mean_result = torch.std_mean(noisy_data - data, dim=0)
elif op == "scale":
std_result, mean_result = torch.std_mean(noisy_data / data, dim=0)
elif op == "abs":
std_result, mean_result = torch.std_mean(noisy_data, dim=0)

self.assertTrue(noise_cfg.mean.device, device)
self.assertTrue(noise_cfg.std.device, device)
torch.testing.assert_close(noise_cfg.std, std_result, atol=1e-2, rtol=1e-2)
torch.testing.assert_close(noise_cfg.mean, mean_result, atol=1e-2, rtol=1e-2)

def test_uniform_noise(self):
"""Test uniform_noise function."""
for device in ["cpu","cuda"]:
for noise_device in ["cpu","cuda"]:
for op in ["add","scale","abs"]:
with self.subTest(device=device, noise_device=noise_device,operation=op):
for device in ["cpu", "cuda"]:
for noise_device in ["cpu", "cuda"]:
for op in ["add", "scale", "abs"]:
with self.subTest(device=device, noise_device=noise_device, operation=op):
# create random data set
data = torch.rand(10000, 3, device=device)
# define uniform minimum and maximum
n_min = torch.tensor([0.1,0.2,0.3],device=noise_device)
n_max = torch.tensor([0.4,0.5,0.6],device=noise_device)
n_min = torch.tensor([0.1, 0.2, 0.3], device=noise_device)
n_max = torch.tensor([0.4, 0.5, 0.6], device=noise_device)
# create noise config
noise_cfg = noise.UniformNoiseCfg(n_max=n_max, n_min=n_min,operation=op)
noise_cfg = noise.UniformNoiseCfg(n_max=n_max, n_min=n_min, operation=op)

for i in range(10):
# apply noise
noisy_data = noise_cfg.func(data,cfg=noise_cfg)
noisy_data = noise_cfg.func(data, cfg=noise_cfg)
# calculate resulting noise compared to original data set
if op=="add":
min_result, _ = torch.min(noisy_data-data,dim=0)
max_result, _ = torch.max(noisy_data-data,dim=0)
elif op=="scale":
min_result, _ = torch.min(torch.div(noisy_data,data),dim=0)
max_result, _ = torch.max(torch.div(noisy_data,data),dim=0)
elif op=="abs":
min_result, _ = torch.min(noisy_data,dim=0)
max_result, _ = torch.max(noisy_data,dim=0)

self.assertTrue(noise_cfg.n_min.device,device)
self.assertTrue(noise_cfg.n_max.device,device)
if op == "add":
min_result, _ = torch.min(noisy_data - data, dim=0)
max_result, _ = torch.max(noisy_data - data, dim=0)
elif op == "scale":
min_result, _ = torch.min(torch.div(noisy_data, data), dim=0)
max_result, _ = torch.max(torch.div(noisy_data, data), dim=0)
elif op == "abs":
min_result, _ = torch.min(noisy_data, dim=0)
max_result, _ = torch.max(noisy_data, dim=0)

self.assertTrue(noise_cfg.n_min.device, device)
self.assertTrue(noise_cfg.n_max.device, device)
self.assertTrue(all(torch.le(noise_cfg.n_min, min_result).tolist()))
self.assertTrue(all(torch.ge(noise_cfg.n_max, max_result).tolist()))

def test_constant_noise(self):
"""Test constant_noise"""
for device in ["cpu","cuda"]:
for noise_device in ["cpu","cuda"]:
for op in ["add","scale","abs"]:
with self.subTest(device=device, noise_device=noise_device,operation=op):
for device in ["cpu", "cuda"]:
for noise_device in ["cpu", "cuda"]:
for op in ["add", "scale", "abs"]:
with self.subTest(device=device, noise_device=noise_device, operation=op):
# create random data set
data = torch.rand(10000, 3, device=device)
# define a bias
bias = torch.tensor([0.1,0.2,0.3],device=noise_device)
bias = torch.tensor([0.1, 0.2, 0.3], device=noise_device)
# create noise config
noise_cfg = noise.ConstantNoiseCfg(bias=bias, operation=op)

for i in range(10):
# apply noise
noisy_data = noise_cfg.func(data,cfg=noise_cfg)
noisy_data = noise_cfg.func(data, cfg=noise_cfg)
# calculate resulting noise compared to original data set
if op=="add":
bias_result = noisy_data-data
elif op=="scale":
bias_result = noisy_data/data
elif op=="abs":
if op == "add":
bias_result = noisy_data - data
elif op == "scale":
bias_result = noisy_data / data
elif op == "abs":
bias_result = noisy_data

self.assertTrue(noise_cfg.bias.device,device)
torch.testing.assert_close(noise_cfg.bias.repeat(data.shape[0],1),bias_result)
self.assertTrue(noise_cfg.bias.device, device)
torch.testing.assert_close(noise_cfg.bias.repeat(data.shape[0], 1), bias_result)


if __name__ == "__main__":
run_tests()
run_tests()

0 comments on commit 5a35952

Please sign in to comment.