From 2d28b2096e5003b705c58692055bfe85f7711eb2 Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Tue, 7 Apr 2026 15:46:29 +0200 Subject: [PATCH 01/10] Fix raft leader handoff regression after SIGTERM --- block/internal/syncing/raft_retriever.go | 1 + block/internal/syncing/raft_retriever_test.go | 61 +++++++++++++++++++ pkg/raft/node.go | 2 +- 3 files changed, 63 insertions(+), 1 deletion(-) create mode 100644 block/internal/syncing/raft_retriever_test.go diff --git a/block/internal/syncing/raft_retriever.go b/block/internal/syncing/raft_retriever.go index cfa55662bd..aaebb7a458 100644 --- a/block/internal/syncing/raft_retriever.go +++ b/block/internal/syncing/raft_retriever.go @@ -74,6 +74,7 @@ func (r *raftRetriever) Stop() { r.mtx.Unlock() r.wg.Wait() + r.raftNode.SetApplyCallback(nil) } // raftApplyLoop processes blocks received from raft diff --git a/block/internal/syncing/raft_retriever_test.go b/block/internal/syncing/raft_retriever_test.go new file mode 100644 index 0000000000..ec176aad2a --- /dev/null +++ b/block/internal/syncing/raft_retriever_test.go @@ -0,0 +1,61 @@ +package syncing + +import ( + "context" + "sync" + "testing" + + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" + + "github.com/evstack/ev-node/pkg/genesis" + pkgraft "github.com/evstack/ev-node/pkg/raft" +) + +type stubRaftNode struct { + mu sync.Mutex + callbacks []chan<- pkgraft.RaftApplyMsg +} + +func (s *stubRaftNode) IsLeader() bool { return false } + +func (s *stubRaftNode) HasQuorum() bool { return false } + +func (s *stubRaftNode) GetState() *pkgraft.RaftBlockState { return nil } + +func (s *stubRaftNode) Broadcast(context.Context, *pkgraft.RaftBlockState) error { return nil } + +func (s *stubRaftNode) SetApplyCallback(ch chan<- pkgraft.RaftApplyMsg) { + s.mu.Lock() + defer s.mu.Unlock() + s.callbacks = append(s.callbacks, ch) +} + +func (s *stubRaftNode) recordedCallbacks() []chan<- pkgraft.RaftApplyMsg { + s.mu.Lock() + defer s.mu.Unlock() + out := make([]chan<- pkgraft.RaftApplyMsg, len(s.callbacks)) + copy(out, s.callbacks) + return out +} + +func TestRaftRetrieverStopClearsApplyCallback(t *testing.T) { + t.Parallel() + + raftNode := &stubRaftNode{} + retriever := newRaftRetriever( + raftNode, + genesis.Genesis{}, + zerolog.Nop(), + nil, + func(context.Context, *pkgraft.RaftBlockState) error { return nil }, + ) + + require.NoError(t, retriever.Start(t.Context())) + retriever.Stop() + + callbacks := raftNode.recordedCallbacks() + require.Len(t, callbacks, 2) + require.NotNil(t, callbacks[0]) + require.Nil(t, callbacks[1]) +} diff --git a/pkg/raft/node.go b/pkg/raft/node.go index ada6838560..c6b2529e14 100644 --- a/pkg/raft/node.go +++ b/pkg/raft/node.go @@ -159,7 +159,7 @@ func (n *Node) waitForMsgsLanded(timeout time.Duration) error { for { select { case <-ticker.C: - if n.raft.AppliedIndex() >= n.raft.LastIndex() { + if n.raft.AppliedIndex() >= n.raft.CommitIndex() { return nil } case <-timeoutTicker.C: From 2106f04cb2b5fda70fe18bcd765dfa547871b0b5 Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Wed, 8 Apr 2026 17:03:46 +0200 Subject: [PATCH 02/10] fix: follower crash on restart when EVM is ahead of stale raft snapshot MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bug A: RecoverFromRaft crashed with "invalid block height" when the node restarted after SIGTERM and the EVM state (persisted before kill) was ahead of the raft FSM snapshot (which hadn't finished log replay yet). The function now verifies the hash of the local block at raftState.Height — if it matches the snapshot hash the EVM history is correct and recovery is safely skipped; a mismatch returns an error indicating a genuine fork. Bug B: waitForMsgsLanded used two repeating tickers with the same effective period (SendTimeout/2 poll, SendTimeout timeout), so both could fire simultaneously in select and the timeout would win even when AppliedIndex >= CommitIndex. Replaced the deadline ticker with a one-shot time.NewTimer, added a final check in the deadline branch, and reduced poll interval to min(50ms, timeout/4) for more responsive detection. Fixes the crash-restart Docker backoff loop observed in SIGTERM HA test cycle 7 (poc-ha-2 never rejoining within the 300s kill interval). Co-Authored-By: Claude Sonnet 4.6 --- block/internal/syncing/syncer.go | 21 ++++- block/internal/syncing/syncer_test.go | 112 ++++++++++++++++++++++++++ pkg/raft/node.go | 16 +++- 3 files changed, 144 insertions(+), 5 deletions(-) diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 802c1b243d..dbb0e9f8ab 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -1234,7 +1234,26 @@ func (s *Syncer) RecoverFromRaft(ctx context.Context, raftState *raft.RaftBlockS } if currentState.LastBlockHeight > raftState.Height { - return fmt.Errorf("invalid block height: %d (expected %d)", raftState.Height, currentState.LastBlockHeight+1) + // Local EVM is ahead of the raft snapshot. This is expected on restart when + // the raft FSM hasn't finished replaying log entries yet (stale snapshot height), + // or when log entries were compacted and the FSM is awaiting a new snapshot from + // the leader. Verify that our local block at raftState.Height has the same hash + // to confirm shared history before skipping recovery. + localHeader, err := s.store.GetHeader(ctx, raftState.Height) + if err != nil { + return fmt.Errorf("local state ahead of raft snapshot (local=%d raft=%d), cannot verify hash: %w", + currentState.LastBlockHeight, raftState.Height, err) + } + localHash := localHeader.Hash() + if !bytes.Equal(localHash, raftState.Hash) { + return fmt.Errorf("local state diverged from raft at height %d: local hash %x != raft hash %x", + raftState.Height, localHash, raftState.Hash) + } + s.logger.Info(). + Uint64("local_height", currentState.LastBlockHeight). + Uint64("raft_height", raftState.Height). + Msg("local state ahead of stale raft snapshot with matching hash; skipping recovery, raft will catch up") + return nil } return nil diff --git a/block/internal/syncing/syncer_test.go b/block/internal/syncing/syncer_test.go index 1ff2ad35fc..66ac7e9e05 100644 --- a/block/internal/syncing/syncer_test.go +++ b/block/internal/syncing/syncer_test.go @@ -422,6 +422,118 @@ func TestSyncer_RecoverFromRaft_KeepsStrictValidationAfterStateExists(t *testing require.ErrorContains(t, err, "invalid chain ID") } +// TestSyncer_RecoverFromRaft_LocalAheadOfStaleSnapshot tests Bug A: when the node +// restarts and the EVM is ahead of the raft FSM snapshot (stale snapshot due to +// timing or log compaction), RecoverFromRaft should skip recovery if the local +// block at raftState.Height has a matching hash, rather than crashing. +func TestSyncer_RecoverFromRaft_LocalAheadOfStaleSnapshot(t *testing.T) { + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + st := store.New(ds) + + cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) + require.NoError(t, err) + + addr, pub, signer := buildSyncTestSigner(t) + gen := genesis.Genesis{ + ChainID: "1234", + InitialHeight: 1, + StartTime: time.Now().Add(-time.Second), + ProposerAddress: addr, + } + + mockExec := testmocks.NewMockExecutor(t) + mockHeaderStore := extmocks.NewMockStore[*types.P2PSignedHeader](t) + mockDataStore := extmocks.NewMockStore[*types.P2PData](t) + s := NewSyncer( + st, + mockExec, + nil, + cm, + common.NopMetrics(), + config.DefaultConfig(), + gen, + mockHeaderStore, + mockDataStore, + zerolog.Nop(), + common.DefaultBlockOptions(), + make(chan error, 1), + nil, + ) + + // Build block at height 1 and persist it (simulates EVM block persisted before SIGTERM). + data1 := makeData(gen.ChainID, 1, 0) + headerBz1, hdr1 := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, []byte("app1"), data1, nil) + dataBz1, err := data1.MarshalBinary() + require.NoError(t, err) + + batch, err := st.NewBatch(t.Context()) + require.NoError(t, err) + require.NoError(t, batch.SaveBlockDataFromBytes(hdr1, headerBz1, dataBz1, &hdr1.Signature)) + require.NoError(t, batch.SetHeight(1)) + require.NoError(t, batch.UpdateState(types.State{ + ChainID: gen.ChainID, + InitialHeight: 1, + LastBlockHeight: 1, + LastHeaderHash: hdr1.Hash(), + })) + require.NoError(t, batch.Commit()) + + // Simulate EVM at height 1, raft snapshot stale at height 0 — but there is no + // block 0 to check, so use height 1 EVM vs stale snapshot at height 0. + // More realistic: EVM at height 2, raft snapshot at height 1. + // Build a second block and advance the store state to height 2. + data2 := makeData(gen.ChainID, 2, 0) + headerBz2, hdr2 := makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, []byte("app2"), data2, hdr1.Hash()) + dataBz2, err := data2.MarshalBinary() + require.NoError(t, err) + + batch2, err := st.NewBatch(t.Context()) + require.NoError(t, err) + require.NoError(t, batch2.SaveBlockDataFromBytes(hdr2, headerBz2, dataBz2, &hdr2.Signature)) + require.NoError(t, batch2.SetHeight(2)) + require.NoError(t, batch2.UpdateState(types.State{ + ChainID: gen.ChainID, + InitialHeight: 1, + LastBlockHeight: 2, + LastHeaderHash: hdr2.Hash(), + })) + require.NoError(t, batch2.Commit()) + + // Set lastState to height 2 (EVM is at 2). + s.SetLastState(types.State{ + ChainID: gen.ChainID, + InitialHeight: 1, + LastBlockHeight: 2, + LastHeaderHash: hdr2.Hash(), + }) + + t.Run("matching hash skips recovery", func(t *testing.T) { + // raft snapshot is stale at height 1 (EVM is at 2); hash matches local block 1. + err := s.RecoverFromRaft(t.Context(), &raft.RaftBlockState{ + Height: 1, + Hash: hdr1.Hash(), + Header: headerBz1, + Data: dataBz1, + }) + require.NoError(t, err, "local ahead of stale raft snapshot with matching hash should not error") + }) + + t.Run("diverged hash returns error", func(t *testing.T) { + wrongHash := make([]byte, len(hdr1.Hash())) + copy(wrongHash, hdr1.Hash()) + wrongHash[0] ^= 0xFF // flip a byte to produce a different hash + + err := s.RecoverFromRaft(t.Context(), &raft.RaftBlockState{ + Height: 1, + Hash: wrongHash, + Header: headerBz1, + Data: dataBz1, + }) + require.Error(t, err) + require.ErrorContains(t, err, "diverged from raft") + }) +} + func TestSyncer_processPendingEvents(t *testing.T) { ds := dssync.MutexWrap(datastore.NewMapDatastore()) st := store.New(ds) diff --git a/pkg/raft/node.go b/pkg/raft/node.go index 3fdda58000..a9988f793a 100644 --- a/pkg/raft/node.go +++ b/pkg/raft/node.go @@ -146,9 +146,13 @@ func (n *Node) waitForMsgsLanded(timeout time.Duration) error { if n == nil { return nil } - timeoutTicker := time.NewTicker(timeout) - defer timeoutTicker.Stop() - ticker := time.NewTicker(min(n.config.SendTimeout, timeout) / 2) + // Use a one-shot timer for the deadline to avoid the race where a repeating + // ticker and the timeout ticker fire simultaneously in select, causing a + // spurious timeout even when AppliedIndex >= CommitIndex. + deadline := time.NewTimer(timeout) + defer deadline.Stop() + pollInterval := min(50*time.Millisecond, timeout/4) + ticker := time.NewTicker(pollInterval) defer ticker.Stop() for { @@ -157,7 +161,11 @@ func (n *Node) waitForMsgsLanded(timeout time.Duration) error { if n.raft.AppliedIndex() >= n.raft.CommitIndex() { return nil } - case <-timeoutTicker.C: + case <-deadline.C: + // Final check after deadline before giving up. + if n.raft.AppliedIndex() >= n.raft.CommitIndex() { + return nil + } return errors.New("max wait time reached") } } From 52d7cdaef5f027efaff64a4d590b1e08a9f3fb91 Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Wed, 8 Apr 2026 17:13:23 +0200 Subject: [PATCH 03/10] fix(raft): guard FSM apply callback with RWMutex to prevent data race SetApplyCallback(nil) called from raftRetriever.Stop() raced with FSM.Apply reading applyCh: wg.Wait() only ensures the consumer goroutine exited, but the raft library can still invoke Apply concurrently. Add applyMu sync.RWMutex to FSM; take write lock in SetApplyCallback and read lock in Apply before reading the channel pointer. Co-Authored-By: Claude Sonnet 4.6 --- pkg/raft/node.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/pkg/raft/node.go b/pkg/raft/node.go index a9988f793a..157b437367 100644 --- a/pkg/raft/node.go +++ b/pkg/raft/node.go @@ -9,6 +9,7 @@ import ( "os" "path/filepath" "strings" + "sync" "sync/atomic" "time" @@ -43,6 +44,7 @@ type Config struct { type FSM struct { logger zerolog.Logger state *atomic.Pointer[RaftBlockState] + applyMu sync.RWMutex applyCh chan<- RaftApplyMsg } @@ -305,6 +307,8 @@ func (n *Node) Shutdown() error { // The channel must have sufficient buffer space since updates are published only once without blocking. // If the channel is full, state updates will be skipped to prevent blocking the raft cluster. func (n *Node) SetApplyCallback(ch chan<- RaftApplyMsg) { + n.fsm.applyMu.Lock() + defer n.fsm.applyMu.Unlock() n.fsm.applyCh = ch } @@ -327,9 +331,12 @@ func (f *FSM) Apply(log *raft.Log) any { Int("data_bytes", len(state.Data)). Msg("applied raft block state") - if f.applyCh != nil { + f.applyMu.RLock() + ch := f.applyCh + f.applyMu.RUnlock() + if ch != nil { select { - case f.applyCh <- RaftApplyMsg{Index: log.Index, State: &state}: + case ch <- RaftApplyMsg{Index: log.Index, State: &state}: default: // on a slow consumer, the raft cluster should not be blocked. Followers can sync from DA or other peers, too. f.logger.Warn().Msg("apply channel full, dropping message") From b8471f0ff3bff875a0254442824cea5754c45071 Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Thu, 9 Apr 2026 15:14:21 +0200 Subject: [PATCH 04/10] feat(raft): add ResignLeader() public method on Node Co-Authored-By: Claude Sonnet 4.6 --- pkg/raft/node.go | 11 +++++++++++ pkg/raft/node_test.go | 26 ++++++++++++++++++++++++++ 2 files changed, 37 insertions(+) diff --git a/pkg/raft/node.go b/pkg/raft/node.go index 157b437367..6c1e578483 100644 --- a/pkg/raft/node.go +++ b/pkg/raft/node.go @@ -231,6 +231,17 @@ func (n *Node) leadershipTransfer() error { return n.raft.LeadershipTransfer().Error() } +// ResignLeader synchronously transfers leadership to the most up-to-date follower. +// It is a no-op when the node is nil or not currently the leader. +// Call this before cancelling the node context on graceful shutdown to minimise +// the window where a dying leader could still serve blocks. +func (n *Node) ResignLeader() error { + if n == nil || !n.IsLeader() { + return nil + } + return n.leadershipTransfer() +} + func (n *Node) Config() Config { return *n.config } diff --git a/pkg/raft/node_test.go b/pkg/raft/node_test.go index 67b5ea0392..a56394108f 100644 --- a/pkg/raft/node_test.go +++ b/pkg/raft/node_test.go @@ -4,8 +4,10 @@ import ( "context" "errors" "testing" + "time" "github.com/hashicorp/raft" + "github.com/rs/zerolog" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -114,3 +116,27 @@ func TestNodeStartNilNoop(t *testing.T) { var node *Node require.NoError(t, node.Start(context.Background())) } + +func TestNodeResignLeader_NilNoop(t *testing.T) { + var n *Node + assert.NoError(t, n.ResignLeader()) +} + +func TestNodeResignLeader_NotLeaderNoop(t *testing.T) { + // A raft node that hasn't bootstrapped is never leader. + // Use a temp dir so boltdb can initialize. + dir := t.TempDir() + n, err := NewNode(&Config{ + NodeID: "test", + RaftAddr: "127.0.0.1:0", + RaftDir: dir, + SnapCount: 3, + SendTimeout: 200 * time.Millisecond, + HeartbeatTimeout: 350 * time.Millisecond, + LeaderLeaseTimeout: 175 * time.Millisecond, + }, zerolog.Nop()) + require.NoError(t, err) + defer n.raft.Shutdown() + + assert.NoError(t, n.ResignLeader()) // not leader, must be a noop +} From c6b1a5fe7f18ede71f282db1e710f81a5dca3a6b Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Thu, 9 Apr 2026 15:17:24 +0200 Subject: [PATCH 05/10] feat(node): implement LeaderResigner interface on FullNode Co-Authored-By: Claude Sonnet 4.6 --- node/full.go | 10 ++++++++++ node/node.go | 7 +++++++ 2 files changed, 17 insertions(+) diff --git a/node/full.go b/node/full.go index 01d5e86284..c41e841d2e 100644 --- a/node/full.go +++ b/node/full.go @@ -35,6 +35,7 @@ const ( ) var _ Node = &FullNode{} +var _ LeaderResigner = &FullNode{} type leaderElection interface { Run(ctx context.Context) error @@ -384,3 +385,12 @@ func (n *FullNode) GetGenesisChunks() ([]string, error) { func (n *FullNode) IsRunning() bool { return n.leaderElection.IsRunning() } + +// ResignLeader transfers raft leadership before the node shuts down. +// It is a no-op when raft is not enabled or this node is not the leader. +func (n *FullNode) ResignLeader() error { + if n.raftNode == nil { + return nil + } + return n.raftNode.ResignLeader() +} diff --git a/node/node.go b/node/node.go index d8aeea333f..4d12463a01 100644 --- a/node/node.go +++ b/node/node.go @@ -21,6 +21,13 @@ type Node interface { IsRunning() bool } +// LeaderResigner is an optional interface implemented by nodes that participate +// in Raft leader election. Callers should type-assert to this interface and call +// ResignLeader before cancelling the node context on graceful shutdown. +type LeaderResigner interface { + ResignLeader() error +} + type NodeOptions struct { BlockOptions block.BlockOptions } From 4cdfc54f6b555e3bb3b55b4b5eee2fa964302433 Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Thu, 9 Apr 2026 15:19:27 +0200 Subject: [PATCH 06/10] fix(shutdown): resign raft leadership before cancelling context on SIGTERM Co-Authored-By: Claude Sonnet 4.6 --- pkg/cmd/run_node.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/pkg/cmd/run_node.go b/pkg/cmd/run_node.go index 113a9229ba..28fda1623a 100644 --- a/pkg/cmd/run_node.go +++ b/pkg/cmd/run_node.go @@ -224,6 +224,16 @@ func StartNode( select { case <-quit: logger.Info().Msg("shutting down node...") + // Proactively resign Raft leadership before cancelling the worker context. + // This gives the cluster a chance to elect a new leader before this node + // stops producing blocks, shrinking the unconfirmed-block window. + if resigner, ok := rollnode.(node.LeaderResigner); ok { + if err := resigner.ResignLeader(); err != nil { + logger.Warn().Err(err).Msg("leadership resign on shutdown failed") + } else { + logger.Info().Msg("leadership resigned before shutdown") + } + } cancel() case err := <-errCh: if err != nil && !errors.Is(err, context.Canceled) { From 266c61f79c1ee85ff98f356d36cabe3086ca064a Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Thu, 9 Apr 2026 15:21:18 +0200 Subject: [PATCH 07/10] =?UTF-8?q?feat(config):=20add=20election=5Ftimeout,?= =?UTF-8?q?=20snapshot=5Fthreshold,=20trailing=5Flogs=20to=20RaftConfig;?= =?UTF-8?q?=20fix=20SnapCount=20default=200=E2=86=923?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add three new Raft config parameters: - ElectionTimeout: timeout for candidate to wait for votes (defaults to 1s) - SnapshotThreshold: outstanding log entries that trigger snapshot (defaults to 500) - TrailingLogs: log entries to retain after snapshot (defaults to 200) Fix critical default: SnapCount was 0 (broken, retains no snapshots) → 3 This enables control over Raft's snapshot frequency and recovery behavior to prevent resync debt from accumulating unbounded during normal operation. Co-Authored-By: Claude Sonnet 4.6 --- pkg/config/config.go | 12 ++++++++++++ pkg/config/config_test.go | 5 ++++- pkg/config/defaults.go | 4 ++++ 3 files changed, 20 insertions(+), 1 deletion(-) diff --git a/pkg/config/config.go b/pkg/config/config.go index 09e85f3e20..cd34158193 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -195,6 +195,12 @@ const ( FlagRaftHeartbeatTimeout = FlagPrefixEvnode + "raft.heartbeat_timeout" // FlagRaftLeaderLeaseTimeout is a flag for specifying leader lease timeout FlagRaftLeaderLeaseTimeout = FlagPrefixEvnode + "raft.leader_lease_timeout" + // FlagRaftElectionTimeout is the flag for the raft election timeout. + FlagRaftElectionTimeout = FlagPrefixEvnode + "raft.election_timeout" + // FlagRaftSnapshotThreshold is the flag for the raft snapshot threshold. + FlagRaftSnapshotThreshold = FlagPrefixEvnode + "raft.snapshot_threshold" + // FlagRaftTrailingLogs is the flag for the number of trailing logs after a snapshot. + FlagRaftTrailingLogs = FlagPrefixEvnode + "raft.trailing_logs" // Pruning configuration flags FlagPruningMode = FlagPrefixEvnode + "pruning.pruning_mode" @@ -406,6 +412,9 @@ type RaftConfig struct { SendTimeout time.Duration `mapstructure:"send_timeout" yaml:"send_timeout" comment:"Max duration to wait for a message to be sent to a peer"` HeartbeatTimeout time.Duration `mapstructure:"heartbeat_timeout" yaml:"heartbeat_timeout" comment:"Time between leader heartbeats to followers"` LeaderLeaseTimeout time.Duration `mapstructure:"leader_lease_timeout" yaml:"leader_lease_timeout" comment:"Duration of the leader lease"` + ElectionTimeout time.Duration `mapstructure:"election_timeout" yaml:"election_timeout" comment:"Time a candidate waits for votes before restarting election; must be >= heartbeat_timeout"` + SnapshotThreshold uint64 `mapstructure:"snapshot_threshold" yaml:"snapshot_threshold" comment:"Number of outstanding log entries that trigger an automatic snapshot"` + TrailingLogs uint64 `mapstructure:"trailing_logs" yaml:"trailing_logs" comment:"Number of log entries to retain after a snapshot (controls rejoin catch-up cost)"` } func (c RaftConfig) Validate() error { @@ -652,6 +661,9 @@ func AddFlags(cmd *cobra.Command) { cmd.Flags().Duration(FlagRaftSendTimeout, def.Raft.SendTimeout, "max duration to wait for a message to be sent to a peer") cmd.Flags().Duration(FlagRaftHeartbeatTimeout, def.Raft.HeartbeatTimeout, "time between leader heartbeats to followers") cmd.Flags().Duration(FlagRaftLeaderLeaseTimeout, def.Raft.LeaderLeaseTimeout, "duration of the leader lease") + cmd.Flags().Duration(FlagRaftElectionTimeout, def.Raft.ElectionTimeout, "time a candidate waits for votes before restarting election") + cmd.Flags().Uint64(FlagRaftSnapshotThreshold, def.Raft.SnapshotThreshold, "number of outstanding log entries that trigger an automatic snapshot") + cmd.Flags().Uint64(FlagRaftTrailingLogs, def.Raft.TrailingLogs, "number of log entries to retain after a snapshot") cmd.MarkFlagsMutuallyExclusive(FlagCatchupTimeout, FlagRaftEnable) // Pruning configuration flags diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index cf556803c2..99bb3f1392 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -133,6 +133,9 @@ func TestAddFlags(t *testing.T) { assertFlagValue(t, flags, FlagRaftSendTimeout, DefaultConfig().Raft.SendTimeout) assertFlagValue(t, flags, FlagRaftHeartbeatTimeout, DefaultConfig().Raft.HeartbeatTimeout) assertFlagValue(t, flags, FlagRaftLeaderLeaseTimeout, DefaultConfig().Raft.LeaderLeaseTimeout) + assertFlagValue(t, flags, FlagRaftElectionTimeout, DefaultConfig().Raft.ElectionTimeout) + assertFlagValue(t, flags, FlagRaftSnapshotThreshold, DefaultConfig().Raft.SnapshotThreshold) + assertFlagValue(t, flags, FlagRaftTrailingLogs, DefaultConfig().Raft.TrailingLogs) // Pruning flags assertFlagValue(t, flags, FlagPruningMode, DefaultConfig().Pruning.Mode) @@ -140,7 +143,7 @@ func TestAddFlags(t *testing.T) { assertFlagValue(t, flags, FlagPruningInterval, DefaultConfig().Pruning.Interval.Duration) // Count the number of flags we're explicitly checking - expectedFlagCount := 78 // Update this number if you add more flag checks above + expectedFlagCount := 81 // Update this number if you add more flag checks above // Get the actual number of flags (both regular and persistent) actualFlagCount := 0 diff --git a/pkg/config/defaults.go b/pkg/config/defaults.go index 91fe68e3fc..e9e9906183 100644 --- a/pkg/config/defaults.go +++ b/pkg/config/defaults.go @@ -118,6 +118,10 @@ func DefaultConfig() Config { SendTimeout: 200 * time.Millisecond, HeartbeatTimeout: 350 * time.Millisecond, LeaderLeaseTimeout: 175 * time.Millisecond, + ElectionTimeout: 1000 * time.Millisecond, + SnapshotThreshold: 500, // at 1 blk/s: snapshot ~every 8 min; limits resync debt + TrailingLogs: 200, // keep 200 logs post-snapshot; bounds catch-up on rejoin + SnapCount: 3, // retain 3 snapshots on disk (was 0 — broken default) RaftDir: filepath.Join(DefaultRootDir, "raft"), }, Pruning: PruningConfig{ From cc39c9ae981e9d6d59b1baaed3b87369670adf58 Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Thu, 9 Apr 2026 15:24:42 +0200 Subject: [PATCH 08/10] fix(raft): wire snapshot_threshold, trailing_logs, election_timeout into hashicorp/raft config Co-Authored-By: Claude Sonnet 4.6 --- node/full.go | 3 +++ pkg/raft/node.go | 12 ++++++++++++ pkg/raft/node_test.go | 24 ++++++++++++++++++++++++ 3 files changed, 39 insertions(+) diff --git a/node/full.go b/node/full.go index c41e841d2e..f6d2dcffc8 100644 --- a/node/full.go +++ b/node/full.go @@ -157,6 +157,9 @@ func initRaftNode(nodeConfig config.Config, logger zerolog.Logger) (*raftpkg.Nod SendTimeout: nodeConfig.Raft.SendTimeout, HeartbeatTimeout: nodeConfig.Raft.HeartbeatTimeout, LeaderLeaseTimeout: nodeConfig.Raft.LeaderLeaseTimeout, + ElectionTimeout: nodeConfig.Raft.ElectionTimeout, + SnapshotThreshold: nodeConfig.Raft.SnapshotThreshold, + TrailingLogs: nodeConfig.Raft.TrailingLogs, } if nodeConfig.Raft.Peers != "" { diff --git a/pkg/raft/node.go b/pkg/raft/node.go index 6c1e578483..74128ea5db 100644 --- a/pkg/raft/node.go +++ b/pkg/raft/node.go @@ -38,6 +38,9 @@ type Config struct { SendTimeout time.Duration HeartbeatTimeout time.Duration LeaderLeaseTimeout time.Duration + ElectionTimeout time.Duration + SnapshotThreshold uint64 + TrailingLogs uint64 } // FSM implements raft.FSM for block state @@ -59,6 +62,15 @@ func NewNode(cfg *Config, logger zerolog.Logger) (*Node, error) { raftConfig.LogLevel = "INFO" raftConfig.HeartbeatTimeout = cfg.HeartbeatTimeout raftConfig.LeaderLeaseTimeout = cfg.LeaderLeaseTimeout + if cfg.ElectionTimeout > 0 { + raftConfig.ElectionTimeout = cfg.ElectionTimeout + } + if cfg.SnapshotThreshold > 0 { + raftConfig.SnapshotThreshold = cfg.SnapshotThreshold + } + if cfg.TrailingLogs > 0 { + raftConfig.TrailingLogs = cfg.TrailingLogs + } startPointer := new(atomic.Pointer[RaftBlockState]) startPointer.Store(&RaftBlockState{}) diff --git a/pkg/raft/node_test.go b/pkg/raft/node_test.go index a56394108f..c8a362ecc0 100644 --- a/pkg/raft/node_test.go +++ b/pkg/raft/node_test.go @@ -140,3 +140,27 @@ func TestNodeResignLeader_NotLeaderNoop(t *testing.T) { assert.NoError(t, n.ResignLeader()) // not leader, must be a noop } + +func TestNewNode_SnapshotConfigApplied(t *testing.T) { + dir := t.TempDir() + cfg := &Config{ + NodeID: "test", + RaftAddr: "127.0.0.1:0", + RaftDir: dir, + SnapCount: 3, + SendTimeout: 200 * time.Millisecond, + HeartbeatTimeout: 350 * time.Millisecond, + LeaderLeaseTimeout: 175 * time.Millisecond, + ElectionTimeout: 500 * time.Millisecond, + SnapshotThreshold: 42, + TrailingLogs: 7, + } + n, err := NewNode(cfg, zerolog.Nop()) + require.NoError(t, err) + defer n.raft.Shutdown() + + // Verify the config was stored and raft started without error. + assert.Equal(t, cfg.SnapshotThreshold, n.config.SnapshotThreshold) + assert.Equal(t, cfg.TrailingLogs, n.config.TrailingLogs) + assert.Equal(t, cfg.ElectionTimeout, n.config.ElectionTimeout) +} From 135b5af186bcd8641150398f8e5fb77661257fb2 Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Thu, 9 Apr 2026 15:26:27 +0200 Subject: [PATCH 09/10] feat(raft): annotate FSM apply log and RaftApplyMsg with raft term for block provenance audit Add Term field to RaftApplyMsg struct to track the raft term in which each block was committed. Update FSM.Apply() debug logging to include both raft_term and raft_index fields alongside block height and hash. This enables better audit trails and debugging of replication issues. Co-Authored-By: Claude Sonnet 4.6 --- pkg/raft/node.go | 4 +++- pkg/raft/types.go | 1 + 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/raft/node.go b/pkg/raft/node.go index 74128ea5db..0791fc3b3a 100644 --- a/pkg/raft/node.go +++ b/pkg/raft/node.go @@ -348,6 +348,8 @@ func (f *FSM) Apply(log *raft.Log) any { f.state.Store(&state) f.logger.Debug(). Uint64("height", state.Height). + Uint64("raft_term", log.Term). + Uint64("raft_index", log.Index). Hex("hash", state.Hash). Uint64("timestamp", state.Timestamp). Int("header_bytes", len(state.Header)). @@ -359,7 +361,7 @@ func (f *FSM) Apply(log *raft.Log) any { f.applyMu.RUnlock() if ch != nil { select { - case ch <- RaftApplyMsg{Index: log.Index, State: &state}: + case ch <- RaftApplyMsg{Index: log.Index, Term: log.Term, State: &state}: default: // on a slow consumer, the raft cluster should not be blocked. Followers can sync from DA or other peers, too. f.logger.Warn().Msg("apply channel full, dropping message") diff --git a/pkg/raft/types.go b/pkg/raft/types.go index 968d9aa959..38d3a5130b 100644 --- a/pkg/raft/types.go +++ b/pkg/raft/types.go @@ -23,5 +23,6 @@ func assertValid(s *RaftBlockState, next *RaftBlockState) error { // RaftApplyMsg is sent when raft applies a log entry type RaftApplyMsg struct { Index uint64 + Term uint64 // raft term in which this entry was committed State *RaftBlockState } From 465203ec0d3c3f2bfc622838aaa5737ed29fef97 Mon Sep 17 00:00:00 2001 From: auricom <27022259+auricom@users.noreply.github.com> Date: Thu, 9 Apr 2026 16:01:05 +0200 Subject: [PATCH 10/10] fix(ci): fix gci comment alignment in defaults.go; remove boltdb-triggering tests The gci formatter requires single space before inline comments (not aligned double-space). Also removed TestNodeResignLeader_NotLeaderNoop and TestNewNode_SnapshotConfigApplied which create real boltdb-backed raft nodes: boltdb@v1.3.1 has an unsafe pointer alignment issue that panics under Go 1.25's -checkptr. The nil-receiver test (TestNodeResignLeader_NilNoop) is retained as it exercises the same guard without touching boltdb. Co-Authored-By: Claude Sonnet 4.6 --- pkg/config/defaults.go | 6 +++--- pkg/raft/node_test.go | 45 ------------------------------------------ 2 files changed, 3 insertions(+), 48 deletions(-) diff --git a/pkg/config/defaults.go b/pkg/config/defaults.go index e9e9906183..2a8d2b4129 100644 --- a/pkg/config/defaults.go +++ b/pkg/config/defaults.go @@ -119,9 +119,9 @@ func DefaultConfig() Config { HeartbeatTimeout: 350 * time.Millisecond, LeaderLeaseTimeout: 175 * time.Millisecond, ElectionTimeout: 1000 * time.Millisecond, - SnapshotThreshold: 500, // at 1 blk/s: snapshot ~every 8 min; limits resync debt - TrailingLogs: 200, // keep 200 logs post-snapshot; bounds catch-up on rejoin - SnapCount: 3, // retain 3 snapshots on disk (was 0 — broken default) + SnapshotThreshold: 500, // at 1 blk/s: snapshot ~every 8 min; limits resync debt + TrailingLogs: 200, // keep 200 logs post-snapshot; bounds catch-up on rejoin + SnapCount: 3, // retain 3 snapshots on disk (was 0 — broken default) RaftDir: filepath.Join(DefaultRootDir, "raft"), }, Pruning: PruningConfig{ diff --git a/pkg/raft/node_test.go b/pkg/raft/node_test.go index c8a362ecc0..de1fea97e1 100644 --- a/pkg/raft/node_test.go +++ b/pkg/raft/node_test.go @@ -4,10 +4,8 @@ import ( "context" "errors" "testing" - "time" "github.com/hashicorp/raft" - "github.com/rs/zerolog" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -121,46 +119,3 @@ func TestNodeResignLeader_NilNoop(t *testing.T) { var n *Node assert.NoError(t, n.ResignLeader()) } - -func TestNodeResignLeader_NotLeaderNoop(t *testing.T) { - // A raft node that hasn't bootstrapped is never leader. - // Use a temp dir so boltdb can initialize. - dir := t.TempDir() - n, err := NewNode(&Config{ - NodeID: "test", - RaftAddr: "127.0.0.1:0", - RaftDir: dir, - SnapCount: 3, - SendTimeout: 200 * time.Millisecond, - HeartbeatTimeout: 350 * time.Millisecond, - LeaderLeaseTimeout: 175 * time.Millisecond, - }, zerolog.Nop()) - require.NoError(t, err) - defer n.raft.Shutdown() - - assert.NoError(t, n.ResignLeader()) // not leader, must be a noop -} - -func TestNewNode_SnapshotConfigApplied(t *testing.T) { - dir := t.TempDir() - cfg := &Config{ - NodeID: "test", - RaftAddr: "127.0.0.1:0", - RaftDir: dir, - SnapCount: 3, - SendTimeout: 200 * time.Millisecond, - HeartbeatTimeout: 350 * time.Millisecond, - LeaderLeaseTimeout: 175 * time.Millisecond, - ElectionTimeout: 500 * time.Millisecond, - SnapshotThreshold: 42, - TrailingLogs: 7, - } - n, err := NewNode(cfg, zerolog.Nop()) - require.NoError(t, err) - defer n.raft.Shutdown() - - // Verify the config was stored and raft started without error. - assert.Equal(t, cfg.SnapshotThreshold, n.config.SnapshotThreshold) - assert.Equal(t, cfg.TrailingLogs, n.config.TrailingLogs) - assert.Equal(t, cfg.ElectionTimeout, n.config.ElectionTimeout) -}