Skip to content

Commit 56a1191

Browse files
committed
batch emit options
1 parent efed00d commit 56a1191

10 files changed

Lines changed: 98 additions & 24 deletions

File tree

pkg/beholder/beholdertest/beholder.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515

1616
"github.com/smartcontractkit/chainlink-common/pkg/beholder"
1717
"github.com/smartcontractkit/chainlink-common/pkg/beholder/pb"
18+
"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
1819
)
1920

2021
const (
@@ -192,3 +193,15 @@ func (e *assertMessageEmitter) EmitMessage(_ context.Context, msg beholder.Messa
192193

193194
return nil
194195
}
196+
197+
func (e *assertMessageEmitter) BatchEmit(_ context.Context, messages []beholder.Message, _ ...beholder.BatchEmitOption) ([]*chipingress.PublishResult, error) {
198+
e.t.Helper()
199+
200+
e.mu.Lock()
201+
defer e.mu.Unlock()
202+
203+
e.msgs = append(e.msgs, messages...)
204+
205+
return nil, nil
206+
}
207+

pkg/beholder/chip_ingress_emitter.go

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,15 @@ import (
66
"maps"
77

88
"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
9+
"google.golang.org/protobuf/proto"
910
)
1011

1112
type ChipIngressEmitter struct {
1213
client chipingress.Client
1314
}
1415

16+
var _ Emitter = (*ChipIngressEmitter)(nil)
17+
1518
func NewChipIngressEmitter(client chipingress.Client) (Emitter, error) {
1619

1720
if client == nil {
@@ -26,36 +29,52 @@ func (c *ChipIngressEmitter) Close() error {
2629
}
2730

2831
func (c *ChipIngressEmitter) Emit(ctx context.Context, body []byte, attrKVs ...any) error {
29-
return c.BatchEmit(ctx, NewMessage(body, attrKVs...))
32+
_, err := c.BatchEmit(ctx, []Message{
33+
NewMessage(body, attrKVs...),
34+
})
35+
return err
3036
}
3137

32-
func (c *ChipIngressEmitter) BatchEmit(ctx context.Context, messages ...Message) error {
38+
func (c *ChipIngressEmitter) BatchEmit(ctx context.Context, messages []Message, options ...BatchEmitOption) ([]*chipingress.PublishResult, error) {
39+
emitOpts := DefaultBatchEmitOptions
40+
for _, opt := range options {
41+
opt(&emitOpts)
42+
}
43+
3344
events := make([]chipingress.CloudEvent, len(messages))
3445
for i, msg := range messages {
3546
sourceDomain, entityType, err := ExtractSourceAndType(msg.Attrs)
3647
if err != nil {
37-
return err
48+
return nil, err
3849
}
3950

4051
event, err := chipingress.NewEvent(sourceDomain, entityType, msg.Body, msg.Attrs)
4152
if err != nil {
42-
return err
53+
return nil, err
4354
}
4455

4556
events[i] = event
4657
}
4758

4859
eventPb, err := chipingress.EventsToBatch(events)
4960
if err != nil {
50-
return fmt.Errorf("failed to convert event to proto: %w", err)
61+
return nil, fmt.Errorf("failed to convert event to proto: %w", err)
5162
}
5263

53-
_, err = c.client.PublishBatch(ctx, eventPb)
64+
eventPb.Options = &chipingress.PublishOptions{
65+
AllOrNothing: proto.Bool(emitOpts.AllOrNothing),
66+
}
67+
68+
response, err := c.client.PublishBatch(ctx, eventPb)
5469
if err != nil {
55-
return err
70+
return nil, err
71+
}
72+
73+
if response == nil {
74+
return nil, nil
5675
}
5776

58-
return nil
77+
return response.Results, nil
5978
}
6079

6180
// ExtractSourceAndType extracts source domain and entity from the attributes

pkg/beholder/chip_ingress_emitter_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func TestChipIngressEmit(t *testing.T) {
4343
clientMock := mocks.NewClient(t)
4444

4545
clientMock.
46-
On("Publish", mock.Anything, mock.Anything).
46+
On("PublishBatch", mock.Anything, mock.Anything).
4747
Return(nil, nil)
4848

4949
emitter, err := beholder.NewChipIngressEmitter(clientMock)
@@ -69,7 +69,7 @@ func TestChipIngressEmit(t *testing.T) {
6969
clientMock := mocks.NewClient(t)
7070

7171
clientMock.
72-
On("Publish", mock.Anything, mock.Anything).
72+
On("PublishBatch", mock.Anything, mock.Anything).
7373
Return(nil, assert.AnError)
7474

7575
emitter, err := beholder.NewChipIngressEmitter(clientMock)

pkg/beholder/client.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,26 @@ import (
3030

3131
const defaultGRPCCompressor = "gzip"
3232

33+
type BatchEmitOptions struct {
34+
AllOrNothing bool
35+
}
36+
37+
var DefaultBatchEmitOptions = BatchEmitOptions{
38+
AllOrNothing: true,
39+
}
40+
41+
type BatchEmitOption = func(*BatchEmitOptions)
42+
43+
func WithAllOrNothing(v bool) BatchEmitOption {
44+
return func(o *BatchEmitOptions) {
45+
o.AllOrNothing = v
46+
}
47+
}
48+
3349
type Emitter interface {
3450
// Emit Sends message with bytes and attributes to OTel Collector
3551
Emit(ctx context.Context, body []byte, attrKVs ...any) error
36-
BatchEmit(ctx context.Context, messages ...Message) error
52+
BatchEmit(ctx context.Context, messages []Message, options ...BatchEmitOption) ([]*chipingress.PublishResult, error)
3753
io.Closer
3854
}
3955

pkg/beholder/dual_source_emitter.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"sync/atomic"
88

9+
"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
910
"github.com/smartcontractkit/chainlink-common/pkg/logger"
1011
"github.com/smartcontractkit/chainlink-common/pkg/services"
1112
)
@@ -56,31 +57,34 @@ func (d *DualSourceEmitter) Close() error {
5657
}
5758

5859
func (d *DualSourceEmitter) Emit(ctx context.Context, body []byte, attrKVs ...any) error {
59-
return d.BatchEmit(ctx, NewMessage(body, attrKVs...))
60+
_, err := d.BatchEmit(ctx, []Message{
61+
NewMessage(body, attrKVs...),
62+
})
63+
return err
6064
}
6165

62-
func (d *DualSourceEmitter) BatchEmit(ctx context.Context, messages ...Message) error {
66+
func (d *DualSourceEmitter) BatchEmit(ctx context.Context, messages []Message, options ...BatchEmitOption) ([]*chipingress.PublishResult, error) {
6367
// Emit via OTLP first
64-
if err := d.otelCollectorEmitter.BatchEmit(ctx, messages...); err != nil {
65-
return err
68+
if _, err := d.otelCollectorEmitter.BatchEmit(ctx, messages, options...); err != nil {
69+
return nil, err
6670
}
6771

6872
// Emit via chip ingress async
6973
if err := d.wg.TryAdd(1); err != nil {
70-
return err
74+
return nil, err
7175
}
7276
go func(ctx context.Context) {
7377
defer d.wg.Done()
7478
var cancel context.CancelFunc
7579
ctx, cancel = d.stopCh.Ctx(ctx)
7680
defer cancel()
7781

78-
if err := d.chipIngressEmitter.BatchEmit(ctx, messages...); err != nil {
82+
if _, err := d.chipIngressEmitter.BatchEmit(ctx, messages, options...); err != nil {
7983
// If the chip ingress emitter fails, we ONLY log the error
8084
// because we still want to send the data to the OTLP collector and not cause disruption
8185
d.log.Infof("failed to emit to chip ingress: %v", err)
8286
}
8387
}(context.WithoutCancel(ctx))
8488

85-
return nil
89+
return nil, nil
8690
}

pkg/beholder/dual_source_emitter_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/stretchr/testify/require"
1010

1111
"github.com/smartcontractkit/chainlink-common/pkg/beholder"
12+
"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
1213
)
1314

1415
func TestNewDualSourceEmitter(t *testing.T) {
@@ -89,3 +90,15 @@ func (m *mockEmitter) Emit(ctx context.Context, body []byte, attrKVs ...any) err
8990
}
9091
return nil
9192
}
93+
94+
func (m *mockEmitter) BatchEmit(ctx context.Context, messages []beholder.Message, _ ...beholder.BatchEmitOption) ([]*chipingress.PublishResult, error) {
95+
if m.emitFunc != nil {
96+
for _, msg := range messages {
97+
if err := m.emitFunc(ctx, msg.Body); err != nil {
98+
return nil, err
99+
}
100+
}
101+
}
102+
return nil, nil
103+
}
104+

pkg/beholder/message.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,10 @@ func newAttributes(attrKVs ...any) Attributes {
8484
case Attributes:
8585
maps.Copy(a, t)
8686
i++
87+
case []any:
88+
// Treat a []any element as if its contents were passed directly.
89+
maps.Copy(a, newAttributes(t...))
90+
i++
8791
case string:
8892
if i+1 >= l {
8993
break

pkg/beholder/message_emitter.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package beholder
33
import (
44
"context"
55

6+
"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
67
otellog "go.opentelemetry.io/otel/log"
78
)
89

@@ -22,15 +23,18 @@ func (e messageEmitter) Close() error { return nil }
2223
// Emits logs the message, but does not wait for the message to be processed.
2324
// Open question: what are pros/cons for using use map[]any vs use otellog.KeyValue
2425
func (e messageEmitter) Emit(ctx context.Context, body []byte, attrKVs ...any) error {
25-
return e.BatchEmit(ctx, NewMessage(body, attrKVs...))
26+
_, err := e.BatchEmit(ctx, []Message{
27+
NewMessage(body, attrKVs...),
28+
})
29+
return err
2630
}
2731

28-
func (e messageEmitter) BatchEmit(ctx context.Context, messages ...Message) error {
32+
func (e messageEmitter) BatchEmit(ctx context.Context, messages []Message, options ...BatchEmitOption) ([]*chipingress.PublishResult, error) {
2933
for _, message := range messages {
3034
if err := message.Validate(); err != nil {
31-
return err
35+
return nil, err
3236
}
3337
e.messageLogger.Emit(ctx, message.OtelRecord())
3438
}
35-
return nil
39+
return nil, nil
3640
}

pkg/beholder/noop.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,8 @@ func (noopMessageEmitter) Emit(ctx context.Context, body []byte, attrKVs ...any)
109109
return nil
110110
}
111111

112-
func (noopMessageEmitter) BatchEmit(ctx context.Context, messages ...Message) error {
113-
return nil
112+
func (noopMessageEmitter) BatchEmit(ctx context.Context, messages []Message, options ...BatchEmitOption) ([]*chipingress.PublishResult, error) {
113+
return nil, nil
114114
}
115115

116116
func (noopMessageEmitter) EmitMessage(ctx context.Context, message Message) error {

pkg/chipingress/types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ type (
2222
PingResponse = pb.PingResponse
2323
PublishResponse = pb.PublishResponse
2424
PublishResult = pb.PublishResult
25+
PublishOptions = pb.PublishOptions
2526
StreamEventsRequest = pb.StreamEventsRequest
2627
StreamEventsResponse = pb.StreamEventsResponse
2728
)

0 commit comments

Comments
 (0)