diff --git a/go.mod b/go.mod index a400ba39dc..1928984e64 100644 --- a/go.mod +++ b/go.mod @@ -86,6 +86,8 @@ require ( sigs.k8s.io/yaml v1.4.0 ) +replace github.com/smartcontractkit/chainlink-common/pkg/chipingress => ./pkg/chipingress + require ( github.com/apache/arrow-go/v18 v18.3.1 // indirect github.com/aybabtme/rgbterm v0.0.0-20170906152045-cc83f3b3ce59 // indirect diff --git a/go.sum b/go.sum index 50114fe85a..249d03a4c3 100644 --- a/go.sum +++ b/go.sum @@ -266,8 +266,6 @@ github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= github.com/smartcontractkit/chain-selectors v1.0.89 h1:L9oWZGqQXWyTPnC6ODXgu3b0DFyLmJ9eHv+uJrE9IZY= github.com/smartcontractkit/chain-selectors v1.0.89/go.mod h1:qy7whtgG5g+7z0jt0nRyii9bLND9m15NZTzuQPkMZ5w= -github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10 h1:FJAFgXS9oqASnkS03RE1HQwYQQxrO4l46O5JSzxqLgg= -github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10/go.mod h1:oiDa54M0FwxevWwyAX773lwdWvFYYlYHHQV1LQ5HpWY= github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4 h1:GCzrxDWn3b7jFfEA+WiYRi8CKoegsayiDoJBCjYkneE= github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4/go.mod h1:HHGeDUpAsPa0pmOx7wrByCitjQ0mbUxf0R9v+g67uCA= github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260514104516-a827acdffe43 h1:9vjqB+iNqwyazVoVjR1rozHXTeRYyeggavt3Q4sbNrg= diff --git a/pkg/beholder/durable_emitter.go b/pkg/beholder/durable_emitter.go index 222ea4c368..fb85727f89 100644 --- a/pkg/beholder/durable_emitter.go +++ b/pkg/beholder/durable_emitter.go @@ -259,10 +259,6 @@ func NewDurableEmitter( metrics: m, stopCh: make(chan struct{}), } - if cp, ok := client.(grpcConnProvider); ok { - d.rawConn = cp.Conn() - log.Infow("DurableEmitter: raw-codec batch publishing enabled (zero-copy protowire)") - } if cfg.InsertBatchSize > 0 { if bi, ok := store.(BatchInserter); ok { d.batchInserter = bi @@ -757,6 +753,7 @@ func (d *DurableEmitter) flushBatchRaw(ctx context.Context, batch []publishWork) // standard gRPC client. Used as fallback when rawConn is not available. func (d *DurableEmitter) flushBatchTyped(ctx context.Context, batch []publishWork) error { events := make([]*chipingress.CloudEventPb, len(batch)) + eventIDtoQueueID := make(map[string]int64, len(batch)) for i, w := range batch { if w.event != nil { events[i] = w.event @@ -767,10 +764,55 @@ func (d *DurableEmitter) flushBatchTyped(ctx context.Context, batch []publishWor } events[i] = ev } + eventIDtoQueueID[events[i].Id] = w.id + } + + batchPb := &chipingress.CloudEventBatch{ + Events: events, + Options: &chipingress.PublishOptions{ + AllowPartialSuccess: new(true), + }, + } + response, err := d.client.PublishBatch(ctx, batchPb) + if err != nil { + return fmt.Errorf("PublishBatch RPC failed: %w", err) + } + + groupByError := make(map[string][]int64) + for _, result := range response.Results { + if result.Error == nil { + continue + } + msg := fmt.Sprintf("%s (code %d) - %s", result.Error.Message, result.Error.Code, result.Error.Details) + id, exists := eventIDtoQueueID[result.EventId] + if !exists { + d.log.Warnw("PublishBatch returned invalid event", "event_id", result.EventId) + continue + } + groupByError[msg] = append(groupByError[msg], id) + } + if len(groupByError) > 0 { + if err := d.reportPartialFailures(ctx, groupByError); err != nil { + return fmt.Errorf("publish errors reporting failed: %w", err) + } } - batchPb := &chipingress.CloudEventBatch{Events: events} - _, err := d.client.PublishBatch(ctx, batchPb) - return err + + return nil +} + +func (d *DurableEmitter) reportPartialFailures(ctx context.Context, errGroups map[string][]int64) error { + var errList []error + + for msg, ids := range errGroups { + if err := d.store.MarkFailedBatch(ctx, msg, ids); err != nil { + errList = append(errList, fmt.Errorf("mark failed batch for message %q ids %v: %w", msg, ids, err)) + } + } + + if len(errList) > 0 { + return errors.Join(errList...) + } + return nil } func (d *DurableEmitter) retransmitLoop() { diff --git a/pkg/beholder/durable_emitter_test.go b/pkg/beholder/durable_emitter_test.go index ad96b53071..dc77ebbd5f 100644 --- a/pkg/beholder/durable_emitter_test.go +++ b/pkg/beholder/durable_emitter_test.go @@ -15,6 +15,7 @@ import ( "github.com/stretchr/testify/require" sdkmetric "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" + statuspb "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" @@ -905,6 +906,267 @@ func TestIntegration_GRPCConnection(t *testing.T) { assert.Equal(t, "test-entity", received.Type) } +// ---------- Partial-success helpers ---------- + +// partialSuccessClient is a chipingress.Client whose PublishBatch delegates result +// construction to a caller-supplied function so tests can return per-event errors +// without a real gRPC server. +type partialSuccessClient struct { + chipingress.NoopClient + batchCount atomic.Int64 + mu sync.Mutex + // resultFn is called with the batch events and returns the Results slice to + // embed in the PublishResponse. Nil means an empty (fully-successful) response. + resultFn func(events []*chipingress.CloudEventPb) []*chipingress.PublishResult +} + +func (c *partialSuccessClient) PublishBatch(_ context.Context, b *chipingress.CloudEventBatch, _ ...grpc.CallOption) (*chipingress.PublishResponse, error) { + c.batchCount.Add(1) + c.mu.Lock() + fn := c.resultFn + c.mu.Unlock() + resp := &chipingress.PublishResponse{} + if fn != nil && b != nil { + resp.Results = fn(b.Events) + } + return resp, nil +} + +// trackingMarkFailedStore wraps MemDurableEventStore and records every +// MarkFailedBatch call. Setting failMarkErr makes MarkFailedBatch return that +// error, which causes flushBatchTyped to propagate the error back to flushBatch, +// which then skips MarkDeliveredBatch — leaving all events pending for retransmit. +type trackingMarkFailedStore struct { + *MemDurableEventStore + mu sync.Mutex + failedCalls []markFailedCall + failMarkErr error +} + +type markFailedCall struct { + msg string + ids []int64 +} + +func newTrackingMarkFailedStore(failMarkErr error) *trackingMarkFailedStore { + return &trackingMarkFailedStore{ + MemDurableEventStore: NewMemDurableEventStore(), + failMarkErr: failMarkErr, + } +} + +func (s *trackingMarkFailedStore) MarkFailedBatch(_ context.Context, msg string, ids []int64) error { + s.mu.Lock() + defer s.mu.Unlock() + cp := make([]int64, len(ids)) + copy(cp, ids) + s.failedCalls = append(s.failedCalls, markFailedCall{msg: msg, ids: cp}) + return s.failMarkErr +} + +func (s *trackingMarkFailedStore) getFailedCalls() []markFailedCall { + s.mu.Lock() + defer s.mu.Unlock() + out := make([]markFailedCall, len(s.failedCalls)) + copy(out, s.failedCalls) + return out +} + +// ---------- Partial-success tests ---------- + +// TestDurableEmitter_PartialSuccess_CallsMarkFailedBatch asserts that when +// PublishBatch returns a per-event error result, MarkFailedBatch is called with +// the corresponding queue ID. After successful reporting, MarkDeliveredBatch is +// still called for all IDs, so the store drains to zero (the at-most-once-per-epoch +// semantics are preserved; retransmit is not needed because the reporting succeeded). +func TestDurableEmitter_PartialSuccess_CallsMarkFailedBatch(t *testing.T) { + store := newTrackingMarkFailedStore(nil) + client := &partialSuccessClient{ + resultFn: func(events []*chipingress.CloudEventPb) []*chipingress.PublishResult { + results := make([]*chipingress.PublishResult, 0, len(events)) + for _, ev := range events { + if ev == nil { + continue + } + results = append(results, &chipingress.PublishResult{ + EventId: ev.Id, + Error: &statuspb.Status{ + Code: int32(codes.InvalidArgument), + Message: "schema validation failed", + }, + }) + } + return results + }, + } + + cfg := DefaultDurableEmitterConfig() + cfg.DisablePruning = true + cfg.RetransmitInterval = time.Hour // disable retransmit so nothing auto-clears + em := newTestDurableEmitter(t, store, client, &cfg) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + em.Start(ctx) + defer em.Close() + + require.NoError(t, em.Emit(ctx, []byte("partial-fail"), testEmitAttrs()...)) + + // After flushBatch: MarkFailedBatch called for the error result, then + // MarkDeliveredBatch called for all IDs → store empties. + require.Eventually(t, func() bool { + return store.Len() == 0 + }, 2*time.Second, 10*time.Millisecond, "event should be removed after MarkDeliveredBatch even on partial failure") + + calls := store.getFailedCalls() + require.Len(t, calls, 1, "MarkFailedBatch must be called once for the failed result") + require.Len(t, calls[0].ids, 1, "MarkFailedBatch must receive the failing event's queue ID") + assert.Contains(t, calls[0].msg, "schema validation failed", "MarkFailedBatch message must contain the server error") +} + +// TestDurableEmitter_PartialSuccess_EmptyErrorMessage verifies the partial-failure +// path handles a PublishResult whose Error.Message is the empty string (the format +// string still produces a non-empty key, so MarkFailedBatch must still be called). +func TestDurableEmitter_PartialSuccess_EmptyErrorMessage(t *testing.T) { + store := newTrackingMarkFailedStore(nil) + client := &partialSuccessClient{ + resultFn: func(events []*chipingress.CloudEventPb) []*chipingress.PublishResult { + results := make([]*chipingress.PublishResult, 0, len(events)) + for _, ev := range events { + if ev == nil { + continue + } + results = append(results, &chipingress.PublishResult{ + EventId: ev.Id, + Error: &statuspb.Status{ + Code: int32(codes.Internal), + Message: "", // deliberately empty + }, + }) + } + return results + }, + } + + cfg := DefaultDurableEmitterConfig() + cfg.DisablePruning = true + cfg.RetransmitInterval = time.Hour + em := newTestDurableEmitter(t, store, client, &cfg) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + em.Start(ctx) + defer em.Close() + + require.NoError(t, em.Emit(ctx, []byte("empty-msg-fail"), testEmitAttrs()...)) + + require.Eventually(t, func() bool { + return store.Len() == 0 + }, 2*time.Second, 10*time.Millisecond, "event should be removed after MarkDeliveredBatch") + + calls := store.getFailedCalls() + require.Len(t, calls, 1, "MarkFailedBatch must be called even when Error.Message is empty") + require.Len(t, calls[0].ids, 1) + // The formatted key is " (code 13) - []" — non-empty, so grouped correctly. + assert.Contains(t, calls[0].msg, "code 13", "message should embed the grpc code even when text is empty") +} + +// TestDurableEmitter_PartialSuccess_MarkFailedBatchStoreError_EventRemainsForRetransmit +// covers the case where MarkFailedBatch itself returns an error. In that scenario +// reportPartialFailures propagates the error back through flushBatchTyped → +// flushBatch, which logs a warning and returns without calling MarkDeliveredBatch, +// leaving all events pending so the retransmit loop can redeliver them. +func TestDurableEmitter_PartialSuccess_MarkFailedBatchStoreError_EventRemainsForRetransmit(t *testing.T) { + storeErr := errors.New("db write failed") + store := newTrackingMarkFailedStore(storeErr) + client := &partialSuccessClient{ + resultFn: func(events []*chipingress.CloudEventPb) []*chipingress.PublishResult { + results := make([]*chipingress.PublishResult, 0, len(events)) + for _, ev := range events { + if ev == nil { + continue + } + results = append(results, &chipingress.PublishResult{ + EventId: ev.Id, + Error: &statuspb.Status{ + Code: int32(codes.Unavailable), + Message: "downstream unavailable", + }, + }) + } + return results + }, + } + + cfg := DefaultDurableEmitterConfig() + cfg.DisablePruning = true + cfg.RetransmitInterval = time.Hour // prevent retransmit from confusing assertions + em := newTestDurableEmitter(t, store, client, &cfg) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + em.Start(ctx) + defer em.Close() + + require.NoError(t, em.Emit(ctx, []byte("store-err"), testEmitAttrs()...)) + + // Wait until the batch publish attempt has run. + require.Eventually(t, func() bool { + return client.batchCount.Load() >= 1 + }, 2*time.Second, 10*time.Millisecond, "PublishBatch must be called at least once") + + // MarkFailedBatch returned an error → flushBatchTyped returned an error → + // MarkDeliveredBatch was never called → event remains in the store. + assert.Equal(t, 1, store.Len(), "event must remain pending when MarkFailedBatch fails") + + calls := store.getFailedCalls() + require.Len(t, calls, 1, "MarkFailedBatch must have been attempted") + require.Len(t, calls[0].ids, 1) +} + +// TestDurableEmitter_PartialSuccess_UnknownEventID_LoggedAndSkipped verifies that +// a PublishResult referencing an event ID absent from the published batch is silently +// skipped (logged as a warning). The remaining batch events are still marked +// delivered and removed from the store. +func TestDurableEmitter_PartialSuccess_UnknownEventID_LoggedAndSkipped(t *testing.T) { + store := newTrackingMarkFailedStore(nil) + client := &partialSuccessClient{ + resultFn: func(_ []*chipingress.CloudEventPb) []*chipingress.PublishResult { + // Return an error result for an ID that was not in the batch. + return []*chipingress.PublishResult{ + { + EventId: "non-existent-event-id-xyz", + Error: &statuspb.Status{ + Code: int32(codes.NotFound), + Message: "event not found", + }, + }, + } + }, + } + + cfg := DefaultDurableEmitterConfig() + cfg.DisablePruning = true + cfg.RetransmitInterval = time.Hour + em := newTestDurableEmitter(t, store, client, &cfg) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + em.Start(ctx) + defer em.Close() + + require.NoError(t, em.Emit(ctx, []byte("unknown-id"), testEmitAttrs()...)) + + // Unknown result ID → not added to groupByError → MarkFailedBatch not called → + // MarkDeliveredBatch called for all IDs → store empties. + require.Eventually(t, func() bool { + return store.Len() == 0 + }, 2*time.Second, 10*time.Millisecond, "event should still be delivered when result has unknown event ID") + + calls := store.getFailedCalls() + assert.Empty(t, calls, "MarkFailedBatch must NOT be called when all error results have unknown event IDs") +} + // MemDurableEventStore is an in-memory DurableEventStore for unit tests. type MemDurableEventStore struct { mu sync.Mutex @@ -1000,6 +1262,11 @@ func (m *MemDurableEventStore) ListPending(_ context.Context, createdBefore time return result, nil } +func (m *MemDurableEventStore) MarkFailedBatch(_ context.Context, _ string, _ []int64) error { + // In-memory store has no persistent failure state; events remain pending for retransmit. + return nil +} + func (m *MemDurableEventStore) DeleteExpired(_ context.Context, ttl time.Duration) (int64, error) { m.mu.Lock() defer m.mu.Unlock() diff --git a/pkg/beholder/durable_event_store.go b/pkg/beholder/durable_event_store.go index cba937c657..61c2c10249 100644 --- a/pkg/beholder/durable_event_store.go +++ b/pkg/beholder/durable_event_store.go @@ -63,6 +63,10 @@ type DurableEventStore interface { ListPending(ctx context.Context, createdBefore time.Time, limit int) ([]DurableEvent, error) // DeleteExpired removes events older than ttl and returns the count deleted. DeleteExpired(ctx context.Context, ttl time.Duration) (int64, error) + // MarkFailedBatch records a delivery failure for a batch of events, annotating + // each row with the provided errorMessage. Failed events remain pending so the + // retransmit loop can attempt redelivery. + MarkFailedBatch(ctx context.Context, errorMessage string, ids []int64) error } // metricsInstrumentedStore wraps DurableEventStore to record store operation metrics. @@ -72,6 +76,7 @@ type metricsInstrumentedStore struct { } var _ DurableEventStore = (*metricsInstrumentedStore)(nil) + var _ DurableQueueObserver = (*metricsInstrumentedStore)(nil) func newMetricsInstrumentedStore(inner DurableEventStore, m *durableEmitterMetrics) DurableEventStore { @@ -148,3 +153,10 @@ func (s *metricsInstrumentedStore) InsertBatch(ctx context.Context, payloads [][ s.m.recordStoreOp(ctx, "insert_batch", time.Since(t0), err) return ids, err } + +func (s *metricsInstrumentedStore) MarkFailedBatch(ctx context.Context, errorMessage string, ids []int64) error { + t0 := time.Now() + err := s.inner.MarkFailedBatch(ctx, errorMessage, ids) + s.m.recordStoreOp(ctx, "mark_failed_batch", time.Since(t0), err) + return err +}