diff --git a/.agents/skills/add-solver/SKILL.md b/.agents/skills/add-solver/SKILL.md new file mode 100644 index 00000000..115f46f3 --- /dev/null +++ b/.agents/skills/add-solver/SKILL.md @@ -0,0 +1,331 @@ +--- +name: add-solver +description: Use when adding a new kinematic (IK/FK) solver to EmbodiChain — implements the solver module, its Sphinx docs page, the unit test, and the benchmark entry together +--- + +# Add Solver + +Scaffold a new kinematic solver — and its three required companion artifacts +(docs, unit test, benchmark) — following EmbodiChain's `SolverCfg` / `BaseSolver` +pattern. The reference implementation is the UR analytic solver; use it as the +gold standard for structure and style. + +## When to Use + +- User asks to "add a solver", "add an IK solver", "add a kinematic solver" +- A new robot family needs a closed-form / numerical / Warp-kernel IK backend +- The request names a solver that does not yet exist under + `embodichain/lab/sim/solvers/` + +## The Four Artifacts + +A new solver is **not complete** until all four exist and pass. Each must follow +the conventions below. + +| # | Artifact | Path | +|---|----------|------| +| 1 | Solver module | `embodichain/lab/sim/solvers/_solver.py` | +| 2 | (GPU/Warp only) Warp kernel | `embodichain/utils/warp/kinematics/_solver.py` | +| 3 | Sphinx docs page | `docs/source/overview/sim/solvers/_solver.md` | +| 4 | Unit test | `tests/sim/solvers/test__solver.py` | +| 5 | Benchmark entry | extend `scripts/benchmark/robotics/kinematic_solver/run_benchmark.py` | + +Plus two registration edits: + +- Export the new `Cfg` + `Solver` classes from + `embodichain/lab/sim/solvers/__init__.py`. +- Add the docs page to the toctree in + `docs/source/overview/sim/solvers/index.rst`. + +## Steps + +### 1. Gather Solver Requirements + +Ask the user (only what is not already stated): + +1. **Solver name** (`_solver`) and a one-line description. +2. **Robot family / kinematic type** — which robot(s) it targets, DOF. +3. **Approach** — analytical closed-form (UR, OPW, SRS), numerical/Jacobian + (Pytorch, Differential, Pink, Pinocchio), or neural (NeuralIK). +4. **Backend** — pure PyTorch, NVIDIA Warp GPU kernel, or both. +5. **URDF / DH / OPW parameter source** — link or file the parameters come + from (cite it in the docs, as the UR solver cites `ur-analytic-ik`). + +### 2. Write the Solver Module + +File: `embodichain/lab/sim/solvers/_solver.py` + +Required pieces (mirror `ur_solver.py`): + +- Apache 2.0 copyright header (the 15-line block). +- `from __future__ import annotations` after the header. +- A `@configclass` config class `SolverCfg(SolverCfg)`: + - Fields for any robot-specific parameters (DH params, OPW params, etc.). + - A `__post_init__` that populates derived fields (e.g. per-variant DH + parameters) and **raises `ValueError` for unknown variants** — fail fast. + - An `init_solver(self, device=..., **kwargs) -> "Solver"` that + constructs the solver and calls `solver.set_tcp(self._get_tcp_as_numpy())`. +- A `Solver(BaseSolver)` class: + - `__init__(self, cfg, device, **kwargs)` calls `super().__init__(...)`, + sets `self.dof`, and initializes solver-specific state. + - Implements `get_ik(self, target_xpos, qpos_seed, return_all_solutions=False, **kwargs)` + returning `(success, ik_qpos)` (or `(validity, all_solutions)` when + `return_all_solutions=True`). Shapes follow the `BaseSolver.get_ik` + contract. + - Reuses `get_fk`, `set_tcp`, `get_qpos_limits`, etc. from `BaseSolver` — + do **not** reimplement FK unless the solver has a custom chain. + - Add static helpers (e.g. `dh_matrix`) only when genuinely needed. +- `__all__ = ["SolverCfg", "Solver"]`. + +Template: + +```python +# ---------------------------------------------------------------------------- +# Copyright (c) 2021-2026 DexForce Technology Co., Ltd. +# ... (full Apache 2.0 header) ... +# ---------------------------------------------------------------------------- + +from __future__ import annotations + +import torch +import numpy as np +from embodichain.utils import configclass +from embodichain.lab.sim.solvers import SolverCfg, BaseSolver +from embodichain.data import get_data_path + + +@configclass +class FooSolverCfg(SolverCfg): + # robot-specific parameters, with sensible defaults + robot_type: str = "foo" + urdf_path: str = get_data_path("Foo/foo.urdf") + + def __post_init__(self): + super().__post_init__() + if self.robot_type == "foo": + ... + else: + raise ValueError(f"Unknown robot type: {self.robot_type}") + + def init_solver(self, device: torch.device = torch.device("cpu"), **kwargs) -> "FooSolver": + """Initialize the solver with the configuration. + + Args: + device: The device to use for the solver. Defaults to CPU. + **kwargs: Additional keyword arguments for solver initialization. + + Returns: + FooSolver: An initialized solver instance. + """ + solver = FooSolver(cfg=self, device=device, **kwargs) + solver.set_tcp(self._get_tcp_as_numpy()) + return solver + + +class FooSolver(BaseSolver): + def __init__(self, cfg: FooSolverCfg, device: str, **kwargs): + super().__init__(cfg, device, **kwargs) + self.dof = 6 + # init solver-specific state / Warp params here + + def get_ik(self, target_xpos, qpos_seed, return_all_solutions: bool = False, **kwargs): + """Compute target joint positions. + + Args: + target_xpos (torch.Tensor): Target end-effector pose, shape (n_sample, 4, 4). + qpos_seed (torch.Tensor): Reference joint positions, shape (n_sample, num_joints). + return_all_solutions (bool): Return all candidates instead of the closest. Defaults to False. + **kwargs: Additional arguments for future extensions. + + Returns: + Tuple[torch.Tensor, torch.Tensor]: (success, target_joints). + """ + ... + return ik_validity, ik_qpos + + +__all__ = ["FooSolverCfg", "FooSolver"] +``` + +### 3. (GPU/Warp only) Write the Warp Kernel + +For analytic solvers evaluated in batch on the GPU (UR, OPW, SRS), put the +`@wp.kernel` / `@wp.func` / `@wp.struct` definitions in +`embodichain/utils/warp/kinematics/_solver.py` — **not** in the solver +module. The solver module imports the kernel and any param struct from there +and launches it with `wp.launch`. + +Conventions (see `ur_solver.py` under `utils/warp/kinematics/`): + +- Apache header + `from __future__ import annotations`. +- Define a `@wp.struct` for solver parameters (e.g. `URParam`) and pass it + to the kernel as an input. +- One kernel thread per target pose (`dim=(n_sample,)`). +- Write candidate solutions and validity flags into preallocated Warp arrays, + then convert back to torch with `wp.to_torch(...)`. +- Use `standardize_device_string(self.device)` from + `embodichain.utils.device_utils` to get the Warp device string. + +For pure-PyTorch / numerical solvers, skip this step entirely and implement +`get_ik` directly with torch ops. + +### 4. Register in `__init__.py` + +Add the import + keep `__all__` (if present) consistent in +`embodichain/lab/sim/solvers/__init__.py`: + +```python +from .foo_solver import FooSolverCfg, FooSolver +``` + +### 5. Write the Docs Page + +File: `docs/source/overview/sim/solvers/_solver.md` — mirror the structure +of `ur_solver.md`: + +1. `# Solver` — one-paragraph intro (what it solves, why it's fast / what + approach it uses, the GPU/numerical backend). +2. Cite the reference implementation / parameter source with a link. +3. **Key Features** — bullet list. +4. **Kinematic model / parameters** (DH table, OPW params, etc.) as needed. +5. **Configuration** — a `python` code block constructing the `Cfg` and calling + `cfg.init_solver(device=...)`. Use a `.. tip::` Sphinx directive for the + one parameter that usually matters. +6. **Main Methods** — document `get_fk` (inherited), `get_ik` (with full + signature, parameters, returns, and a runnable **Example** code block showing + both `return_all_solutions=False` and `True`), `set_tcp`, and any static + helpers. Use Google-style param lists and `+` bullets as in `ur_solver.md`. +7. **How It Works** — numbered explanation of the solve pipeline. +8. **References** — markdown links. + +Then add the page to the toctree in `docs/source/overview/sim/solvers/index.rst`: + +```rst +.. toctree:: + :maxdepth: 1 + + pytorch_solver.md + ... + foo_solver.md +``` + +### 6. Write the Unit Test + +File: `tests/sim/solvers/test__solver.py` — follow `test_ur_solver.py` +exactly: + +- Apache header + `from __future__ import annotations` (after header). +- A `grid_sample_qpos_from_limits(...)` helper (reuse the one from + `test_ur_solver.py`) to sample joint configs within limits with a safety + margin. +- A `BaseSolverTest` class with: + - `setup_simulation(self, sim_device)` — builds a `SimulationManagerCfg`, + a `RobotCfg` whose `solver_cfg={"arm": SolverCfg(...)}` uses the new + solver, and adds the robot via `self.sim.add_robot(cfg=cfg)`. + - `test_ik(self)` — the round-trip contract: + 1. Sample qpos from the robot's joint limits. + 2. `compute_batch_fk` → `fk_xpos` (both matrix and xyzquat forms). + 3. `compute_batch_ik` on both pose forms; assert the two IK results match. + 4. Re-run FK on the IK output; assert `sample_qpos ≈ ik_qpos` and + `fk_xpos ≈ ik_xpos` with `atol=5e-3, rtol=5e-3`. + 5. Feed an unreachable pose; assert `res[0] == False` and the output shape. + - `teardown_method` calling `self.sim.destroy()`. +- Two concrete subclasses driving `setup_method`: + - `class TestFooSolverCUDA(BaseSolverTest): setup_method → "cuda"` + - `class TestFooSolver(BaseSolverTest): setup_method → "cpu"` +- `if __name__ == "__main__":` block running `pytest.main(["-v", "-s", __file__])`. + +### 7. Add the Benchmark Entry + +Extend `scripts/benchmark/robotics/kinematic_solver/run_benchmark.py` (do **not** +create a separate benchmark file — the kinematic-solver benchmark is unified): + +1. Add module-level constants for the new solver's joint limits, joint names, + TCP, etc. (mirror `UR_LOWER_LIMITS` / `UR_UPPER_LIMITS` / `UR_TCP`). +2. If the solver is not yet in `SUPPORTED_SOLVERS`, add its short name there + and update `_normalize_selected_solvers` / the `--solvers` argparse choices. +3. Write `_init__solver(device) -> Solver` and + `_timed__ik_call(solver, fk_xpos, qpos_seed)` helpers, mirroring + `_init_ur_solver` / `_timed_ur_ik_call` (3-iteration timing skipping the + first run, `_sync_cuda()`, `_reset_peak_gpu_memory()`, + `_memory_snapshot()`). +4. Write `benchmark__solver() -> (perf_rows, metric_rows)` mirroring + `benchmark_ur_solver`: iterate `SAMPLE_SIZES`, run CPU (+ optional CUDA), + verify accuracy via `get_pose_err`, and append rows for both the + `Time & Memory` and `Success & Other Metrics` tables. +5. Wire it into `run_all_benchmarks()` behind an `if "" in solvers_to_run:` + guard, extending `perf_rows` / `metric_rows`. The leaderboard + (`_build_leaderboard_rows`) and the markdown report + (`_write_markdown_report`) are shared — the report must contain exactly the + three tables (`Time & Memory`, `Success & Other Metrics`, `Leaderboard`). + +For the full benchmark conventions (timing, memory, three-table report), defer +to the `benchmark` skill (`.agents/skills/benchmark/SKILL.md`). + +### 8. Format and Verify + +```bash +conda activate embodichain +black embodichain/lab/sim/solvers/_solver.py +black embodichain/utils/warp/kinematics/_solver.py # if added +black tests/sim/solvers/test__solver.py +black scripts/benchmark/robotics/kinematic_solver/run_benchmark.py +``` + +Run the unit test (CPU class is enough for a quick check): + +```bash +pytest tests/sim/solvers/test__solver.py::TestFooSolver -v +``` + +Smoke-run the benchmark for the new solver only: + +```bash +python -m scripts.benchmark.robotics.kinematic_solver.run_benchmark -s +``` + +Finally, run the `/pre-commit-check` skill to catch all CI violations locally. + +## Code Style Checklist + +- [ ] Apache 2.0 header on every new file (solver, warp kernel, test) +- [ ] `from __future__ import annotations` after the header +- [ ] `@configclass` on the Cfg, inheriting `SolverCfg` +- [ ] `__post_init__` raises `ValueError` on unknown variants +- [ ] `init_solver` constructs the solver and calls `set_tcp(...)` +- [ ] Solver inherits `BaseSolver`, sets `self.dof`, implements `get_ik` +- [ ] `__all__` declared in the solver module +- [ ] Google-style docstrings with Sphinx directives (`.. tip::`, `.. attention::`) +- [ ] Cfg + Solver exported from `embodichain/lab/sim/solvers/__init__.py` +- [ ] Docs page added to `index.rst` toctree +- [ ] Test covers FK↔IK round-trip + unreachable-pose failure, CPU & CUDA classes +- [ ] Benchmark entry added with CPU (+CUDA) timing, accuracy, and the 3-table report +- [ ] `black` run on all changed files + +## Common Mistakes + +| Mistake | Fix | +|---------|-----| +| Reimplementing FK / TCP / joint limits in the solver | Reuse `BaseSolver.get_fk`, `set_tcp`, `set_qpos_limits` | +| Putting the Warp kernel inside the solver module | Put `@wp.kernel`/`@wp.struct` in `embodichain/utils/warp/kinematics/_solver.py` and import it | +| Not exporting the Cfg/Solver from `__init__.py` | Add the import line so `from embodichain.lab.sim.solvers import FooSolverCfg` works | +| Forgetting the docs toctree entry | Add the `.md` to `docs/source/overview/sim/solvers/index.rst` | +| Test only checks happy-path IK | Must verify FK↔IK round-trip equality AND an unreachable-pose returns `False` | +| Creating a separate benchmark file | Extend the unified `run_benchmark.py` instead | +| Skipping `black` / pre-commit | CI checks every file including tests and benchmarks | +| Missing `__post_init__` validation | Unknown robot variants must `raise ValueError` at config time | + +## Quick Reference + +| Action | Command / Path | +|--------|----------------| +| Reference solver | `embodichain/lab/sim/solvers/ur_solver.py` | +| Reference Warp kernel | `embodichain/utils/warp/kinematics/ur_solver.py` | +| Reference docs | `docs/source/overview/sim/solvers/ur_solver.md` | +| Reference test | `tests/sim/solvers/test_ur_solver.py` | +| Benchmark file | `scripts/benchmark/robotics/kinematic_solver/run_benchmark.py` | +| Python env | `conda activate embodichain` | +| Run test (CPU) | `pytest tests/sim/solvers/test__solver.py::TestFooSolver -v` | +| Run benchmark | `python -m scripts.benchmark.robotics.kinematic_solver.run_benchmark -s ` | +| Format | `black ` | +| Pre-commit | `/pre-commit-check` | diff --git a/.claude/skills/add-solver/SKILL.md b/.claude/skills/add-solver/SKILL.md new file mode 100644 index 00000000..9c0ce1d6 --- /dev/null +++ b/.claude/skills/add-solver/SKILL.md @@ -0,0 +1,24 @@ +--- +name: add-solver +description: Claude adapter for the canonical EmbodiChain add-solver skill. +--- + +# Add Solver - Claude Adapter + +Canonical source: `.agents/skills/add-solver/` + +## When to use + +- adding a new kinematic (IK/FK) solver +- scaffolding the solver module plus its docs, unit test, and benchmark entry together +- a new robot family needs a closed-form / numerical / Warp-kernel IK backend + +## Start here + +1. Use this adapter when the task asks for a new kinematic solver. +2. Then follow `.agents/skills/add-solver/SKILL.md`. + +The full procedure — solver module, (optional) Warp kernel, Sphinx docs page, +unit test, benchmark entry, registration in `__init__.py` and the docs toctree, +plus `black` and verification under `conda activate embodichain` — is defined in +the canonical skill. diff --git a/docs/source/overview/sim/solvers/index.rst b/docs/source/overview/sim/solvers/index.rst index 30ad1043..322195af 100644 --- a/docs/source/overview/sim/solvers/index.rst +++ b/docs/source/overview/sim/solvers/index.rst @@ -48,7 +48,7 @@ approaches implemented in the repository: Analytical IK ~~~~~~~~~~~~~ -Analytical solvers (see the OPW) exploit kinematic +Analytical solvers (see the OPW and UR solvers) exploit kinematic structure to derive algebraic inverse mappings. Benefits include: - very fast runtime @@ -77,7 +77,7 @@ embed loop-closure constraints in the solver as equality constraints. Choosing a solver ----------------- -- Use analytic solvers (OPW for 6-DOF arms or SRS for 7-DOF arms) when available for speed and +- Use analytic solvers (OPW for 6-DOF arms, UR for the Universal Robots family, or SRS for 7-DOF arms) when available for speed and determinism. - Use numerical solvers (PyTorch/optimization, Differential) when you need flexibility. @@ -96,4 +96,5 @@ See also pinocchio_solver.md opw_solver.md srs_solver.md + ur_solver.md neural_ik_solver.md diff --git a/docs/source/overview/sim/solvers/ur_solver.md b/docs/source/overview/sim/solvers/ur_solver.md new file mode 100644 index 00000000..17e3dfac --- /dev/null +++ b/docs/source/overview/sim/solvers/ur_solver.md @@ -0,0 +1,184 @@ +# URSolver + +`URSolver` is a specialized analytical inverse kinematics (IK) solver for the +Universal Robots family of 6-DOF manipulators (UR3, UR3e, UR5, UR5e, UR10, +UR10e). Unlike numerical solvers, it derives closed-form joint solutions from +the robot's Denavit–Hartenberg (DH) parameters and evaluates them in batch on +the GPU through a [NVIDIA Warp](https://github.com/NVIDIA/warp) kernel. This +makes it fast, deterministic, and well suited to large-scale batch IK tasks and +real-time control where exact solutions are preferred over iterative refinement. + +The analytic formulation follows the +[ur-analytic-ik](https://github.com/Victorlouisdg/ur-analytic-ik) repository, +and the DH parameters for each robot variant are taken from +[`dh_parameters.hh`](https://github.com/Victorlouisdg/ur-analytic-ik/blob/main/src/ur_analytic_ik/dh_parameters.hh). + +## Key Features + +* Analytical, closed-form IK for the full UR series (UR3/UR3e/UR5/UR5e/UR10/UR10e) +* DH parameters auto-populated from the selected `ur_type`, sourced from `ur-analytic-ik` +* Batched GPU computation via a Warp kernel for many target poses at once +* Up to 8 base analytical solutions, expanded to 512 FK-equivalent candidates per pose through per-joint `±2π` shifts +* Strict enforcement of joint limits; invalid candidates are flagged and filtered out +* Forward-kinematics (FK) error check that validates each candidate against the target pose +* Tool Center Point (TCP) support via `set_tcp` +* Flexible configuration via `URSolverCfg` + +## DH Parameters + +The UR kinematic chain is described with the standard DH convention: + +| Joint | theta | d | a | alpha | +|-------|-------|-----|-----|---------| +| 1 | q1 | d1 | 0 | +π/2 | +| 2 | q2 | 0 | a2 | 0 | +| 3 | q3 | 0 | a3 | 0 | +| 4 | q4 | d4 | 0 | +π/2 | +| 5 | q5 | d5 | 0 | -π/2 | +| 6 | q6 | d6 | 0 | 0 | + +Setting `ur_type` in `URSolverCfg` selects the matching `(d1, a2, a3, d4, d5, d6)` +values and the corresponding URDF asset. The values are reproduced from +[`dh_parameters.hh`](https://github.com/Victorlouisdg/ur-analytic-ik/blob/main/src/ur_analytic_ik/dh_parameters.hh). + +## Configuration + +The solver is configured using the `URSolverCfg` class. Selecting `ur_type` +automatically fills in the DH parameters and the URDF path; the remaining fields +default to sensible values and rarely need to be overridden. + +```python +import torch +from embodichain.lab.sim.solvers.ur_solver import URSolver, URSolverCfg + +cfg = URSolverCfg( + ur_type="ur10", # one of: ur3, ur3e, ur5, ur5e, ur10, ur10e + end_link_name="ee_link", + root_link_name="base_link", + # DH parameters are set automatically from ur_type; override only if needed: + # d1=0.1273, a2=-0.612, a3=-0.5723, d4=0.163941, d5=0.1157, d6=0.0922, + tcp=[ + [0.0, 1.0, 0.0, 0.0], + [-1.0, 0.0, 0.0, 0.0], + [0.0, 0.0, 1.0, 0.12], + [0.0, 0.0, 0.0, 1.0], + ], +) + +solver = cfg.init_solver(device="cuda") +``` + +.. tip:: + ``ur_type`` is the only parameter that usually needs to be set. The + ``__post_init__`` hook raises ``ValueError`` for any unrecognized value, so + an invalid robot variant fails fast at configuration time. + +## Main Methods + +* `get_fk(self, qpos: torch.Tensor) -> torch.Tensor` + Computes the end-effector pose (homogeneous transformation matrix) for the + given joint positions. Inherited from `BaseSolver` and computed from the URDF + kinematic chain, with the TCP applied. + + **Parameters:** + + `qpos` (`torch.Tensor` or `list[float]`): Joint positions, shape `(num_envs, num_joints)` or `(num_joints,)`. + + **Returns:** + + `torch.Tensor`: End-effector pose(s), shape `(num_envs, 4, 4)`. + + **Example:** + +```python + fk = solver.get_fk(qpos=[0.0, -1.5708, 0.0, -1.5708, 0.0, 0.0]) + print(fk) + # Output: + # tensor([[[ -1.0000, -0.0000, 0.0000, 0.0000], + # [ -0.0000, -0.0000, -1.0000, -0.2561], + # [ 0.0000, -1.0000, 0.0000, 1.4273], + # [ 0.0000, 0.0000, 0.0000, 1.0000]]], device='cuda:0') +``` + +* `get_ik(self, target_xpos: torch.Tensor, qpos_seed: torch.Tensor, return_all_solutions: bool = False, **kwargs) -> Tuple[torch.Tensor, torch.Tensor]` + Computes joint positions (inverse kinematics) for the given target + end-effector pose(s) using the closed-form UR solution evaluated in a Warp + kernel. + + For each target pose the kernel first derives the 8 base analytical solutions + (combinations of the two `theta1` branches, the two `theta5` sign branches, + and the elbow-up/down `theta3` branch). Because UR joints are `2π`-periodic, + each base solution is then expanded into `2**6 = 64` per-joint `±2π` shift + variants that are FK-equivalent, yielding `8 * 64 = 512` candidate solutions. + A candidate is flagged valid only if its forward kinematics matches the target + pose *and* every joint lies within the configured joint limits. + + When `return_all_solutions=False`, the valid candidate closest to `qpos_seed` + (by Euclidean distance in joint space) is returned for each target. + + **Parameters:** + + `target_xpos` (`torch.Tensor`): Target end-effector pose(s), shape `(4, 4)` or `(num_envs, 4, 4)`. + + `qpos_seed` (`torch.Tensor`): Reference joint positions used to pick the closest solution, shape `(num_envs, num_joints)` or `(num_joints,)`. + + `return_all_solutions` (`bool`, optional): If `True`, returns all 512 candidates and their validity flags instead of the single closest solution. Default is `False`. + + `**kwargs`: Additional arguments for future extensions. + + **Returns:** + + `Tuple[torch.Tensor, torch.Tensor]`: + - If `return_all_solutions=False`: + - First element: validity flag per environment, shape `(num_envs,)`. + - Second element: closest joint positions, shape `(num_envs, num_joints)`. + - If `return_all_solutions=True`: + - First element: validity flag per candidate, shape `(num_envs, 512)`. + - Second element: all candidate joint positions, shape `(num_envs, 512, num_joints)`. + + **Example:** + +```python + import torch + # Reuse the FK pose above as the IK target. + xpos = torch.tensor([[[ -1.0000, -0.0000, 0.0000, 0.0000], + [ -0.0000, -0.0000, -1.0000, -0.2561], + [ 0.0000, -1.0000, 0.0000, 1.4273], + [ 0.0000, 0.0000, 0.0000, 1.0000]]], + device=solver.device) + qpos_seed = torch.zeros((1, 6), device=solver.device) + valid, ik_qpos = solver.get_ik(target_xpos=xpos, qpos_seed=qpos_seed) + print("IK valid:", valid) + print("IK solution:", ik_qpos) + # IK valid: tensor([True], device='cuda:0') + # IK solution: tensor([[ 0.0004, -1.5704, -0.0007, -1.5705, -0.0004, -0.0000]], + # device='cuda:0') + + # Return every candidate instead of just the closest one: + valid_all, all_solutions = solver.get_ik( + target_xpos=xpos, qpos_seed=qpos_seed, return_all_solutions=True + ) + print(all_solutions.shape) # torch.Size([1, 512, 6]) +``` + +* `set_tcp(self, tcp: np.ndarray)` + Sets the Tool Center Point transform and precomputes its inverse so that IK + targets are mapped into the flange frame before solving. + +* `dh_matrix(theta_i, d_i, a_i, alpha_i) -> torch.Tensor` *(staticmethod)* + Computes a single 4×4 Denavit–Hartenberg transformation matrix from the + standard DH parameters `(theta, d, a, alpha)`. Useful for building custom FK + chains or debugging the kinematic model. + +## How It Works + +1. **DH model**: The robot is parameterized by `(d1, a2, a3, d4, d5, d6)` and + the fixed twists `alpha ∈ {+π/2, 0, -π/2}` (see the table above). +2. **Analytic solve**: For a target pose, `theta1` has two branches; for each, + `theta5` (sign) and `theta6` are recovered, then `theta2/3/4` are solved in + closed form with an elbow-up/down split — yielding 8 base solutions. +3. **Shift expansion**: Each base solution is expanded into 64 per-joint `±2π` + shift variants that preserve the end-effector pose, producing 512 candidates. +4. **Validation**: A candidate is valid only if its forward kinematics matches + the target (translation ≤ 1e-2, rotation ≤ 1e-1) *and* all joints lie within + the joint limits. The Warp kernel runs one thread per target pose, so the + whole batch is evaluated in parallel on the GPU. + +## References + +* [ur-analytic-ik (GitHub)](https://github.com/Victorlouisdg/ur-analytic-ik) — reference implementation of the analytic UR IK +* [DH parameters source (`dh_parameters.hh`)](https://github.com/Victorlouisdg/ur-analytic-ik/blob/main/src/ur_analytic_ik/dh_parameters.hh) +* [NVIDIA Warp Documentation](https://github.com/NVIDIA/warp) diff --git a/embodichain/lab/sim/solvers/__init__.py b/embodichain/lab/sim/solvers/__init__.py index 25a932ba..bfea96b2 100644 --- a/embodichain/lab/sim/solvers/__init__.py +++ b/embodichain/lab/sim/solvers/__init__.py @@ -22,3 +22,4 @@ from .opw_solver import OPWSolverCfg, OPWSolver from .srs_solver import SRSSolverCfg, SRSSolver from .neural_ik_solver import NeuralIKSolverCfg, NeuralIKSolver +from .ur_solver import URSolverCfg, URSolver diff --git a/embodichain/lab/sim/solvers/ur_solver.py b/embodichain/lab/sim/solvers/ur_solver.py new file mode 100644 index 00000000..1ff83afb --- /dev/null +++ b/embodichain/lab/sim/solvers/ur_solver.py @@ -0,0 +1,254 @@ +# ---------------------------------------------------------------------------- +# Copyright (c) 2021-2026 DexForce Technology Co., Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ---------------------------------------------------------------------------- + +import torch +import numpy as np +import warp as wp +from embodichain.utils import configclass +from embodichain.lab.sim.solvers import SolverCfg, BaseSolver +from embodichain.data import get_data_path +from embodichain.utils.warp.kinematics.ur_solver import ( + URParam, + ur_ik_kernel, +) +import math +from embodichain.utils.device_utils import standardize_device_string + + +@configclass +class URSolverCfg(SolverCfg): + ur_type: str = "ur10" + end_link_name: str = "ee_link" + root_link_name: str = "base_link" + urdf_path: str = get_data_path("UniversalRobots/UR10/UR10.urdf") + # DH parameters: default ur10 parameters + d1: float = 0.1273 + a2: float = -0.612 + a3: float = -0.5723 + d4: float = 0.163941 + d5: float = 0.1157 + d6: float = 0.0922 + + alpha1: float = torch.pi * 0.5 + alpha4: float = torch.pi * 0.5 + alpha5: float = -torch.pi * 0.5 + + def __post_init__(self): + super().__post_init__() + # from https://github.com/Victorlouisdg/ur-analytic-ik/blob/main/src/ur_analytic_ik/dh_parameters.hh + if self.ur_type == "ur3": + self.d1 = 0.1519 + self.d4 = 0.11235 + self.d5 = 0.08535 + self.d6 = 0.0819 + self.a2 = -0.24365 + self.a3 = -0.21325 + self.urdf_path = get_data_path("UniversalRobots/UR3/UR3.urdf") + elif self.ur_type == "ur3e": + self.d1 = 0.15185 + self.d4 = 0.13105 + self.d5 = 0.08535 + self.d6 = 0.0921 + self.a2 = -0.24355 + self.a3 = -0.2132 + self.urdf_path = get_data_path("UniversalRobots/UR3e/UR3e.urdf") + elif self.ur_type == "ur5": + self.d1 = 0.089159 + self.d4 = 0.10915 + self.d5 = 0.09465 + self.d6 = 0.0823 + self.a2 = -0.425 + self.a3 = -0.39225 + self.urdf_path = get_data_path("UniversalRobots/UR5/UR5.urdf") + elif self.ur_type == "ur5e": + self.d1 = 0.1625 + self.d4 = 0.1333 + self.d5 = 0.0997 + self.d6 = 0.0996 + self.a2 = -0.425 + self.a3 = -0.3922 + self.urdf_path = get_data_path("UniversalRobots/UR5e/UR5e.urdf") + elif self.ur_type == "ur10": + self.d1 = 0.1273 + self.d4 = 0.163941 + self.d5 = 0.1157 + self.d6 = 0.0922 + self.a2 = -0.612 + self.a3 = -0.5723 + self.urdf_path = get_data_path("UniversalRobots/UR10/UR10.urdf") + elif self.ur_type == "ur10e": + self.d1 = 0.1807 + self.d4 = 0.17415 + self.d5 = 0.11985 + self.d6 = 0.11655 + self.a2 = -0.612 + self.a3 = -0.5723 + self.urdf_path = get_data_path("UniversalRobots/UR10e/UR10e.urdf") + else: + raise ValueError(f"Unknown UR type: {self.ur_type}") + + def init_solver( + self, device: torch.device = torch.device("cpu"), **kwargs + ) -> "URSolver": + """Initialize the solver with the configuration. + + Args: + device (torch.device): The device to use for the solver. Defaults to CPU. + **kwargs: Additional keyword arguments that may be used for solver initialization. + + Returns: + URSolver: An initialized solver instance. + """ + solver = URSolver(cfg=self, device=device, **kwargs) + solver.set_tcp(self._get_tcp_as_numpy()) + return solver + + +class URSolver(BaseSolver): + def __init__(self, cfg: URSolverCfg, device: str, **kwargs): + super().__init__(cfg, device, **kwargs) + self.dof = 6 + self._init_warp_solver(cfg) + + def _init_warp_solver(self, cfg: URSolverCfg): + self._ur_params = URParam() + self._ur_params.d1 = cfg.d1 + self._ur_params.a2 = cfg.a2 + self._ur_params.a3 = cfg.a3 + self._ur_params.d4 = cfg.d4 + self._ur_params.d5 = cfg.d5 + self._ur_params.d6 = cfg.d6 + + def set_tcp(self, tcp: np.ndarray): + super().set_tcp(tcp) + self._tcp_inv = np.eye(4, dtype=float) + self._tcp_inv[:3, :3] = self.tcp_xpos[:3, :3].T + self._tcp_inv[:3, 3] = -self._tcp_inv[:3, :3] @ self.tcp_xpos[:3, 3] + + def get_ik( + self, + target_xpos: torch.Tensor, + qpos_seed: torch.Tensor, + return_all_solutions: bool = False, + **kwargs, + ): + """Compute target joint positions using OPW inverse kinematics. + + Args: + target_xpos (torch.Tensor): Current end-effector pose, shape (n_sample, 4, 4). + qpos_seed (torch.Tensor): Current joint positions, shape (n_sample, num_joints). + return_all_solutions (bool, optional): Whether to return all IK solutions or just the best one. Defaults to False. + **kwargs: Additional keyword arguments for future extensions. + + Returns: + Tuple[torch.Tensor, torch.Tensor]: + - target_joints (torch.Tensor): Computed target joint positions, shape (n_sample, n_solution, num_joints). + - success (torch.Tensor): Boolean tensor indicating IK solution validity for each environment, shape (n_sample,). + """ + N_SOL = 512 + DOF = 6 + if target_xpos.shape == (4, 4): + target_xpos_batch = target_xpos[None, :, :] + else: + target_xpos_batch = target_xpos + tcp_inv = torch.tensor(self._tcp_inv, dtype=torch.float32, device=self.device) + target_xpos_batch = target_xpos_batch @ tcp_inv[None, :, :] + n_sample = target_xpos_batch.shape[0] + + device = self.device + wp_device = standardize_device_string(self.device) + # Flatten target poses to a 1-D float array for the Warp kernel. + xpos_wp = wp.from_torch(target_xpos_batch.reshape(-1)) + all_qpos_wp = wp.zeros(n_sample * N_SOL * DOF, dtype=float, device=wp_device) + all_ik_valid_wp = wp.zeros(n_sample * N_SOL, dtype=int, device=wp_device) + lower_qpos_limits_wp = wp.from_torch(self.lower_qpos_limits) + upper_qpos_limits_wp = wp.from_torch(self.upper_qpos_limits) + wp.launch( + kernel=ur_ik_kernel, + dim=(n_sample,), + inputs=[ + xpos_wp, + self._ur_params, + lower_qpos_limits_wp, + upper_qpos_limits_wp, + ], + outputs=[all_qpos_wp, all_ik_valid_wp], + device=wp_device, + ) + + all_solutions = ( + wp.to_torch(all_qpos_wp) + .reshape(n_sample, N_SOL, DOF) + .to(dtype=torch.float32, device=device) + ) + all_solutions_validity = ( + wp.to_torch(all_ik_valid_wp) + .reshape(n_sample, N_SOL) + .bool() + .to(device=device) + ) + + if return_all_solutions: + return all_solutions_validity, all_solutions + # Select ik qpos based on the closest distance to the seed qpos + qpos_seed_expanded = qpos_seed.unsqueeze(1).expand(-1, N_SOL, -1) + distances = torch.norm(all_solutions - qpos_seed_expanded, dim=-1) + # fill invalid solutions with inf distance + distances[~all_solutions_validity] = float("inf") + closest_indices = torch.argmin(distances, dim=1) + ik_qpos = all_solutions[torch.arange(n_sample), closest_indices] + ik_validity = all_solutions_validity[torch.arange(n_sample), closest_indices] + return ik_validity, ik_qpos + + @staticmethod + def dh_matrix(theta_i, d_i, a_i, alpha_i): + """ + Compute the Denavit-Hartenberg transformation matrix. + + Args: + theta_i (float): Joint angle in radians. + d_i (float): Link offset along the previous z-axis. + a_i (float): Link length along the previous x-axis. + alpha_i (float): Link twist angle in radians. + + Returns: + torch.Tensor: A 4x4 transformation matrix representing the pose of the next link. + """ + m = torch.zeros((4, 4), dtype=torch.float32) + s_ai = math.sin(alpha_i) + c_ai = math.cos(alpha_i) + + m[0, 0] = torch.cos(theta_i) + m[0, 1] = -torch.sin(theta_i) * c_ai + m[0, 2] = torch.sin(theta_i) * s_ai + m[0, 3] = a_i * torch.cos(theta_i) + + m[1, 0] = torch.sin(theta_i) + m[1, 1] = torch.cos(theta_i) * c_ai + m[1, 2] = -torch.cos(theta_i) * s_ai + m[1, 3] = a_i * torch.sin(theta_i) + + m[2, 0] = 0.0 + m[2, 1] = s_ai + m[2, 2] = c_ai + m[2, 3] = d_i + + m[3, 0] = 0.0 + m[3, 1] = 0.0 + m[3, 2] = 0.0 + m[3, 3] = 1.0 + + return m diff --git a/embodichain/utils/warp/kinematics/__init__.py b/embodichain/utils/warp/kinematics/__init__.py index c351defc..36642597 100644 --- a/embodichain/utils/warp/kinematics/__init__.py +++ b/embodichain/utils/warp/kinematics/__init__.py @@ -16,4 +16,5 @@ from . import interpolate from . import opw_solver +from . import ur_solver from . import warp_trajectory diff --git a/embodichain/utils/warp/kinematics/ur_solver.py b/embodichain/utils/warp/kinematics/ur_solver.py new file mode 100644 index 00000000..d886750c --- /dev/null +++ b/embodichain/utils/warp/kinematics/ur_solver.py @@ -0,0 +1,500 @@ +# ---------------------------------------------------------------------------- +# Copyright (c) 2021-2026 DexForce Technology Co., Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ---------------------------------------------------------------------------- + +from __future__ import annotations + +from typing import Tuple + +import warp as wp + +wp_vec6f = wp.types.vector(length=6, dtype=float) +wp_vec48f = wp.types.vector(length=48, dtype=float) + + +@wp.func +def normalize_to_pi(angle: float) -> float: + # TODO: Cannot work in warp. + # return (angle + wp.pi) % (2.0 * wp.pi) - wp.pi + return wp.atan2(wp.sin(angle), wp.cos(angle)) + + +@wp.struct +class URParam: + d1: float + a2: float + a3: float + d4: float + d5: float + d6: float + + +@wp.func +def _safe_sqrt_ur(x: float) -> float: + return wp.sqrt(wp.max(x, float(0.0))) + + +@wp.func +def _ur_dh(theta: float, d: float, a: float, alpha: float) -> wp.mat44f: + """Compute a standard Denavit-Hartenberg transformation matrix.""" + sa = wp.sin(alpha) + ca = wp.cos(alpha) + st = wp.sin(theta) + ct = wp.cos(theta) + return wp.mat44f( + ct, + -st * ca, + st * sa, + a * ct, + st, + ct * ca, + -ct * sa, + a * st, + float(0.0), + sa, + ca, + d, + float(0.0), + float(0.0), + float(0.0), + float(1.0), + ) + + +@wp.func +def ur_single_fk( + q1: float, + q2: float, + q3: float, + q4: float, + q5: float, + q6: float, + params: URParam, +) -> wp.mat44f: + """Compute UR robot forward kinematics via sequential DH matrix products. + + The DH parameter convention follows the standard UR kinematic description: + - Joint 1: (theta1, d1, 0, pi/2) + - Joint 2: (theta2, 0, a2, 0 ) + - Joint 3: (theta3, 0, a3, 0 ) + - Joint 4: (theta4, d4, 0, pi/2) + - Joint 5: (theta5, d5, 0, -pi/2) + - Joint 6: (theta6, d6, 0, 0 ) + """ + half_pi = wp.pi * float(0.5) + T01 = _ur_dh(q1, params.d1, float(0.0), half_pi) + T12 = _ur_dh(q2, float(0.0), params.a2, float(0.0)) + T23 = _ur_dh(q3, float(0.0), params.a3, float(0.0)) + T34 = _ur_dh(q4, params.d4, float(0.0), half_pi) + T45 = _ur_dh(q5, params.d5, float(0.0), -half_pi) + T56 = _ur_dh(q6, params.d6, float(0.0), float(0.0)) + T02 = T01 * T12 + T03 = T02 * T23 + T04 = T03 * T34 + T05 = T04 * T45 + return T05 * T56 + + +@wp.func +def _ur_transform_err(t1: wp.mat44f, t2: wp.mat44f) -> Tuple[float, float]: + """Return (translation_error, rotation_error) between two homogeneous transforms.""" + dx = t1[0, 3] - t2[0, 3] + dy = t1[1, 3] - t2[1, 3] + dz = t1[2, 3] - t2[2, 3] + t_err = wp.sqrt(dx * dx + dy * dy + dz * dz) + + r1 = wp.mat33f( + t1[0, 0], + t1[0, 1], + t1[0, 2], + t1[1, 0], + t1[1, 1], + t1[1, 2], + t1[2, 0], + t1[2, 1], + t1[2, 2], + ) + r2 = wp.mat33f( + t2[0, 0], + t2[0, 1], + t2[0, 2], + t2[1, 0], + t2[1, 1], + t2[1, 2], + t2[2, 0], + t2[2, 1], + t2[2, 2], + ) + r_diff = wp.transpose(r1) * r2 + cos_val = wp.clamp( + float(0.5) * (wp.trace(r_diff) - float(1.0)), float(-1.0), float(1.0) + ) + r_err = wp.abs(wp.acos(cos_val)) + return t_err, r_err + + +@wp.func +def _ur_solve_234( + c1: float, + s1: float, + theta5: float, + theta6: float, + r11: float, + r21: float, + r31: float, + px: float, + py: float, + pz: float, + d1: float, + a2: float, + a3: float, + d5: float, + d6: float, +) -> Tuple[float, float, float, float, float, float]: + """Compute joint angles 2/3/4 given joint angles 1/5/6. + + Returns: + (theta2a, theta3a, theta4a, theta2b, theta3b, theta4b) + where a/b correspond to the two elbow-up/down branches from theta3. + """ + c5 = wp.cos(theta5) + s5 = wp.sin(theta5) + c6 = wp.cos(theta6) + s6 = wp.sin(theta6) + + A234 = c1 * r11 + s1 * r21 + H1_234 = c5 * c6 * r31 - s6 * A234 + H2_234 = c5 * c6 * A234 + s6 * r31 + theta234 = wp.atan2(H1_234, H2_234) + c234 = wp.cos(theta234) + s234 = wp.sin(theta234) + + KC = c1 * px + s1 * py - s234 * d5 + c234 * s5 * d6 + KS = pz - d1 + c234 * d5 + s234 * s5 * d6 + + H3_cos = (KS * KS + KC * KC - a2 * a2 - a3 * a3) / (float(2.0) * a2 * a3) + H3_sin = _safe_sqrt_ur(float(1.0) - H3_cos * H3_cos) + + theta3a = wp.atan2(H3_sin, H3_cos) + theta3b = -theta3a + + base_t2 = wp.atan2(KS, KC) + theta2a = base_t2 - wp.atan2(a3 * wp.sin(theta3a), a3 * wp.cos(theta3a) + a2) + theta2b = base_t2 - wp.atan2(a3 * wp.sin(theta3b), a3 * wp.cos(theta3b) + a2) + + theta4a = theta234 - theta2a - theta3a + theta4b = theta234 - theta2b - theta3b + + return theta2a, theta3a, theta4a, theta2b, theta3b, theta4b + + +@wp.func +def _shift_to_limit(q: float, lo: float, hi: float) -> float: + """Return an FK-equivalent value of ``q`` shifted by +/- 2*pi inside [lo, hi]. + + UR joints are 2*pi-periodic, so ``q`` and ``q +/- 2*pi`` yield identical + forward kinematics. When a 2*pi-shifted representative falls inside the + joint limits it is returned; otherwise ``q`` itself is returned as a + repeated fallback (the validity check later flags out-of-limit values). + + Args: + q: Joint angle normalized to [-pi, pi]. + lo: Lower joint limit. + hi: Upper joint limit. + + Returns: + A 2*pi-shifted copy of ``q`` inside [lo, hi] when one exists, else ``q``. + """ + two_pi = 2.0 * wp.pi + q_plus = q + two_pi + if q_plus >= lo and q_plus <= hi: + return q_plus + q_minus = q - two_pi + if q_minus >= lo and q_minus <= hi: + return q_minus + return q + + +@wp.kernel +def ur_ik_kernel( + xpos: wp.array(dtype=float), # [n_sample * 16] row-major 4x4 target poses + params: URParam, + lower_qpos_limits_wp: wp.array(dtype=float), # [6] lower joint limits + upper_qpos_limits_wp: wp.array(dtype=float), # [6] upper joint limits + qpos: wp.array(dtype=float), # [n_sample * 512 * DOF] output joint solutions + ik_valid: wp.array(dtype=int), # [n_sample * 512] output validity flags +): + """Compute expanded analytical IK solutions for a batch of UR poses. + + Each thread handles one target pose. The 8 base analytical solutions are + expanded to ``8 * 2**6 = 512`` candidates: for every base solution and every + joint, a second FK-equivalent value shifted by +/- 2*pi is generated when it + falls inside the joint limits (UR joints are 2*pi-periodic, so this preserves + the end-effector pose). When no shifted value fits the limits, the joint's own + value is repeated. Each candidate is flagged valid only if the base FK matches + the target *and* every joint lies within its limits. + """ + i = wp.tid() + DOF = int(6) + N_SOL = int(8) + N_SHIFT = int(64) # 2**6 per-joint +/- 2*pi shift combinations + base = i * 16 + + # Load rotation and translation from the row-major 4x4 target pose. + r11 = xpos[base + 0] + r12 = xpos[base + 1] + r13 = xpos[base + 2] + px = xpos[base + 3] + r21 = xpos[base + 4] + r22 = xpos[base + 5] + r23 = xpos[base + 6] + py = xpos[base + 7] + r31 = xpos[base + 8] + r32 = xpos[base + 9] + r33 = xpos[base + 10] + pz = xpos[base + 11] + + # Reconstruct target pose matrix for FK error check. + target_pose = wp.mat44f( + r11, + r12, + r13, + px, + r21, + r22, + r23, + py, + r31, + r32, + r33, + pz, + float(0.0), + float(0.0), + float(0.0), + float(1.0), + ) + + d1 = params.d1 + a2 = params.a2 + a3 = params.a3 + d4 = params.d4 + d5 = params.d5 + d6 = params.d6 + + # ---- theta1: two solutions ---- + A1 = px - d6 * r13 + B1 = d6 * r23 - py + H1_sq = A1 * A1 + B1 * B1 - d4 * d4 + H2 = wp.atan2(_safe_sqrt_ur(H1_sq), d4) + base_ang1 = wp.atan2(A1, B1) + theta1a = base_ang1 + H2 + theta1b = base_ang1 - H2 + + # ---- theta5 & theta6 for theta1a ---- + c1a = wp.cos(theta1a) + s1a = wp.sin(theta1a) + c5_val_a = s1a * r13 - c1a * r23 + s5_sq_a = (s1a * r11 - c1a * r21) * (s1a * r11 - c1a * r21) + ( + s1a * r12 - c1a * r22 + ) * (s1a * r12 - c1a * r22) + s5p_a = _safe_sqrt_ur(s5_sq_a) + t5_pos_a = wp.atan2(s5p_a, c5_val_a) # theta5 branch with positive s5 + t5_neg_a = wp.atan2(-s5p_a, c5_val_a) # theta5 branch with negative s5 + H61_a = c1a * r22 - s1a * r12 + H62_a = s1a * r11 - c1a * r21 + sin5p_a = wp.sin(t5_pos_a) + sin5n_a = wp.sin(t5_neg_a) + sgn5p_a = float(1.0) + sgn5n_a = float(-1.0) + if wp.abs(sin5p_a) > float(1e-12): + if sin5p_a < float(0.0): + sgn5p_a = float(-1.0) + if sin5n_a < float(0.0): + sgn5n_a = float(-1.0) + else: + sgn5n_a = float(1.0) + t6_5pa = wp.atan2(sgn5p_a * H61_a, sgn5p_a * H62_a) # theta6 for (1a, 5pos) + t6_5na = wp.atan2(sgn5n_a * H61_a, sgn5n_a * H62_a) # theta6 for (1a, 5neg) + + # ---- theta5 & theta6 for theta1b ---- + c1b = wp.cos(theta1b) + s1b = wp.sin(theta1b) + c5_val_b = s1b * r13 - c1b * r23 + s5_sq_b = (s1b * r11 - c1b * r21) * (s1b * r11 - c1b * r21) + ( + s1b * r12 - c1b * r22 + ) * (s1b * r12 - c1b * r22) + s5p_b = _safe_sqrt_ur(s5_sq_b) + t5_pos_b = wp.atan2(s5p_b, c5_val_b) + t5_neg_b = wp.atan2(-s5p_b, c5_val_b) + H61_b = c1b * r22 - s1b * r12 + H62_b = s1b * r11 - c1b * r21 + sin5p_b = wp.sin(t5_pos_b) + sin5n_b = wp.sin(t5_neg_b) + sgn5p_b = float(1.0) + sgn5n_b = float(-1.0) + if wp.abs(sin5p_b) > float(1e-12): + if sin5p_b < float(0.0): + sgn5p_b = float(-1.0) + if sin5n_b < float(0.0): + sgn5n_b = float(-1.0) + else: + sgn5n_b = float(1.0) + t6_5pb = wp.atan2(sgn5p_b * H61_b, sgn5p_b * H62_b) + t6_5nb = wp.atan2(sgn5n_b * H61_b, sgn5n_b * H62_b) + + # ---- theta2/3/4 for each (theta1, theta5, theta6) group ---- + # Group 0: theta1a + positive theta5 -> solutions 0, 1 + t2a_g0, t3a_g0, t4a_g0, t2b_g0, t3b_g0, t4b_g0 = _ur_solve_234( + c1a, s1a, t5_pos_a, t6_5pa, r11, r21, r31, px, py, pz, d1, a2, a3, d5, d6 + ) + # Group 1: theta1a + negative theta5 -> solutions 2, 3 + t2a_g1, t3a_g1, t4a_g1, t2b_g1, t3b_g1, t4b_g1 = _ur_solve_234( + c1a, s1a, t5_neg_a, t6_5na, r11, r21, r31, px, py, pz, d1, a2, a3, d5, d6 + ) + # Group 2: theta1b + positive theta5 -> solutions 4, 5 + t2a_g2, t3a_g2, t4a_g2, t2b_g2, t3b_g2, t4b_g2 = _ur_solve_234( + c1b, s1b, t5_pos_b, t6_5pb, r11, r21, r31, px, py, pz, d1, a2, a3, d5, d6 + ) + # Group 3: theta1b + negative theta5 -> solutions 6, 7 + t2a_g3, t3a_g3, t4a_g3, t2b_g3, t3b_g3, t4b_g3 = _ur_solve_234( + c1b, s1b, t5_neg_b, t6_5nb, r11, r21, r31, px, py, pz, d1, a2, a3, d5, d6 + ) + + # Assemble all 8 solutions as a flat 48-element vector [t1,t2,t3,t4,t5,t6, ...] + theta = wp_vec48f( + theta1a, + t2a_g0, + t3a_g0, + t4a_g0, + t5_pos_a, + t6_5pa, # sol 0 + theta1a, + t2b_g0, + t3b_g0, + t4b_g0, + t5_pos_a, + t6_5pa, # sol 1 + theta1a, + t2a_g1, + t3a_g1, + t4a_g1, + t5_neg_a, + t6_5na, # sol 2 + theta1a, + t2b_g1, + t3b_g1, + t4b_g1, + t5_neg_a, + t6_5na, # sol 3 + theta1b, + t2a_g2, + t3a_g2, + t4a_g2, + t5_pos_b, + t6_5pb, # sol 4 + theta1b, + t2b_g2, + t3b_g2, + t4b_g2, + t5_pos_b, + t6_5pb, # sol 5 + theta1b, + t2a_g3, + t3a_g3, + t4a_g3, + t5_neg_b, + t6_5nb, # sol 6 + theta1b, + t2b_g3, + t3b_g3, + t4b_g3, + t5_neg_b, + t6_5nb, # sol 7 + ) + + # Expand each of the 8 base solutions into 2**6 = 64 per-joint +/- 2*pi shift + # variants, yielding 8 * 64 = 512 candidates total. Shifting is FK-equivalent + # and only applied when it lands inside the joint limits; otherwise the joint's + # own value is repeated. Validity requires both FK match and joint-limit fit. + for j in range(N_SOL): + q1 = normalize_to_pi(theta[j * DOF + 0]) + q2 = normalize_to_pi(theta[j * DOF + 1]) + q3 = normalize_to_pi(theta[j * DOF + 2]) + q4 = normalize_to_pi(theta[j * DOF + 3]) + q5 = normalize_to_pi(theta[j * DOF + 4]) + q6 = normalize_to_pi(theta[j * DOF + 5]) + + # Precompute the per-joint shifted representative inside [lo, hi]. + q1_shift = _shift_to_limit(q1, lower_qpos_limits_wp[0], upper_qpos_limits_wp[0]) + q2_shift = _shift_to_limit(q2, lower_qpos_limits_wp[1], upper_qpos_limits_wp[1]) + q3_shift = _shift_to_limit(q3, lower_qpos_limits_wp[2], upper_qpos_limits_wp[2]) + q4_shift = _shift_to_limit(q4, lower_qpos_limits_wp[3], upper_qpos_limits_wp[3]) + q5_shift = _shift_to_limit(q5, lower_qpos_limits_wp[4], upper_qpos_limits_wp[4]) + q6_shift = _shift_to_limit(q6, lower_qpos_limits_wp[5], upper_qpos_limits_wp[5]) + + fk_result = ur_single_fk(q1, q2, q3, q4, q5, q6, params) + t_err, r_err = _ur_transform_err(fk_result, target_pose) + fk_ok = int(1) + if t_err > float(1e-2) or r_err > float(1e-1): + fk_ok = int(0) + + for k in range(N_SHIFT): + out_start = i * DOF * N_SOL * N_SHIFT + (j * N_SHIFT + k) * DOF + oq1 = q1 if (k & 1) == 0 else q1_shift + oq2 = q2 if (k & 2) == 0 else q2_shift + oq3 = q3 if (k & 4) == 0 else q3_shift + oq4 = q4 if (k & 8) == 0 else q4_shift + oq5 = q5 if (k & 16) == 0 else q5_shift + oq6 = q6 if (k & 32) == 0 else q6_shift + qpos[out_start + 0] = oq1 + qpos[out_start + 1] = oq2 + qpos[out_start + 2] = oq3 + qpos[out_start + 3] = oq4 + qpos[out_start + 4] = oq5 + qpos[out_start + 5] = oq6 + + valid = fk_ok + tol = float(1e-9) + if ( + oq1 < lower_qpos_limits_wp[0] - tol + or oq1 > upper_qpos_limits_wp[0] + tol + ): + valid = int(0) + if ( + oq2 < lower_qpos_limits_wp[1] - tol + or oq2 > upper_qpos_limits_wp[1] + tol + ): + valid = int(0) + if ( + oq3 < lower_qpos_limits_wp[2] - tol + or oq3 > upper_qpos_limits_wp[2] + tol + ): + valid = int(0) + if ( + oq4 < lower_qpos_limits_wp[3] - tol + or oq4 > upper_qpos_limits_wp[3] + tol + ): + valid = int(0) + if ( + oq5 < lower_qpos_limits_wp[4] - tol + or oq5 > upper_qpos_limits_wp[4] + tol + ): + valid = int(0) + if ( + oq6 < lower_qpos_limits_wp[5] - tol + or oq6 > upper_qpos_limits_wp[5] + tol + ): + valid = int(0) + ik_valid[i * N_SOL * N_SHIFT + j * N_SHIFT + k] = valid diff --git a/scripts/benchmark/robotics/kinematic_solver/run_benchmark.py b/scripts/benchmark/robotics/kinematic_solver/run_benchmark.py index 5f4451ae..584f0e51 100644 --- a/scripts/benchmark/robotics/kinematic_solver/run_benchmark.py +++ b/scripts/benchmark/robotics/kinematic_solver/run_benchmark.py @@ -14,10 +14,11 @@ # limitations under the License. # ---------------------------------------------------------------------------- -"""Unified benchmark for OPW and Pytorch kinematic solvers. +"""Unified benchmark for OPW, UR, and Pytorch kinematic solvers. Measures IK wall-clock latency, pose accuracy, success rate, and memory usage -across OPW (Warp CUDA vs CPU) and Pytorch solver (CPU vs optional CUDA). +across OPW (Warp CUDA vs CPU), UR analytic (Warp CPU vs CUDA), and the +Pytorch solver (CPU vs optional CUDA). Run: python -m scripts.benchmark.robotics.kinematic_solver.run_benchmark """ @@ -36,6 +37,7 @@ from embodichain.data import get_data_path from embodichain.lab.sim.solvers.opw_solver import OPWSolverCfg from embodichain.lab.sim.solvers.pytorch_solver import PytorchSolver, PytorchSolverCfg +from embodichain.lab.sim.solvers.ur_solver import URSolver, URSolverCfg OPW_LOWER_LIMITS = [-2.618, 0.0, -2.967, -1.745, -1.22, -2.0944] OPW_UPPER_LIMITS = [2.618, 3.14159, 0.0, 1.745, 1.22, 2.0944] @@ -46,8 +48,20 @@ PYTORCH_LOWER_LIMITS = [1.0, 1.0, 1.0, 1.0, 1.0, 1.0] PYTORCH_UPPER_LIMITS = [2.5, 2.5, 2.5, 2.5, 2.5, 2.5] +# UR10 analytic IK is robust over the full range; use +/- pi (matching test_ur_solver) +# with a small safety margin applied at sampling time. +UR_LOWER_LIMITS = [-np.pi, -np.pi, -np.pi, -np.pi, -np.pi, -np.pi] +UR_UPPER_LIMITS = [np.pi, np.pi, np.pi, np.pi, np.pi, np.pi] +UR_JOINT_NAMES = ["Joint1", "Joint2", "Joint3", "Joint4", "Joint5", "Joint6"] +UR_TCP = [ + [0.0, 1.0, 0.0, 0.0], + [-1.0, 0.0, 0.0, 0.0], + [0.0, 0.0, 1.0, 0.12], + [0.0, 0.0, 0.0, 1.0], +] + SAMPLE_SIZES = [100, 1000, 10000] -SUPPORTED_SOLVERS = ("opw", "pytorch") +SUPPORTED_SOLVERS = ("opw", "pytorch", "ur") def _parse_args() -> argparse.Namespace: @@ -62,7 +76,7 @@ def _parse_args() -> argparse.Namespace: choices=(*SUPPORTED_SOLVERS, "all"), default=["all"], help=( - "Solvers to benchmark. Use one or more of: opw, pytorch, all. " + "Solvers to benchmark. Use one or more of: opw, pytorch, ur, all. " "Default: all" ), ) @@ -82,6 +96,18 @@ def _sync_cuda() -> None: torch.cuda.synchronize() +def _ensure_warp_initialized() -> None: + """Initialize the Warp runtime if it has not been initialized yet. + + Solvers that launch Warp kernels (e.g. the analytic UR/OPW solvers) require + an initialized Warp runtime. ``SimulationManager`` normally handles this, but + the benchmark drives the solvers directly, so we initialize it here. + """ + import warp as wp + + wp.init() + + def _reset_peak_gpu_memory() -> None: """Reset PyTorch peak GPU memory stats when CUDA is available.""" if torch.cuda.is_available(): @@ -659,8 +685,195 @@ def benchmark_opw_solver() -> tuple[list[dict[str, object]], list[dict[str, obje return perf_rows, metric_rows +def _init_ur_solver(device: torch.device) -> URSolver: + """Initialize the UR analytic IK solver on the target device.""" + solver_cfg = URSolverCfg( + ur_type="ur10", + end_link_name="ee_link", + root_link_name="base_link", + joint_names=UR_JOINT_NAMES, + tcp=UR_TCP, + user_qpos_limits=[UR_LOWER_LIMITS, UR_UPPER_LIMITS], + ) + return solver_cfg.init_solver(device=device) + + +def _timed_ur_ik_call( + solver: URSolver, + fk_xpos: torch.Tensor, + qpos_seed: torch.Tensor, +) -> tuple[float, dict[str, float], float, torch.Tensor, torch.Tensor]: + """Run a timed UR analytic IK call and return elapsed/memory/outputs.""" + _reset_peak_gpu_memory() + mem_before = _memory_snapshot() + _sync_cuda() + + start = time.perf_counter() + for i in range(3): + if i == 1: # skip first run to avoid initialization overhead + start = time.perf_counter() + ik_success, ik_qpos = solver.get_ik(fk_xpos, qpos_seed=qpos_seed) + _sync_cuda() + elapsed = time.perf_counter() - start + elapsed /= 2.0 + + mem_after = _memory_snapshot() + deltas = { + "cpu_mb": mem_after["cpu_mb"] - mem_before["cpu_mb"], + "gpu_mb": mem_after["gpu_mb"] - mem_before["gpu_mb"], + } + return elapsed, deltas, _peak_gpu_memory_mb(), ik_success, ik_qpos + + +def benchmark_ur_solver() -> tuple[list[dict[str, object]], list[dict[str, object]]]: + """Benchmark the UR analytic IK solver for CPU and optional CUDA.""" + perf_rows: list[dict[str, object]] = [] + metric_rows: list[dict[str, object]] = [] + + _ensure_warp_initialized() + + cpu_solver = _init_ur_solver(device=torch.device("cpu")) + has_cuda = torch.cuda.is_available() + cuda_solver = _init_ur_solver(device=torch.device("cuda")) if has_cuda else None + + print("\n=== UR Analytic Solver Benchmark ===") + if not has_cuda: + print(" CUDA unavailable; CUDA benchmark is skipped.") + + for n_sample in SAMPLE_SIZES: + print(f"**** Test over {n_sample} samples:") + + qpos_cpu = _sample_qpos( + n_samples=n_sample, + lower_limits=UR_LOWER_LIMITS, + upper_limits=UR_UPPER_LIMITS, + margin=1e-1, + device=torch.device("cpu"), + dtype=torch.float32, + ) + fk_xpos_cpu = cpu_solver.get_fk(qpos_cpu) + ( + cpu_elapsed, + cpu_mem, + cpu_peak_gpu, + cpu_success, + cpu_ik_qpos, + ) = _timed_ur_ik_call(cpu_solver, fk_xpos_cpu, qpos_cpu) + check_xpos_cpu = cpu_solver.get_fk(cpu_ik_qpos) + cpu_t_err, cpu_r_err = get_pose_err(fk_xpos_cpu, check_xpos_cpu) + + cpu_result = { + "cost_time_ms": cpu_elapsed * 1000.0, + "cpu_delta_mb": cpu_mem["cpu_mb"], + "gpu_delta_mb": cpu_mem["gpu_mb"], + "peak_gpu_mb": cpu_peak_gpu, + "success_rate": float(cpu_success.float().mean().item()), + "translation_err_mm": cpu_t_err.mean().item() * 1000.0, + "rotation_err_deg": cpu_r_err.mean().item() * 180.0 / np.pi, + } + + perf_rows.append( + { + "sample_size": n_sample, + "impl": "ur_cpu", + "component": "ur_ik", + "cost_time_ms": f"{cpu_result['cost_time_ms']:.6f}", + "cpu_delta_mb": f"{cpu_result['cpu_delta_mb']:.6f}", + "gpu_delta_mb": f"{cpu_result['gpu_delta_mb']:.6f}", + "peak_gpu_mb": f"{cpu_result['peak_gpu_mb']:.6f}", + } + ) + metric_rows.append( + { + "sample_size": n_sample, + "impl": "ur_cpu", + "component": "ur_ik", + "success_rate": f"{cpu_result['success_rate']:.6f}", + "translation_err_mm": f"{cpu_result['translation_err_mm']:.6f}", + "rotation_err_deg": f"{cpu_result['rotation_err_deg']:.6f}", + } + ) + + print(f"===UR CPU IK time: {cpu_result['cost_time_ms']:.6f} ms") + print(f" Translation mean error: {cpu_result['translation_err_mm']:.6f} mm") + print( + f" Rotation mean error: {cpu_result['rotation_err_deg']:.6f} degrees" + ) + print(f" Success rate: {cpu_result['success_rate'] * 100.0:.2f}%") + print( + " " + f"CPU Δ={cpu_result['cpu_delta_mb']:+.1f} MB " + f"GPU Δ={cpu_result['gpu_delta_mb']:+.1f} MB " + f"peak GPU={cpu_result['peak_gpu_mb']:.1f} MB" + ) + + if has_cuda and cuda_solver is not None: + qpos_cuda = qpos_cpu.to(torch.device("cuda")) + fk_xpos_cuda = cuda_solver.get_fk(qpos_cuda) + ( + cuda_elapsed, + cuda_mem, + cuda_peak_gpu, + cuda_success, + cuda_ik_qpos, + ) = _timed_ur_ik_call(cuda_solver, fk_xpos_cuda, qpos_cuda) + check_xpos_cuda = cuda_solver.get_fk(cuda_ik_qpos) + cuda_t_err, cuda_r_err = get_pose_err(fk_xpos_cuda, check_xpos_cuda) + + cuda_result = { + "cost_time_ms": cuda_elapsed * 1000.0, + "cpu_delta_mb": cuda_mem["cpu_mb"], + "gpu_delta_mb": cuda_mem["gpu_mb"], + "peak_gpu_mb": cuda_peak_gpu, + "success_rate": float(cuda_success.float().mean().item()), + "translation_err_mm": cuda_t_err.mean().item() * 1000.0, + "rotation_err_deg": cuda_r_err.mean().item() * 180.0 / np.pi, + } + + perf_rows.append( + { + "sample_size": n_sample, + "impl": "ur_cuda", + "component": "ur_ik", + "cost_time_ms": f"{cuda_result['cost_time_ms']:.6f}", + "cpu_delta_mb": f"{cuda_result['cpu_delta_mb']:.6f}", + "gpu_delta_mb": f"{cuda_result['gpu_delta_mb']:.6f}", + "peak_gpu_mb": f"{cuda_result['peak_gpu_mb']:.6f}", + } + ) + metric_rows.append( + { + "sample_size": n_sample, + "impl": "ur_cuda", + "component": "ur_ik", + "success_rate": f"{cuda_result['success_rate']:.6f}", + "translation_err_mm": f"{cuda_result['translation_err_mm']:.6f}", + "rotation_err_deg": f"{cuda_result['rotation_err_deg']:.6f}", + } + ) + + print(f"===UR CUDA IK time: {cuda_result['cost_time_ms']:.6f} ms") + print( + f" Translation mean error: {cuda_result['translation_err_mm']:.6f} mm" + ) + print( + f" Rotation mean error: {cuda_result['rotation_err_deg']:.6f} degrees" + ) + print( + f" Success rate: {cuda_result['success_rate'] * 100.0:.2f}%" + ) + print( + " " + f"CPU Δ={cuda_result['cpu_delta_mb']:+.1f} MB " + f"GPU Δ={cuda_result['gpu_delta_mb']:+.1f} MB " + f"peak GPU={cuda_result['peak_gpu_mb']:.1f} MB" + ) + + return perf_rows, metric_rows + + def run_all_benchmarks(selected_solvers: list[str] | None = None) -> None: - """Run unified OPW + Pytorch kinematic solver benchmarks.""" + """Run unified OPW + UR + Pytorch kinematic solver benchmarks.""" solvers_to_run = _normalize_selected_solvers(selected_solvers) print("=" * 60) @@ -675,6 +888,7 @@ def run_all_benchmarks(selected_solvers: list[str] | None = None) -> None: "opw-specific joint limits." ) print("- Pytorch solver: UR10 URDF-based PytorchSolver with " "UR10 joint limits.") + print("- UR solver: analytic UR10 IK via URSolverCfg with UR10 DH parameters.") perf_rows: list[dict[str, object]] = [] metric_rows: list[dict[str, object]] = [] @@ -689,6 +903,11 @@ def run_all_benchmarks(selected_solvers: list[str] | None = None) -> None: perf_rows.extend(pytorch_perf_rows) metric_rows.extend(pytorch_metric_rows) + if "ur" in solvers_to_run: + ur_perf_rows, ur_metric_rows = benchmark_ur_solver() + perf_rows.extend(ur_perf_rows) + metric_rows.extend(ur_metric_rows) + leaderboard_rows = _build_leaderboard_rows(metric_rows) benchmark_name = "kinematic_solver" diff --git a/scripts/tutorials/grasp/grasp_generator.py b/scripts/tutorials/grasp/grasp_generator.py index 42fe250f..cec81a4c 100644 --- a/scripts/tutorials/grasp/grasp_generator.py +++ b/scripts/tutorials/grasp/grasp_generator.py @@ -28,7 +28,7 @@ from embodichain.lab.sim.objects import Robot, RigidObject from embodichain.lab.sim.utility.action_utils import interpolate_with_distance from embodichain.lab.sim.shapes import MeshCfg -from embodichain.lab.sim.solvers import PytorchSolverCfg +from embodichain.lab.sim.solvers import URSolverCfg from embodichain.data import get_data_path from embodichain.lab.gym.utils.gym_utils import add_env_launcher_args_to_parser from embodichain.utils import logger @@ -129,16 +129,14 @@ def create_robot(sim: SimulationManager, position=[0.0, 0.0, 0.0]) -> Robot: "hand": ["FINGER[1-2]"], }, solver_cfg={ - "arm": PytorchSolverCfg( - end_link_name="ee_link", - root_link_name="base_link", + "arm": URSolverCfg( + ur_type="ur10", tcp=[ [0.0, 1.0, 0.0, 0.0], [-1.0, 0.0, 0.0, 0.0], [0.0, 0.0, 1.0, 0.12], [0.0, 0.0, 0.0, 1.0], ], - num_samples=30, ) }, init_qpos=[0.0, -np.pi / 2, -np.pi / 2, np.pi / 2, -np.pi / 2, 0.0, 0.0, 0.0], diff --git a/tests/sim/solvers/test_ur_solver.py b/tests/sim/solvers/test_ur_solver.py new file mode 100644 index 00000000..626ff9d6 --- /dev/null +++ b/tests/sim/solvers/test_ur_solver.py @@ -0,0 +1,207 @@ +# ---------------------------------------------------------------------------- +# Copyright (c) 2021-2026 DexForce Technology Co., Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ---------------------------------------------------------------------------- + +import torch +import pytest +import numpy as np + +from embodichain.data import get_data_path +from embodichain.lab.sim import SimulationManager, SimulationManagerCfg +from embodichain.lab.sim.objects import Robot +from embodichain.lab.sim.solvers import URSolverCfg +from embodichain.lab.sim.cfg import ( + RenderCfg, + JointDrivePropertiesCfg, + RobotCfg, + LightCfg, + RigidBodyAttributesCfg, + RigidObjectCfg, + URDFCfg, +) + + +def grid_sample_qpos_from_limits( + qpos_limits: torch.Tensor, + steps_per_joint: int = 4, + device=None, + max_samples: int = 4096, + safe_margin: float = 5 / 180 * np.pi, # 5 degrees in radians +) -> torch.Tensor: + """Generate grid samples for qpos from qpos_limits. + + Args: + qpos_limits: tensor of shape (1, n, 2) or (n, 2) where each row is [low, high]. + steps_per_joint: number of values per joint (defaults to 2: low and high). + device: torch device to place the samples on. + max_samples: cap the number of returned samples (take first N if grid is larger). + + Returns: + Tensor of shape (N, n) where N <= max_samples. + """ + if device is None: + device = qpos_limits.device + + limits = qpos_limits.squeeze(0) if qpos_limits.dim() == 3 else qpos_limits + lows = limits[:, 0].to(device) + safe_margin * 1.01 + highs = limits[:, 1].to(device) - safe_margin * 1.01 + + # create per-joint linspaces + grids = [ + torch.linspace(l.item(), h.item(), steps_per_joint, device=device) + for l, h in zip(lows, highs) + ] + + # meshgrid and stack + mesh = torch.meshgrid(*grids, indexing="ij") + stacked = torch.stack([m.reshape(-1) for m in mesh], dim=1) + + if stacked.shape[0] > max_samples: + return stacked[:max_samples] + return stacked + + +# Base test class for OPWSolver +class BaseSolverTest: + sim = None # Define as a class attribute + + def setup_simulation(self, sim_device): + config = SimulationManagerCfg(headless=True, sim_device=sim_device) + self.sim = SimulationManager(config) + self.sim.set_manual_update(False) + + ur10_urdf_path = get_data_path("UniversalRobots/UR10/UR10.urdf") + gripper_urdf_path = get_data_path("DH_PGC_140_50_M/DH_PGC_140_50_M.urdf") + # Configure the robot with its components and control properties + cfg = RobotCfg( + uid="UR10", + urdf_cfg=URDFCfg( + components=[ + {"component_type": "arm", "urdf_path": ur10_urdf_path}, + {"component_type": "hand", "urdf_path": gripper_urdf_path}, + ] + ), + drive_pros=JointDrivePropertiesCfg( + stiffness={"JOINT[0-9]": 1e4, "FINGER[1-2]": 1e3}, + damping={"JOINT[0-9]": 1e3, "FINGER[1-2]": 1e2}, + max_effort={"JOINT[0-9]": 1e5, "FINGER[1-2]": 1e4}, + drive_type="force", + ), + control_parts={ + "arm": ["JOINT[0-9]"], + "hand": ["FINGER[1-2]"], + }, + solver_cfg={ + "arm": URSolverCfg( + ur_type="ur10", + tcp=[ + [0.0, 1.0, 0.0, 0.0], + [-1.0, 0.0, 0.0, 0.0], + [0.0, 0.0, 1.0, 0.12], + [0.0, 0.0, 0.0, 1.0], + ], + ) + }, + init_qpos=[ + 0.0, + -np.pi / 2, + -np.pi / 2, + np.pi / 2, + -np.pi / 2, + 0.0, + 0.0, + 0.0, + ], + init_pos=(0, 0, 0), + ) + self.robot: Robot = self.sim.add_robot(cfg=cfg) + + def test_ik(self): + # Test inverse kinematics (IK) with a 1x4x4 homogeneous matrix pose and a joint_seed + arm_name = "arm" + # generate a small grid of qpos samples from the joint limits (low/high) + qpos_limit = self.robot.get_qpos_limits(name=arm_name) + sample_qpos = grid_sample_qpos_from_limits( + qpos_limit, steps_per_joint=8, device=self.robot.device, max_samples=65536 + ) + sample_qpos = sample_qpos[None, :, :] + fk_xpos = self.robot.compute_batch_fk( + qpos=sample_qpos, name=arm_name, to_matrix=True + ) + fk_xpos_xyzquat = self.robot.compute_batch_fk( + qpos=sample_qpos, name=arm_name, to_matrix=False + ) + + res, ik_qpos = self.robot.compute_batch_ik( + pose=fk_xpos, joint_seed=sample_qpos, name=arm_name + ) + + res, ik_qpos_xyzquat = self.robot.compute_batch_ik( + pose=fk_xpos_xyzquat, joint_seed=sample_qpos, name=arm_name + ) + + assert torch.allclose( + ik_qpos, ik_qpos_xyzquat, atol=5e-3, rtol=5e-3 + ), "IK results do not match for different pose formats" + + ik_xpos = self.robot.compute_batch_fk( + qpos=ik_qpos_xyzquat, name=arm_name, to_matrix=True + ) + assert torch.allclose( + sample_qpos, ik_qpos, atol=5e-3, rtol=5e-3 + ), f"FK and IK qpos do not match for {arm_name}" + + assert torch.allclose( + fk_xpos, ik_xpos, atol=5e-3, rtol=5e-3 + ), f"FK and IK xpos do not match for {arm_name}" + # test for failed xpos + invalid_pose = torch.tensor( + [ + [ + [1.0, 0.0, 0.0, 10.0], + [0.0, 1.0, 0.0, 10.0], + [0.0, 0.0, 1.0, 10.0], + [0.0, 0.0, 0.0, 1.0], + ] + ], + dtype=torch.float32, + device=self.robot.device, + ) + res, ik_qpos = self.robot.compute_ik( + pose=invalid_pose, joint_seed=ik_qpos[:, 0, :], name=arm_name + ) + dof = ik_qpos.shape[-1] + assert res[0] == False + assert ik_qpos.shape == (1, dof) + + def teardown_method(self): + """Clean up resources after each test method.""" + self.sim.destroy() + + +class TestURSolverCUDA(BaseSolverTest): + def setup_method(self): + self.setup_simulation("cuda") + + +class TestURSolver(BaseSolverTest): + def setup_method(self): + self.setup_simulation("cpu") + + +if __name__ == "__main__": + np.set_printoptions(precision=5, suppress=True) + pytest_args = ["-v", "-s", __file__] + pytest.main(pytest_args)