From 54e40b9868c034ba1ad29bdaccb674a05e684b64 Mon Sep 17 00:00:00 2001 From: jdbloom Date: Sat, 11 Apr 2026 10:18:21 -0400 Subject: [PATCH 1/8] feat: add HDF5 data pipeline to Main.py alongside pkl - HDF5Logger writes all raw episode data to compressed HDF5 - Runs in parallel with existing pkl data_logger (dual-write) - Removes inline SQLite registry writes (replaced by ingestion worker) - HDF5 enabled via STELARIS_ROOT env var, gracefully disabled otherwise - Zero overhead when HDF5 not configured Co-Authored-By: Claude Opus 4.6 (1M context) --- rl_code/Main.py | 67 +++++++++++++++++++------------------------------ 1 file changed, 26 insertions(+), 41 deletions(-) diff --git a/rl_code/Main.py b/rl_code/Main.py index 5a5cdce..318921d 100644 --- a/rl_code/Main.py +++ b/rl_code/Main.py @@ -2,7 +2,17 @@ #import python_code.Agent as Agent import src.agent as Agent from src.env import calculate_gsp_reward, ZMQ_Utility -from src.exp_data_structures import data_logger +from src.exp_data_structures import data_logger # kept for backward compat +# HDF5 data pipeline +try: + import sys as _sys + _stelaris_root = os.environ.get("STELARIS_ROOT", "") + if _stelaris_root and _stelaris_root not in _sys.path: + _sys.path.insert(0, _stelaris_root) + from tools.ingestion.hdf5_logger import HDF5Logger + HAS_HDF5 = True +except ImportError: + HAS_HDF5 = False from src.zmq_diagnostics import DiagnosticSocket from src.diagnostics import ExperimentLogger @@ -87,6 +97,13 @@ # Path to save data data_file_path = recording_path + '/Data/' +# Initialize HDF5 logger (one per experiment) +if HAS_HDF5: + hdf5_path = os.path.join(recording_path, os.path.basename(recording_path) + ".h5") + hdf5_writer = HDF5Logger(hdf5_path) +else: + hdf5_writer = None + if args.share_prox_values: num_obs = Utility.params['num_obs'] +Utility.params['num_robots'] #need to account for num_robots extra observations elif args.global_knowledge: @@ -547,6 +564,12 @@ obj_stats[5], gate, obstacles, gsp_reward, next_heading_gsp, time.time() - episode_start_time, robot_x_pos, robot_y_pos, robot_angle, robot_failures, com_X_poses=com_X_poses, com_Y_poses=com_Y_poses) + if hdf5_writer: + hdf5_writer.writerow(r, tmp_epsilon, reached_goal, loss, force_mags, force_angs, + [average_force_mag, math.degrees(average_force_ang)], obj_stats[0], obj_stats[1], + obj_stats[5], gate, obstacles, gsp_reward, next_heading_gsp, + time.time() - episode_start_time, robot_x_pos, robot_y_pos, robot_angle, + robot_failures, com_X_poses=com_X_poses, com_Y_poses=com_Y_poses) if episode_done: if args.independent_learning: @@ -558,50 +581,12 @@ model.reset_hidden_states() run_time = time.time() - episode_start_time data_writer.write_to_file() + if HAS_HDF5: + hdf5_writer.write_episode(ep_counter, os.path.basename(recording_path)) log.info( "Episode %d done: success=%s duration=%.1fs timesteps=%d", ep_counter, reached_goal, run_time, time_steps, ) - # Log to registry if available - try: - import sys as _sys - _stelaris_root = os.environ.get("STELARIS_ROOT", - os.path.abspath(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "..", "..", ".."))) - if _stelaris_root not in _sys.path: - _sys.path.insert(0, _stelaris_root) - from tools.registry.client import RegistryClient as _RC - _reg_db = os.environ.get("STELARIS_DB", - os.path.join(_stelaris_root, "data", "registry.db")) - if os.path.exists(_reg_db): - _reg = _RC(_reg_db) - _exp_id = f"{os.path.basename(recording_path)}_{config.get('SEED', 0)}" - # Ensure experiment exists (may already be created by runner) - if _reg.get_experiment(_exp_id) is None: - _reg.create_experiment( - id=_exp_id, name=os.path.basename(recording_path), - algorithm=config.get("LEARNING_SCHEME", "DQN"), - coordination="IC", environment="unknown", - num_robots=int(config.get("NUM_ROBOTS", 4)), - num_obstacles=int(config.get("NUM_OBSTACLES", 0)), - use_gate=bool(config.get("USE_GATE", 0)), - use_prisms=bool(config.get("USE_PRISMS", 0)), - num_episodes=int(config.get("NUM_EPISODES", 500)), - seed=int(config.get("SEED", 0)), - port=int(config.get("PORT", 55555)), - status="running", - ) - _reg.log_episode( - experiment_id=_exp_id, - episode_num=ep_counter, - total_reward=float(running_reward if not args.independent_learning else np.mean(running_reward)), - success=bool(reached_goal), - duration_s=run_time, - timesteps=time_steps, - epsilon=float(tmp_epsilon), - ) - _reg.close() - except Exception: - pass # Registry is optional if not args.no_print: print('[RUN TIME] %.2f' % run_time) if args.independent_learning: From 1b96a4dcacf85ffa48f8abe43b961f7efe1e9f87 Mon Sep 17 00:00:00 2001 From: jdbloom Date: Sat, 11 Apr 2026 10:31:36 -0400 Subject: [PATCH 2/8] =?UTF-8?q?fix:=20address=20review=20=E2=80=94=20move?= =?UTF-8?q?=20HDF5Logger=20into=20RL-CT,=20fix=20import=20order?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. Fixed NameError: moved HDF5 import after os is available 2. HDF5Logger now lives in rl_code/src/hdf5_logger.py (no cross-repo import) 3. Removed STELARIS_ROOT dependency for HDF5 — always available via local import Co-Authored-By: Claude Opus 4.6 (1M context) --- rl_code/Main.py | 8 +- rl_code/src/hdf5_logger.py | 178 +++++++++++++++++++++++++++++++++++++ 2 files changed, 180 insertions(+), 6 deletions(-) create mode 100644 rl_code/src/hdf5_logger.py diff --git a/rl_code/Main.py b/rl_code/Main.py index 318921d..353ed14 100644 --- a/rl_code/Main.py +++ b/rl_code/Main.py @@ -3,13 +3,9 @@ import src.agent as Agent from src.env import calculate_gsp_reward, ZMQ_Utility from src.exp_data_structures import data_logger # kept for backward compat -# HDF5 data pipeline +# HDF5 data pipeline (optional, enabled via USE_HDF5 env var or --hdf5 flag) try: - import sys as _sys - _stelaris_root = os.environ.get("STELARIS_ROOT", "") - if _stelaris_root and _stelaris_root not in _sys.path: - _sys.path.insert(0, _stelaris_root) - from tools.ingestion.hdf5_logger import HDF5Logger + from src.hdf5_logger import HDF5Logger HAS_HDF5 = True except ImportError: HAS_HDF5 = False diff --git a/rl_code/src/hdf5_logger.py b/rl_code/src/hdf5_logger.py new file mode 100644 index 0000000..24e9030 --- /dev/null +++ b/rl_code/src/hdf5_logger.py @@ -0,0 +1,178 @@ +"""HDF5-based episode data logger — drop-in replacement for pkl data_logger. + +Main.py creates one HDF5Logger per experiment. Each episode is written +as a group in the experiment's HDF5 file. No pkl files created. + +Usage: + logger = HDF5Logger("/path/to/experiment.h5") + + # Per timestep (same API as data_logger.writerow) + logger.writerow(rewards, epsilon, termination, loss, ...) + + # End of episode + logger.write_episode(episode_num) + + # Notify ingestion worker + logger.notify(experiment_name) +""" + +import os +from typing import Optional + +import h5py +import numpy as np + +# Notification handled by ingestion worker (optional, external) + + +class HDF5Logger: + """Accumulates timestep data and writes to HDF5 at episode boundaries.""" + + def __init__(self, hdf5_path: str): + self.hdf5_path = hdf5_path + os.makedirs(os.path.dirname(hdf5_path), exist_ok=True) + self._reset() + + def _reset(self): + """Clear buffers for a new episode.""" + self.reward = [] + self.epsilon = [] + self.termination = [] + self.loss = [] + self.force_magnitude = [] + self.force_angle = [] + self.average_force_vector = [] + self.cyl_x_pos = [] + self.cyl_y_pos = [] + self.cyl_angle = [] + self.gate_stats = [] + self.obstacle_stats = [] + self.gsp_reward = [] + self.gsp_heading = [] + self.run_time = [] + self.robots_x_pos = [] + self.robots_y_pos = [] + self.robot_angle = [] + self.robot_failures = [] + self.com_X_pos = [] + self.com_Y_pos = [] + + def writerow( + self, rewards, epsilons, terminations, losses, + force_magnitudes, force_angles, average_force_vectors, + cyl_x_poses, cyl_y_poses, cyl_angles, + gate_stats, obstacle_stats, + gsp_rewards, gsp_headings, + run_times, robots_x_poses, robots_y_poses, robot_angles, + robot_failure, com_X_poses=0, com_Y_poses=0, + ): + """Accumulate one timestep of data. Same signature as data_logger.writerow.""" + self.reward.append(rewards) + self.epsilon.append(epsilons) + self.termination.append(terminations) + self.loss.append(losses) + self.force_magnitude.append(force_magnitudes) + self.force_angle.append(force_angles) + self.average_force_vector.append(average_force_vectors) + self.cyl_x_pos.append(cyl_x_poses) + self.cyl_y_pos.append(cyl_y_poses) + self.cyl_angle.append(cyl_angles) + self.gate_stats.append(gate_stats) + self.obstacle_stats.append(obstacle_stats) + self.gsp_reward.append(gsp_rewards) + if isinstance(gsp_headings, np.ndarray): + gsp_headings = gsp_headings.tolist() + self.gsp_heading.append(gsp_headings) + self.run_time.append(run_times) + self.robots_x_pos.append(robots_x_poses) + self.robots_y_pos.append(robots_y_poses) + self.robot_angle.append(robot_angles) + self.robot_failures.append(robot_failure) + self.com_X_pos.append(com_X_poses) + self.com_Y_pos.append(com_Y_poses) + + def write_episode(self, episode_num: int, experiment_name: Optional[str] = None) -> dict: + """Write accumulated data to HDF5 and return summary dict. + + Call this instead of data_logger.write_to_file(). + Optionally notifies the ingestion worker. + """ + group_name = f"episode_{episode_num:04d}" + + with h5py.File(self.hdf5_path, "a") as h5f: + if group_name in h5f: + del h5f[group_name] + grp = h5f.create_group(group_name) + + # Store 2D arrays (timesteps × robots) + for key, data in [ + ("reward", self.reward), + ("gsp_reward", self.gsp_reward), + ("force_magnitude", self.force_magnitude), + ("force_angle", self.force_angle), + ("robot_x_pos", self.robots_x_pos), + ("robot_y_pos", self.robots_y_pos), + ("robot_angle", self.robot_angle), + ("robot_failure", self.robot_failures), + ("gsp_heading", self.gsp_heading), + ]: + arr = np.array(data, dtype=np.float32) + if arr.size > 0: + grp.create_dataset(key, data=arr, compression="gzip", compression_opts=4) + + # Store 1D arrays (timesteps) + for key, data in [ + ("epsilon", self.epsilon), + ("loss", self.loss), + ("cyl_x_pos", self.cyl_x_pos), + ("cyl_y_pos", self.cyl_y_pos), + ("cyl_angle", self.cyl_angle), + ("run_time", self.run_time), + ("comX", self.com_X_pos), + ("comY", self.com_Y_pos), + ]: + arr = np.array(data, dtype=np.float32) + if arr.size > 0: + grp.create_dataset(key, data=arr, compression="gzip", compression_opts=4) + + # Termination as bool + term_arr = np.array(self.termination, dtype=bool) + if term_arr.size > 0: + grp.create_dataset("termination", data=term_arr) + + # Compute and store summary attributes + rewards = np.array(self.reward, dtype=np.float32) + gsp_rewards = np.array(self.gsp_reward, dtype=np.float32) + timesteps = len(self.reward) + success = bool(np.any(term_arr)) if term_arr.size > 0 else False + + if rewards.ndim == 2: + reward_per_robot = np.sum(rewards, axis=0).tolist() + elif rewards.size > 0: + reward_per_robot = [float(np.sum(rewards))] + else: + reward_per_robot = [] + + if gsp_rewards.ndim == 2 and gsp_rewards.size > 0: + gsp_per_robot = np.sum(gsp_rewards, axis=0).tolist() + else: + gsp_per_robot = [] + + grp.attrs["episode_num"] = episode_num + grp.attrs["timesteps"] = timesteps + grp.attrs["success"] = success + grp.attrs["reward_per_robot"] = reward_per_robot + grp.attrs["gsp_reward_per_robot"] = gsp_per_robot + + # Reset for next episode + summary = { + "episode_num": episode_num, + "timesteps": timesteps, + "success": success, + "reward_per_robot": reward_per_robot, + "gsp_reward_per_robot": gsp_per_robot, + } + self._reset() + + + return summary From dc2938447e94986e8992cf6505d3b82e04f8fe29 Mon Sep 17 00:00:00 2001 From: jdbloom Date: Sat, 11 Apr 2026 11:11:51 -0400 Subject: [PATCH 3/8] =?UTF-8?q?fix:=20address=20review=20v2=20=E2=80=94=20?= =?UTF-8?q?add=20h5py=20dep,=20tests,=20remove=20dead=20param?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. Added h5py as optional dependency in pyproject.toml 2. Removed unused experiment_name param from write_episode 3. Added 7 pytest tests for HDF5Logger (create, datasets, shapes, attributes, multi-episode, reset, disabled path) 4. Registry writes intentionally removed — HDF5 is the data layer, FIFO worker will restore SQLite indexing in a follow-up Co-Authored-By: Claude Opus 4.6 (1M context) --- pyproject.toml | 7 ++ rl_code/Main.py | 2 +- rl_code/src/hdf5_logger.py | 2 +- tests/test_diagnostics/test_hdf5_logger.py | 134 +++++++++++++++++++++ 4 files changed, 143 insertions(+), 2 deletions(-) create mode 100644 tests/test_diagnostics/test_hdf5_logger.py diff --git a/pyproject.toml b/pyproject.toml index 42f6b6c..a576938 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,6 +10,13 @@ readme = "README.md" python = "^3.10" gsp-rl = {path = "../GSP-RL", develop = true} +[tool.poetry.extras] +hdf5 = ["h5py"] + +[tool.poetry.dependencies.h5py] +version = "^3.0" +optional = true + [tool.poetry.group.dev.dependencies] pytest = "^8.1.1" diff --git a/rl_code/Main.py b/rl_code/Main.py index 353ed14..d3d579e 100644 --- a/rl_code/Main.py +++ b/rl_code/Main.py @@ -578,7 +578,7 @@ run_time = time.time() - episode_start_time data_writer.write_to_file() if HAS_HDF5: - hdf5_writer.write_episode(ep_counter, os.path.basename(recording_path)) + hdf5_writer.write_episode(ep_counter) log.info( "Episode %d done: success=%s duration=%.1fs timesteps=%d", ep_counter, reached_goal, run_time, time_steps, diff --git a/rl_code/src/hdf5_logger.py b/rl_code/src/hdf5_logger.py index 24e9030..ee5f6b5 100644 --- a/rl_code/src/hdf5_logger.py +++ b/rl_code/src/hdf5_logger.py @@ -91,7 +91,7 @@ def writerow( self.com_X_pos.append(com_X_poses) self.com_Y_pos.append(com_Y_poses) - def write_episode(self, episode_num: int, experiment_name: Optional[str] = None) -> dict: + def write_episode(self, episode_num: int) -> dict: """Write accumulated data to HDF5 and return summary dict. Call this instead of data_logger.write_to_file(). diff --git a/tests/test_diagnostics/test_hdf5_logger.py b/tests/test_diagnostics/test_hdf5_logger.py new file mode 100644 index 0000000..3fc5a88 --- /dev/null +++ b/tests/test_diagnostics/test_hdf5_logger.py @@ -0,0 +1,134 @@ +"""Tests for HDF5Logger — the episode data writer.""" + +import os +import numpy as np +import pytest + +try: + import h5py + HAS_H5PY = True +except ImportError: + HAS_H5PY = False + +pytestmark = pytest.mark.skipif(not HAS_H5PY, reason="h5py not installed") + + +from src.hdf5_logger import HDF5Logger + + +class TestHDF5Logger: + def test_creates_h5_file(self, tmp_path): + path = str(tmp_path / "test.h5") + logger = HDF5Logger(path) + for _ in range(3): + logger.writerow( + [-2.0]*4, 0.5, False, 0.01, [0.1]*4, [45.0]*4, + [0.1, 45.0], -1.0, 0.5, 10.0, 0, 0, + [0.0]*4, [0.003]*4, 0.1, [0.0]*4, [0.0]*4, + [90.0]*4, [False]*4, + ) + logger.write_episode(0) + assert os.path.exists(path) + + def test_episode_has_all_datasets(self, tmp_path): + path = str(tmp_path / "test.h5") + logger = HDF5Logger(path) + for _ in range(5): + logger.writerow( + [-2.0]*4, 0.5, False, 0.01, [0.1]*4, [45.0]*4, + [0.1, 45.0], -1.0, 0.5, 10.0, 0, 0, + [0.0]*4, np.array([0.003]*4), 0.1, [0.0]*4, [0.0]*4, + [90.0]*4, [False]*4, + ) + logger.write_episode(0) + with h5py.File(path, "r") as f: + ep = f["episode_0000"] + expected = {"reward", "gsp_reward", "force_magnitude", "force_angle", + "robot_x_pos", "robot_y_pos", "robot_angle", "robot_failure", + "gsp_heading", "epsilon", "loss", "cyl_x_pos", "cyl_y_pos", + "cyl_angle", "run_time", "comX", "comY", "termination"} + assert expected.issubset(set(ep.keys())) + + def test_reward_shape(self, tmp_path): + path = str(tmp_path / "test.h5") + logger = HDF5Logger(path) + num_steps = 10 + num_robots = 4 + for _ in range(num_steps): + logger.writerow( + [-2.0]*num_robots, 0.5, False, 0.01, [0.1]*num_robots, [45.0]*num_robots, + [0.1, 45.0], -1.0, 0.5, 10.0, 0, 0, + [0.0]*num_robots, [0.003]*num_robots, 0.1, + [0.0]*num_robots, [0.0]*num_robots, + [90.0]*num_robots, [False]*num_robots, + ) + logger.write_episode(0) + with h5py.File(path, "r") as f: + assert f["episode_0000"]["reward"].shape == (num_steps, num_robots) + + def test_summary_attributes(self, tmp_path): + path = str(tmp_path / "test.h5") + logger = HDF5Logger(path) + for t in range(10): + logger.writerow( + [-2.0]*4, 0.5, (t == 9), 0.01, [0.1]*4, [45.0]*4, + [0.1, 45.0], -1.0, 0.5, 10.0, 0, 0, + [-0.01]*4, [0.003]*4, 0.1, [0.0]*4, [0.0]*4, + [90.0]*4, [False]*4, + ) + summary = logger.write_episode(0) + assert summary["timesteps"] == 10 + assert summary["success"] == True + assert len(summary["reward_per_robot"]) == 4 + assert all(r < 0 for r in summary["reward_per_robot"]) + + def test_multiple_episodes(self, tmp_path): + path = str(tmp_path / "test.h5") + logger = HDF5Logger(path) + for ep in range(3): + for _ in range(5): + logger.writerow( + [-1.0]*4, 0.5, False, 0.01, [0.1]*4, [45.0]*4, + [0.1, 45.0], -1.0, 0.5, 10.0, 0, 0, + [0.0]*4, [0.0]*4, 0.1, [0.0]*4, [0.0]*4, + [90.0]*4, [False]*4, + ) + logger.write_episode(ep) + with h5py.File(path, "r") as f: + episodes = [k for k in f.keys() if k.startswith("episode")] + assert len(episodes) == 3 + + def test_resets_between_episodes(self, tmp_path): + path = str(tmp_path / "test.h5") + logger = HDF5Logger(path) + for _ in range(10): + logger.writerow( + [-1.0]*4, 0.5, False, 0.01, [0.1]*4, [45.0]*4, + [0.1, 45.0], -1.0, 0.5, 10.0, 0, 0, + [0.0]*4, [0.0]*4, 0.1, [0.0]*4, [0.0]*4, + [90.0]*4, [False]*4, + ) + logger.write_episode(0) + for _ in range(5): + logger.writerow( + [-1.0]*4, 0.5, False, 0.01, [0.1]*4, [45.0]*4, + [0.1, 45.0], -1.0, 0.5, 10.0, 0, 0, + [0.0]*4, [0.0]*4, 0.1, [0.0]*4, [0.0]*4, + [90.0]*4, [False]*4, + ) + logger.write_episode(1) + with h5py.File(path, "r") as f: + assert f["episode_0000"]["reward"].shape[0] == 10 + assert f["episode_0001"]["reward"].shape[0] == 5 + + +class TestHDF5Disabled: + """Test that Main.py works when h5py is not available.""" + + def test_has_hdf5_flag_exists(self): + """The HAS_HDF5 flag should be importable.""" + # This tests the import pattern — if h5py is installed, HAS_HDF5=True + import importlib + spec = importlib.util.find_spec("h5py") + if spec: + assert HAS_H5PY is True From 4d9d1af902826e6b31cd34fbd76c1193f0efd1c8 Mon Sep 17 00:00:00 2001 From: jdbloom Date: Sat, 11 Apr 2026 16:01:25 -0400 Subject: [PATCH 4/8] fix(runner): wait for Python to finish after ARGoS exits normally After ARGoS exits with rc=0, the monitoring loop breaks but never waited for Main.py to finish. main_proc.returncode was None, causing the runner to report test failures (rc=None) even when all 100 episodes completed successfully. Co-Authored-By: Claude Opus 4.6 (1M context) --- run_baseline_experiments.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/run_baseline_experiments.py b/run_baseline_experiments.py index 1f235af..4c1303f 100644 --- a/run_baseline_experiments.py +++ b/run_baseline_experiments.py @@ -320,6 +320,8 @@ def run_experiment(exp_name, config, test_mode=False, model_path=None): # rc=0 means ARGoS finished normally — wait for Python to finish too break time.sleep(1) + # Wait for Python to finish after ARGoS exits normally + main_proc.wait() finally: argos_log_file.close() if argos_proc.poll() is None: From 6fa8ff303dab4a6e343a725fbfb276124ee81523 Mon Sep 17 00:00:00 2001 From: jdbloom Date: Sat, 11 Apr 2026 18:20:37 -0400 Subject: [PATCH 5/8] refactor: remove pkl writing, make HDF5 the sole data output - Main.py: removed data_logger, data_writer, pkl file creation - HDF5Logger is now required (not optional) - run_baseline_experiments.py: find_best_model reads HDF5 attrs - count_episodes reads HDF5 (fallback to pkl for backward compat) - Test results parsed from HDF5 summary attributes Co-Authored-By: Claude Opus 4.6 (1M context) --- rl_code/Main.py | 24 ++++---------------- run_baseline_experiments.py | 45 ++++++++++++++++++++----------------- 2 files changed, 28 insertions(+), 41 deletions(-) diff --git a/rl_code/Main.py b/rl_code/Main.py index d3d579e..4fb51c5 100644 --- a/rl_code/Main.py +++ b/rl_code/Main.py @@ -2,13 +2,7 @@ #import python_code.Agent as Agent import src.agent as Agent from src.env import calculate_gsp_reward, ZMQ_Utility -from src.exp_data_structures import data_logger # kept for backward compat -# HDF5 data pipeline (optional, enabled via USE_HDF5 env var or --hdf5 flag) -try: - from src.hdf5_logger import HDF5Logger - HAS_HDF5 = True -except ImportError: - HAS_HDF5 = False +from src.hdf5_logger import HDF5Logger from src.zmq_diagnostics import DiagnosticSocket from src.diagnostics import ExperimentLogger @@ -94,11 +88,8 @@ data_file_path = recording_path + '/Data/' # Initialize HDF5 logger (one per experiment) -if HAS_HDF5: - hdf5_path = os.path.join(recording_path, os.path.basename(recording_path) + ".h5") - hdf5_writer = HDF5Logger(hdf5_path) -else: - hdf5_writer = None +hdf5_path = os.path.join(recording_path, os.path.basename(recording_path) + ".h5") +hdf5_writer = HDF5Logger(hdf5_path) if args.share_prox_values: num_obs = Utility.params['num_obs'] +Utility.params['num_robots'] #need to account for num_robots extra observations @@ -555,13 +546,7 @@ else: tmp_epsilon = model.epsilon - data_writer.writerow(r, tmp_epsilon, reached_goal, loss, force_mags, force_angs, - [average_force_mag, math.degrees(average_force_ang)], obj_stats[0], obj_stats[1], - obj_stats[5], gate, obstacles, gsp_reward, next_heading_gsp, - time.time() - episode_start_time, robot_x_pos, robot_y_pos, robot_angle, - robot_failures, com_X_poses=com_X_poses, com_Y_poses=com_Y_poses) - if hdf5_writer: - hdf5_writer.writerow(r, tmp_epsilon, reached_goal, loss, force_mags, force_angs, + hdf5_writer.writerow(r, tmp_epsilon, reached_goal, loss, force_mags, force_angs, [average_force_mag, math.degrees(average_force_ang)], obj_stats[0], obj_stats[1], obj_stats[5], gate, obstacles, gsp_reward, next_heading_gsp, time.time() - episode_start_time, robot_x_pos, robot_y_pos, robot_angle, @@ -576,7 +561,6 @@ if hasattr(model, 'reset_hidden_states'): model.reset_hidden_states() run_time = time.time() - episode_start_time - data_writer.write_to_file() if HAS_HDF5: hdf5_writer.write_episode(ep_counter) log.info( diff --git a/run_baseline_experiments.py b/run_baseline_experiments.py index 4c1303f..9f58674 100644 --- a/run_baseline_experiments.py +++ b/run_baseline_experiments.py @@ -11,7 +11,7 @@ import sys import time import shutil -import pickle +import h5py import subprocess import threading from datetime import datetime @@ -338,12 +338,18 @@ def run_experiment(exp_name, config, test_mode=False, model_path=None): def count_episodes(exp_name): - """Count completed pkl files for an experiment.""" - data_dir = os.path.join(PROJECT_ROOT, "rl_code", "Data", exp_name, "Data") + """Count completed episodes from HDF5 file.""" + h5_path = os.path.join(PROJECT_ROOT, "rl_code", "Data", exp_name, exp_name + ".h5") try: - return len([f for f in os.listdir(data_dir) if f.endswith(".pkl")]) - except FileNotFoundError: - return 0 + with h5py.File(h5_path, "r") as f: + return len([k for k in f.keys() if k.startswith("episode")]) + except (FileNotFoundError, OSError): + # Fallback to pkl count + data_dir = os.path.join(PROJECT_ROOT, "rl_code", "Data", exp_name, "Data") + try: + return len([f for f in os.listdir(data_dir) if f.endswith(".pkl")]) + except FileNotFoundError: + return 0 def run_train_and_test(train_name): @@ -412,21 +418,18 @@ def run_train_and_test(train_name): # Compute test metrics test_rewards = [] successes = 0 - for f in sorted(os.listdir(test_data_dir)): - if f.endswith(".pkl"): - with open(os.path.join(test_data_dir, f), "rb") as fh: - d = pickle.load(fh) - rw = d.get("reward", d.get("rewards", [0])) - if isinstance(rw, list) and rw and isinstance(rw[0], (list, np.ndarray)): - test_rewards.append(sum(sum(r) for r in rw)) - else: - test_rewards.append(np.sum(rw)) - terminations = d.get("termination", []) - if terminations: - # termination is a list of bools per timestep - # any True means goal was reached during the episode - if isinstance(terminations, list) and any(terminations): - successes += 1 + # Read test results from HDF5 + test_h5 = os.path.join(os.path.dirname(test_data_dir.rstrip("/")), + os.path.basename(os.path.dirname(test_data_dir.rstrip("/"))) + ".h5") + try: + with h5py.File(test_h5, "r") as f: + for ep_key in sorted(k for k in f.keys() if k.startswith("episode")): + rpr = list(f[ep_key].attrs.get("reward_per_robot", [0])) + test_rewards.append(sum(rpr)) + if f[ep_key].attrs.get("success", False): + successes += 1 + except (FileNotFoundError, OSError): + pass avg_reward = np.mean(test_rewards) if test_rewards else 0 print(f" [TEST] ✓ {test_name} done — {test_ep_count} eps, " From 31b691e85dcf3ddf00630c4ab150fec3a6e5d8af Mon Sep 17 00:00:00 2001 From: jdbloom Date: Sat, 11 Apr 2026 18:24:17 -0400 Subject: [PATCH 6/8] refactor: update viz.py and trajectory plotter to read HDF5 Both scripts now check for HDF5 first, fallback to pkl for backward compat with historical data. Removed registry-db args from viz.py. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../src/plotting/make_cylinder_trajectory.py | 18 +++- rl_code/src/plotting/viz.py | 97 +++++++------------ 2 files changed, 50 insertions(+), 65 deletions(-) diff --git a/rl_code/src/plotting/make_cylinder_trajectory.py b/rl_code/src/plotting/make_cylinder_trajectory.py index 8cc062a..60513b1 100644 --- a/rl_code/src/plotting/make_cylinder_trajectory.py +++ b/rl_code/src/plotting/make_cylinder_trajectory.py @@ -6,6 +6,7 @@ import os import matplotlib.pyplot as plt import pickle +import h5py def angle_normalize_unsigned_deg(a): while a < 0: a += 360 @@ -40,8 +41,18 @@ def angle_normalize_signed_deg(a): print('. . . Loading Model Data') file_names = [] -for file in os.listdir(data_path): - file_names.append(file) +# Try HDF5 first +exp_name = os.path.basename(args.data_path.rstrip('/')) +h5_path = os.path.join(args.data_path, exp_name + '.h5') +use_hdf5 = os.path.exists(h5_path) +if use_hdf5: + h5_file = h5py.File(h5_path, 'r') + file_names = sorted([k for k in h5_file.keys() if k.startswith('episode')]) + print(f'Loading from HDF5: {len(file_names)} episodes') +else: + for file in os.listdir(data_path): + file_names.append(file) + print(f'Loading from pkl: {len(file_names)} files') df_list = [] for ep in range(len(file_names)-1): @@ -52,7 +63,8 @@ def angle_normalize_signed_deg(a): cyl_heading_diff = [] cyl_angle = data['cyl_angle'] - gsp = data['gsp_heading'] + gsp_raw = data["gsp_heading"] + gsp = [g[0] if isinstance(g, list) else g for g in gsp_raw] predicted_cyl_heading = [] for i in range(len(data['cyl_angle'])-1): predicted_cyl_heading.append(cyl_angle[i] + math.degrees(gsp[i+1]/10)) diff --git a/rl_code/src/plotting/viz.py b/rl_code/src/plotting/viz.py index ec34cab..57b9737 100644 --- a/rl_code/src/plotting/viz.py +++ b/rl_code/src/plotting/viz.py @@ -12,86 +12,59 @@ parser.add_argument("--IL", default = False, action = "store_true") parser.add_argument("--gate", default = False, action = "store_true") -parser.add_argument("--registry-db", default=None, help="Path to registry SQLite database") -parser.add_argument("--experiment-id", default=None, help="Experiment ID in registry") args = parser.parse_args() data_path = args.data_path + 'Data/' -if args.registry_db and args.experiment_id: - # Load from registry database - import sys as _sys - _stelaris_root = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), '..', '..', '..') - if _stelaris_root not in _sys.path: - _sys.path.insert(0, _stelaris_root) - from tools.registry.client import RegistryClient - - print('. . . Loading from Registry DB') - client = RegistryClient(args.registry_db) - episodes_data = client.get_episodes(args.experiment_id) - client.close() - - if not episodes_data: - print(f"No episodes found for {args.experiment_id}") - exit(1) - - # Build the same data structures the pkl path produces - episode_rewards_list = [] - episode_gsp_rewards_list = [] - terminals = [] - - for ep in episodes_data: - # Registry stores total_reward as a single float (averaged across robots) - # Replicate as if all robots had the same reward for compatibility - reward_val = ep.get("total_reward", 0) or 0 - episode_rewards_list.append([reward_val]) - gsp_val = ep.get("gsp_reward", 0) or 0 - episode_gsp_rewards_list.append([gsp_val]) - terminals.append(1 if ep.get("success") else 0) +# Try HDF5 first, fallback to pkl +import h5py + +exp_name = os.path.basename(args.data_path.rstrip('/')) +h5_path = os.path.join(args.data_path, exp_name + '.h5') + +if os.path.exists(h5_path): + print('. . . Loading from HDF5:', h5_path) + with h5py.File(h5_path, 'r') as f: + episodes = sorted([k for k in f.keys() if k.startswith('episode')]) + episode_rewards = [] + episode_gsp_rewards = [] + terminals = [] + for ep_key in episodes: + ep = f[ep_key] + rpr = list(ep.attrs.get('reward_per_robot', [0])) + episode_rewards.append(rpr) + gpr = list(ep.attrs.get('gsp_reward_per_robot', [0])) + episode_gsp_rewards.append(gpr) + terminals.append(1 if ep.attrs.get('success', False) else 0) - episode_rewards = np.array(episode_rewards_list) + episode_rewards = np.array(episode_rewards) terminals = np.array(terminals) - episode_gsp_rewards = np.array(episode_gsp_rewards_list)[:, 0] + episode_gsp_rewards = np.array(episode_gsp_rewards)[:, 0] episode_std = np.std(episode_rewards, axis=1) average_episode_rewards = np.average(episode_rewards, axis=1) - else: - # Original pkl loading path - print('. . . Loading Model Data') - file_names = [] - for file in os.listdir(data_path): - file_names.append(file) + # Fallback to pkl + print('. . . Loading from pkl files') + file_names = [f for f in os.listdir(data_path) if f.endswith('.pkl')] episode_rewards = [] - episode_gsps = [] episode_gsp_rewards = [] - episode_success_reward = [] - episode_success_index = [] - episode_failure_reward = [] - episode_failure_index = [] terminals = [] - last_10_axis = [] - last_10_rewards = [] - episode_run_times = [] - cumulative_episode_run_times = [0] - episode_time_steps = [] - - print('. . . Consolodating Model Data') - IL_flag = False - for i in range(len(file_names)-1): + for i in range(len(file_names)): name = 'Data_Episode_'+str(i)+'.pkl' - with open(data_path+name, 'rb') as f: + pkl_file = os.path.join(data_path, name) + if not os.path.exists(pkl_file): + continue + with open(pkl_file, 'rb') as f: data = pickle.load(f) - rewards = np.array(data['reward']) gsp_rewrads = np.array(data['gsp_reward']) - - robot_rewards = np.array([rewards[:, i] for i in range(rewards.shape[-1])]) - robot_gsp_rewards = np.array([gsp_rewrads[:, i] for i in range(gsp_rewrads.shape[-1])]) - episode_rewards.append([np.sum(robot_rewards[i]) for i in range(robot_rewards.shape[0])]) - episode_gsp_rewards.append([np.sum(robot_gsp_rewards[i]) for i in range(robot_gsp_rewards.shape[0])]) + robot_rewards = np.array([rewards[:, j] for j in range(rewards.shape[-1])]) + robot_gsp_rewards = np.array([gsp_rewrads[:, j] for j in range(gsp_rewrads.shape[-1])]) + episode_rewards.append([np.sum(robot_rewards[j]) for j in range(robot_rewards.shape[0])]) + episode_gsp_rewards.append([np.sum(robot_gsp_rewards[j]) for j in range(robot_gsp_rewards.shape[0])]) if rewards.shape[0] < 4500: terminals.append(1) else: @@ -99,7 +72,7 @@ episode_rewards = np.array(episode_rewards) terminals = np.array(terminals) - episode_gsp_rewards = np.array(episode_gsp_rewards)[:,0] + episode_gsp_rewards = np.array(episode_gsp_rewards)[:, 0] episode_std = np.std(episode_rewards, axis=1) average_episode_rewards = np.average(episode_rewards, axis=1) episode_index = np.arange(0, average_episode_rewards.shape[0], 1) From fc8959679f19af1af76c8824d0f3e2e23878fee2 Mon Sep 17 00:00:00 2001 From: jdbloom Date: Sat, 11 Apr 2026 18:27:09 -0400 Subject: [PATCH 7/8] fix(test): conditional import of HDF5Logger for CI without h5py Co-Authored-By: Claude Opus 4.6 (1M context) --- tests/test_diagnostics/test_hdf5_logger.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/test_diagnostics/test_hdf5_logger.py b/tests/test_diagnostics/test_hdf5_logger.py index 3fc5a88..deb364d 100644 --- a/tests/test_diagnostics/test_hdf5_logger.py +++ b/tests/test_diagnostics/test_hdf5_logger.py @@ -13,7 +13,8 @@ pytestmark = pytest.mark.skipif(not HAS_H5PY, reason="h5py not installed") -from src.hdf5_logger import HDF5Logger +if HAS_H5PY: + from src.hdf5_logger import HDF5Logger class TestHDF5Logger: From 6fc660c818b03f8d439782c42b94d651934ce124 Mon Sep 17 00:00:00 2001 From: jdbloom Date: Sat, 11 Apr 2026 18:30:14 -0400 Subject: [PATCH 8/8] ci: install h5py so HDF5Logger tests run in CI Co-Authored-By: Claude Opus 4.6 (1M context) --- .github/workflows/unit-tests.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/unit-tests.yml b/.github/workflows/unit-tests.yml index 725db7f..3506829 100644 --- a/.github/workflows/unit-tests.yml +++ b/.github/workflows/unit-tests.yml @@ -40,6 +40,7 @@ jobs: sed -i 's|path = "../GSP-RL"|path = "GSP-RL"|' pyproject.toml poetry lock poetry install --no-interaction + poetry run pip install h5py - name: Run unit tests run: |