Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ require (
sigs.k8s.io/yaml v1.4.0
)

replace github.com/smartcontractkit/chainlink-common/pkg/chipingress => ./pkg/chipingress
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

waiting for #2031 to be merged

Comment thread
tarcisiozf marked this conversation as resolved.

require (
github.com/apache/arrow-go/v18 v18.3.1 // indirect
github.com/aybabtme/rgbterm v0.0.0-20170906152045-cc83f3b3ce59 // indirect
Expand Down
2 changes: 0 additions & 2 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

56 changes: 49 additions & 7 deletions pkg/beholder/durable_emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,10 +259,6 @@ func NewDurableEmitter(
metrics: m,
stopCh: make(chan struct{}),
}
Comment thread
tarcisiozf marked this conversation as resolved.
if cp, ok := client.(grpcConnProvider); ok {
d.rawConn = cp.Conn()
log.Infow("DurableEmitter: raw-codec batch publishing enabled (zero-copy protowire)")
}
if cfg.InsertBatchSize > 0 {
if bi, ok := store.(BatchInserter); ok {
d.batchInserter = bi
Expand Down Expand Up @@ -757,6 +753,7 @@ func (d *DurableEmitter) flushBatchRaw(ctx context.Context, batch []publishWork)
// standard gRPC client. Used as fallback when rawConn is not available.
func (d *DurableEmitter) flushBatchTyped(ctx context.Context, batch []publishWork) error {
events := make([]*chipingress.CloudEventPb, len(batch))
eventIDtoQueueID := make(map[string]int64, len(batch))
for i, w := range batch {
if w.event != nil {
events[i] = w.event
Expand All @@ -767,10 +764,55 @@ func (d *DurableEmitter) flushBatchTyped(ctx context.Context, batch []publishWor
}
events[i] = ev
}
eventIDtoQueueID[events[i].Id] = w.id
}

batchPb := &chipingress.CloudEventBatch{
Events: events,
Options: &chipingress.PublishOptions{
AllowPartialSuccess: new(true),
Comment thread
tarcisiozf marked this conversation as resolved.
},
}
response, err := d.client.PublishBatch(ctx, batchPb)
if err != nil {
return fmt.Errorf("PublishBatch RPC failed: %w", err)
}

groupByError := make(map[string][]int64)
for _, result := range response.Results {
Comment thread
tarcisiozf marked this conversation as resolved.
if result.Error == nil {
continue
}
msg := fmt.Sprintf("%s (code %d) - %s", result.Error.Message, result.Error.Code, result.Error.Details)
id, exists := eventIDtoQueueID[result.EventId]
if !exists {
d.log.Warnw("PublishBatch returned invalid event", "event_id", result.EventId)
continue
}
groupByError[msg] = append(groupByError[msg], id)
}
if len(groupByError) > 0 {
if err := d.reportPartialFailures(ctx, groupByError); err != nil {
return fmt.Errorf("publish errors reporting failed: %w", err)
}
}
batchPb := &chipingress.CloudEventBatch{Events: events}
_, err := d.client.PublishBatch(ctx, batchPb)
return err

return nil
}

func (d *DurableEmitter) reportPartialFailures(ctx context.Context, errGroups map[string][]int64) error {
var errList []error

for msg, ids := range errGroups {
if err := d.store.MarkFailedBatch(ctx, msg, ids); err != nil {
errList = append(errList, fmt.Errorf("mark failed batch for message %q ids %v: %w", msg, ids, err))
}
}

if len(errList) > 0 {
return errors.Join(errList...)
}
return nil
}

func (d *DurableEmitter) retransmitLoop() {
Expand Down
267 changes: 267 additions & 0 deletions pkg/beholder/durable_emitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/stretchr/testify/require"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
statuspb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
Expand Down Expand Up @@ -905,6 +906,267 @@ func TestIntegration_GRPCConnection(t *testing.T) {
assert.Equal(t, "test-entity", received.Type)
}

// ---------- Partial-success helpers ----------

// partialSuccessClient is a chipingress.Client whose PublishBatch delegates result
// construction to a caller-supplied function so tests can return per-event errors
// without a real gRPC server.
type partialSuccessClient struct {
chipingress.NoopClient
batchCount atomic.Int64
mu sync.Mutex
// resultFn is called with the batch events and returns the Results slice to
// embed in the PublishResponse. Nil means an empty (fully-successful) response.
resultFn func(events []*chipingress.CloudEventPb) []*chipingress.PublishResult
}

func (c *partialSuccessClient) PublishBatch(_ context.Context, b *chipingress.CloudEventBatch, _ ...grpc.CallOption) (*chipingress.PublishResponse, error) {
c.batchCount.Add(1)
c.mu.Lock()
fn := c.resultFn
c.mu.Unlock()
resp := &chipingress.PublishResponse{}
if fn != nil && b != nil {
resp.Results = fn(b.Events)
}
return resp, nil
}

// trackingMarkFailedStore wraps MemDurableEventStore and records every
// MarkFailedBatch call. Setting failMarkErr makes MarkFailedBatch return that
// error, which causes flushBatchTyped to propagate the error back to flushBatch,
// which then skips MarkDeliveredBatch — leaving all events pending for retransmit.
type trackingMarkFailedStore struct {
*MemDurableEventStore
mu sync.Mutex
failedCalls []markFailedCall
failMarkErr error
}

type markFailedCall struct {
msg string
ids []int64
}

func newTrackingMarkFailedStore(failMarkErr error) *trackingMarkFailedStore {
return &trackingMarkFailedStore{
MemDurableEventStore: NewMemDurableEventStore(),
failMarkErr: failMarkErr,
}
}

func (s *trackingMarkFailedStore) MarkFailedBatch(_ context.Context, msg string, ids []int64) error {
s.mu.Lock()
defer s.mu.Unlock()
cp := make([]int64, len(ids))
copy(cp, ids)
s.failedCalls = append(s.failedCalls, markFailedCall{msg: msg, ids: cp})
return s.failMarkErr
}

func (s *trackingMarkFailedStore) getFailedCalls() []markFailedCall {
s.mu.Lock()
defer s.mu.Unlock()
out := make([]markFailedCall, len(s.failedCalls))
copy(out, s.failedCalls)
return out
}

// ---------- Partial-success tests ----------

// TestDurableEmitter_PartialSuccess_CallsMarkFailedBatch asserts that when
// PublishBatch returns a per-event error result, MarkFailedBatch is called with
// the corresponding queue ID. After successful reporting, MarkDeliveredBatch is
// still called for all IDs, so the store drains to zero (the at-most-once-per-epoch
// semantics are preserved; retransmit is not needed because the reporting succeeded).
func TestDurableEmitter_PartialSuccess_CallsMarkFailedBatch(t *testing.T) {
store := newTrackingMarkFailedStore(nil)
client := &partialSuccessClient{
resultFn: func(events []*chipingress.CloudEventPb) []*chipingress.PublishResult {
results := make([]*chipingress.PublishResult, 0, len(events))
for _, ev := range events {
if ev == nil {
continue
}
results = append(results, &chipingress.PublishResult{
EventId: ev.Id,
Error: &statuspb.Status{
Code: int32(codes.InvalidArgument),
Message: "schema validation failed",
},
})
}
return results
},
}

cfg := DefaultDurableEmitterConfig()
cfg.DisablePruning = true
cfg.RetransmitInterval = time.Hour // disable retransmit so nothing auto-clears
em := newTestDurableEmitter(t, store, client, &cfg)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
em.Start(ctx)
defer em.Close()

require.NoError(t, em.Emit(ctx, []byte("partial-fail"), testEmitAttrs()...))

// After flushBatch: MarkFailedBatch called for the error result, then
// MarkDeliveredBatch called for all IDs → store empties.
require.Eventually(t, func() bool {
return store.Len() == 0
}, 2*time.Second, 10*time.Millisecond, "event should be removed after MarkDeliveredBatch even on partial failure")

calls := store.getFailedCalls()
require.Len(t, calls, 1, "MarkFailedBatch must be called once for the failed result")
require.Len(t, calls[0].ids, 1, "MarkFailedBatch must receive the failing event's queue ID")
assert.Contains(t, calls[0].msg, "schema validation failed", "MarkFailedBatch message must contain the server error")
}

// TestDurableEmitter_PartialSuccess_EmptyErrorMessage verifies the partial-failure
// path handles a PublishResult whose Error.Message is the empty string (the format
// string still produces a non-empty key, so MarkFailedBatch must still be called).
func TestDurableEmitter_PartialSuccess_EmptyErrorMessage(t *testing.T) {
store := newTrackingMarkFailedStore(nil)
client := &partialSuccessClient{
resultFn: func(events []*chipingress.CloudEventPb) []*chipingress.PublishResult {
results := make([]*chipingress.PublishResult, 0, len(events))
for _, ev := range events {
if ev == nil {
continue
}
results = append(results, &chipingress.PublishResult{
EventId: ev.Id,
Error: &statuspb.Status{
Code: int32(codes.Internal),
Message: "", // deliberately empty
},
})
}
return results
},
}

cfg := DefaultDurableEmitterConfig()
cfg.DisablePruning = true
cfg.RetransmitInterval = time.Hour
em := newTestDurableEmitter(t, store, client, &cfg)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
em.Start(ctx)
defer em.Close()

require.NoError(t, em.Emit(ctx, []byte("empty-msg-fail"), testEmitAttrs()...))

require.Eventually(t, func() bool {
return store.Len() == 0
}, 2*time.Second, 10*time.Millisecond, "event should be removed after MarkDeliveredBatch")

calls := store.getFailedCalls()
require.Len(t, calls, 1, "MarkFailedBatch must be called even when Error.Message is empty")
require.Len(t, calls[0].ids, 1)
// The formatted key is " (code 13) - []" — non-empty, so grouped correctly.
assert.Contains(t, calls[0].msg, "code 13", "message should embed the grpc code even when text is empty")
}

// TestDurableEmitter_PartialSuccess_MarkFailedBatchStoreError_EventRemainsForRetransmit
// covers the case where MarkFailedBatch itself returns an error. In that scenario
// reportPartialFailures propagates the error back through flushBatchTyped →
// flushBatch, which logs a warning and returns without calling MarkDeliveredBatch,
// leaving all events pending so the retransmit loop can redeliver them.
func TestDurableEmitter_PartialSuccess_MarkFailedBatchStoreError_EventRemainsForRetransmit(t *testing.T) {
storeErr := errors.New("db write failed")
store := newTrackingMarkFailedStore(storeErr)
client := &partialSuccessClient{
resultFn: func(events []*chipingress.CloudEventPb) []*chipingress.PublishResult {
results := make([]*chipingress.PublishResult, 0, len(events))
for _, ev := range events {
if ev == nil {
continue
}
results = append(results, &chipingress.PublishResult{
EventId: ev.Id,
Error: &statuspb.Status{
Code: int32(codes.Unavailable),
Message: "downstream unavailable",
},
})
}
return results
},
}

cfg := DefaultDurableEmitterConfig()
cfg.DisablePruning = true
cfg.RetransmitInterval = time.Hour // prevent retransmit from confusing assertions
em := newTestDurableEmitter(t, store, client, &cfg)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
em.Start(ctx)
defer em.Close()

require.NoError(t, em.Emit(ctx, []byte("store-err"), testEmitAttrs()...))

// Wait until the batch publish attempt has run.
require.Eventually(t, func() bool {
return client.batchCount.Load() >= 1
}, 2*time.Second, 10*time.Millisecond, "PublishBatch must be called at least once")

// MarkFailedBatch returned an error → flushBatchTyped returned an error →
// MarkDeliveredBatch was never called → event remains in the store.
assert.Equal(t, 1, store.Len(), "event must remain pending when MarkFailedBatch fails")

calls := store.getFailedCalls()
require.Len(t, calls, 1, "MarkFailedBatch must have been attempted")
require.Len(t, calls[0].ids, 1)
}

// TestDurableEmitter_PartialSuccess_UnknownEventID_LoggedAndSkipped verifies that
// a PublishResult referencing an event ID absent from the published batch is silently
// skipped (logged as a warning). The remaining batch events are still marked
// delivered and removed from the store.
func TestDurableEmitter_PartialSuccess_UnknownEventID_LoggedAndSkipped(t *testing.T) {
store := newTrackingMarkFailedStore(nil)
client := &partialSuccessClient{
resultFn: func(_ []*chipingress.CloudEventPb) []*chipingress.PublishResult {
// Return an error result for an ID that was not in the batch.
return []*chipingress.PublishResult{
{
EventId: "non-existent-event-id-xyz",
Error: &statuspb.Status{
Code: int32(codes.NotFound),
Message: "event not found",
},
},
}
},
}

cfg := DefaultDurableEmitterConfig()
cfg.DisablePruning = true
cfg.RetransmitInterval = time.Hour
em := newTestDurableEmitter(t, store, client, &cfg)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
em.Start(ctx)
defer em.Close()

require.NoError(t, em.Emit(ctx, []byte("unknown-id"), testEmitAttrs()...))

// Unknown result ID → not added to groupByError → MarkFailedBatch not called →
// MarkDeliveredBatch called for all IDs → store empties.
require.Eventually(t, func() bool {
return store.Len() == 0
}, 2*time.Second, 10*time.Millisecond, "event should still be delivered when result has unknown event ID")

calls := store.getFailedCalls()
assert.Empty(t, calls, "MarkFailedBatch must NOT be called when all error results have unknown event IDs")
}

// MemDurableEventStore is an in-memory DurableEventStore for unit tests.
type MemDurableEventStore struct {
mu sync.Mutex
Expand Down Expand Up @@ -1000,6 +1262,11 @@ func (m *MemDurableEventStore) ListPending(_ context.Context, createdBefore time
return result, nil
}

func (m *MemDurableEventStore) MarkFailedBatch(_ context.Context, _ string, _ []int64) error {
// In-memory store has no persistent failure state; events remain pending for retransmit.
return nil
}

func (m *MemDurableEventStore) DeleteExpired(_ context.Context, ttl time.Duration) (int64, error) {
m.mu.Lock()
defer m.mu.Unlock()
Expand Down
Loading
Loading