From 129184edc2a3347e780f8bc7c69be2ec5babc4b6 Mon Sep 17 00:00:00 2001 From: Graham Goh Date: Wed, 20 May 2026 15:34:29 +1000 Subject: [PATCH 1/2] feat(cmd/pipeline): Run all changesets support --all Adding the ability to execute all changesets using the pipeline run command by introduing the --all flag. ``` # Run all changesets sequentially defined in the input file chainlink-deployments pipeline run \ --environment testnet \ --input-file inputs.yaml \ --all ``` JIRA: https://smartcontract-it.atlassian.net/browse/CLD-2430 --- .changeset/thick-coins-punch.md | 7 + engine/cld/commands/pipeline/run.go | 157 ++++++++++++--- engine/cld/commands/pipeline/run_test.go | 245 ++++++++++++++++++++++- engine/cld/pipeline/input/yaml.go | 2 +- engine/cld/pipeline/input/yaml_test.go | 2 +- 5 files changed, 377 insertions(+), 36 deletions(-) create mode 100644 .changeset/thick-coins-punch.md diff --git a/.changeset/thick-coins-punch.md b/.changeset/thick-coins-punch.md new file mode 100644 index 000000000..e803f3ff0 --- /dev/null +++ b/.changeset/thick-coins-punch.md @@ -0,0 +1,7 @@ +--- +"chainlink-deployments-framework": minor +--- + +feat(cmd/pipeline): Run all changesets support --all + +Adding the ability to execute all changesets using the pipeline run command by introducing the --all flag. diff --git a/engine/cld/commands/pipeline/run.go b/engine/cld/commands/pipeline/run.go index 6b7d09ca7..440772f2c 100644 --- a/engine/cld/commands/pipeline/run.go +++ b/engine/cld/commands/pipeline/run.go @@ -14,6 +14,7 @@ import ( fdeployment "github.com/smartcontractkit/chainlink-deployments-framework/deployment" "github.com/smartcontractkit/chainlink-deployments-framework/engine/cld/changeset" "github.com/smartcontractkit/chainlink-deployments-framework/engine/cld/commands/flags" + "github.com/smartcontractkit/chainlink-deployments-framework/engine/cld/domain" "github.com/smartcontractkit/chainlink-deployments-framework/engine/cld/environment" "github.com/smartcontractkit/chainlink-deployments-framework/engine/cld/pipeline/input" dprun "github.com/smartcontractkit/chainlink-deployments-framework/engine/cld/pipeline/run" @@ -22,10 +23,10 @@ import ( ) var ( - runShort = "Run a durable pipeline changeset" + runShort = "Run a pipeline changeset" runLong = ` - Run a durable pipeline changeset. + Run a pipeline changeset. This command applies a changeset against the specified environment, resolves any timelock proposals, and persists artifacts. @@ -33,23 +34,29 @@ var ( runExample = ` # Dry-run of changeset 0001_test_changeset in testnet - chainlink-deployments durable-pipeline run \ + chainlink-deployments pipeline run \ --environment testnet \ --changeset 0001_test_changeset \ --input-file inputs.yaml \ --dry-run # Run changeset by name with input file - chainlink-deployments durable-pipeline run \ + chainlink-deployments pipeline run \ --environment testnet \ --changeset 0001_test_changeset \ --input-file inputs.yaml # Run changeset by index position with array format input file. - chainlink-deployments durable-pipeline run \ + chainlink-deployments pipeline run \ --environment testnet \ --input-file inputs.yaml \ --changeset-index 0 + + # Run all changesets sequentially defined in the input file + chainlink-deployments pipeline run \ + --environment testnet \ + --input-file inputs.yaml \ + --all ` ) @@ -59,6 +66,7 @@ type runFlags struct { dryRun bool inputFile string changesetIndex int + all bool } func newRunCmd(cfg *Config) *cobra.Command { @@ -74,6 +82,7 @@ func newRunCmd(cfg *Config) *cobra.Command { dryRun: flags.MustBool(cmd.Flags().GetBool("dry-run")), inputFile: flags.MustString(cmd.Flags().GetString("input-file")), changesetIndex: flags.MustInt(cmd.Flags().GetInt("changeset-index")), + all: flags.MustBool(cmd.Flags().GetBool("all")), } return runRun(cmd, cfg, f) @@ -85,10 +94,13 @@ func newRunCmd(cfg *Config) *cobra.Command { cmd.Flags().StringP("changeset", "c", "", "changeset to apply by name") cmd.Flags().StringP("input-file", "i", "", "YAML input file name. Not the full path, just the name") cmd.Flags().IntP("changeset-index", "x", 0, "Index of changeset to run by position in array format input file") + cmd.Flags().BoolP("all", "a", false, "Run all changesets defined in the input file in order") _ = cmd.MarkFlagRequired("input-file") cmd.MarkFlagsMutuallyExclusive("changeset", "changeset-index") - cmd.MarkFlagsOneRequired("changeset", "changeset-index") + cmd.MarkFlagsMutuallyExclusive("changeset", "all") + cmd.MarkFlagsMutuallyExclusive("changeset-index", "all") + cmd.MarkFlagsOneRequired("changeset", "changeset-index", "all") return cmd } @@ -96,48 +108,140 @@ func newRunCmd(cfg *Config) *cobra.Command { func runRun(cmd *cobra.Command, cfg *Config, f runFlags) error { envdir := cfg.Domain.EnvDir(f.environment) artdir := envdir.ArtifactsDir() + deps := cfg.deps() - var actualChangesetName string + if f.all { + return runAllChangesets(cmd, cfg, f, artdir, deps) + } + + return runSingleChangeset(cmd, cfg, f, artdir, deps) +} + +// runSingleChangeset runs a single changeset specified by name or index. It sets the +// DURABLE_PIPELINE_INPUT env var for the changeset so that it can be accessed by the registry +// and resolvers, then loads the registry and applies the changeset. +func runSingleChangeset( + cmd *cobra.Command, + cfg *Config, + f runFlags, + artdir *domain.ArtifactsDir, + deps *Deps, +) error { + if err := artdir.SetDurablePipelines(strconv.FormatInt(time.Now().UnixNano(), 10)); err != nil { + return err + } + + var changesetName string if f.changeset != "" { - actualChangesetName = f.changeset + changesetName = f.changeset if err := input.PrepareInputForRunByName(f.inputFile, f.changeset, cfg.Domain, f.environment); err != nil { return fmt.Errorf("failed to parse input file: %w", err) } } else { var err error - actualChangesetName, err = input.PrepareInputForRunByIndex(f.inputFile, f.changesetIndex, cfg.Domain, f.environment) + changesetName, err = input.PrepareInputForRunByIndex(f.inputFile, f.changesetIndex, cfg.Domain, f.environment) if err != nil { return fmt.Errorf("failed to get changeset at index %d: %w", f.changesetIndex, err) } } - if err := artdir.SetDurablePipelines(strconv.FormatInt(time.Now().UnixNano(), 10)); err != nil { + registry, err := cfg.LoadChangesets(f.environment) + if err != nil { return err } - registry, err := cfg.LoadChangesets(f.environment) + indexStr := "" + if f.changeset == "" { + indexStr = fmt.Sprintf(" (at index %d)", f.changesetIndex) + } + cfg.Logger.Infof("Applying %s pipeline for changeset %s%s for environment: %s", + cfg.Domain, changesetName, indexStr, f.environment, + ) + + return applyChangeset(cmd, cfg, f.dryRun, f.environment, changesetName, registry, artdir, deps) +} + +// runAllChangesets runs all changesets defined in the input file sequentially. It sets the +// DURABLE_PIPELINE_INPUT env var for each changeset before applying, +// then loads the registry and applies each changeset in order. +func runAllChangesets( + cmd *cobra.Command, + cfg *Config, + f runFlags, + artdir *domain.ArtifactsDir, + deps *Deps, +) error { + dpYAML, err := input.ParseDurablePipelineYAML(f.inputFile, cfg.Domain, f.environment) if err != nil { - return err + return fmt.Errorf("failed to parse input file: %w", err) } - envOptions, err := dprun.ConfigureEnvironmentOptions(registry, actualChangesetName, f.dryRun, cfg.Logger) + changesets, err := input.GetAllChangesetsInOrder(dpYAML.Changesets) + if err != nil { + return fmt.Errorf("failed to read changesets from input file: %w", err) + } + if len(changesets) == 0 { + return errors.New("no changesets found in input file") + } + + cfg.Logger.Infof("Applying %s pipeline for all %d changesets for environment: %s", + cfg.Domain, len(changesets), f.environment, + ) + + for i, cs := range changesets { + cfg.Logger.Infof("[%d/%d] Applying changeset %s", i+1, len(changesets), cs.Name) + + if err := artdir.SetDurablePipelines(strconv.FormatInt(time.Now().UnixNano(), 10)); err != nil { + return err + } + + if err := input.SetChangesetEnvironmentVariable(cs.Name, cs.Data); err != nil { + return fmt.Errorf("changeset %s: failed to set input: %w", cs.Name, err) + } + + registry, err := cfg.LoadChangesets(f.environment) + if err != nil { + return fmt.Errorf("[%d/%d] changeset %s: failed to load changesets: %w", i+1, len(changesets), cs.Name, err) + } + + if err := applyChangeset(cmd, cfg, f.dryRun, f.environment, cs.Name, registry, artdir, deps); err != nil { + return fmt.Errorf("[%d/%d] changeset %s: %w", i+1, len(changesets), cs.Name, err) + } + } + + cfg.Logger.Infof("Successfully applied all %d changesets for environment: %s", len(changesets), f.environment) + + return nil +} + +func applyChangeset( + cmd *cobra.Command, + cfg *Config, + dryRun bool, + envName string, + changesetName string, + registry *changeset.ChangesetsRegistry, + artdir *domain.ArtifactsDir, + deps *Deps, +) error { + envOptions, err := dprun.ConfigureEnvironmentOptions(registry, changesetName, dryRun, cfg.Logger) if err != nil { return err } - regCfg, err := registry.GetConfigurations(actualChangesetName) + regCfg, err := registry.GetConfigurations(changesetName) if err != nil { - return fmt.Errorf("failed to get configurations for %s: %w", actualChangesetName, err) + return fmt.Errorf("failed to get configurations for %s: %w", changesetName, err) } if regCfg.ConfigResolver != nil { if cfg.ConfigResolverManager.NameOf(regCfg.ConfigResolver) == "" { - return fmt.Errorf("resolver for %s is not registered", actualChangesetName) + return fmt.Errorf("resolver for %s is not registered", changesetName) } } - reports, err := artdir.LoadOperationsReports(actualChangesetName) + reports, err := artdir.LoadOperationsReports(changesetName) if err != nil { return fmt.Errorf("failed to load operations report: %w", err) } @@ -147,23 +251,14 @@ func runRun(cmd *cobra.Command, cfg *Config, f runFlags) error { reporter := operations.NewMemoryReporter(operations.WithReports(reports)) envOptions = append(envOptions, environment.WithReporter(reporter)) - deps := cfg.deps() - env, err := deps.EnvironmentLoader(cmd.Context(), cfg.Domain, f.environment, envOptions...) + env, err := deps.EnvironmentLoader(cmd.Context(), cfg.Domain, envName, envOptions...) if err != nil { return err } - indexStr := "" - if f.changeset == "" { - indexStr = fmt.Sprintf(" (at index %d)", f.changesetIndex) - } - cfg.Logger.Infof("Applying %s durable pipeline for changeset %s%s for environment: %s\n", - cfg.Domain, actualChangesetName, indexStr, f.environment, - ) - - out, err := registry.Apply(actualChangesetName, env) + out, err := registry.Apply(changesetName, env) var saveErr error - if saveErr = dprun.SaveReports(reporter, originalReportsLen, cfg.Logger, artdir, actualChangesetName); saveErr != nil { + if saveErr = dprun.SaveReports(reporter, originalReportsLen, cfg.Logger, artdir, changesetName); saveErr != nil { cfg.Logger.Errorf("failed to save reports: %v", saveErr) } if err != nil { @@ -173,7 +268,7 @@ func runRun(cmd *cobra.Command, cfg *Config, f runFlags) error { return saveErr } - err = saveChangesetProposalMetadata(registry, actualChangesetName, out) + err = saveChangesetProposalMetadata(registry, changesetName, out) if err != nil { return fmt.Errorf("failed to save changeset proposal metadata: %w", err) } @@ -195,7 +290,7 @@ func runRun(cmd *cobra.Command, cfg *Config, f runFlags) error { } } - if err := artdir.SaveChangesetOutput(actualChangesetName, out); err != nil { + if err := artdir.SaveChangesetOutput(changesetName, out); err != nil { cfg.Logger.Errorf("failed to save changeset artifacts: %v", err) return err } diff --git a/engine/cld/commands/pipeline/run_test.go b/engine/cld/commands/pipeline/run_test.go index ad9d7a9b5..770eea38a 100644 --- a/engine/cld/commands/pipeline/run_test.go +++ b/engine/cld/commands/pipeline/run_test.go @@ -27,12 +27,15 @@ import ( // stubChangeset implements ChangeSetV2 for testing. type stubChangeset struct { - ApplyCalled bool - StubError error + ApplyCalled bool + CapturedInput any + StubError error } -func (s *stubChangeset) Apply(_ fdeployment.Environment, _ any) (fdeployment.ChangesetOutput, error) { +func (s *stubChangeset) Apply(_ fdeployment.Environment, input any) (fdeployment.ChangesetOutput, error) { s.ApplyCalled = true + s.CapturedInput = input + return fdeployment.ChangesetOutput{}, s.StubError } @@ -681,6 +684,242 @@ changesets: }}, proposal.Operations) } +// ----- --all flag tests ----- + +func preserveDurablePipelineInputEnv(t *testing.T) { + t.Helper() + t.Setenv("DURABLE_PIPELINE_INPUT", os.Getenv("DURABLE_PIPELINE_INPUT")) +} + +//nolint:paralleltest +func TestRunCmd_AllFlag_Success(t *testing.T) { + preserveDurablePipelineInputEnv(t) + env := "testnet" + testDomain := domain.NewDomain(t.TempDir(), "test") + + workspaceRoot := t.TempDir() + inputsDir := filepath.Join(workspaceRoot, "domains", testDomain.String(), env, "durable_pipelines", "inputs") + require.NoError(t, os.MkdirAll(inputsDir, 0o755)) + + yamlContent := `environment: testnet +domain: test +changesets: + - 0001_cs_first: + payload: {a: 1} + - 0002_cs_second: + payload: {b: 2} + - 0003_cs_third: + payload: {c: 3}` + yamlFileName := "input.yaml" + require.NoError(t, os.WriteFile(filepath.Join(inputsDir, yamlFileName), []byte(yamlContent), 0o600)) + + originalWd, _ := os.Getwd() + require.NoError(t, os.Chdir(workspaceRoot)) + t.Cleanup(func() { require.NoError(t, os.Chdir(originalWd)) }) + + stub1 := &stubChangeset{} + stub2 := &stubChangeset{} + stub3 := &stubChangeset{} + + loadChangesets := func(string) (*changeset.ChangesetsRegistry, error) { + reg := changeset.NewChangesetsRegistry() + reg.Add("0001_cs_first", changeset.Configure(stub1).WithEnvInput()) + reg.Add("0002_cs_second", changeset.Configure(stub2).WithEnvInput()) + reg.Add("0003_cs_third", changeset.Configure(stub3).WithEnvInput()) + + return reg, nil + } + + cfg := &Config{ + Logger: logger.Test(t), + Domain: testDomain, + LoadChangesets: loadChangesets, + ConfigResolverManager: fresolvers.NewConfigResolverManager(), + Deps: Deps{ + EnvironmentLoader: func(context.Context, domain.Domain, string, ...environment.LoadEnvironmentOption) (fdeployment.Environment, error) { + return fdeployment.Environment{}, nil + }, + }, + } + + cmd, err := NewCommand(cfg) + require.NoError(t, err) + + cmd.SetArgs([]string{ + "run", + "--environment", env, + "--input-file", yamlFileName, + "--all", + }) + + require.NoError(t, cmd.Execute()) + require.True(t, stub1.ApplyCalled) + require.True(t, stub2.ApplyCalled) + require.True(t, stub3.ApplyCalled) + require.Equal(t, map[string]any{"a": json.Number("1")}, stub1.CapturedInput) + require.Equal(t, map[string]any{"b": json.Number("2")}, stub2.CapturedInput) + require.Equal(t, map[string]any{"c": json.Number("3")}, stub3.CapturedInput) +} + +//nolint:paralleltest +func TestRunCmd_AllFlag_FailFastOnError(t *testing.T) { + preserveDurablePipelineInputEnv(t) + + env := "testnet" + testDomain := domain.NewDomain(t.TempDir(), "test") + + workspaceRoot := t.TempDir() + inputsDir := filepath.Join(workspaceRoot, "domains", testDomain.String(), env, "durable_pipelines", "inputs") + require.NoError(t, os.MkdirAll(inputsDir, 0o755)) + + yamlContent := `environment: testnet +domain: test +changesets: + - 0001_cs_first: + payload: {a: 1} + - 0002_cs_second: + payload: {b: 2} + - 0003_cs_third: + payload: {c: 3}` + yamlFileName := "input.yaml" + require.NoError(t, os.WriteFile(filepath.Join(inputsDir, yamlFileName), []byte(yamlContent), 0o600)) + + originalWd, _ := os.Getwd() + require.NoError(t, os.Chdir(workspaceRoot)) + t.Cleanup(func() { require.NoError(t, os.Chdir(originalWd)) }) + + stub1 := &stubChangeset{} + stub2 := &stubChangeset{StubError: errors.New("second changeset failed")} + stub3 := &stubChangeset{} + + loadChangesets := func(string) (*changeset.ChangesetsRegistry, error) { + reg := changeset.NewChangesetsRegistry() + reg.Add("0001_cs_first", changeset.Configure(stub1).WithEnvInput()) + reg.Add("0002_cs_second", changeset.Configure(stub2).WithEnvInput()) + reg.Add("0003_cs_third", changeset.Configure(stub3).WithEnvInput()) + + return reg, nil + } + + cfg := &Config{ + Logger: logger.Test(t), + Domain: testDomain, + LoadChangesets: loadChangesets, + ConfigResolverManager: fresolvers.NewConfigResolverManager(), + Deps: Deps{ + EnvironmentLoader: func(context.Context, domain.Domain, string, ...environment.LoadEnvironmentOption) (fdeployment.Environment, error) { + return fdeployment.Environment{}, nil + }, + }, + } + + cmd, err := NewCommand(cfg) + require.NoError(t, err) + + cmd.SetArgs([]string{ + "run", + "--environment", env, + "--input-file", yamlFileName, + "--all", + }) + + err = cmd.Execute() + require.ErrorContains(t, err, "[2/3] changeset 0002_cs_second: second changeset failed") + require.True(t, stub1.ApplyCalled) + require.True(t, stub2.ApplyCalled) + require.False(t, stub3.ApplyCalled, "third changeset should not run after second fails") +} + +//nolint:paralleltest +func TestRunCmd_AllFlag_MutuallyExclusiveWithChangeset(t *testing.T) { + preserveDurablePipelineInputEnv(t) + + cfg := &Config{ + Logger: logger.Test(t), + Domain: domain.NewDomain(t.TempDir(), "test"), + LoadChangesets: func(string) (*changeset.ChangesetsRegistry, error) { return changeset.NewChangesetsRegistry(), nil }, + ConfigResolverManager: fresolvers.NewConfigResolverManager(), + } + + cmd, err := NewCommand(cfg) + require.NoError(t, err) + + cmd.SetArgs([]string{ + "run", + "--environment", "testnet", + "--input-file", "input.yaml", + "--changeset", "0001_cs1", + "--all", + }) + + err = cmd.Execute() + require.ErrorContains(t, err, "if any flags in the group [changeset all] are set none of the others can be; [all changeset] were all set") +} + +//nolint:paralleltest +func TestRunCmd_AllFlag_MutuallyExclusiveWithIndex(t *testing.T) { + preserveDurablePipelineInputEnv(t) + + cfg := &Config{ + Logger: logger.Test(t), + Domain: domain.NewDomain(t.TempDir(), "test"), + LoadChangesets: func(string) (*changeset.ChangesetsRegistry, error) { return changeset.NewChangesetsRegistry(), nil }, + ConfigResolverManager: fresolvers.NewConfigResolverManager(), + } + + cmd, err := NewCommand(cfg) + require.NoError(t, err) + + cmd.SetArgs([]string{ + "run", + "--environment", "testnet", + "--input-file", "input.yaml", + "--changeset-index", "0", + "--all", + }) + + err = cmd.Execute() + require.ErrorContains(t, err, "if any flags in the group [changeset-index all] are set none of the others can be; [all changeset-index] were all set") +} + +//nolint:paralleltest +func TestRunCmd_AllFlag_InvalidInputFile(t *testing.T) { + preserveDurablePipelineInputEnv(t) + + testDomain := domain.NewDomain(t.TempDir(), "test") + workspaceRoot := t.TempDir() + require.NoError(t, os.MkdirAll(filepath.Join(workspaceRoot, "domains"), 0o755)) + + originalWd, _ := os.Getwd() + require.NoError(t, os.Chdir(workspaceRoot)) + t.Cleanup(func() { require.NoError(t, os.Chdir(originalWd)) }) + + cfg := &Config{ + Logger: logger.Test(t), + Domain: testDomain, + LoadChangesets: func(string) (*changeset.ChangesetsRegistry, error) { return changeset.NewChangesetsRegistry(), nil }, + ConfigResolverManager: fresolvers.NewConfigResolverManager(), + Deps: Deps{ + EnvironmentLoader: func(context.Context, domain.Domain, string, ...environment.LoadEnvironmentOption) (fdeployment.Environment, error) { + return fdeployment.Environment{}, nil + }, + }, + } + + cmd, err := NewCommand(cfg) + require.NoError(t, err) + + cmd.SetArgs([]string{ + "run", + "--environment", "testnet", + "--input-file", "nonexistent.yaml", + "--all", + }) + + err = cmd.Execute() + require.ErrorContains(t, err, "failed to parse input file") +} + // ----- shared test data ----- var testProposal = lo.Must(mcms.NewTimelockProposalBuilder(). diff --git a/engine/cld/pipeline/input/yaml.go b/engine/cld/pipeline/input/yaml.go index d80172146..ef9c1d1bc 100644 --- a/engine/cld/pipeline/input/yaml.go +++ b/engine/cld/pipeline/input/yaml.go @@ -108,7 +108,7 @@ func GetAllChangesetsInOrder(changesets any) ([]ChangesetItem, error) { data, ok := changesets.([]any) if !ok { - return nil, errors.New("invalid 'changesets' format for index access, expected array format") + return nil, errors.New("invalid 'changesets' format, expected array format") } for i, item := range data { diff --git a/engine/cld/pipeline/input/yaml_test.go b/engine/cld/pipeline/input/yaml_test.go index b0a35d437..57e862487 100644 --- a/engine/cld/pipeline/input/yaml_test.go +++ b/engine/cld/pipeline/input/yaml_test.go @@ -93,7 +93,7 @@ func TestGetAllChangesetsInOrder(t *testing.T) { { name: "invalid format", changesets: map[string]any{"x": 1}, - wantErr: "invalid 'changesets' format for index access, expected array format", + wantErr: "invalid 'changesets' format, expected array format", }, { name: "invalid array item type", From 2ab4d60e7b996cf18d0b6cec2c7b2f2e5f9163da Mon Sep 17 00:00:00 2001 From: Graham Goh Date: Wed, 20 May 2026 22:29:13 +1000 Subject: [PATCH 2/2] fix: perform datastore/ab merge after execution --- engine/cld/commands/pipeline/deps.go | 24 +++++ engine/cld/commands/pipeline/run.go | 36 ++++++- engine/cld/commands/pipeline/run_test.go | 131 +++++++++++++++++++++++ 3 files changed, 189 insertions(+), 2 deletions(-) diff --git a/engine/cld/commands/pipeline/deps.go b/engine/cld/commands/pipeline/deps.go index f298d2923..1ff40cd64 100644 --- a/engine/cld/commands/pipeline/deps.go +++ b/engine/cld/commands/pipeline/deps.go @@ -16,10 +16,24 @@ type EnvironmentLoaderFunc func( opts ...environment.LoadEnvironmentOption, ) (fdeployment.Environment, error) +// AddressBookMergerFunc merges a changeset's address book to the main address book. +type AddressBookMergerFunc func(envDir domain.EnvDir, name, timestamp string) error + +// DataStoreMergerFunc merges a changeset's datastore to the main datastore. +type DataStoreMergerFunc func(envDir domain.EnvDir, name, timestamp string) error + // Deps holds optional dependencies that can be overridden for testing. type Deps struct { // EnvironmentLoader loads a deployment environment. Default: environment.Load EnvironmentLoader EnvironmentLoaderFunc + + // AddressBookMerger merges a changeset's address book to the main address book. + // Default: envDir.MergeChangesetAddressBook + AddressBookMerger AddressBookMergerFunc + + // DataStoreMerger merges a changeset's datastore to the main datastore. + // Default: envDir.MergeChangesetDataStore + DataStoreMerger DataStoreMergerFunc } // DefaultEnvironmentLoader is used when Deps.EnvironmentLoader is nil. @@ -31,4 +45,14 @@ func (d *Deps) applyDefaults() { if d.EnvironmentLoader == nil { d.EnvironmentLoader = DefaultEnvironmentLoader } + if d.AddressBookMerger == nil { + d.AddressBookMerger = func(envDir domain.EnvDir, name, timestamp string) error { + return envDir.MergeChangesetAddressBook(name, timestamp) + } + } + if d.DataStoreMerger == nil { + d.DataStoreMerger = func(envDir domain.EnvDir, name, timestamp string) error { + return envDir.MergeChangesetDataStore(name, timestamp) + } + } } diff --git a/engine/cld/commands/pipeline/run.go b/engine/cld/commands/pipeline/run.go index 440772f2c..05c24518d 100644 --- a/engine/cld/commands/pipeline/run.go +++ b/engine/cld/commands/pipeline/run.go @@ -164,7 +164,9 @@ func runSingleChangeset( // runAllChangesets runs all changesets defined in the input file sequentially. It sets the // DURABLE_PIPELINE_INPUT env var for each changeset before applying, -// then loads the registry and applies each changeset in order. +// then loads the registry and applies each changeset in order. After each changeset, +// the address book and datastore artifacts are merged into the main state so that +// the next changeset loads a fully up-to-date environment. func runAllChangesets( cmd *cobra.Command, cfg *Config, @@ -192,7 +194,8 @@ func runAllChangesets( for i, cs := range changesets { cfg.Logger.Infof("[%d/%d] Applying changeset %s", i+1, len(changesets), cs.Name) - if err := artdir.SetDurablePipelines(strconv.FormatInt(time.Now().UnixNano(), 10)); err != nil { + timestamp := strconv.FormatInt(time.Now().UnixNano(), 10) + if err := artdir.SetDurablePipelines(timestamp); err != nil { return err } @@ -208,6 +211,10 @@ func runAllChangesets( if err := applyChangeset(cmd, cfg, f.dryRun, f.environment, cs.Name, registry, artdir, deps); err != nil { return fmt.Errorf("[%d/%d] changeset %s: %w", i+1, len(changesets), cs.Name, err) } + + if err := mergeArtifacts(cfg, deps, f.environment, cs.Name, timestamp); err != nil { + return fmt.Errorf("[%d/%d] changeset %s: merge failed: %w", i+1, len(changesets), cs.Name, err) + } } cfg.Logger.Infof("Successfully applied all %d changesets for environment: %s", len(changesets), f.environment) @@ -215,6 +222,31 @@ func runAllChangesets( return nil } +// mergeArtifacts merges the address book and datastore artifacts produced by a changeset +// into the main state. Called between changesets in the --all flow so that each +// subsequent changeset loads a fully up-to-date environment. +func mergeArtifacts( + cfg *Config, + deps *Deps, + envName string, + changesetName string, + timestamp string, +) error { + envDir := cfg.Domain.EnvDir(envName) + + if err := deps.AddressBookMerger(envDir, changesetName, timestamp); err != nil { + return fmt.Errorf("address book merge: %w", err) + } + cfg.Logger.Infof("Merged address book for changeset %s", changesetName) + + if err := deps.DataStoreMerger(envDir, changesetName, timestamp); err != nil { + return fmt.Errorf("datastore merge: %w", err) + } + cfg.Logger.Infof("Merged datastore for changeset %s", changesetName) + + return nil +} + func applyChangeset( cmd *cobra.Command, cfg *Config, diff --git a/engine/cld/commands/pipeline/run_test.go b/engine/cld/commands/pipeline/run_test.go index 770eea38a..2bccb51a7 100644 --- a/engine/cld/commands/pipeline/run_test.go +++ b/engine/cld/commands/pipeline/run_test.go @@ -739,6 +739,8 @@ changesets: EnvironmentLoader: func(context.Context, domain.Domain, string, ...environment.LoadEnvironmentOption) (fdeployment.Environment, error) { return fdeployment.Environment{}, nil }, + AddressBookMerger: func(domain.EnvDir, string, string) error { return nil }, + DataStoreMerger: func(domain.EnvDir, string, string) error { return nil }, }, } @@ -810,6 +812,8 @@ changesets: EnvironmentLoader: func(context.Context, domain.Domain, string, ...environment.LoadEnvironmentOption) (fdeployment.Environment, error) { return fdeployment.Environment{}, nil }, + AddressBookMerger: func(domain.EnvDir, string, string) error { return nil }, + DataStoreMerger: func(domain.EnvDir, string, string) error { return nil }, }, } @@ -830,6 +834,133 @@ changesets: require.False(t, stub3.ApplyCalled, "third changeset should not run after second fails") } +//nolint:paralleltest +func TestRunCmd_AllFlag_MergeCalledAfterEachChangeset(t *testing.T) { + preserveDurablePipelineInputEnv(t) + env := "testnet" + testDomain := domain.NewDomain(t.TempDir(), "test") + + workspaceRoot := t.TempDir() + inputsDir := filepath.Join(workspaceRoot, "domains", testDomain.String(), env, "durable_pipelines", "inputs") + require.NoError(t, os.MkdirAll(inputsDir, 0o755)) + + yamlContent := `environment: testnet +domain: test +changesets: + - 0001_cs_first: + payload: {a: 1} + - 0002_cs_second: + payload: {b: 2} + - 0003_cs_third: + payload: {c: 3}` + yamlFileName := "input.yaml" + require.NoError(t, os.WriteFile(filepath.Join(inputsDir, yamlFileName), []byte(yamlContent), 0o600)) + + originalWd, _ := os.Getwd() + require.NoError(t, os.Chdir(workspaceRoot)) + t.Cleanup(func() { require.NoError(t, os.Chdir(originalWd)) }) + + loadChangesets := func(string) (*changeset.ChangesetsRegistry, error) { + reg := changeset.NewChangesetsRegistry() + reg.Add("0001_cs_first", changeset.Configure(&stubChangeset{}).WithEnvInput()) + reg.Add("0002_cs_second", changeset.Configure(&stubChangeset{}).WithEnvInput()) + reg.Add("0003_cs_third", changeset.Configure(&stubChangeset{}).WithEnvInput()) + return reg, nil + } + + var abMergedNames []string + var dsMergedNames []string + + cfg := &Config{ + Logger: logger.Test(t), + Domain: testDomain, + LoadChangesets: loadChangesets, + ConfigResolverManager: fresolvers.NewConfigResolverManager(), + Deps: Deps{ + EnvironmentLoader: func(context.Context, domain.Domain, string, ...environment.LoadEnvironmentOption) (fdeployment.Environment, error) { + return fdeployment.Environment{}, nil + }, + AddressBookMerger: func(_ domain.EnvDir, name, _ string) error { + abMergedNames = append(abMergedNames, name) + return nil + }, + DataStoreMerger: func(_ domain.EnvDir, name, _ string) error { + dsMergedNames = append(dsMergedNames, name) + return nil + }, + }, + } + + cmd, err := NewCommand(cfg) + require.NoError(t, err) + + cmd.SetArgs([]string{"run", "--environment", env, "--input-file", yamlFileName, "--all"}) + require.NoError(t, cmd.Execute()) + + want := []string{"0001_cs_first", "0002_cs_second", "0003_cs_third"} + require.Equal(t, want, abMergedNames, "address book merge should be called for each changeset in order") + require.Equal(t, want, dsMergedNames, "datastore merge should be called for each changeset in order") +} + +//nolint:paralleltest +func TestRunCmd_AllFlag_MergeFailureStopsRun(t *testing.T) { + preserveDurablePipelineInputEnv(t) + env := "testnet" + testDomain := domain.NewDomain(t.TempDir(), "test") + + workspaceRoot := t.TempDir() + inputsDir := filepath.Join(workspaceRoot, "domains", testDomain.String(), env, "durable_pipelines", "inputs") + require.NoError(t, os.MkdirAll(inputsDir, 0o755)) + + yamlContent := `environment: testnet +domain: test +changesets: + - 0001_cs_first: + payload: {a: 1} + - 0002_cs_second: + payload: {b: 2}` + yamlFileName := "input.yaml" + require.NoError(t, os.WriteFile(filepath.Join(inputsDir, yamlFileName), []byte(yamlContent), 0o600)) + + originalWd, _ := os.Getwd() + require.NoError(t, os.Chdir(workspaceRoot)) + t.Cleanup(func() { require.NoError(t, os.Chdir(originalWd)) }) + + stub2 := &stubChangeset{} + + loadChangesets := func(string) (*changeset.ChangesetsRegistry, error) { + reg := changeset.NewChangesetsRegistry() + reg.Add("0001_cs_first", changeset.Configure(&stubChangeset{}).WithEnvInput()) + reg.Add("0002_cs_second", changeset.Configure(stub2).WithEnvInput()) + return reg, nil + } + + cfg := &Config{ + Logger: logger.Test(t), + Domain: testDomain, + LoadChangesets: loadChangesets, + ConfigResolverManager: fresolvers.NewConfigResolverManager(), + Deps: Deps{ + EnvironmentLoader: func(context.Context, domain.Domain, string, ...environment.LoadEnvironmentOption) (fdeployment.Environment, error) { + return fdeployment.Environment{}, nil + }, + AddressBookMerger: func(domain.EnvDir, string, string) error { + return errors.New("address book merge failed") + }, + DataStoreMerger: func(domain.EnvDir, string, string) error { return nil }, + }, + } + + cmd, err := NewCommand(cfg) + require.NoError(t, err) + + cmd.SetArgs([]string{"run", "--environment", env, "--input-file", yamlFileName, "--all"}) + + err = cmd.Execute() + require.ErrorContains(t, err, "[1/2] changeset 0001_cs_first: merge failed: address book merge: address book merge failed") + require.False(t, stub2.ApplyCalled, "second changeset should not run after merge failure") +} + //nolint:paralleltest func TestRunCmd_AllFlag_MutuallyExclusiveWithChangeset(t *testing.T) { preserveDurablePipelineInputEnv(t)