Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions lib/ec2macosinit/instancehistory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"os"
"path/filepath"
"sync"
"time"
)

Expand Down Expand Up @@ -41,6 +42,58 @@ func (h HistoryError) Error() string {
return h.err.Error()
}

// HistoryRecorder writes module results to the history file incrementally as
// each module completes. It is safe for concurrent use from multiple goroutines.
type HistoryRecorder struct {
mu sync.Mutex
instanceID string
historyPath string
historyFilename string
startTime time.Time
results []ModuleHistory
}

// NewHistoryRecorder creates a HistoryRecorder that will persist results to the
// given instance history path.
func NewHistoryRecorder(instanceID, historyPath, historyFilename string) *HistoryRecorder {
return &HistoryRecorder{
instanceID: instanceID,
historyPath: historyPath,
historyFilename: historyFilename,
startTime: time.Now(),
}
}

// Record appends a module result and flushes the full history to disk.
func (r *HistoryRecorder) Record(m *Module) error {
r.mu.Lock()
defer r.mu.Unlock()

r.results = append(r.results, ModuleHistory{
Key: m.generateHistoryKey(),
Success: m.Success,
})

return r.flush()
}

func (r *HistoryRecorder) flush() error {
history := History{
InstanceID: r.instanceID,
RunTime: r.startTime,
ModuleHistories: r.results,
Version: historyVersion,
}

historyBytes, err := json.Marshal(history)
if err != nil {
return fmt.Errorf("ec2macosinit: unable to marshal history: %w", err)
}

path := filepath.Join(r.historyPath, r.instanceID, r.historyFilename)
return safeWrite(path, historyBytes)
}

// GetInstanceHistory takes a path to instance history directory and a file name for history files and searches for
// any files that match. Then, for each file, it calls readHistoryFile() to read the file and add it to the
// InstanceHistory struct.
Expand Down
170 changes: 170 additions & 0 deletions lib/ec2macosinit/instancehistory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package ec2macosinit

import (
"encoding/json"
"os"
"path/filepath"
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestHistoryRecorder_Record(t *testing.T) {
dir := t.TempDir()
instanceID := "i-abc123"
require.NoError(t, os.MkdirAll(filepath.Join(dir, instanceID), 0755))

recorder := NewHistoryRecorder(instanceID, dir, "history.json")

m := &Module{
Name: "testmod",
Type: "command",
PriorityGroup: 1,
RunOnce: true,
Success: true,
}

err := recorder.Record(m)
require.NoError(t, err)

// Verify the history file was written
data, err := os.ReadFile(filepath.Join(dir, instanceID, "history.json"))
require.NoError(t, err)

var h History
require.NoError(t, json.Unmarshal(data, &h))

assert.Equal(t, instanceID, h.InstanceID)
assert.Equal(t, historyVersion, h.Version)
require.Len(t, h.ModuleHistories, 1)
assert.Equal(t, m.generateHistoryKey(), h.ModuleHistories[0].Key)
assert.True(t, h.ModuleHistories[0].Success)
}

func TestHistoryRecorder_RecordMultiple(t *testing.T) {
dir := t.TempDir()
instanceID := "i-multi456"
require.NoError(t, os.MkdirAll(filepath.Join(dir, instanceID), 0755))

recorder := NewHistoryRecorder(instanceID, dir, "history.json")

modules := []*Module{
{Name: "mod1", Type: "command", PriorityGroup: 1, RunOnce: true, Success: true},
{Name: "mod2", Type: "sshkeys", PriorityGroup: 1, RunPerBoot: true, Success: false},
{Name: "mod3", Type: "motd", PriorityGroup: 2, RunPerInstance: true, Success: true},
}

for _, m := range modules {
require.NoError(t, recorder.Record(m))
}

data, err := os.ReadFile(filepath.Join(dir, instanceID, "history.json"))
require.NoError(t, err)

var h History
require.NoError(t, json.Unmarshal(data, &h))

assert.Equal(t, instanceID, h.InstanceID)
require.Len(t, h.ModuleHistories, 3)

for i, m := range modules {
assert.Equal(t, m.generateHistoryKey(), h.ModuleHistories[i].Key)
assert.Equal(t, m.Success, h.ModuleHistories[i].Success)
}
}

func TestHistoryRecorder_RecordConcurrent(t *testing.T) {
dir := t.TempDir()
instanceID := "i-concurrent789"
require.NoError(t, os.MkdirAll(filepath.Join(dir, instanceID), 0755))

recorder := NewHistoryRecorder(instanceID, dir, "history.json")

modules := make([]*Module, 20)
for i := range modules {
modules[i] = &Module{
Name: "mod" + string(rune('A'+i)),
Type: "command",
PriorityGroup: 1,
RunPerBoot: true,
Success: true,
}
}

var wg sync.WaitGroup
for _, m := range modules {
wg.Add(1)
go func(m *Module) {
defer wg.Done()
assert.NoError(t, recorder.Record(m))
}(m)
}
wg.Wait()

data, err := os.ReadFile(filepath.Join(dir, instanceID, "history.json"))
require.NoError(t, err)

var h History
require.NoError(t, json.Unmarshal(data, &h))

assert.Equal(t, instanceID, h.InstanceID)
assert.Len(t, h.ModuleHistories, 20)
}

func TestHistoryRecorder_RecordFailedModule(t *testing.T) {
dir := t.TempDir()
instanceID := "i-fail000"
require.NoError(t, os.MkdirAll(filepath.Join(dir, instanceID), 0755))

recorder := NewHistoryRecorder(instanceID, dir, "history.json")

m := &Module{
Name: "failing",
Type: "userdata",
PriorityGroup: 2,
RunPerInstance: true,
Success: false,
}

require.NoError(t, recorder.Record(m))

data, err := os.ReadFile(filepath.Join(dir, instanceID, "history.json"))
require.NoError(t, err)

var h History
require.NoError(t, json.Unmarshal(data, &h))

require.Len(t, h.ModuleHistories, 1)
assert.False(t, h.ModuleHistories[0].Success)
}

func TestHistoryRecorder_IncrementalPersistence(t *testing.T) {
dir := t.TempDir()
instanceID := "i-incr111"
require.NoError(t, os.MkdirAll(filepath.Join(dir, instanceID), 0755))

recorder := NewHistoryRecorder(instanceID, dir, "history.json")
historyFile := filepath.Join(dir, instanceID, "history.json")

m1 := &Module{Name: "first", Type: "command", PriorityGroup: 1, RunOnce: true, Success: true}
require.NoError(t, recorder.Record(m1))

// After first record, file should exist with 1 entry
data, err := os.ReadFile(historyFile)
require.NoError(t, err)
var h1 History
require.NoError(t, json.Unmarshal(data, &h1))
assert.Len(t, h1.ModuleHistories, 1)

m2 := &Module{Name: "second", Type: "motd", PriorityGroup: 1, RunPerBoot: true, Success: true}
require.NoError(t, recorder.Record(m2))

// After second record, file should have 2 entries
data, err = os.ReadFile(historyFile)
require.NoError(t, err)
var h2 History
require.NoError(t, json.Unmarshal(data, &h2))
assert.Len(t, h2.ModuleHistories, 2)
}
15 changes: 7 additions & 8 deletions run.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ func run(baseDir string, c *ec2macosinit.InitConfig) {
}
c.Log.Info("Successfully gathered instance history")

// Create a history recorder to persist results incrementally
recorder := ec2macosinit.NewHistoryRecorder(c.IMDS.InstanceID, c.HistoryPath, c.HistoryFilename)

// Process each module by priority level
var aggregateFatal bool
var aggFatalModuleName string
Expand Down Expand Up @@ -140,6 +143,10 @@ func run(baseDir string, c *ec2macosinit.InitConfig) {
m.Success = true
c.Log.Infof("Skipping module [%s] (type: %s, group: %d) due to Run type setting\n", m.Name, m.Type, m.PriorityGroup)
}
// Record result incrementally so history is persisted even if a later module crashes
if err := recorder.Record(m); err != nil {
c.Log.Errorf("Error recording history for module [%s]: %s", m.Name, err)
}
wg.Done()
}(&c.ModulesByPriority[i][j], &c.InstanceHistory)
}
Expand All @@ -151,14 +158,6 @@ func run(baseDir string, c *ec2macosinit.InitConfig) {
}
}

// Write history file
c.Log.Infof("Writing instance history for instance %s...", c.IMDS.InstanceID)
err = c.WriteHistoryFile()
if err != nil {
c.Log.Fatalf(computeExitCode(c, 73), "Error writing instance history file: %s", err)
}
c.Log.Info("Successfully wrote instance history")

// If any module triggered an aggregate fatal, exit 1
if aggregateFatal {
c.Log.Fatalf(computeExitCode(c, 1), "Exiting after %s due to failure in module [%s] with FatalOnError set", time.Since(startTime).String(), aggFatalModuleName)
Expand Down
Loading