diff --git a/GPR_library.py b/GPR_library.py new file mode 100644 index 0000000..bb45ef4 --- /dev/null +++ b/GPR_library.py @@ -0,0 +1,333 @@ +# Distributed under the MIT License. +# See LICENSE.txt for details. + +# Gaussian Process Regression Machine Learning Function Library +# contains all functions necessary to run the GPR Model +# used to predict better low-eccentricity orbital parameter initial guesses +# functions: +# 1. normalize_data +# 2. denormalize_predictions +# 3. omega_and_adot +# 4. omegaAndAdot +# 5. polynomial_fit_with_confidence +# 6. GPRegressionModel (class) +# 7. train_gpr_model +# 8. predict_with_gpr_model +# 9. run_gpr_pipeline +# 10. train_model_and_eigenvalue_analysis +# 11. loo_predictions +# 12. parse_test_runs +# 13. apply_gpr_corrections +# 14. save_gpr_corrected +# 15. loo_crossval +# 16. plot_loo_residuals + +import argparse + +import gpytorch +import matplotlib.pyplot as plt +import numpy as np +import pandas as pd + +# Imports +import torch + + +### ML functions +# ExactGP uses an infinite number of basis functions; GP is non-parametric and models functions globally; +# limited only by training points +class GPRegressionModel(gpytorch.models.ExactGP): + """ + Exact GP with a mixture of RBF and Matern kernels, a linear mean function, + and normalization capabilities for inputs and outputs. + + Args: + train_x (torch.Tensor): Training input data + train_y (torch.Tensor): Training targets + likelihood (gpytorch.likelihoods.GaussianLikelihood): Likelihood for the model + """ + + def __init__(self, train_x, train_y, likelihood): + super(GPRegressionModel, self).__init__(train_x, train_y, likelihood) + + # Supports all dimensions (ie GPR can be run from 1-8 dimensions) + input_dim = train_x.shape[1] if train_x.dim() > 1 else 1 + + # Define base kernels - use a mixture of the RBF and Matern kernels + self.rbf_kernel = gpytorch.kernels.RBFKernel(ard_num_dims=input_dim) + self.matern_kernel = gpytorch.kernels.MaternKernel( + nu=2.5, ard_num_dims=input_dim + ) + + # Wrap each kernel with a scale kernel - introduces learnable scaling factor + self.scaled_rbf = gpytorch.kernels.ScaleKernel(self.rbf_kernel) + self.scaled_matern = gpytorch.kernels.ScaleKernel(self.matern_kernel) + + # Combine kernels - the sum of the kernels allows the model to capture more complex + # behavior than either kernel alone would + self.covar_module = self.scaled_rbf + self.scaled_matern + + # Mean function - use linear mean instead of default 0 mean + # Remove hardcoding of model to expect 1D inputs and therefore, matrix mismatch if you pass 2D inputs + self.mean_module = gpytorch.means.LinearMean(input_size=input_dim) + + # Normalization parameters - store the mean and std of the inputs and outputs + self.input_mean = None + self.input_std = None + self.output_mean = None + self.output_std = None + + def set_normalization(self, input_mean, input_std, output_mean, output_std): + """ + Store normalization parameters in the model. + """ + self.input_mean = input_mean + self.input_std = input_std + self.output_mean = output_mean + self.output_std = output_std + + def normalize_input(self, X): + """ + Normalize input using stored parameters. Scale input to zero mean and unit variance. + """ + return (X - self.input_mean) / self.input_std + + def denormalize_output(self, Y_normalized): + """ + Denormalize output using stored parameters (converts normalized output back to original scale). + """ + return (Y_normalized * self.output_std) + self.output_mean + + def forward(self, x, normalize_input=False): + """ + Forward pass with optional input normalization. + + Args: + x (torch.Tensor): Input data + normalize_input (bool): If True, normalize x using stored parameters. + + Returns: + gpytorch.distributions.MultivariateNormal: Distribution for the input. + """ + if normalize_input: + x = self.normalize_input(x) + mean_x = self.mean_module(x) + covar_x = self.covar_module(x) + return gpytorch.distributions.MultivariateNormal(mean_x, covar_x) + + +# GPR training function +def train_gpr_model(raw_X, raw_Y): + """ + Train a GPR model with normalization parameters stored in the model. + + Args: + raw_X (numpy.ndarray): Raw input data + raw_Y (numpy.ndarray): Raw output data + + Returns: + GPRegressionModel: Trained model with the normalization parameters stored + gpytorch.likelihoods.GaussianLikelihood: Likelihood for the model + """ + # Compute normalization parameters + input_mean = raw_X.mean( + axis=0 + ) # (D, ) needed for multidimensions to avoid computing a single scalar mean for all columns combined + input_std = raw_X.std( + axis=0 + ) # (D, ) needed for multidimensions to avoid computing a single scalar mean for all columns combined + output_mean = raw_Y.mean() + output_std = raw_Y.std() + + # Normalize data column-wise; needed for all dimension > 1 + normalized_X = (raw_X - input_mean) / input_std + normalized_Y = (raw_Y - output_mean) / output_std + + # Ensure X is proper dimension + if normalized_X.ndim == 1: + normalized_X = normalized_X.reshape(-1, 1) + + # Convert to PyTorch tensors + train_X = torch.from_numpy(normalized_X).float() + train_Y = torch.from_numpy(normalized_Y).float() + + # Define the likelihood and the model + likelihood = gpytorch.likelihoods.GaussianLikelihood() + model = GPRegressionModel(train_X, train_Y, likelihood) + + # Store normalization parameters in the model + model.set_normalization(input_mean, input_std, output_mean, output_std) + + # Set model to training mode + model.train() + likelihood.train() + + # Use Adam optimizer with learning rate + optimizer = torch.optim.Adam(model.parameters(), lr=0.05) + + # Add learning rate scheduler + scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau( + optimizer, + mode="min", + factor=0.5, # Reduce LR by a factor of 0.5 when triggered + patience=5, # Wait 5 epochs before reducing LR if there is no improvement + ) + + # Loss function + mll = gpytorch.mlls.ExactMarginalLogLikelihood(likelihood, model) + + # Track the best model to prevent overfitting + best_loss = float("inf") # Initialize best loss as infinity + best_state = None # Placeholder for best model state + + # Training loop for 200 iterations + for i in range(200): + # Ensure model is in training mode at the start of each iteration + model.train() + likelihood.train() + + # Reset gradients from the previous iteration + optimizer.zero_grad() + + # Compute model output + output = model(train_X) + + # Compute negative log-likelihood loss + loss = -mll(output, train_Y) + + # Backpropagation: compute gradients of loss wrt model parameters + loss.backward() + + # Update model parameters + optimizer.step() + + # Update and adjust the learning rate + scheduler.step(loss) + + # Save the best model parameters (if current loss is the lowest) + if loss.item() < best_loss: + best_loss = loss.item() + best_state = model.state_dict().copy() + + # Load the best model state after training + if best_state is not None: + model.load_state_dict(best_state) + + return model, likelihood + + +# GPR prediction function +def predict_with_gpr_model(raw_X, model, likelihood): + """ + Predict using the GPR model with stored normalization parameters. + + Args: + raw_X (numpy.ndarray): Raw input data. + model (GPRegressionModel): Trained model. + likelihood (gpytorch.likelihoods.GaussianLikelihood): Likelihood + for the model. + + Returns: + numpy.ndarray: Predicted mean (denormalized). + numpy.ndarray: Predicted standard deviation (denormalized). + """ + # Normalize the input using the model's stored parameters + normalized_X = (raw_X - model.input_mean) / model.input_std + X_tensor = torch.from_numpy(normalized_X).float() + + # Set the model and likelihood to evaluation mode + model.eval() + likelihood.eval() + + # Make predictions + with torch.no_grad(): + observed_pred = likelihood(model(X_tensor)) + + # Denormalize the predictions + mean_normalized = observed_pred.mean.numpy() + stddev_normalized = observed_pred.variance.sqrt().numpy() + + mean_denormalized = model.denormalize_output(mean_normalized) + stddev_denormalized = stddev_normalized * model.output_std + + return mean_denormalized, stddev_denormalized + + +# GPR pipeline function - runs the entire process - including training, predicting, plotting - and outputs performance metrics +# This function encompasses previous functions defined above: train_gpr_model and predict_with_gpr_model and runs them together +def run_gpr_pipeline(X, Y, target_name="target", plot=True, silent=False): + """ + Train a GPR model on (X, Y), predict on X, plot, and report metrics for a given target. + + Args: + X (np.ndarray): Input data. + Y (np.ndarray): Target output deltas. + target_name (str): For labeling plots & output. + plot: whether to produce correlation plots. + silent: whether to suppress print statements entirely. + + Returns: + model + likelihood + Y_pred + uncertainties + """ + + # Train GPR model + model, likelihood = train_gpr_model(X, Y) + + # Make predictions + Y_pred, uncertainties = predict_with_gpr_model(X, model, likelihood) + + # If specified, create correlation plot and compute metrics + if plot: + plt.figure(figsize=(8, 6)) + plt.scatter(Y, Y_pred, alpha=0.6, s=20) + + # Make perfect correlation line (y = x) + min_val = min(Y.min(), Y_pred.min()) + max_val = max(Y.max(), Y_pred.max()) + plt.plot( + [min_val, max_val], + [min_val, max_val], + "r--", + lw=2, + label="Perfect Correlation", + ) + + # Labels and formatting + plt.xlabel(f"ΔTrue {target_name}", fontsize=12) + plt.ylabel(f"GPR Predicted Δ{target_name}", fontsize=12) + plt.title( + f"GPR Predictions vs True Values ({target_name})", fontsize=14 + ) + plt.grid(True, alpha=0.3) + plt.legend() + + # Calculate and display metrics + corr = np.corrcoef(Y, Y_pred)[0, 1] + r2 = corr**2 + rmse = np.sqrt(np.mean((Y - Y_pred) ** 2)) + mae = np.mean(np.abs(Y - Y_pred)) + metrics_text = f"R² = {r2:.4f}\nRMSE = {rmse:.8f}\nMAE = {mae:.8f}" + plt.text( + 0.95, + 0.05, + metrics_text, + transform=plt.gca().transAxes, + fontsize=12, + bbox=dict(boxstyle="round", facecolor="white", alpha=0.8), + verticalalignment="bottom", + horizontalalignment="right", + ) + + plt.tight_layout() + plt.show() + + if not silent: + # Print performance metrics regardless of plotting + print(f"R²: goal: > 0.95 excellent, > 0.90 good, < 0.70 poor") + print(f"RMSE goal: < 1 % of target range, lower is better") + print(f"MAE goal: < 1 % of target range, lower is better") + + return model, likelihood, Y_pred diff --git a/TestGPRLibrary.py b/TestGPRLibrary.py new file mode 100644 index 0000000..b5c8321 --- /dev/null +++ b/TestGPRLibrary.py @@ -0,0 +1,204 @@ +# Distributed under the MIT License. +# See LICENSE.txt for details. + +# Unit test for GPR_library.py +# Generate a simple, synthetic regression problem and run the pipeline +# Assert that the shapes are reasonable, uncertainties are positive, and +# that the mean predictions are within a reasonable range of the true values + +import shutil +import unittest +from pathlib import Path + +import numpy as np + +from GPR_library import ( + predict_with_gpr_model, + run_gpr_pipeline, + train_gpr_model, +) + + +class TestGPRLibrary(unittest.TestCase): + # Set up and prepare test directory and file paths + def setUp(self): + self.test_dir = Path("SimulationSupport") / "GPR_library_tests" + + # Clean up any existing test directory and create a new one + shutil.rmtree(self.test_dir, ignore_errors=True) + self.test_dir.mkdir(parents=True, exist_ok=True) + + self.X, self.Y = self.make_data() + + # Clean up and remove test directory after tests are done + def tearDown(self): + shutil.rmtree(self.test_dir) + + @staticmethod + def make_data(n=50, noise_lev=0.05): + """ + Generate synthetic regression dataset - + points sampled from a noisy sine curve: + y = sin(5 * x) + Gaussian noise. + """ + # Random number generator with a fixed seed 0; gets the same random data every time + rng = np.random.default_rng(0) + + # Inputs with shape (n, 1) + X = np.linspace(0, 2 * np.pi, n).reshape(-1, 1) + + # Used flattened X with shape (n,) to evaluate the function + y = np.sin(5 * X.reshape(-1)) + + # Add n Gaussian noise samples with mean 0 and variance 1 + noise = noise_lev * rng.standard_normal(n) + + Y = y + noise # Observed data is y = sin(5x) + Gaussian noise + + return X, Y + + def test_train_predict_and_uncertainty(self): + """ + Check that: + - train_gpr_model and predict with gpr_model run and + return predictions of the correct shape + - uncertainties are positive and non-trivial + """ + X, Y = self.X, self.Y # fake dataset + + # Train the GPR model + model, likelihood = train_gpr_model(X, Y) + + # Make predictions + Y_pred, Y_std = predict_with_gpr_model(X, model, likelihood) + + # Check shapes + self.assertEqual( + Y_pred.shape, + Y.shape, + f"Expected prediction shape {Y.shape}, got {Y_pred.shape}", + ) + + self.assertEqual( + Y_std.shape, + Y.shape, + f"Expected standard deviation shape {Y.shape}, got {Y_std.shape}", + ) + + # Check uncertainties are positive + self.assertTrue( + np.all(Y_std >= 0), + f"Some of the predicted standard deviations are negative", + ) + + # Check that uncertainties are nontrivial + self.assertTrue(np.any(Y_std > 1e-10), "All uncertainties are zero") + + def test_run_gpr_pipeline(self): + """ + Test the full GPR pipeline function. + Check that outputs are well correlated with true values. + """ + X, Y = self.X, self.Y + + # Run the full GPR pipeline + model, likelihood, Y_pred = run_gpr_pipeline( + X, Y, target_name="test", plot=False, silent=True + ) + + corr = np.corrcoef(Y, Y_pred)[0, 1] + + self.assertGreater( + corr, 0.9, f"Expected correlation > 0.9, got {corr:.3f}" + ) + + def test_normalization_stored_and_applied(self): + """ + Test that the normalization parameters are stored and applied correctly: + - input_mean and input_std have correct shapes + - normalized inputs, X, have mean of ~0 and std of ~1 per feature + """ + X, Y = self.X, self.Y + model, likelihood = train_gpr_model(X, Y) + + # Check shapes of stored statistics + exp_shape = (X.shape[1],) + + self.assertEqual( + model.input_mean.shape, + exp_shape, + f"Expected input_mean shape {exp_shape}, got" + f" {model.input_mean.shape}", + ) + + self.assertEqual( + model.input_std.shape, + exp_shape, + f"Expected input_std shape {exp_shape}, got" + f" {model.input_std.shape}", + ) + + # Recreate normalized X using the stored statistics + normalized_X = (X - model.input_mean) / model.input_std + + # Column-wise mean and std + col_means = normalized_X.mean(axis=0) + col_stds = normalized_X.std(axis=0) + + # Check that each mean is close to 0 with an allowed tolerance of 0.1 + self.assertTrue( + np.allclose(col_means, 0, atol=1e-1), + f"Expected means ~0, got {col_means}", + ) + + # Check that each std is close to 1 with an allowed tolerance of 0.1 + self.assertTrue( + np.allclose(col_stds, 1.0, atol=1e-1), + f"Expected stddevs ~1, got {col_stds}", + ) + + def test_output_denorm(self): + """ + Test that the stored output_mean and output_std are consistent with + denormalize_output, ie that it is correctly undoing the normalization. + """ + X, Y = self.X, self.Y + model, likelihood = train_gpr_model(X, Y) + + # Sanity check on stored output statistics + self.assertIsInstance( + model.output_mean, + (float, np.floating), + "Expected output mean to be a float, got" + f" {type(model.output_mean)}", + ) + + self.assertIsInstance( + model.output_std, + (float, np.floating), + "Expected output standard deviation to be a float, got" + f" {type(model.output_std)}", + ) + + self.assertGreater( + model.output_std, + 0.0, + "output standard deviation should be positive", + ) + + # Test normalization on a small subset of Y + Y_subset = Y[:5] + Y_normalized = (Y_subset - model.output_mean) / model.output_std + Y_denormalized = model.denormalize_output(Y_normalized) + + # Check that the original Y is recovered + self.assertTrue( + np.allclose(Y_denormalized, Y_subset, atol=1e-8), + f"Expected normalization {Y_subset}, got {Y_denormalized}." + f" Normalization failed.\nOriginal: {Y_subset}, Denormalized:" + f" {Y_denormalized}", + ) + + +if __name__ == "__main__": + unittest.main(verbosity=2) diff --git a/src/SimulationSupport/PlotSpecEccentricityControl.py b/src/SimulationSupport/PlotSpecEccentricityControl.py new file mode 100644 index 0000000..dd85eb5 --- /dev/null +++ b/src/SimulationSupport/PlotSpecEccentricityControl.py @@ -0,0 +1,1345 @@ +#!/usr/bin/env python + +# Distributed under the MIT License. +# See LICENSE.txt for details. + +import logging +import os +import re +from collections import defaultdict +from pathlib import Path +from typing import Optional, Sequence, Union + +import click +import matplotlib.colors as mcolors +import matplotlib.lines as mlines +import matplotlib.pyplot as plt +import numpy as np +import pandas as pd +import h5py +import json +import matplotlib.patheffects as path_effects +from adjustText import adjust_text +from matplotlib.ticker import MaxNLocator, ScalarFormatter +from matplotlib.patches import Ellipse, FancyArrowPatch +from mpl_toolkits.axes_grid1.inset_locator import inset_axes + +logger = logging.getLogger(__name__) + +# Style helpers + +# Line styles and markers are cycled over however many run types the user supplies. +# Styles are assigned in the order run types appear in the data. +_LINESTYLE_CYCLE = ["-", "--", ":", "-.", (0, (3, 1, 1, 1))] +_MARKER_CYCLE = ["o", "D", "s", "^", "v", "P", "X", "*"] +_MARKERSIZE_CYCLE = [5, 4, 5, 5, 5, 5, 5, 7 ] + +def build_run_type_styles(run_types: Sequence) -> dict: + """ + Build per-run-type style dicts from whatever run type labels the user + has assigned in their RunType column. + + Styles are assigned in the order the run types appear. User controls all naming. + + Args: + run_types: iterable of user supplied run type label strings. + + Returns: + Dict of dicts keyed by run type, each containing keys + 'linestyle', 'marker', 'markersize', and 'zorder'. + """ + unique = list(dict.fromkeys(run_types)) + return { + rt: { + "linestyle": _LINESTYLE_CYCLE[i % len(_LINESTYLE_CYCLE)], + "marker": _MARKER_CYCLE[i % len(_MARKER_CYCLE)], + "markersize": _MARKERSIZE_CYCLE[i % len(_MARKERSIZE_CYCLE)], + "zorder": i + 1, + } + for i, rt in enumerate(unique) + } + +def build_case_color_map(cases: Sequence) -> dict: + """Assign a distinct colour to each unique case, using the tab10 palette. + + Args: + cases: iterable of case identifiers. + + Returns: + Dict mapping each case identifier to a matplotlib colour string. + """ + unique = list(dict.fromkeys(cases)) + palette = plt.cm.tab10.colors + return {c: palette[i % len(palette)] for i, c in enumerate(unique)} + + +# Formatting helpers +def sci_offset_formatter() -> ScalarFormatter: + """ + Return a ScalarFormatter that always uses scientific offset notation. + """ + fmt = ScalarFormatter(useMathText=True) + fmt.set_scientific(True) + fmt.set_powerlimits((0, 0)) + return fmt + +def with_alpha(color: str, alpha: float): + """ + Return the color as an RGBA tuple with the given opacity. + + Args: + color: any matplotlib color string. + alpha: opacity in [0, 1]. + + Returns: + RGBA tuple. + """ + r, g, b, _ = mcolors.to_rgba(color) + return (r, g, b, alpha) + +def save_pdf(name: str, width: float = 3.4, fig=None): + """ + Save a figure as a PDF with fixed width of 3.4 and auto-increment the filename so old plots do not get + overwritten. Call after plt.show() + + Args: + name: base filename. + width: target width in inches (defaults to 3.4 for PRD column width). + fig: figure to save (defaults to the current figure). + """ + i = 1 + filename = f"{name}_{i:02d}.pdf" + while os.path.exists(filename): + i += 1 + filename = f"{name}_{i:02d}.pdf" + + if fig is None: + fig = plt.gcf() + # Get current figure and resize + orig_width, orig_height = fig.get_size_inches() + # Keep aspect ratio by computing proportional height + new_height = orig_height * (width / orig_width) + # Resize to PRD width + fig.set_size_inches(width, new_height) + # Save + fig.savefig(filename, format="pdf", bbox_inches="tight") + print(f"Saved figure as {filename}") + +# HDF5 parsing helpers +def extract_params_from_lines(lines: list, include_massratio: bool = True): + """ + Extract MassRatio, Omega0, adot0, and D0 from their respective Params.input lines. + + Args: + lines: list of strings from Params.input file. + include_massratio (bool): condition to determine whether or not to extract MassRatio. + + Returns: + Tuple (massratio, omega, adot, d0) of floats, or (None, …) on error. + """ + try: + # Find relevant lines for each parameter + mass_ratio_line = next((l for l in lines if "$MassRatio" in l), None) + omega_line = next((l for l in lines if "$Omega0" in l), None) + adot_line = next((l for l in lines if "$adot0" in l), None) + d0_line = next((l for l in lines if "$D0" in l), None) + + # Raise error if any are missing + if None in [mass_ratio_line, omega_line, adot_line, d0_line]: + raise ValueError("Required parameters not found") + + # Extract numeric values from the strings + massratio = float(mass_ratio_line.split("=")[1].strip().rstrip(";")) + omega = float(omega_line.split("=")[1].strip().rstrip(";")) + adot = float(adot_line.split("=")[1].strip().rstrip(";")) + d0 = float(d0_line.split("=")[1].strip().rstrip(";")) + + return massratio, omega, adot, d0 + + except Exception as e: + print(f"Error extracting parameters: {e}") + return None, None, None, None + +def filter_param_lines(content: str) -> list: + """ + Return only the lines in the input file that contain the relevant parameters. + + Args: + content: text of a Params.input file. + + Returns: + List of matching lines. + """ + return [ + line for line in content.splitlines() + if any(key in line for key in ("$MassRatio", "$Omega0", "$adot0", "$D0")) + ] + + +def read_dataset(hdf, path: str) -> str: + """ + Read and decode the string content of the dataset from the HDF5 file. + + Args: + hdf (h5py.File): open h5py.File object. + path (str): dataset path inside the file. + + Returns: + Decoded UTF-8 string. + """ + return hdf[path][()].decode("utf-8") + +# Group by simulation name and eccentricity level - for example, EccRedTest**2025*** and Ecc* +def group_by_sim_and_ecc(paths: list, suffix: str) -> dict: + """ + Group HDF5 dataset paths by simulation name and eccentricity level. + Only include paths ending with the specified suffix (eg. "Params.input") + + Args: + paths: all dataset paths in the HDF5 file. + suffix (str): file suffix to match, e.g. 'Params.input'. + + Returns: + Dict mapping simulation name to a list of (ecc_level, path) tuples. + """ + # Create a dictionary that maps each sim to a list of (ecc, path) pairs + grouped = defaultdict(list) + # Loop through each path and keep only the ones that end with the given suffix + for path in paths: + if path.endswith(suffix): + # Split path string into components and only process paths with at least 3 components + parts = path.split("/") + # Expect format SimName/EccX/... + if len(parts) >= 3 and re.match(r"Ecc\d+", parts[1]): + # Ensure eccentricity directory exists + sim = parts[0] + try: + ecc = int(parts[1][3:]) # strip "Ecc" and convert to int + grouped[sim].append((ecc, path)) + except ValueError: + pass + return grouped + + +def process_h5_file(file_path: str) -> pd.DataFrame: + """ + Extract orbital parameter (Omega0, adot0, separation, mass ratio, eccentricity) values + from all Ecc folders across all simulations in an HDF5 file. + + Args: + file_path: path to the HDF5 file. + + Returns: + DataFrame with columns [Sim, MassRatio, EccLevel, Omega0, Adot0, + Initial Separation, Eccentricity] and one row per eccentricity level. + """ + + data_all_iterations = [] + + with h5py.File(file_path, "r") as hdf: + all_paths = [] + + # Recursively collect all dataset paths in the file + def collect_paths(name, obj): + if isinstance(obj, h5py.Dataset): + all_paths.append(name) + + hdf.visititems(collect_paths) + + # Group dataset by simulation and eccentricity level + grouped_params = group_by_sim_and_ecc(all_paths, "Params.input") + + for sim, entries in grouped_params.items(): + # Sort entries in order of ascending eccentricity level (Ecc0, Ecc1, ...) + entries.sort() + + # Iterate through all eccentricity levels and extract data + for ecc, path in entries: + try: + # Read Params.input content and extract relevaznt parameters + content = read_dataset(hdf, path) + lines = filter_param_lines(content) + massratio, omega, adot, d0 = extract_params_from_lines(lines) + + ecc_file = ( + f"{sim}/Ecc{ecc}/Ev/JoinedForEcc/Fit_F2cos2_SS.dat" + ) + if ecc_file in all_paths: + ecc_lines = read_dataset(hdf, ecc_file).splitlines() + ecc_value = float( + ecc_lines[-1].split()[-1].strip().rstrip(";") + ) + else: + ecc_value = None + + # Store data in dictionary with one row per iteration + data_all_iterations.append({ + "Sim": sim, + "MassRatio": massratio, + "EccLevel": ecc, + "Omega0": omega, + "Adot0": adot, + "Initial Separation": d0, + "Eccentricity": ecc_value, + }) + + except Exception as e: + print( + f"Error in Params.input for {sim} in {file_path}: {e}" + ) + + return pd.DataFrame(data_all_iterations) + +# JSON parsing helpers +def find_metadata( + paths: list, + chosen_lev: str, + suffix: str = "metadata.json", + root: Path = None, +) -> dict: + """ + Find metadata.json files at a chosen Lev directory for each sim folder. + + Args: + paths: all relative file paths. + chosen_lev: which Lev directory to select, e.g. 'Lev3' or 'Lev4'. + Must be provided - no default assumed. + suffix: filename to match. + root: absolute root path to prepend. + + Returns: + Dict mapping sim name to a list of absolute Paths. + """ + grouped = defaultdict(list) + for path in paths: + if path.endswith(suffix): + parts = path.split("/") + if len(parts) >= 3 and parts[1] == chosen_lev: + sim = parts[0] + grouped[sim].append(root / Path(path)) + return grouped + +def process_json_files(metadata_paths: dict) -> pd.DataFrame: + """ + Read each metadata.json file and extract parameters into a DataFrame. + + Args: + metadata_paths: dict mapping sim name to a list of absolute Paths. + + Returns: + DataFrame with orbital parameters extracted from each JSON file. + """ + + data_all = [] + for sim in sorted(metadata_paths.keys(), key=lambda s: int(s)): + for path in metadata_paths[sim]: + try: + with open(path, "r") as f: + data = json.load(f) + + spin1 = data.get("initial_dimensionless_spin1", [None, None, None]) + spin2 = data.get("initial_dimensionless_spin2", [None, None, None]) + + mass1 = data.get("initial_mass1", None) + mass2 = data.get("initial_mass2", 1) + + data_all.append({ + "Sim": data.get("simulation_name", sim), + "MassRatio": mass1 / mass2 if mass1 is not None else None, + "Initial Separation": data.get("initial_separation", None), + "Omega0": data.get("initial_orbital_frequency", None), + "Adot0": data.get("initial_adot", None), + "S1x": spin1[0], "S1y": spin1[1], "S1z": spin1[2], + "S2x": spin2[0], "S2y": spin2[1], "S2z": spin2[2], + "Eccentricity": data.get("reference_eccentricity", None), + }) + except Exception as e: + print(f"Error reading {path}: {e}") + + return pd.DataFrame(data_all) + +# Tolerance ellipse helpers +def choose_inset_anchor( + ax, + x: np.ndarray, + y: np.ndarray, + w: float = 0.35, + h: float = 0.5, + pad_x: float = 0.02, + pad_y: float = 0.02, +): + """ + Choose an inset anchor (x0, y0, w, h) in Axes coords that overlaps the + fewest data points. + + Args: + ax: parent Axes object. + x: data x-coordinates of existing points. + y: data y-coordinates of existing points. + w: inset width in Axes fraction. + h: inset height in Axes fraction. + pad_x: horizontal padding from Axes edge. + pad_y: vertical padding from Axes edge. + + Returns: + Tuple (x0, y0, w, h) in Axes fraction coordinates. + """ + pts = ax.transAxes.inverted().transform( + ax.transData.transform(np.c_[x, y]) + ) + px, py = pts[:, 0], pts[:, 1] + + candidates = [ + (pad_x, pad_y), + (1 - w - pad_x, pad_y), + (pad_x, 1 - h - pad_y), + (1 - w - pad_x, 1 - h - pad_y), + ] + best = candidates[0] + best_score = None + for x0, y0 in candidates: + in_box = ( + (px >= x0) & (px <= x0 + w) + & (py >= y0) & (py <= y0 + h) + ) + score = in_box.sum() + if best_score is None or score < best_score: + best_score = score + best = (x0, y0) + return best[0], best[1], w, h + +def add_tolerance_ellipse_fixed( + ax, + df_case: pd.DataFrame, + case_id: str, + run_type: str, + dOmega: float, + dAdot: float, + case_col: str = "CaseLetter", + facecolor: str = "hotpink", + edgecolor: str = "hotpink", + alpha: float = 0.10, + lw: float = 0.8, + zorder: Optional[int] = None, + label: Optional[str] = None, +): + """ + Draw a single tolerance ellipse centered on the final iteration point. + + Args: + ax: Axes to draw on. + df_case: per-iteration DataFrame for the case. + case_id: case letter, e.g. 'A'. + run_type: run type label set by the user in the RunType column. + dOmega: half-width of the ellipse in Omega0 units. + dAdot: half-height of the ellipse in Adot0 units. + case_col: column name that holds the case letter. + facecolor: fill colour. + edgecolor: edge colour. + alpha: fill opacity. + lw: edge line width. + zorder: drawing order. + label: legend label. + + Returns: + The Ellipse patch, or None if no data is found. + """ + d = df_case.copy() + d[case_col] = d[case_col].astype(str) + d = d[(d[case_col] == str(case_id)) & (d["RunType"] == run_type)] + if d.empty: + return None + + row_final = d.sort_values("EccLevel").iloc[-1] + x_final = float(row_final["Omega0"]) + y_final = float(row_final["Adot0"]) + + fc = mcolors.to_rgba(facecolor, 0.12) + ec = mcolors.to_rgba(edgecolor, 0.62) + + ell = Ellipse( + (x_final, y_final), + width=2 * dOmega, + height=2 * dAdot, + facecolor=fc, + edgecolor=ec, + linewidth=lw, + zorder=zorder, + label=label, + ) + ax.add_patch(ell) + return ell + + +def max_unapplied_deltas( + df_case: pd.DataFrame, + df_next: pd.DataFrame, + case_id: str, + run_types: list, + case_col: str = "CaseLetter", +): + """ + Compute the maximum unapplied correction size across run types. + + Args: + df_case: per-iteration DataFrame. + df_next: DataFrame of next-step parameters with columns + [Case, RunType, Omega0_new, Adot0_new]. + case_id: case letter. + run_types: list of run type strings to consider. + case_col: column name that holds the case letter. + + Returns: + Tuple (dOmega_max, dAdot_max), or None if no data is found. + """ + d = df_case.copy() + d[case_col] = d[case_col].astype(str) + out = [] + + for rt in run_types: + dd = d[(d[case_col] == str(case_id)) & (d["RunType"] == rt)] + if dd.empty: + continue + row_final = dd.sort_values("EccLevel").iloc[-1] + x_final = float(row_final["Omega0"]) + y_final = float(row_final["Adot0"]) + + hit = df_next[ + (df_next["Case"].astype(str) == str(case_id)) + & (df_next["RunType"].astype(str) == rt) + ] + if hit.empty: + continue + + x_new = float(hit.iloc[0]["Omega0_new"]) + y_new = float(hit.iloc[0]["Adot0_new"]) + out.append((abs(x_new - x_final), abs(y_new - y_final))) + + if not out: + return None + + dOmega_max = max(v[0] for v in out) + dAdot_max = max(v[1] for v in out) + return dOmega_max, dAdot_max + + +def add_concentric_tolerance_from_next_step( + ax, + df_case: pd.DataFrame, + df_next: pd.DataFrame, + case_id: str, + run_type: str, + case_col: str = "CaseLetter", + base_color: str = "hotpink", + multipliers: tuple = (3, 2, 1), + fill_alphas: tuple = (0.05, 0.08, 0.12), + edge_alphas: tuple = (0.20, 0.35, 0.55), + lw: tuple = (1.0, 1.1, 1.2), + zorder: int = 1, + min_frac: Optional[float] = None, +) -> list: + """ + Draw concentric tolerance ellipses centered at the final executed point. + The ellipse sizes are set by the unapplied correction |next - final| in (Omega, Adot). + + Args: + ax: Axes to draw on. + df_case: per-iteration DataFrame. + df_next: DataFrame of next-step parameters. + case_id: case letter. + run_type: run type string. + case_col: column name that holds the case letter. + base_color: colour for all ellipses. + multipliers: scale factors for the three ellipses. + fill_alphas: fill opacities for each ellipse. + edge_alphas: edge opacities for each ellipse. + lw: line widths for each ellipse. + zorder: drawing order (placed behind lines and markers). + min_frac: if set, enforce a minimum ellipse size as a fraction of the current Axes range. + + Returns: + List of Ellipse patches that were added. + """ + d = df_case.copy() + d[case_col] = d[case_col].astype(str) + d = d[(d[case_col] == str(case_id)) & (d["RunType"] == run_type)] + if d.empty: + return [] + + row_final = d.sort_values("EccLevel").iloc[-1] + x_final = float(row_final["Omega0"]) + y_final = float(row_final["Adot0"]) + + df_next2 = df_next.copy() + df_next2["Case"] = df_next2["Case"].astype(str) + df_next2["RunType"] = df_next2["RunType"].astype(str) + hit = df_next2[ + (df_next2["Case"] == str(case_id)) + & (df_next2["RunType"] == str(run_type)) + ] + if hit.empty: + return [] + + x_new = float(hit.iloc[0]["Omega0_new"]) + y_new = float(hit.iloc[0]["Adot0_new"]) + dOmega = abs(x_new - x_final) + dAdot = abs(y_new - y_final) + + if min_frac is not None: + xspan = ax.get_xlim()[1] - ax.get_xlim()[0] + yspan = ax.get_ylim()[1] - ax.get_ylim()[0] + if xspan > 0: + dOmega = max(dOmega, min_frac * xspan) + if yspan > 0: + dAdot = max(dAdot, min_frac * yspan) + + ells = [] + for m, fa, ea, lw_i in zip(multipliers, fill_alphas, edge_alphas, lw): + ell = Ellipse( + (x_final, y_final), + width=2 * (m * dOmega), + height=2 * (m * dAdot), + facecolor=with_alpha(base_color, fa), + edgecolor=with_alpha(base_color, ea), + linewidth=lw_i, + zorder=zorder, + ) + ax.add_patch(ell) + ells.append(ell) + + return ells + +# Plotting functions +def plot_spec_eccentricity_control( + df_all: pd.DataFrame, + sim_name: str, + ax=None, + color: str = "black", + label: Optional[str] = None, + linestyle: str = "-", + linewidth: Optional[float] = None, + run_type: Optional[str] = None, + marker: Optional[str] = None, + markersize: Optional[float] = None, + zorder: int = 10, + adot_mode: str = "raw", +): + """ + Plot the trajectory through (Omega0, Adot0) space for a single + simulation across eccentricity reduction iterations. Optionally overlay + eccentricity contours. Can plot Adot0 in its true format, absolute value, + or log absolute value. + + Key differences from PlotEccentricityControl.py: + 1. Handles cases with fewer than 3 iterations (skips contour map, + still plots points and path) because many simulations need only 2 iterations. + 2. Draws directional arrows along the trajectory. + 3. Annotates each point with its iteration number. + + \f + Arguments: + df_all: DataFrame containing per-iteration simulation data. + Must include columns [Sim, Omega0, Adot0, EccLevel, Eccentricity]. + sim_name: simulation name to filter and plot. + ax: Axis to plot on; If None, creates a new figure. + color: color of line/points for the simulation. + label: legend label. + linestyle: matplotlib line style string. + linewidth: line width. + run_type: run type label for this simulation that was set by the user in the RunType column. + marker: matplotlib marker string. + markersize: marker size in points. + zorder: base drawing order; arrows and annotations are drawn above. + adot_mode: how to display Adot0 — 'raw' (default), 'abs' (absolute + value), or 'logabs' (log10 of absolute value). + + Returns: + fig (matplotlib.figure.Figure): Figure containing the plot + ax (matplotlib.axes.Axes): Axis + """ + # Filter simulation data to only include the specified simulation + sim_data = df_all[df_all["Sim"] == sim_name].copy() + # Ensure iterations are plotted in order + sim_data.sort_values("EccLevel", inplace=True) + + if sim_data.empty: + print(f"No data found for simulation {sim_name}") + return ax + + # Transform Adot0 according to requested mode + if adot_mode == "abs": + sim_data["Adot0_plot"] = np.abs(sim_data["Adot0"].values.astype(float)) + elif adot_mode == "logabs": + safe_abs = np.abs(sim_data["Adot0"].values.astype(float)) + sim_data["Adot0_plot"] = np.where(safe_abs > 0, np.log10(safe_abs), np.nan) + else: + sim_data["Adot0_plot"] = sim_data["Adot0"] + + xvals = sim_data["Omega0"].to_numpy() + yvals = sim_data["Adot0_plot"].to_numpy() + + if ax is None: + _, ax = plt.subplots(figsize=(8, 3)) + + marker_alpha = 0.5 + + # Plot trajectory + ax.plot( + xvals, yvals, + color=color, + linestyle=linestyle, + linewidth=linewidth, + marker=marker, + markersize=markersize, + markerfacecolor=with_alpha(color, marker_alpha), + markeredgecolor=color, + markeredgewidth=0.4, + label=label, + zorder=zorder + 1, + ) + + # Directional arrows along the trajectory + for (x0, y0), (x1, y1) in zip( + zip(xvals[:-1], yvals[:-1]), zip(xvals[1:], yvals[1:]) + ): + ax.add_patch(FancyArrowPatch( + (x0, y0), (x1, y1), + arrowstyle="-|>", + mutation_scale=9, + linewidth=0, + edgecolor=color, + facecolor=color, + zorder=zorder + 2, + )) + + # Iteration number annotations + for it, x, y in zip(sim_data["EccLevel"].astype(int), xvals, yvals): + ax.annotate( + f"{it}", + (x, y), + xycoords="data", + ha="center", + va="center", + fontsize=4, + color="black", + zorder=zorder + 200, + clip_on=True, + ) + + ax.relim() + ax.autoscale_view() + return ax + +# Option 1: Eccentricity vs iteration number +def plot_eccentricity_vs_iteration( + df_all, + case_col: str, + output_path: str = "eccentricity_vs_iteration.pdf", + run_type_legend_labels=None, + case_to_color=None, + case_display_labels=None, + y_min: float = 1e-4, +): + """ + Plot eccentricity versus iteration number for all simulations. + + Each line represents one simulation. Line style and marker encode the + run type; colour encodes the case. No run type naming convention is + assumed — the user controls all labeling in the RunType column. + + \f + Arguments: + df_all: per-iteration DataFrame. Must contain columns + [Sim, EccLevel, Eccentricity, RunType] plus the column + named by case_col. The RunType column must be populated + by the user before calling this function. + case_col: column that identifies the physical case for each row; used to assign colours. + output_path: path for the saved PDF. + run_type_legend_labels: optional dict mapping RunType values to custom display labels in the legend, e.g. + {'baseline': 'PN (2025)', 'GPR': 'GPR (2025)'}. If None, raw RunType values are used. + case_to_color: optional dict mapping case identifiers to matplotlib + colour strings. If None, colours are assigned automatically from tab10. + case_display_labels: optional dict mapping case identifiers to display labels for the case legend, e.g. + {'111': 'A', '271': 'B'}. If None, raw identifiers are used. + y_min: lower y-axis limit (default 1e-4). + + Returns: + fig (matplotlib.figure.Figure): Figure containing the plot + ax (matplotlib.axes.Axes): Axis + """ + run_types_present = list(dict.fromkeys(df_all["RunType"].dropna().values)) + rt_styles = build_run_type_styles(run_types_present) + + if case_to_color is None: + all_cases = list(dict.fromkeys(df_all[case_col].dropna().values)) + case_to_color = build_case_color_map(all_cases) + + legend_label = run_type_legend_labels or {} + display_labels = case_display_labels or {} + + fig, ax = plt.subplots(figsize=(3.4, 3)) + + for sim_name, df_sim in df_all.sort_values("EccLevel").groupby("Sim"): + case = df_sim[case_col].iloc[0] + rt = df_sim["RunType"].iloc[0] + color = case_to_color.get(case, "black") + s = rt_styles.get(rt, rt_styles[next(iter(rt_styles))]) + + ax.plot( + df_sim["EccLevel"], + df_sim["Eccentricity"], + marker=s["marker"], + markersize=s["markersize"], + zorder=s["zorder"], + linestyle=s["linestyle"], + linewidth=0.65, + color=color, + label="_nolegend_", + markeredgecolor="white", + markeredgewidth=0.05, + ) + + ax.set_yscale("log") + ax.set_ylim(y_min, None) + ax.set_xlabel("Iteration number", fontsize=8.5) + ax.set_ylabel("Eccentricity", fontsize=8.5) + ax.set_title("Eccentricity reduction across iterations", fontsize=10) + ax.tick_params(axis="both", which="major", labelsize=7) + + # Run type legend + run_type_handles = [ + mlines.Line2D( + [], [], color="black", + linestyle=rt_styles[rt]["linestyle"], + marker=rt_styles[rt]["marker"], + markersize=rt_styles[rt]["markersize"], + linewidth=0.65, + label=legend_label.get(rt, rt), + ) + for rt in run_types_present + ] + type_legend = ax.legend( + handles=run_type_handles, + title="Initial Guess Type", + loc="upper right", + bbox_to_anchor=(0.85, 1), + title_fontsize=7, fontsize=6, + handlelength=4, handletextpad=0.6, + frameon=False, + ) + ax.add_artist(type_legend) + + # Case color legend + cases_in_data = list(dict.fromkeys(df_all[case_col].dropna().values)) + cases_sorted = sorted(cases_in_data, key=lambda c: display_labels.get(c, str(c))) + case_handles = [ + mlines.Line2D( + [], [], color=case_to_color.get(c, "black"), + marker="s", markersize=3, linestyle="None", + label=display_labels.get(c, str(c)), + ) + for c in cases_sorted + ] + ax.legend( + handles=case_handles, title="Case", + loc="upper right", bbox_to_anchor=(1, 1), + title_fontsize=7, fontsize=6, handletextpad=0, frameon=False, + ) + + fig.tight_layout() + fig.savefig(output_path) + return fig + + +# Option 2: trajectory plot — one case per panel +def plot_trajectories( + df_all: pd.DataFrame, + case_col: str, + cases_to_plot: Sequence, + drop_by_panel: dict, + new_params: pd.DataFrame, + output_path: str = "trajectories.pdf", + inset_tail_counts: Optional[dict] = None, + run_type_legend_labels: Optional[dict] = None, + case_to_color: Optional[dict] = None, +): + """ + Plot (Omega0, Adot0) trajectories with zoom insets, making one panel per case. + + Arguments: + df_all: full per-iteration DataFrame. Must contain columns + [Sim, Omega0, Adot0, EccLevel, Eccentricity, RunType] + plus the column named. The RunType column must be populated + before calling the function. + case_col: column in df_all that identifies the physical case, + e.g. 'Case' or 'SimID'. Each unique value becomes one panel. + cases_to_plot: ordered sequence of values from case_col to display; one panel each. + drop_by_panel: dict mapping a case value to a set of other case values + to exclude from that panel. Pass an empty dict to show + all data in every panel. + new_params: DataFrame of next-step parameters for tolerance ellipses, + with columns [, RunType, Omega0_new, Adot0_new]. + Pass an empty DataFrame to skip the ellipses. + output_path: path for the saved PDF. + inset_tail_counts: optional dict controlling how many tail iterations + each run type contributes to the inset zoom region, e.g. + {'PN': 1, 'GPR': None} where None means all + iterations. Defaults to all iterations for every run type. + run_type_legend_labels: optional dict mapping RunType values to custom + display labels in the legend, e.g. + {'PN': 'PN (2025)', 'GPR run': 'GPR (2025)'}. + If None, the raw RunType values are used as labels. + case_to_color: optional dict mapping case identifiers to matplotlib + colour strings, e.g. {'caseA': 'red', 'caseB': 'blue'}. + If None, colours are assigned automatically from tab10. + + Returns: + fig (matplotlib.figure.Figure): Figure containing the plot + ax (matplotlib.axes.Axes): Axis + """ + n = len(cases_to_plot) + fig, axes = plt.subplots(1, n, figsize=(n * 8 / 3, 3), dpi=300, sharey=False) + fig.suptitle("Eccentricity Trajectory through Parameter Space", fontsize=10) + if n == 1: + axes = [axes] + + # Build style maps + run_types_present = list(dict.fromkeys(df_all["RunType"].dropna().values)) + rt_styles = build_run_type_styles(run_types_present) + + # Color map — use user supplied map or build one automatically + if case_to_color is None: + all_cases = list(dict.fromkeys(df_all[case_col].dropna().values)) + case_to_color = build_case_color_map(all_cases) + + # Track which cases actually appear across all panels for the legend + cases_used: set = set() + + for ax, case_id in zip(axes, cases_to_plot): + drops = drop_by_panel.get(case_id, set()) + df_case = df_all[(df_all[case_col] == case_id) & (~df_all[case_col].isin(drops))].copy() + cases_used |= set(df_case[case_col].unique()) + + for sim_name in sorted(df_case["Sim"].unique()): + rt = df_case.loc[df_case["Sim"] == sim_name, "RunType"].iloc[0] + color = case_to_color.get(case_id, "black") + s = rt_styles.get(rt, rt_styles[next(iter(rt_styles))]) + + plot_spec_eccentricity_control( + df_case, sim_name, + ax=ax, + color=color, + adot_mode="raw", + linestyle=s["linestyle"], + run_type=rt, + marker=s["marker"], + linewidth=0.6, + markersize=s["markersize"], + zorder=s["zorder"], + label="_nolegend_", + ) + + # Main axis limits + xmin, xmax = df_case["Omega0"].min(), df_case["Omega0"].max() + ymin, ymax = df_case["Adot0"].min(), df_case["Adot0"].max() + + dx = 0.05 * (xmax - xmin) or 1e-6 + dy = 0.05 * (ymax - ymin) or 1e-6 + + ax.set_xlim(xmin - dx, xmax + dx) + ax.set_ylim(ymin - dy, ymax + dy) + + # Build inset zoom region: per-run-type tail counts let the user + # control how many iterations each run type contributes to the zoom. + # None means all iterations; a positive int means that many tail rows. + zoom_parts = [] + for rt in run_types_present: + df_rt = df_case[df_case["RunType"] == rt] + if df_rt.empty: + continue + tail_n = (inset_tail_counts or {}).get(rt, None) + if tail_n is not None: + df_rt = df_rt.sort_values("EccLevel").groupby("Sim").tail(tail_n) + zoom_parts.append(df_rt) + df_zoom = pd.concat(zoom_parts, ignore_index=True) if zoom_parts else pd.DataFrame() + + if not df_zoom.empty: + x_all = df_case["Omega0"].to_numpy() + y_all = df_case["Adot0"].to_numpy() + x0, y0, w, h = choose_inset_anchor(ax, x_all, y_all, w=0.55, h=0.5, pad_x=0.02, pad_y=0.02) + x_offset= 0.08 + y_offset = 0.09 # (axes fraction; can tweak) + x0 = min(x0 + x_offset, 1 - w - 0.01) + y0 = min(y0 + y_offset, 1 - h - 0.01) + + axins = inset_axes( + ax, width="100%", height="100%", + bbox_to_anchor=(x0, y0, w, h), + bbox_transform=ax.transAxes, + loc="lower left", + borderpad=0.0, + ) + axins.set_facecolor("white") + axins.patch.set_alpha(0.92) + for spine in axins.spines.values(): + spine.set_linewidth(0.5) + + # Plot onto inset + for sim_name in sorted(df_case["Sim"].unique()): + rt = df_case.loc[df_case["Sim"] == sim_name, "RunType"].iloc[0] + color = case_to_color.get(case_id, "black") + s = rt_styles.get(rt, rt_styles[next(iter(rt_styles))]) + plot_spec_eccentricity_control( + df_case, sim_name, + ax=axins, + color=color, + adot_mode="raw", + linestyle=s["linestyle"], + linewidth=0.6, + marker=s["marker"], + markersize=s["markersize"], + zorder=s["zorder"], + run_type=rt, + label="_nolegend_", + ) + + # Inset limits + xin_min, xin_max = df_zoom["Omega0"].min(), df_zoom["Omega0"].max() + yin_min, yin_max = df_zoom["Adot0"].min(), df_zoom["Adot0"].max() + + xrange = (xin_max - xin_min) or 1e-6 + yrange = (yin_max - yin_min) or 1e-6 + + pad_top = 1.8 * yrange # space on top + pad_bottom = 0.2 * yrange # space on bottom + pad_left = 0.4 * xrange # space on left + pad_right = 0.3 * xrange # space on right + + axins.set_xlim(xin_min - pad_left, xin_max + pad_right) + axins.set_ylim(yin_min - pad_bottom, yin_max + pad_top) + + axins.xaxis.set_major_locator(MaxNLocator(3)) + axins.yaxis.set_major_locator(MaxNLocator(3)) + axins.xaxis.set_major_formatter(sci_offset_formatter()) + axins.yaxis.set_major_formatter(sci_offset_formatter()) + axins.tick_params(which="major", width=0.4, length=2, + labelsize=5, direction="in") + axins.grid(False) + for offset_text in ( + axins.xaxis.get_offset_text(), axins.yaxis.get_offset_text() + ): + offset_text.set_fontsize(5) + axins.xaxis.get_offset_text().set_x(1) + axins.xaxis.get_offset_text().set_y(1.15) + + # Bold annotation and highlighted marker on the final iteration + # of each run type in the inset + for sim_name in sorted(df_case["Sim"].unique()): + for rt in run_types_present: + df_rt = df_case[ + (df_case["Sim"] == sim_name) & + (df_case["RunType"] == rt) + ] + if df_rt.empty: + continue + + last = df_rt.sort_values("EccLevel").iloc[-1] + s = rt_styles.get(rt, rt_styles[next(iter(rt_styles))]) + pt_color = case_to_color.get( + last.get(case_col, case_id), "black" + ) + axins.annotate( + f"{int(last['EccLevel'])}", + (last["Omega0"], last["Adot0"]), + xycoords="data", + textcoords="offset points", + xytext=(0, 0), + ha="center", va="center", + fontsize=4, fontweight="bold", + color="black", zorder=300, clip_on=True, + ) + axins.plot( + last["Omega0"], last["Adot0"], + marker=s["marker"], + linestyle="None", + markersize=s["markersize"], + markerfacecolor=pt_color, + markeredgecolor=pt_color, + markeredgewidth=0.5, + zorder=0, + ) + + # Tolerance ellipses (only get drawn when the next-step params are provided) + if not new_params.empty: + for rt in df_case["RunType"].dropna().unique(): + add_concentric_tolerance_from_next_step( + axins, + df_case=df_case, + df_next=new_params, + case_id=case_id, + run_type=rt, + case_col=case_col, + base_color="hotpink", + multipliers=(3, 2, 1), + fill_alphas=(0.03, 0.06, 0.10), + edge_alphas=(0.18, 0.28, 0.45), + lw=(0.8, 0.9, 1.0), + zorder=1, + min_frac=None, + ) + + # Main axis formatting + ax.grid(True, alpha=0.35) + ax.set_xlabel(r"$\Omega_0$", fontsize=8) + if ax is axes[0]: + ax.set_ylabel(r"$\dot{a}_0$", fontsize=8) + ax.xaxis.set_major_locator(MaxNLocator(6)) + ax.yaxis.set_major_locator(MaxNLocator(6)) + ax.xaxis.set_major_formatter(sci_offset_formatter()) + ax.yaxis.set_major_formatter(sci_offset_formatter()) + ax.tick_params(axis="both", which="major", labelsize=7) + for offset_text in ( + ax.xaxis.get_offset_text(), ax.yaxis.get_offset_text() + ): + offset_text.set_fontsize(7) + ax.xaxis.get_offset_text().set_x(1) + ax.xaxis.get_offset_text().set_y(0) + + fig.canvas.draw() + + for ax in axes: + for i, lab in enumerate(ax.get_xticklabels()): + if i % 2 == 1: + lab.set_visible(False) + + # Legends on the middle (or only) axis + ax_legend = axes[len(axes) // 2] + + # Run type legend — use custom display labels if provided, otherwise + # fall back to the raw RunType values from the data + legend_label = run_type_legend_labels or {} + run_type_handles = [ + mlines.Line2D( + [], [], color="black", + linestyle=rt_styles[rt]["linestyle"], + marker=rt_styles[rt]["marker"], + markersize=rt_styles[rt]["markersize"], + linewidth=0.7, + label=legend_label.get(rt, rt), + ) + for rt in run_types_present + ] + type_legend = ax_legend.legend( + handles=run_type_handles, + title="Initial Guess Type", + loc="upper left", + bbox_to_anchor=(0.15, 1), + fontsize=7, title_fontsize=7, + handlelength=4, handletextpad=0.6, + frameon=False, + ) + ax_legend.add_artist(type_legend) + + # Case color legend — only show cases that actually appeared in the panels + cases_in_plot = [c for c in cases_to_plot if c in cases_used] + + case_handles = [ + mlines.Line2D([], [], color=case_to_color[c], marker="s", + linestyle="None", markersize=4, label=str(c)) + for c in cases_in_plot + ] + if case_handles: + ax_legend.legend( + handles=case_handles, title="Case", + loc="upper left", bbox_to_anchor=(0, 1), + fontsize=7, title_fontsize=7, handletextpad=0, frameon=False, + ) + + fig.tight_layout() + fig.savefig(output_path) + return fig + +# Option 3: Iteration counts scatter plot +def plot_iteration_counts( + df: pd.DataFrame, + annotate: str = "iterations", + title: Optional[str] = None, +): + """ + Scatter plot showing the number of eccentricity reduction iterations + per simulation in (MassRatio, Initial Separation) space. + + Each point represents one simulation. Marker size encodes the iteration + count; colour encodes the simulation name. + + \f + Arguments: + df: per-iteration simulation data with columns + [Sim, EccLevel, MassRatio, Initial Separation]. + annotate: 'iterations' to label each point with its count, or 'sim' + to label with the simulation name. + title: plot title; if None, default is used. + + Returns: + fig (matplotlib.figure.Figure): Figure containing the plot + ax (matplotlib.axes.Axes): Axis + """ + required_cols = {"Sim", "EccLevel", "MassRatio", "Initial Separation"} + missing = required_cols - set(df.columns) + if missing: + raise ValueError(f"Missing required columns: {missing}") + + counts_df = ( + df.groupby("Sim").agg( + Iterations=("EccLevel", "count"), + MassRatio=("MassRatio", "first"), + InitialSeparation=("Initial Separation", "first"), + ) + .reset_index() + ) + counts_df["Iterations"] -= 1 # EccLevel 0 is the initial guess + + sizes = 80 * counts_df["Iterations"] + unique_sims = counts_df["Sim"].unique() + color_map = dict(zip(unique_sims, plt.cm.tab20.colors[: len(unique_sims)])) + colors = counts_df["Sim"].map(color_map) + + fig, ax = plt.subplots(figsize=(6, 5)) + ax.scatter( + counts_df["MassRatio"], + counts_df["InitialSeparation"], + c=colors, + s=sizes, + alpha=0.5, + zorder=10, + ) + + texts = [] + for _, row in counts_df.iterrows(): + label = ( + str(row["Iterations"]) if annotate == "iterations" else row["Sim"] + ) + txt = ax.text( + row["MassRatio"], row["InitialSeparation"], label, + fontsize=8, color=color_map[row["Sim"]], zorder=20, + path_effects=[ + path_effects.withStroke(linewidth=1.2, foreground="black") + ], + ) + texts.append(txt) + + adjust_text( + texts, + expand_points=(1.2, 1.4), + arrowprops=dict(arrowstyle="-", color="gray", lw=0.5), + ax=ax, + ) + + ax.set_xlabel("Mass Ratio") + ax.set_ylabel("Initial Separation") + ax.set_title( + title if title is not None + else "Number of Eccentricity Reduction Iterations" + ) + ax.grid(True, zorder=0) + + handles = [ + ax.scatter([], [], color=[c], s=60, alpha=0.7) + for c in color_map.values() + ] + ax.legend( + handles, list(color_map.keys()), + bbox_to_anchor=(1, 1), loc="upper left", + fontsize=10, title="Simulations", + ) + + return fig + +# CLIs +@click.command( + name="spec-eccentricity-control", + help=plot_spec_eccentricity_control.__doc__, +) +@click.argument( + "h5_files", + nargs=-1, + type=click.Path(exists=True, file_okay=True, dir_okay=False, readable=True), +) +@click.option( + "--output", "-o", + default="trajectories.pdf", + show_default=True, + help="Path for the output PDF.", +) +@click.option( + "--lev", + required=True, + help=( + "Lev directory to read metadata from, e.g. 'Lev3' or 'Lev4'. " + "Must match a directory present in the HDF5 file structure. " + ), +) +@click.option( + "--run-type-col", + default="RunType", + show_default=True, + help=( + "Column in the loaded DataFrame that identifies the run type for " + "each simulation (e.g. 'GPR', 'PN'). This column must be " + "present in the data before calling the plotting functions. " + "Defaults to 'RunType'." + ), +) +@click.option( + "--case-col", + default="Sim", + show_default=True, + help=( + "Column to use as the case identifier for panel grouping. " + "Defaults to 'Sim' (one panel per simulation). Change to any " + "other column present in the data, e.g. 'MassRatio'." + ), +) +def plot_spec_eccentricity_control_command( + h5_files, output, lev, run_type_col, case_col +): + """Load HDF5 files and produce the trajectory panel plot. + + Before running, ensure that your DataFrame has a column identifying the run + type for each simulation (e.g. 'GPR', 'PN'). + """ + if not h5_files: + raise click.UsageError("Provide at least one HDF5 file.") + + frames = [process_h5_file(f) for f in h5_files] + df_iter = pd.concat(frames, ignore_index=True) + + for col in (run_type_col, case_col): + if col not in df_iter.columns: + raise click.UsageError( + f"Column '{col}' not found in the loaded data. " + f"Available columns: {list(df_iter.columns)}" + ) + + if df_iter[run_type_col].isna().all(): + raise click.UsageError( + f"Column '{run_type_col}' is empty. " + "Please populate it with run type labels (e.g. 'GPR', " + "'PN') before running." + ) + + cases_to_plot = sorted(df_iter[case_col].dropna().unique()) + + plot_trajectories( + df_iter, + case_col=case_col, + cases_to_plot=cases_to_plot, + drop_by_panel={}, + new_params=pd.DataFrame(), + output_path=output, + ) + logger.info("Saved %s", output) + + +if __name__ == "__main__": + plot_spec_eccentricity_control_command(help_option_names=["-h", "--help"]) \ No newline at end of file diff --git a/src/SimulationSupport/gpr.py b/src/SimulationSupport/gpr.py new file mode 100644 index 0000000..5f3d494 --- /dev/null +++ b/src/SimulationSupport/gpr.py @@ -0,0 +1,318 @@ +# Distributed under the MIT License. +# See LICENSE.txt for details. + +""" +Gaussian Process Regression Machine Learning Function Library. +Contains all functions necessary to run the GPR Model used to predict better +low-eccentricity orbital parameter initial guesses. +""" + +import argparse + +import gpytorch +import matplotlib.pyplot as plt +import numpy as np +import pandas as pd +import torch + +""" +ML functions: + ExactGP uses an infinite number of basis functions; GP is non-parametric and models functions globally; + limited only by training points +""" + + +class GPRegressionModel(gpytorch.models.ExactGP): + """ + Exact GP with a mixture of RBF and Matern kernels, a linear mean function, + and normalization capabilities for inputs and outputs. + + Args: + train_x (torch.Tensor): Training input data + train_y (torch.Tensor): Training targets + likelihood (gpytorch.likelihoods.GaussianLikelihood): Likelihood for the model + """ + + def __init__(self, train_x, train_y, likelihood): + super(GPRegressionModel, self).__init__(train_x, train_y, likelihood) + + # Supports all dimensions (ie GPR can be run from 1-8 dimensions) + input_dim = train_x.shape[1] if train_x.dim() > 1 else 1 + + # Define base kernels - use a mixture of the RBF and Matern kernels + # Smoothness parameter of Matern Kernel is ν=5/2 (nu=2.5); chosen to + # allow GPR to model curvature and capture both smooth global trends (RBF) + # and local variations (Matern) + self.rbf_kernel = gpytorch.kernels.RBFKernel(ard_num_dims=input_dim) + self.matern_kernel = gpytorch.kernels.MaternKernel( + nu=2.5, ard_num_dims=input_dim + ) + + # Wrap each kernel with a scale kernel - introduces learnable scaling factor + self.scaled_rbf = gpytorch.kernels.ScaleKernel(self.rbf_kernel) + self.scaled_matern = gpytorch.kernels.ScaleKernel(self.matern_kernel) + + # Combine kernels - the sum of the kernels allows the model to capture more complex + # behavior than either kernel alone would + self.covar_module = self.scaled_rbf + self.scaled_matern + + # Mean function - use linear mean instead of default 0 mean + # Remove hardcoding of model to expect 1D inputs and therefore, matrix mismatch if you pass 2D inputs + self.mean_module = gpytorch.means.LinearMean(input_size=input_dim) + + # Normalization parameters - store the mean and std of the inputs and outputs + self.input_mean = None + self.input_std = None + self.output_mean = None + self.output_std = None + + def set_normalization(self, input_mean, input_std, output_mean, output_std): + """ + Store normalization parameters in the model. + """ + self.input_mean = input_mean + self.input_std = input_std + self.output_mean = output_mean + self.output_std = output_std + + def normalize_input(self, X): + """ + Normalize input using stored parameters. Scale input to zero mean and unit variance. + """ + return (X - self.input_mean) / self.input_std + + def denormalize_output(self, Y_normalized): + """ + Denormalize output using stored parameters (converts normalized output back to original scale). + """ + return (Y_normalized * self.output_std) + self.output_mean + + def forward(self, x, normalize_input=False): + """ + Forward pass with optional input normalization. + + Args: + x (torch.Tensor): Input data + normalize_input (bool): If True, normalize x using stored parameters. + + Returns: + gpytorch.distributions.MultivariateNormal: Distribution for the input. + """ + if normalize_input: + x = self.normalize_input(x) + mean_x = self.mean_module(x) + covar_x = self.covar_module(x) + return gpytorch.distributions.MultivariateNormal(mean_x, covar_x) + + +# GPR training function +def train_gpr_model(raw_X, raw_Y): + """ + Train a GPR model with normalization parameters stored in the model. + + Args: + raw_X (numpy.ndarray): Raw input data + raw_Y (numpy.ndarray): Raw output data + + Returns: + GPRegressionModel: Trained model with the normalization parameters stored + gpytorch.likelihoods.GaussianLikelihood: Likelihood for the model + """ + # Compute normalization parameters + input_mean = raw_X.mean(axis=0) + input_std = raw_X.std(axis=0) + output_mean = raw_Y.mean() + output_std = raw_Y.std() + + # Normalize data column-wise; needed for all dimension > 1 + normalized_X = (raw_X - input_mean) / input_std + normalized_Y = (raw_Y - output_mean) / output_std + + # Ensure X is proper dimension + if normalized_X.ndim == 1: + normalized_X = normalized_X.reshape(-1, 1) + + # Convert to PyTorch tensors + train_X = torch.from_numpy(normalized_X).float() + train_Y = torch.from_numpy(normalized_Y).float() + + # Define the likelihood and the model + likelihood = gpytorch.likelihoods.GaussianLikelihood() + model = GPRegressionModel(train_X, train_Y, likelihood) + + # Store normalization parameters in the model + model.set_normalization(input_mean, input_std, output_mean, output_std) + + # Set model to training mode + model.train() + likelihood.train() + + # Use Adam optimizer with learning rate + optimizer = torch.optim.Adam(model.parameters(), lr=0.05) + + # Add learning rate scheduler + scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau( + optimizer, + mode="min", + factor=0.5, # Reduce LR by a factor of 0.5 when triggered + patience=5, # Wait 5 epochs before reducing LR if there is no improvement + ) + + # Loss function + mll = gpytorch.mlls.ExactMarginalLogLikelihood(likelihood, model) + + # Track the best model to prevent overfitting + best_loss = float("inf") # Initialize best loss as infinity + best_state = None # Placeholder for best model state + + # Training loop for 200 iterations + for i in range(200): + # Ensure model is in training mode at the start of each iteration + model.train() + likelihood.train() + + # Reset gradients from the previous iteration + optimizer.zero_grad() + + # Compute model output + output = model(train_X) + + # Compute negative log-likelihood loss + loss = -mll(output, train_Y) + + # Backpropagation: compute gradients of loss wrt model parameters + loss.backward() + + # Update model parameters + optimizer.step() + + # Update and adjust the learning rate + scheduler.step(loss) + + # Save the best model parameters (if current loss is the lowest) + if loss.item() < best_loss: + best_loss = loss.item() + best_state = model.state_dict().copy() + + # Load the best model state after training + if best_state is not None: + model.load_state_dict(best_state) + + return model, likelihood + + +# GPR prediction function +def predict_with_gpr_model(raw_X, model, likelihood): + """ + Predict using the GPR model with stored normalization parameters. + + Args: + raw_X (numpy.ndarray): Raw input data. + model (GPRegressionModel): Trained model. + likelihood (gpytorch.likelihoods.GaussianLikelihood): Likelihood + for the model. + + Returns: + numpy.ndarray: Predicted mean (denormalized). + numpy.ndarray: Predicted standard deviation (denormalized). + """ + # Normalize the input using the model's stored parameters + normalized_X = (raw_X - model.input_mean) / model.input_std + X_tensor = torch.from_numpy(normalized_X).float() + + # Set the model and likelihood to evaluation mode + model.eval() + likelihood.eval() + + # Make predictions + with torch.no_grad(): + observed_pred = likelihood(model(X_tensor)) + + # Denormalize the predictions + mean_normalized = observed_pred.mean.numpy() + stddev_normalized = observed_pred.variance.sqrt().numpy() + + mean_denormalized = model.denormalize_output(mean_normalized) + stddev_denormalized = stddev_normalized * model.output_std + + return mean_denormalized, stddev_denormalized + + +# GPR pipeline function - runs the entire process - including training, predicting, plotting - and outputs performance metrics +# This function encompasses previous functions defined above: train_gpr_model and predict_with_gpr_model and runs them together +def run_gpr_pipeline(X, Y, target_name="target", plot=True, silent=False): + """ + Train a GPR model on (X, Y), predict on X, plot, and report metrics for a given target. + + Args: + X (np.ndarray): Input data. + Y (np.ndarray): Target output deltas. + target_name (str): For labeling plots & output. + plot (bool): whether to produce correlation plots. + silent (bool): whether to suppress print statements entirely. + + Returns: + model + likelihood + Y_pred + uncertainties + """ + + # Train GPR model + model, likelihood = train_gpr_model(X, Y) + + # Make predictions + Y_pred, uncertainties = predict_with_gpr_model(X, model, likelihood) + + # If specified, create correlation plot and compute metrics + if plot: + plt.figure(figsize=(8, 6)) + plt.scatter(Y, Y_pred, alpha=0.6, s=20) + + # Make perfect correlation line (y = x) + min_val = min(Y.min(), Y_pred.min()) + max_val = max(Y.max(), Y_pred.max()) + plt.plot( + [min_val, max_val], + [min_val, max_val], + "r--", + lw=2, + label="Perfect Correlation", + ) + + # Labels and formatting + plt.xlabel(f"ΔTrue {target_name}", fontsize=12) + plt.ylabel(f"GPR Predicted Δ{target_name}", fontsize=12) + plt.title( + f"GPR Predictions vs True Values ({target_name})", fontsize=14 + ) + plt.grid(True, alpha=0.3) + plt.legend() + + # Calculate and display metrics + corr = np.corrcoef(Y, Y_pred)[0, 1] + r2 = corr**2 + rmse = np.sqrt(np.mean((Y - Y_pred) ** 2)) + mae = np.mean(np.abs(Y - Y_pred)) + metrics_text = f"R² = {r2:.4f}\nRMSE = {rmse:.8f}\nMAE = {mae:.8f}" + plt.text( + 0.95, + 0.05, + metrics_text, + transform=plt.gca().transAxes, + fontsize=12, + bbox=dict(boxstyle="round", facecolor="white", alpha=0.8), + verticalalignment="bottom", + horizontalalignment="right", + ) + + plt.tight_layout() + plt.show() + + if not silent: + # Print performance metrics regardless of plotting + print(f"R²: goal: > 0.95 excellent, > 0.90 good, < 0.70 poor") + print(f"RMSE goal: < 1 % of target range, lower is better") + print(f"MAE goal: < 1 % of target range, lower is better") + + return model, likelihood, Y_pred diff --git a/tests/test_PlotSpecEccentricityControl.py b/tests/test_PlotSpecEccentricityControl.py new file mode 100644 index 0000000..3ad94e9 --- /dev/null +++ b/tests/test_PlotSpecEccentricityControl.py @@ -0,0 +1,434 @@ +""" +Unit tests for PlotSpecEccentricityControl.py. + +Run with: + python -m pytest test_PlotSpecEccentricityControl.py -v +or: + python -m unittest test_PlotSpecEccentricityControl -v +""" + +import unittest +import numpy as np +import pandas as pd +import matplotlib +matplotlib.use("Agg") +import matplotlib.pyplot as plt +import pytest +from matplotlib.patches import Ellipse + +from SimulationSupport.PlotSpecEccentricityControl import ( + build_run_type_styles, + build_case_color_map, + extract_params_from_lines, + filter_param_lines, + group_by_sim_and_ecc, + plot_spec_eccentricity_control, + plot_eccentricity_vs_iteration, + plot_trajectories, + plot_iteration_counts, + add_tolerance_ellipse_fixed, + choose_inset_anchor, +) + +""" +Make dummy data. All test classes call make_df() in order to get a consistent +DataFrame that mirrors the structure of process_h5_file(). Using a fixed seed +allows the random noise to be identical in every test run. +""" +def make_df( + cases=("A", "B"), + run_types=("PN", "GPR"), + n_iter=4, + seed=42, +): + """Return a minimal DataFrame.""" + rng = np.random.default_rng(seed) + omega_base = {"A": 0.01642 , "B": 0.0146} + adot_base = {"A": -2.3968e-05, "B": -2.3699e-05} + mass_ratio = {"A": 8.0 , "B": 5.3139} + separation = {"A":14.5189 , "B": 15.8556} + + rows = [] + for case in cases: + for rt in run_types: + # Simulation name mirrors the convention used in the SpEC pipeline + sim = f"Sim_{case}_{rt.replace(' ', '_')}" + omega = omega_base[case] + adot = adot_base[case] + for lvl in range(n_iter): + # Make eccentricity decay with each iteration to mimic SpEC runs + ecc_base = {"A": 0.0731, "B": 0.0518} + ecc = ecc_base[case] * (0.3 ** lvl) * abs(1 + 0.05 * rng.normal()) + rows.append({ + "Sim": sim, + "Case": case, + "MassRatio": mass_ratio[case], + "EccLevel": lvl, + "Omega0": omega + lvl * 1e-5 * rng.normal(), + "Adot0": adot + lvl * 1e-7 * rng.normal(), + "Initial Separation": separation[case], + "Eccentricity": ecc, + "RunType": rt, + }) + return pd.DataFrame(rows) + +# Style helpers +# No figures returned on purpose +class TestBuildRunTypeStyles(unittest.TestCase): + def test_returns_all_run_types(self): + # Every run type should appear as a key in the output. + rts = ["PN", "GPR", "custom"] + styles = build_run_type_styles(rts) + self.assertEqual(set(styles.keys()), set(rts)) + + def test_required_keys_present(self): + # Each style dict should contain the four keys used by the plotting functions + styles = build_run_type_styles(["A", "B"]) + for s in styles.values(): + for key in ("linestyle", "marker", "markersize", "zorder"): + self.assertIn(key, s) + + def test_duplicate_run_types_deduplicated(self): + # Ensure that passing the same label twice does not create duplicate entries + styles = build_run_type_styles(["A", "A", "B"]) + self.assertEqual(len(styles), 2) + + def test_zorder_increases_with_index(self): + # Ensure that run types are drawn on top of each other in sequential order + rts = ["first", "second", "third"] + styles = build_run_type_styles(rts) + zorders = [styles[rt]["zorder"] for rt in rts] + self.assertEqual(zorders, sorted(zorders)) + +class TestBuildCaseColorMap(unittest.TestCase): + def test_returns_all_cases(self): + cases = ["A", "B", "C"] + color_map = build_case_color_map(cases) + self.assertEqual(set(color_map.keys()), set(cases)) + + def test_colors_are_distinct(self): + # Give each case a unique color so trajectories are distinguishable + cases = ["A", "B", "C", "D"] + color_map = build_case_color_map(cases) + colors = list(color_map.values()) + self.assertEqual(len(colors), len(set(map(str, colors)))) + +# HDF5 parsing helpers +# Use fake Params.input text so no real HDF5 file is necessary +class TestExtractParamsFromLines(unittest.TestCase): + def _make_lines(self, mass=5.3139, omega=0.0146, adot=-2.3699e-05, d0=15.8556): + """ + Return a minimal Params.input line list with known values. + """ + return [ + f"$MassRatio = {mass};", + f"$Omega0 = {omega};", + f"$adot0 = {adot};", + f"$D0 = {d0};", + ] + + def test_extracts_all_four_values(self): + lines = self._make_lines() + mr, omega, adot, d0 = extract_params_from_lines(lines) + self.assertAlmostEqual(mr, 5.3139) + self.assertAlmostEqual(omega, 0.0146) + self.assertAlmostEqual(adot, -2.3699e-05) + self.assertAlmostEqual(d0, 15.8556) + + def test_returns_none_on_missing_param(self): + # If any required parameter is missing, return None + # for all values + lines = ["$Omega0 = 0.0146;", "$adot0 = -2.3699e-05;", "$D0 = 15.8556;"] + result = extract_params_from_lines(lines) + self.assertIsNone(result[0]) + + def test_returns_none_on_empty_input(self): + result = extract_params_from_lines([]) + self.assertTrue(all(v is None for v in result)) + +class TestFilterParamLines(unittest.TestCase): + def test_keeps_relevant_lines(self): + # Ensure that only the lines containing one of the four parameter keys survive + content = "$MassRatio = 5.3139;\n$Omega0 = 0.0146;\nIgnoredLine;\n$adot0 = -2.3699e-05;\n$D0 = 15.8556;" + lines = filter_param_lines(content) + self.assertEqual(len(lines), 4) + + def test_excludes_irrelevant_lines(self): + content = "NotAParam = 5;\nAlsoIrrelevant;" + lines = filter_param_lines(content) + self.assertEqual(lines, []) + +class TestGroupBySimAndEcc(unittest.TestCase): + def _make_paths(self): + return [ + "SimA/Ecc0/Lev3/Params.input", + "SimA/Ecc1/Lev3/Params.input", + "SimB/Ecc0/Lev3/Params.input", + "SimB/Ecc0/Lev3/OtherFile.dat", # ignore if wrong suffix is used + "ShortPath/Params.input", # ignore if path is not fully specified + ] + + def test_groups_correctly(self): + # Ensure that the numeric levels are extracted correctly + grouped = group_by_sim_and_ecc(self._make_paths(), "Params.input") + self.assertIn("SimA", grouped) + self.assertIn("SimB", grouped) + + def test_correct_ecc_levels(self): + grouped = group_by_sim_and_ecc(self._make_paths(), "Params.input") + ecc_levels_A = [ecc for ecc, _ in grouped["SimA"]] + self.assertEqual(sorted(ecc_levels_A), [0, 1]) + + def test_ignores_wrong_suffix(self): + grouped = group_by_sim_and_ecc(self._make_paths(), "Params.input") + all_paths = [p for entries in grouped.values() for _, p in entries] + self.assertFalse(any("OtherFile" in p for p in all_paths)) + + def test_ignores_bad_paths(self): + # Ensure that paths without an EccN component are skipped + grouped = group_by_sim_and_ecc(self._make_paths(), "Params.input") + self.assertNotIn("ShortadPath", grouped) + +# Trajectory Plot +class TestPlotSpecEccentricityControl(unittest.TestCase): + def setUp(self): + self.df = make_df() + + def tearDown(self): + # Close figures after each test + plt.close("all") + + def test_missing_sim_returns_ax_unchanged(self): + # Ensure that an unrecognized sim results in nothing, instead of crashing + _, ax = plt.subplots() + result = plot_spec_eccentricity_control(self.df, "NoSim", ax=ax) + self.assertIs(result, ax) + self.assertEqual(len(ax.lines), 0) + + def test_adot_mode_abs(self): + # Ensure that when in 'abs' mode, every y-value is non-negative + ax = plot_spec_eccentricity_control( + self.df, "Sim_A_PN", adot_mode="abs" + ) + ydata = ax.lines[0].get_ydata() + self.assertTrue(np.all(ydata >= 0)) + + def test_adot_mode_logabs(self): + ax = plot_spec_eccentricity_control( + self.df, "Sim_A_PN", adot_mode="logabs" + ) + ydata = ax.lines[0].get_ydata() + self.assertTrue(np.all(ydata < 0)) # log of small numbers is negative + + def test_correct_number_of_points(self): + # Ensure that there is one point per EccLevel iteration + n_iter = 4 + ax = plot_spec_eccentricity_control( + self.df, "Sim_A_PN", adot_mode="raw" + ) + xdata = ax.lines[0].get_xdata() + self.assertEqual(len(xdata), n_iter) + + def test_uses_supplied_axes(self): + _, ax_in = plt.subplots() + ax_out = plot_spec_eccentricity_control( + self.df, "Sim_A_PN", ax=ax_in + ) + self.assertIs(ax_out, ax_in) + +# Test Option 1: Eccentricity vs iteration plot +# Uses pytest's tmp_path fixture so the functions can write a file to a temporary directory +# that pytest creates and cleans up +class TestPlotEccentricityVsIteration: + + def setup_method(self): + self.df = make_df() + + def teardown_method(self): + plt.close("all") + + def test_y_axis_is_log(self, tmp_path): + # Ensure y-axis is logarithmic + fig = plot_eccentricity_vs_iteration( + self.df, case_col="Case", + output_path=str(tmp_path / "out.pdf"), + ) + assert fig.axes[0].get_yscale() == "log" + + def test_number_of_lines_matches_sims(self, tmp_path): + # Ensure that one line is drawn per simulation + n_sims = self.df["Sim"].nunique() + fig = plot_eccentricity_vs_iteration( + self.df, case_col="Case", + output_path=str(tmp_path / "out.pdf"), + ) + assert len(fig.axes[0].lines) == n_sims + + def test_custom_run_type_legend_labels(self, tmp_path): + # Ensure that passing a label-override dict does not raise or alter + # the figure type + labels = {"PN": "PN (2025)", "GPR": "GPR (2025)"} + fig = plot_eccentricity_vs_iteration( + self.df, case_col="Case", + output_path=str(tmp_path / "out.pdf"), + run_type_legend_labels=labels, + ) + assert isinstance(fig, plt.Figure) + + def test_missing_required_column_raises(self, tmp_path): + # Ensure that if dataframe is missing "Eccentricity" the plot does not get created + bad_df = self.df.drop(columns=["Eccentricity"]) + with pytest.raises((KeyError, ValueError)): + plot_eccentricity_vs_iteration( + bad_df, case_col="Case", + output_path=str(tmp_path / "out.pdf"), + ) + +# Test Option 2: Eccentricity Trajectory Through Parameter Space +class TestPlotTrajectories: + def setup_method(self): + self.df = make_df() + + def teardown_method(self): + plt.close("all") + + def test_one_panel_per_case(self, tmp_path): + # Ensure that figure contains at least one Axes per case + # (inset axes also get created) + cases = ["A", "B"] + fig = plot_trajectories( + self.df, case_col="Case", + cases_to_plot=cases, + drop_by_panel={}, + new_params=pd.DataFrame(), + output_path=str(tmp_path / "out.pdf"), + ) + main_axes = [ax for ax in fig.axes if not ax.get_label().startswith("inset")] + assert len(main_axes) >= len(cases) + + def test_single_case_produces_figure(self, tmp_path): + fig = plot_trajectories( + self.df, case_col="Case", + cases_to_plot=["A"], + drop_by_panel={}, + new_params=pd.DataFrame(), + output_path=str(tmp_path / "out.pdf"), + ) + assert isinstance(fig, plt.Figure) + + def test_missing_required_column_raises(self, tmp_path): + bad_df = self.df.drop(columns=["Omega0"]) + with pytest.raises((KeyError, ValueError)): + plot_trajectories( + bad_df, case_col="Case", + cases_to_plot=["A"], + drop_by_panel={}, + new_params=pd.DataFrame(), + output_path=str(tmp_path / "out.pdf"), + ) + + def test_tolerance_ellipses_with_new_params(self, tmp_path): + # Ensure that providing next step parameters draws the tolerance ellipses + new_params = pd.DataFrame([ + {"Case": "A", "RunType": "PN", + "Omega0_new": 0.0161, "Adot0_new": -1.4e-5}, + ]) + fig = plot_trajectories( + self.df, case_col="Case", + cases_to_plot=["A"], + drop_by_panel={}, + new_params=new_params, + output_path=str(tmp_path / "out.pdf"), + ) + assert isinstance(fig, plt.Figure) + +# Test Option 3: Iteration counts scatter plot +class TestPlotIterationCounts(unittest.TestCase): + def setUp(self): + self.df = make_df() + + def tearDown(self): + plt.close("all") + + def test_one_point_per_sim(self): + # Ensure that each simulation makes a single scatter point + # and that marker size encodes the iteration count + n_sims = self.df["Sim"].nunique() + fig = plot_iteration_counts(self.df) + ax = fig.axes[0] + scatter = ax.collections[0] + self.assertEqual(len(scatter.get_offsets()), n_sims) + + def test_missing_column_raises(self): + bad_df = self.df.drop(columns=["MassRatio"]) + with self.assertRaises(ValueError): + plot_iteration_counts(bad_df) + + def test_annotate_sim_mode(self): + # Ensure that "sim" annotation labels each point with the simulation name + # instead of the iteration count + fig = plot_iteration_counts(self.df, annotate="sim") + ax = fig.axes[0] + text_labels = {t.get_text() for t in ax.texts} + sim_names = set(self.df["Sim"].unique()) + self.assertTrue(sim_names.issubset(text_labels)) + +# Tolerance ellipse helpers +class TestAddToleranceEllipseFixed(unittest.TestCase): + + def setUp(self): + self.df = make_df(cases=("A",), run_types=("PN",)) + _, self.ax = plt.subplots() + + def tearDown(self): + plt.close("all") + + def test_returns_ellipse_for_valid_data(self): + ell = add_tolerance_ellipse_fixed( + self.ax, self.df, + case_id="A", run_type="PN", + dOmega=1e-5, dAdot=1e-7, + case_col="Case", + ) + self.assertIsInstance(ell, Ellipse) + + def test_returns_none_for_missing_case(self): + # Ensure that if no data matches the case, the function returns None + result = add_tolerance_ellipse_fixed( + self.ax, self.df, + case_id="Z", # does not exist in DataFrame + run_type="PN", + dOmega=1e-5, dAdot=1e-7, + case_col="Case", + ) + self.assertIsNone(result) + +# Inset anchor helper +class TestChooseInsetAnchor(unittest.TestCase): + + def tearDown(self): + plt.close("all") + + def test_returns_four_floats(self): + # Ensure that the function returns (x0, y0, width, height) in Axes + # fraction coordinates + _, ax = plt.subplots() + ax.plot([0, 1], [0, 1]) + result = choose_inset_anchor(ax, np.array([0, 1]), np.array([0, 1])) + self.assertEqual(len(result), 4) + self.assertTrue(all(isinstance(v, float) for v in result)) + + def test_anchor_within_axes_bounds(self): + # Ensure that the inset box fits inside the Axes + _, ax = plt.subplots() + ax.plot([0, 1], [0, 1]) + x0, y0, w, h = choose_inset_anchor( + ax, np.array([0.5]), np.array([0.5]), w=0.35, h=0.5 + ) + self.assertGreaterEqual(x0, 0) + self.assertGreaterEqual(y0, 0) + self.assertLessEqual(x0 + w, 1.1) # small tolerance for padding + self.assertLessEqual(y0 + h, 1.1) # small tolerance for padding + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/tests/test_gpr.py b/tests/test_gpr.py new file mode 100644 index 0000000..8aa1fd3 --- /dev/null +++ b/tests/test_gpr.py @@ -0,0 +1,205 @@ +# Distributed under the MIT License. +# See LICENSE.txt for details. + +""" +Unit test for gpr.py + Generate a simple, synthetic regression problem and run the pipeline + Assert that the shapes are reasonable, uncertainties are positive, and + that the mean predictions are within a reasonable range of the true values +""" + +import shutil +import unittest +from pathlib import Path + +import numpy as np +from gpr import ( + predict_with_gpr_model, + run_gpr_pipeline, + train_gpr_model, +) + + +class TestGPRLibrary(unittest.TestCase): + # Set up and prepare test directory and file paths + def setUp(self): + self.test_dir = Path("SimulationSupport") / "gpr_library_tests" + + # Clean up any existing test directory and create a new one + shutil.rmtree(self.test_dir, ignore_errors=True) + self.test_dir.mkdir(parents=True, exist_ok=True) + + self.X, self.Y = self.make_data() + + # Clean up and remove test directory after tests are done + def tearDown(self): + shutil.rmtree(self.test_dir) + + @staticmethod + def make_data(n=50, noise_lev=0.05): + """ + Generate synthetic regression dataset - + points sampled from a noisy sine curve: + y = sin(5 * x) + Gaussian noise. + """ + # Random number generator with a fixed seed 0; gets the same random data every time + rng = np.random.default_rng(0) + + # Inputs with shape (n, 1) + X = np.linspace(0, 2 * np.pi, n).reshape(-1, 1) + + # Used flattened X with shape (n,) to evaluate the function + y = np.sin(5 * X.reshape(-1)) + + # Add n Gaussian noise samples with mean 0 and variance 1 + noise = noise_lev * rng.standard_normal(n) + + Y = y + noise # Observed data is y = sin(5x) + Gaussian noise + + return X, Y + + def test_train_predict_and_uncertainty(self): + """ + Check that: + - train_gpr_model and predict_with_gpr_model run and + return predictions of the correct shape + - uncertainties are positive and non-trivial + """ + X, Y = self.X, self.Y # fake dataset + + # Train the GPR model + model, likelihood = train_gpr_model(X, Y) + + # Make predictions + Y_pred, Y_std = predict_with_gpr_model(X, model, likelihood) + + # Check shapes + self.assertEqual( + Y_pred.shape, + Y.shape, + f"Expected prediction shape {Y.shape}, got {Y_pred.shape}", + ) + + self.assertEqual( + Y_std.shape, + Y.shape, + f"Expected standard deviation shape {Y.shape}, got {Y_std.shape}", + ) + + # Check uncertainties are positive + self.assertTrue( + np.all(Y_std >= 0), + f"Some of the predicted standard deviations are negative", + ) + + # Check that uncertainties are nontrivial + self.assertTrue(np.any(Y_std > 1e-10), "All uncertainties are zero") + + def test_run_gpr_pipeline(self): + """ + Test the full GPR pipeline function. + Check that outputs are well correlated with true values. + """ + X, Y = self.X, self.Y + + # Run the full GPR pipeline + model, likelihood, Y_pred = run_gpr_pipeline( + X, Y, target_name="test", plot=False, silent=True + ) + + corr = np.corrcoef(Y, Y_pred)[0, 1] + + self.assertGreater( + corr, 0.9, f"Expected correlation > 0.9, got {corr:.3f}" + ) + + def test_normalization_stored_and_applied(self): + """ + Test that the normalization parameters are stored and applied correctly: + - input_mean and input_std have correct shapes + - normalized inputs, X, have mean of ~0 and std of ~1 per feature + """ + X, Y = self.X, self.Y + model, likelihood = train_gpr_model(X, Y) + + # Check shapes of stored statistics + exp_shape = (X.shape[1],) + + self.assertEqual( + model.input_mean.shape, + exp_shape, + f"Expected input_mean shape {exp_shape}, got" + f" {model.input_mean.shape}", + ) + + self.assertEqual( + model.input_std.shape, + exp_shape, + f"Expected input_std shape {exp_shape}, got" + f" {model.input_std.shape}", + ) + + # Recreate normalized X using the stored statistics + normalized_X = (X - model.input_mean) / model.input_std + + # Column-wise mean and std + col_means = normalized_X.mean(axis=0) + col_stds = normalized_X.std(axis=0) + + # Check that each mean is close to 0 with an allowed tolerance of 0.1 + self.assertTrue( + np.allclose(col_means, 0, atol=1e-1), + f"Expected means ~0, got {col_means}", + ) + + # Check that each std is close to 1 with an allowed tolerance of 0.1 + self.assertTrue( + np.allclose(col_stds, 1.0, atol=1e-1), + f"Expected stddevs ~1, got {col_stds}", + ) + + def test_output_denorm(self): + """ + Test that the stored output_mean and output_std are consistent with + denormalize_output, ie that it is correctly undoing the normalization. + """ + X, Y = self.X, self.Y + model, likelihood = train_gpr_model(X, Y) + + # Sanity check on stored output statistics + self.assertIsInstance( + model.output_mean, + (float, np.floating), + "Expected output mean to be a float, got" + f" {type(model.output_mean)}", + ) + + self.assertIsInstance( + model.output_std, + (float, np.floating), + "Expected output standard deviation to be a float, got" + f" {type(model.output_std)}", + ) + + self.assertGreater( + model.output_std, + 0.0, + "output standard deviation should be positive", + ) + + # Test normalization on a small subset of Y + Y_subset = Y[:5] + Y_normalized = (Y_subset - model.output_mean) / model.output_std + Y_denormalized = model.denormalize_output(Y_normalized) + + # Check that the original Y is recovered + self.assertTrue( + np.allclose(Y_denormalized, Y_subset, atol=1e-8), + f"Expected normalization {Y_subset}, got {Y_denormalized}." + f" Normalization failed.\nOriginal: {Y_subset}, Denormalized:" + f" {Y_denormalized}", + ) + + +if __name__ == "__main__": + unittest.main(verbosity=2)