Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ docs/_build
.codex
CLAUDE.md
uv.lock
*.parquet

11 changes: 7 additions & 4 deletions examples/teleop/franka.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
from rcs.operator.gello import GelloConfig, GelloOperator
from rcs.operator.interface import TeleopLoop
from rcs.operator.quest import QuestConfig, QuestOperator
from rcs_fr3.configs import DefaultFR3MultiHardwareEnv
from rcs_fr3.creators import HardwareCameraCreatorConfig
from simpub.sim.mj_publisher import MujocoPublisher

import rcs
Expand Down Expand Up @@ -67,7 +65,9 @@
}

config: QuestConfig | GelloConfig
config = QuestConfig(mq3_addr=MQ3_ADDR, simulation=ROBOT_INSTANCE == RobotPlatform.SIMULATION, switched_left_right=True)
config = QuestConfig(
mq3_addr=MQ3_ADDR, simulation=ROBOT_INSTANCE == RobotPlatform.SIMULATION, switched_left_right=False
)
# config = GelloConfig(
# arms={
# "right": GelloArmConfig(com_port="/dev/serial/by-id/usb-ROBOTIS_OpenRB-150_E505008B503059384C2E3120FF07332D-if00"),
Expand All @@ -79,6 +79,9 @@

def get_env():
if ROBOT_INSTANCE == RobotPlatform.HARDWARE:
from rcs_fr3.configs import DefaultFR3MultiHardwareEnv
from rcs_fr3.creators import HardwareCameraCreatorConfig

env_creator = DefaultFR3MultiHardwareEnv()
env_creator.left_ip = ROBOT2IP["left"]
env_creator.right_ip = ROBOT2IP["right"]
Expand Down Expand Up @@ -151,7 +154,7 @@ def get_env():
if sim_cfg_data.root_frame_objects is None:
sim_cfg_data.root_frame_objects = {}
# cfg.root_frame_objects["green_cube"] = (rcs.OBJECT_PATHS["green_cube"], Pose(translation=[0.5, 0, 0.5], quaternion=[0, 0, 0, 1]))
sim_cfg_data.task_cfg = PickTaskConfig(robot_name="left")
sim_cfg_data.task_cfg = PickTaskConfig(robot_name="right")

env_rel = scene.create_env(sim_cfg_data)
env_rel = StorageWrapper(
Expand Down
37 changes: 35 additions & 2 deletions python/rcs/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@

import typer
from rcs.envs.storage_wrapper import StorageWrapper
from rcs.sim_state_replay import replay as replay_command
from rcs.sim.replayer import replay as replay_dataset

app = typer.Typer()
app.command()(replay_command)


@app.command()
Expand Down Expand Up @@ -34,5 +33,39 @@ def consolidate(
typer.echo("Done.")


@app.command("replay")
def replay(
dataset: Annotated[
Path,
typer.Argument(
exists=True,
help="Parquet dataset directory to replay.",
),
],
headless: Annotated[bool, typer.Option(help="Whether to run without GUI.")] = True,
frequency: Annotated[int, typer.Option(help="Simulation frequency to use during replay.")] = 30,
relative_to: Annotated[
str,
typer.Option(help="RelativeTo enum name: CONFIGURED_ORIGIN, LAST_STEP, or NONE."),
] = "CONFIGURED_ORIGIN",
scene: Annotated[
str,
typer.Option(help="Python expression that evaluates to a scene instance."),
] = "env_configs.EmptyWorldFR3Duo()",
task_cfg: Annotated[
str,
typer.Option(help="Python expression that evaluates to a task config."),
] = 'env_tasks.PickTaskConfig(robot_name="right")',
):
replay_dataset(
dataset=dataset,
headless=headless,
frequency=frequency,
relative_to=relative_to,
scene=scene,
task_cfg=task_cfg,
)


if __name__ == "__main__":
app()
25 changes: 22 additions & 3 deletions python/rcs/envs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,23 +200,36 @@ class HardwareEnv(BaseEnv):

class SimEnv(BaseEnv):
PLATFORM = RobotPlatform.SIMULATION
STATE_KEY = "sim_state"
STATE_SPEC_KEY = "sim_state_spec"

def __init__(self, sim: simulation.Sim) -> None:
def __init__(self, sim: simulation.Sim, return_state=True) -> None:
self.sim = sim
cfg = self.sim.get_config()
self.frame_rate = SimpleFrameRate(cfg.frequency, "MoJoCo Simulation Loop")
self.main_greenlet: greenlet | None = None
self.return_state = return_state
self._replay_state: tuple[np.ndarray, int | None] | None = None

def set_replay_state(self, state: np.ndarray, spec: int | None = None):
self._replay_state = (state, spec)

def step(self, action: dict[str, Any]) -> tuple[dict[str, Any], float, bool, bool, dict]:
if self.main_greenlet is not None:
self.main_greenlet.switch()
else:
self.step_sim()
return super().step(action)
obs, reward, terminated, truncated, info = super().step(action)
if self.return_state:
obs, info = self.observation(obs, info)
return obs, reward, terminated, truncated, info

def step_sim(self):
cfg = self.sim.get_config()
if cfg.async_control:
if self._replay_state is not None:
self.sim.set_state(self._replay_state[0], self._replay_state[1])
self._replay_state = None
elif cfg.async_control:
self.sim.step(round(1 / cfg.frequency / self.sim.model.opt.timestep))
if cfg.realtime:
self.frame_rate.frame_rate = cfg.frequency
Expand All @@ -236,6 +249,12 @@ def reset(
self.apply_sim_state()
return super().reset(seed=seed, options=options)

def observation(self, observation: dict[str, Any], info: dict[str, Any]) -> tuple[dict[str, Any], dict[str, Any]]:
sim_state = self.sim.get_state()
info[self.STATE_KEY] = sim_state
info[self.STATE_SPEC_KEY] = self.sim.get_state_spec()
return observation, info


class CoverWrapper(gym.Wrapper):
"""The CoverWrapper must be the last wrapper on the stack
Expand Down
17 changes: 0 additions & 17 deletions python/rcs/envs/sim.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,23 +40,6 @@ def reset(
return super().reset(seed=seed, options=options)


class SimStateObservationWrapper(ActObsInfoWrapper):
STATE_KEY = "sim_state"
STATE_SPEC_KEY = "sim_state_spec"

def __init__(self, env):
super().__init__(env)
assert self.env.get_wrapper_attr("PLATFORM") == RobotPlatform.SIMULATION, "Base environment must be simulation."
self.sim = cast(sim.Sim, self.get_wrapper_attr("sim"))

def observation(self, observation: dict[str, Any], info: dict[str, Any]) -> tuple[dict[str, Any], dict[str, Any]]:
observation = dict(observation)
sim_state = self.sim.get_state()
observation[self.STATE_KEY] = sim_state
observation[self.STATE_SPEC_KEY] = self.sim.get_state_spec()
return observation, info


class GripperWrapperSim(ActObsInfoWrapper):
def __init__(self, env):
super().__init__(env)
Expand Down
55 changes: 24 additions & 31 deletions python/rcs/envs/storage_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import pyarrow.dataset as ds
import simplejpeg
from PIL import Image
from rcs.envs.base import RelativeActionSpace


class StorageWrapper(gym.Wrapper):
Expand All @@ -31,6 +30,7 @@ def __init__(
basename_template: Optional[str] = None,
max_rows_per_group: Optional[int] = None,
max_rows_per_file: Optional[int] = None,
success_from_env: bool = False,
):
"""
Asynchronously log environment transitions to a Parquet dataset on disk.
Expand Down Expand Up @@ -91,6 +91,7 @@ def __init__(
self._success = False
self._prev_action = None
self._prev_absolute_action = None
self.success_from_env = success_from_env

self.thread_pool = ThreadPoolExecutor()
self.queue: Queue[pa.Table | pa.RecordBatch] = Queue(maxsize=2)
Expand Down Expand Up @@ -249,49 +250,41 @@ def step(self, action):

if not self._pause:
assert isinstance(obs, dict)
if "frames" in obs and not obs["frames"]:
del obs["frames"]
if "frames" in obs:
self._encode_images(obs)
self._flatten_arrays(obs)
if info.get("success"):
if info.get("success") and self.success_from_env:
self.success()

self.buffer.append(
{
"obs": obs,
"info": info,
"reward": reward,
"step": self.step_cnt,
"uuid": self.uuid.hex,
"date": datetime.date.today().isoformat(),
"success": self._success,
"action": self._prev_action,
"instruction": self.instruction,
"timestamp": datetime.datetime.now().timestamp(),
RelativeActionSpace.ABSOLUTE_ACTION_KEY: self._prev_absolute_action,
}
)
frame = {
"obs": obs,
"info": info,
"reward": reward,
"step": self.step_cnt,
"uuid": self.uuid.hex,
"date": datetime.date.today().isoformat(),
"success": self._success,
"action": self._prev_action,
"env_action": action,
"instruction": self.instruction,
"timestamp": datetime.datetime.now().timestamp(),
}

self._prev_action = action
if RelativeActionSpace.ABSOLUTE_ACTION_KEY in obs:
# single env
self._prev_absolute_action = obs[RelativeActionSpace.ABSOLUTE_ACTION_KEY]
else:
# multi env wrapper
act_dict = {}
try:
for key in self.get_wrapper_attr("envs"):
if RelativeActionSpace.ABSOLUTE_ACTION_KEY in obs[key]:
act_dict[key] = obs[key][RelativeActionSpace.ABSOLUTE_ACTION_KEY]
except AttributeError:
pass
if len(act_dict) != 0:
self._prev_absolute_action = act_dict # type: ignore

self.buffer.append(frame)

self.step_cnt += 1
if len(self.buffer) == self.batch_size:
self._flush()

return obs, reward, terminated, truncated, info

def set_instruction(self, instruction: str):
self.instruction = instruction

def success(self):
self._success = True

Expand Down
Loading
Loading