From fddcb86b64821c6fca8f682067310678662cd563 Mon Sep 17 00:00:00 2001 From: Pavel <177363085+pkcll@users.noreply.github.com> Date: Thu, 14 May 2026 17:36:47 -0400 Subject: [PATCH 1/4] pkg/chipingress/batch: add OTel metrics, fix shutdown, harden tests Add observability metrics to the batch client using OpenTelemetry: - send_requests_total (counter with status=success|failure attribute) - request_size_messages (histogram with batch_size attribute) - request_size_bytes (histogram with max_grpc_request_size_bytes attribute) - request_latency_ms (histogram with status attribute) - config.info (gauge recording batch configuration at startup) Shutdown improvements: - Close the underlying chipingress.Client in Stop() - Use a standalone timeout context for the shutdown drain so it is not cancelled prematurely by close(stopCh) - Remove closeOnce guard from client.Close (shutdownOnce already serialises) Bug fixes: - Pass caller-provided ctx to recordConfig instead of context.Background() - Remove redundant send_failures_total counter; send_requests_total with its status label already captures failure counts - Remove misleading comment on maxGRPCRequestSize default (10MB matches the chip-ingress server MaxRecvMsgSize, not the client-side 16MB maxMessageSize used for MaxCallRecvMsgSize) Test improvements: - Replace nil client in 17 test call sites with mocks.NewClient(t); nil is not a valid chipingress.Client and would panic on Stop() - Add WithMaxGRPCRequestSize option and oversize-event metrics test - Add config.info gauge assertion --- pkg/chipingress/batch/client.go | 267 ++++++++++-- pkg/chipingress/batch/client_test.go | 607 ++++++++++++++++++++++++++- pkg/chipingress/client_test.go | 10 + pkg/chipingress/go.mod | 4 +- 4 files changed, 832 insertions(+), 56 deletions(-) diff --git a/pkg/chipingress/batch/client.go b/pkg/chipingress/batch/client.go index e02704b26b..e585682d12 100644 --- a/pkg/chipingress/batch/client.go +++ b/pkg/chipingress/batch/client.go @@ -3,12 +3,16 @@ package batch import ( "context" "errors" + "fmt" "strconv" "sync" "sync/atomic" "time" cepb "github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + otelmetric "go.opentelemetry.io/otel/metric" "go.uber.org/zap" "google.golang.org/protobuf/proto" @@ -29,6 +33,7 @@ type seqnumKey struct { type Client struct { client chipingress.Client batchSize int + maxGRPCRequestSize int cloneEvent bool maxConcurrentSends chan struct{} batchInterval time.Duration @@ -42,6 +47,20 @@ type Client struct { batcherDone chan struct{} cancelBatcher context.CancelFunc counters sync.Map // map[seqnumKey]*atomic.Uint64 for per-(source,type) seqnum, cleared on Stop() + + metrics batchClientMetrics +} + +type batchClientMetrics struct { + sendRequestsTotal otelmetric.Int64Counter + requestSizeMessages otelmetric.Int64Histogram + requestSizeBytes otelmetric.Int64Histogram + requestLatencyMS otelmetric.Float64Histogram + configInfo otelmetric.Int64Gauge + batchSizeAttr otelmetric.MeasurementOption + maxGRPCReqSizeAttr otelmetric.MeasurementOption + successStatusAttr otelmetric.MeasurementOption + failureStatusAttr otelmetric.MeasurementOption } // Opt is a functional option for configuring the batch Client. @@ -53,6 +72,7 @@ func NewBatchClient(client chipingress.Client, opts ...Opt) (*Client, error) { client: client, log: zap.NewNop().Sugar(), batchSize: 10, + maxGRPCRequestSize: 10 * 1024 * 1024, cloneEvent: true, maxConcurrentSends: make(chan struct{}, 1), messageBuffer: make(chan *messageWithCallback, 200), @@ -68,11 +88,19 @@ func NewBatchClient(client chipingress.Client, opts ...Opt) (*Client, error) { opt(c) } + var err error + c.metrics, err = newBatchClientMetrics() + if err != nil { + return nil, err + } + return c, nil } // Start begins processing messages from the queue and sending them in batches func (b *Client) Start(ctx context.Context) { + b.metrics.recordConfig(ctx, b) + // Create a cancellable context for the batcher batcherCtx, cancel := context.WithCancel(ctx) b.cancelBatcher = cancel @@ -110,34 +138,45 @@ func (b *Client) Start(ctx context.Context) { // Forcibly shutdowns down after timeout if not completed. func (b *Client) Stop() { b.shutdownOnce.Do(func() { - ctx, cancel := b.stopCh.CtxWithTimeout(b.shutdownTimeout) + // Use a standalone timeout context so the shutdown wait isn't cancelled + // by close(b.stopCh) below. + ctx, cancel := context.WithTimeout(context.Background(), b.shutdownTimeout) defer cancel() - if b.cancelBatcher != nil { + started := b.cancelBatcher != nil + if started { b.cancelBatcher() } close(b.stopCh) - done := make(chan struct{}) - go func() { - <-b.batcherDone - for range cap(b.maxConcurrentSends) { - b.maxConcurrentSends <- struct{}{} - } - // wait for all callbacks to complete - b.callbackWg.Wait() - close(done) - }() + // Only wait for the batcher goroutine when Start() was called; + // otherwise batcherDone is never closed and we'd block until timeout. + if started { + done := make(chan struct{}) + go func() { + <-b.batcherDone + for range cap(b.maxConcurrentSends) { + b.maxConcurrentSends <- struct{}{} + } + // wait for all callbacks to complete + b.callbackWg.Wait() + close(done) + }() - select { - case <-done: - // All successfully shutdown - case <-ctx.Done(): // timeout or context cancelled - b.log.Warnw("timed out waiting for shutdown to finish, force closing", "timeout", b.shutdownTimeout) + select { + case <-done: + // All successfully shutdown + case <-ctx.Done(): // timeout or context cancelled + b.log.Warnw("timed out waiting for shutdown to finish, force closing", "timeout", b.shutdownTimeout) + } } // Release per-stream seqnum state to avoid unbounded growth from high-cardinality source/type values. b.clearCounters() + + if err := b.client.Close(); err != nil { + b.log.Warnw("failed to close chip ingress client", "error", err) + } }) } @@ -222,30 +261,80 @@ func (b *Client) sendBatch(ctx context.Context, messages []*messageWithCallback) go func() { defer func() { <-b.maxConcurrentSends }() - // this is specifically to prevent long running network calls - ctxTimeout, cancel := context.WithTimeout(ctx, b.maxPublishTimeout) - defer cancel() - events := make([]*chipingress.CloudEventPb, len(messages)) - for i, msg := range messages { - events[i] = msg.event - } - _, err := b.client.PublishBatch(ctxTimeout, &chipingress.CloudEventBatch{Events: events}) - if err != nil { - b.log.Errorw("failed to publish batch", "error", err) - } - // the callbacks are placed in their own goroutine to not block releasing the semaphore - // we use a wait group, to ensure all callbacks are completed if .Stop() is called. - b.callbackWg.Go(func() { - for _, msg := range messages { - if msg.callback != nil { - msg.callback(err) - } + for _, batchMessages := range splitMessagesByRequestSize(messages, b.maxGRPCRequestSize) { + batchReq, batchBytes := newBatchRequest(batchMessages) + if b.maxGRPCRequestSize > 0 && batchBytes > b.maxGRPCRequestSize { + err := fmt.Errorf("publish batch serialized size %d exceeds max gRPC request size %d", batchBytes, b.maxGRPCRequestSize) + b.metrics.recordSend(context.Background(), len(batchMessages), batchBytes, 0, false) + b.log.Errorw("failed to publish batch", "error", err) + b.completeBatchCallbacks(batchMessages, err) + continue + } + + // this is specifically to prevent long running network calls + ctxTimeout, cancel := context.WithTimeout(ctx, b.maxPublishTimeout) + startedAt := time.Now() + _, err := b.client.PublishBatch(ctxTimeout, batchReq) + cancel() + + b.metrics.recordSend(context.Background(), len(batchMessages), batchBytes, time.Since(startedAt), err == nil) + if err != nil { + b.log.Errorw("failed to publish batch", "error", err) } - }) + b.completeBatchCallbacks(batchMessages, err) + } }() } +func (b *Client) completeBatchCallbacks(messages []*messageWithCallback, err error) { + callbackMessages, callbackErr := messages, err + // the callbacks are placed in their own goroutine to not block releasing the semaphore + // we use a wait group, to ensure all callbacks are completed if .Stop() is called. + b.callbackWg.Go(func() { + for _, msg := range callbackMessages { + if msg.callback != nil { + msg.callback(callbackErr) + } + } + }) +} + +func splitMessagesByRequestSize(messages []*messageWithCallback, maxRequestSize int) [][]*messageWithCallback { + if len(messages) == 0 { + return nil + } + if maxRequestSize <= 0 { + return [][]*messageWithCallback{messages} + } + + var batches [][]*messageWithCallback + current := make([]*messageWithCallback, 0, len(messages)) + for _, msg := range messages { + candidate := append(current, msg) + _, candidateBytes := newBatchRequest(candidate) + if len(current) > 0 && candidateBytes > maxRequestSize { + batches = append(batches, current) + current = []*messageWithCallback{msg} + continue + } + current = candidate + } + if len(current) > 0 { + batches = append(batches, current) + } + return batches +} + +func newBatchRequest(messages []*messageWithCallback) (*chipingress.CloudEventBatch, int) { + events := make([]*chipingress.CloudEventPb, len(messages)) + for i, msg := range messages { + events[i] = msg.event + } + batchReq := &chipingress.CloudEventBatch{Events: events} + return batchReq, proto.Size(batchReq) +} + // WithBatchSize sets the number of messages to accumulate before sending a batch func WithBatchSize(batchSize int) Opt { return func(c *Client) { @@ -253,6 +342,13 @@ func WithBatchSize(batchSize int) Opt { } } +// WithMaxGRPCRequestSize sets the max gRPC request size in bytes used for metric comparison attributes. +func WithMaxGRPCRequestSize(maxReqSize int) Opt { + return func(c *Client) { + c.maxGRPCRequestSize = maxReqSize + } +} + // WithEventClone controls whether QueueMessage clones events before stamping seqnum and buffering. // Defaults to true for safety when caller reuses event pointers. func WithEventClone(clone bool) Opt { @@ -302,3 +398,102 @@ func WithLogger(log *zap.SugaredLogger) Opt { c.log = log } } + +func newBatchClientMetrics() (batchClientMetrics, error) { + meter := otel.Meter("chipingress/batch_client") + sendRequestsTotal, err := meter.Int64Counter( + "chip_ingress.batch.send_requests_total", + otelmetric.WithDescription("Total PublishBatch requests sent by batch client"), + otelmetric.WithUnit("{request}"), + ) + if err != nil { + return batchClientMetrics{}, err + } + requestSizeMessages, err := meter.Int64Histogram( + "chip_ingress.batch.request_size_messages", + otelmetric.WithDescription("PublishBatch request size measured in number of events"), + otelmetric.WithUnit("{event}"), + ) + if err != nil { + return batchClientMetrics{}, err + } + requestSizeBytes, err := meter.Int64Histogram( + "chip_ingress.batch.request_size_bytes", + otelmetric.WithDescription("PublishBatch request size measured in bytes"), + otelmetric.WithUnit("By"), + ) + if err != nil { + return batchClientMetrics{}, err + } + requestLatencyMS, err := meter.Float64Histogram( + "chip_ingress.batch.request_latency_ms", + otelmetric.WithDescription("PublishBatch end-to-end latency in milliseconds"), + otelmetric.WithUnit("ms"), + ) + if err != nil { + return batchClientMetrics{}, err + } + configInfo, err := meter.Int64Gauge( + "chip_ingress.batch.config.info", + otelmetric.WithDescription("Batch client configuration info metric"), + otelmetric.WithUnit("{info}"), + ) + if err != nil { + return batchClientMetrics{}, err + } + + return batchClientMetrics{ + sendRequestsTotal: sendRequestsTotal, + + requestSizeMessages: requestSizeMessages, + requestSizeBytes: requestSizeBytes, + requestLatencyMS: requestLatencyMS, + configInfo: configInfo, + successStatusAttr: otelmetric.WithAttributeSet(attribute.NewSet( + attribute.String("status", "success"), + )), + failureStatusAttr: otelmetric.WithAttributeSet(attribute.NewSet( + attribute.String("status", "failure"), + )), + }, nil +} + +func (m *batchClientMetrics) recordConfig(ctx context.Context, c *Client) { + m.batchSizeAttr = otelmetric.WithAttributeSet(attribute.NewSet( + attribute.Int("max_batch_size", c.batchSize), + )) + m.maxGRPCReqSizeAttr = otelmetric.WithAttributeSet(attribute.NewSet( + attribute.Int("max_grpc_request_size_bytes", c.maxGRPCRequestSize), + )) + m.configInfo.Record(ctx, 1, otelmetric.WithAttributes( + attribute.Int("max_batch_size", c.batchSize), + attribute.Int("message_buffer_size", cap(c.messageBuffer)), + attribute.Int("max_concurrent_sends", cap(c.maxConcurrentSends)), + attribute.Int64("batch_interval_ms", c.batchInterval.Milliseconds()), + attribute.Int64("max_publish_timeout_ms", c.maxPublishTimeout.Milliseconds()), + attribute.Int64("shutdown_timeout_ms", c.shutdownTimeout.Milliseconds()), + attribute.Bool("clone_event", c.cloneEvent), + attribute.Int("max_grpc_request_size_bytes", c.maxGRPCRequestSize), + )) +} + +func (m *batchClientMetrics) recordSend(ctx context.Context, messageCount int, requestBytes int, latency time.Duration, success bool) { + statusAttr := m.successStatusAttr + if !success { + statusAttr = m.failureStatusAttr + } + m.sendRequestsTotal.Add(ctx, 1, statusAttr) + + messageSizeOpts := []otelmetric.RecordOption{} + if m.batchSizeAttr != nil { + messageSizeOpts = append(messageSizeOpts, m.batchSizeAttr) + } + requestSizeOpts := []otelmetric.RecordOption{} + if m.maxGRPCReqSizeAttr != nil { + requestSizeOpts = append(requestSizeOpts, m.maxGRPCReqSizeAttr) + } + + m.requestSizeMessages.Record(ctx, int64(messageCount), messageSizeOpts...) + m.requestSizeBytes.Record(ctx, int64(requestBytes), requestSizeOpts...) + m.requestLatencyMS.Record(ctx, float64(latency)/float64(time.Millisecond), statusAttr) +} diff --git a/pkg/chipingress/batch/client_test.go b/pkg/chipingress/batch/client_test.go index 7f8c356fb3..4578fc5bb8 100644 --- a/pkg/chipingress/batch/client_test.go +++ b/pkg/chipingress/batch/client_test.go @@ -4,13 +4,20 @@ import ( "context" "sort" "strconv" + "strings" "sync" "testing" "time" + cepb "github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "google.golang.org/protobuf/proto" "github.com/smartcontractkit/chainlink-common/pkg/chipingress" "github.com/smartcontractkit/chainlink-common/pkg/chipingress/mocks" @@ -18,49 +25,93 @@ import ( func TestNewBatchClient(t *testing.T) { t.Run("NewBatchClient", func(t *testing.T) { - client, err := NewBatchClient(nil) + client, err := NewBatchClient(mocks.NewClient(t)) require.NoError(t, err) assert.NotNil(t, client) }) t.Run("WithBatchSize", func(t *testing.T) { - client, err := NewBatchClient(nil, WithBatchSize(100)) + client, err := NewBatchClient(mocks.NewClient(t), WithBatchSize(100)) require.NoError(t, err) assert.Equal(t, 100, client.batchSize) }) t.Run("WithEventClone", func(t *testing.T) { - client, err := NewBatchClient(nil) + client, err := NewBatchClient(mocks.NewClient(t)) require.NoError(t, err) assert.True(t, client.cloneEvent) - client, err = NewBatchClient(nil, WithEventClone(false)) + client, err = NewBatchClient(mocks.NewClient(t), WithEventClone(false)) require.NoError(t, err) assert.False(t, client.cloneEvent) }) t.Run("WithMaxConcurrentSends", func(t *testing.T) { - client, err := NewBatchClient(nil, WithMaxConcurrentSends(10)) + client, err := NewBatchClient(mocks.NewClient(t), WithMaxConcurrentSends(10)) require.NoError(t, err) assert.Equal(t, 10, cap(client.maxConcurrentSends)) }) t.Run("WithBatchInterval", func(t *testing.T) { - client, err := NewBatchClient(nil, WithBatchInterval(100*time.Millisecond)) + client, err := NewBatchClient(mocks.NewClient(t), WithBatchInterval(100*time.Millisecond)) require.NoError(t, err) assert.Equal(t, 100*time.Millisecond, client.batchInterval) }) t.Run("WithMessageBuffer", func(t *testing.T) { - client, err := NewBatchClient(nil, WithMessageBuffer(1000)) + client, err := NewBatchClient(mocks.NewClient(t), WithMessageBuffer(1000)) require.NoError(t, err) assert.Equal(t, 1000, cap(client.messageBuffer)) }) + + t.Run("records failure metrics when request exceeds configured max grpc size", func(t *testing.T) { + reader, restore := useTestMeterProvider(t) + defer restore() + + const maxGRPCSize = 2048 + + mockClient := mocks.NewClient(t) + mockClient.EXPECT().Close().Return(nil).Maybe() + // PublishBatch should NOT be called — the oversized event is rejected before sending. + + client, err := NewBatchClient( + mockClient, + WithBatchSize(1), + WithBatchInterval(time.Second), + WithMessageBuffer(10), + WithMaxGRPCRequestSize(maxGRPCSize), + ) + require.NoError(t, err) + client.Start(t.Context()) + + err = client.QueueMessage(&chipingress.CloudEventPb{ + Id: strings.Repeat("x", maxGRPCSize*2), + Source: "platform", + Type: "MetricOversizeFailure", + }, nil) + require.NoError(t, err) + + // Stop flushes the batch which triggers the oversize rejection path. + client.Stop() + rm := collectResourceMetrics(t, reader) + + reqTotal := mustMetric(t, rm, "chip_ingress.batch.send_requests_total") + reqSum, ok := reqTotal.Data.(metricdata.Sum[int64]) + require.True(t, ok) + failureReq := mustInt64SumPointWithAttr(t, reqSum, "status", "failure") + assert.GreaterOrEqual(t, failureReq.Value, int64(1)) + + reqSize := mustMetric(t, rm, "chip_ingress.batch.request_size_bytes") + reqSizeHist, ok := reqSize.Data.(metricdata.Histogram[int64]) + require.True(t, ok) + reqSizePoint := mustInt64HistogramPointWithIntAttr(t, reqSizeHist, "max_grpc_request_size_bytes", maxGRPCSize) + assert.GreaterOrEqual(t, reqSizePoint.Count, uint64(1)) + }) } func TestQueueMessage(t *testing.T) { t.Run("successfully queues a message", func(t *testing.T) { - client, err := NewBatchClient(nil, WithMessageBuffer(5)) + client, err := NewBatchClient(mocks.NewClient(t), WithMessageBuffer(5)) require.NoError(t, err) event := &chipingress.CloudEventPb{ @@ -81,7 +132,7 @@ func TestQueueMessage(t *testing.T) { }) t.Run("drops message if buffer is full", func(t *testing.T) { - client, err := NewBatchClient(nil, WithMessageBuffer(1)) + client, err := NewBatchClient(mocks.NewClient(t), WithMessageBuffer(1)) require.NoError(t, err) require.NotNil(t, client) @@ -103,7 +154,7 @@ func TestQueueMessage(t *testing.T) { }) t.Run("handles nil event", func(t *testing.T) { - client, err := NewBatchClient(nil, WithMessageBuffer(5)) + client, err := NewBatchClient(mocks.NewClient(t), WithMessageBuffer(5)) require.NoError(t, err) err = client.QueueMessage(nil, nil) @@ -115,6 +166,7 @@ func TestQueueMessage(t *testing.T) { func TestSendBatch(t *testing.T) { t.Run("successfully sends a batch", func(t *testing.T) { mockClient := mocks.NewClient(t) + mockClient.EXPECT().Close().Return(nil).Maybe() done := make(chan struct{}) mockClient. @@ -154,6 +206,7 @@ func TestSendBatch(t *testing.T) { t.Run("doesn't publish empty batch", func(t *testing.T) { mockClient := mocks.NewClient(t) + mockClient.EXPECT().Close().Return(nil).Maybe() client, err := NewBatchClient(mockClient, WithMessageBuffer(5)) require.NoError(t, err) @@ -165,6 +218,7 @@ func TestSendBatch(t *testing.T) { t.Run("sends multiple batches successfully", func(t *testing.T) { mockClient := mocks.NewClient(t) + mockClient.EXPECT().Close().Return(nil).Maybe() done := make(chan struct{}) callCount := 0 @@ -207,11 +261,118 @@ func TestSendBatch(t *testing.T) { mockClient.AssertExpectations(t) }) + + t.Run("splits oversized batch by max gRPC request size", func(t *testing.T) { + events := []*chipingress.CloudEventPb{ + largeTestEvent("test-id-1"), + largeTestEvent("test-id-2"), + largeTestEvent("test-id-3"), + largeTestEvent("test-id-4"), + largeTestEvent("test-id-5"), + } + maxRequestSize := proto.Size(&chipingress.CloudEventBatch{Events: events[:2]}) + require.LessOrEqual(t, proto.Size(&chipingress.CloudEventBatch{Events: events[:1]}), maxRequestSize) + require.Greater(t, proto.Size(&chipingress.CloudEventBatch{Events: events[:3]}), maxRequestSize) + + mockClient := mocks.NewClient(t) + done := make(chan struct{}) + callbackDone := make(chan error, len(events)) + var mu sync.Mutex + var publishedIDs []string + var publishedSizes []int + + mockClient. + On("PublishBatch", + mock.Anything, + mock.MatchedBy(func(batch *chipingress.CloudEventBatch) bool { + return len(batch.Events) > 0 && proto.Size(batch) <= maxRequestSize + }), + ). + Return(&chipingress.PublishResponse{}, nil). + Run(func(args mock.Arguments) { + batch := args.Get(1).(*chipingress.CloudEventBatch) + mu.Lock() + for _, event := range batch.Events { + publishedIDs = append(publishedIDs, event.Id) + } + publishedSizes = append(publishedSizes, proto.Size(batch)) + if len(publishedIDs) == len(events) { + close(done) + } + mu.Unlock() + }). + Times(3) + + client, err := NewBatchClient(mockClient, WithMaxGRPCRequestSize(maxRequestSize)) + require.NoError(t, err) + + messages := make([]*messageWithCallback, 0, len(events)) + for _, event := range events { + messages = append(messages, &messageWithCallback{ + event: event, + callback: func(err error) { + callbackDone <- err + }, + }) + } + + client.sendBatch(t.Context(), messages) + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("timeout waiting for split batches to be sent") + } + for range events { + select { + case err := <-callbackDone: + require.NoError(t, err) + case <-time.After(time.Second): + t.Fatal("timeout waiting for split batch callback") + } + } + + assert.Equal(t, []string{"test-id-1", "test-id-2", "test-id-3", "test-id-4", "test-id-5"}, publishedIDs) + for _, size := range publishedSizes { + assert.LessOrEqual(t, size, maxRequestSize) + } + mockClient.AssertExpectations(t) + }) + + t.Run("doesn't publish a single event over max gRPC request size", func(t *testing.T) { + mockClient := mocks.NewClient(t) + callbackDone := make(chan error, 1) + event := largeTestEvent("oversized-id") + maxRequestSize := proto.Size(&chipingress.CloudEventBatch{Events: []*chipingress.CloudEventPb{event}}) - 1 + + client, err := NewBatchClient(mockClient, WithMaxGRPCRequestSize(maxRequestSize)) + require.NoError(t, err) + + client.sendBatch(t.Context(), []*messageWithCallback{ + { + event: event, + callback: func(err error) { + callbackDone <- err + }, + }, + }) + + select { + case err := <-callbackDone: + require.Error(t, err) + assert.Contains(t, err.Error(), "exceeds max gRPC request size") + case <-time.After(time.Second): + t.Fatal("timeout waiting for oversized batch callback") + } + + mockClient.AssertNotCalled(t, "PublishBatch", mock.Anything, mock.Anything) + }) } func TestStart(t *testing.T) { t.Run("batch size trigger", func(t *testing.T) { mockClient := mocks.NewClient(t) + mockClient.EXPECT().Close().Return(nil).Maybe() done := make(chan struct{}) mockClient. @@ -254,6 +415,7 @@ func TestStart(t *testing.T) { t.Run("timeout trigger", func(t *testing.T) { mockClient := mocks.NewClient(t) + mockClient.EXPECT().Close().Return(nil).Maybe() done := make(chan struct{}) mockClient. @@ -291,6 +453,7 @@ func TestStart(t *testing.T) { t.Run("context cancellation flushes pending batch", func(t *testing.T) { mockClient := mocks.NewClient(t) + mockClient.EXPECT().Close().Return(nil).Maybe() done := make(chan struct{}) mockClient. @@ -334,6 +497,7 @@ func TestStart(t *testing.T) { t.Run("stop flushes pending batch", func(t *testing.T) { mockClient := mocks.NewClient(t) + mockClient.EXPECT().Close().Return(nil).Maybe() done := make(chan struct{}) mockClient. @@ -372,6 +536,7 @@ func TestStart(t *testing.T) { t.Run("no flush when batch is empty", func(t *testing.T) { mockClient := mocks.NewClient(t) + mockClient.EXPECT().Close().Return(nil).Maybe() client, err := NewBatchClient(mockClient, WithBatchSize(10), WithBatchInterval(5*time.Second)) require.NoError(t, err) @@ -390,6 +555,7 @@ func TestStart(t *testing.T) { t.Run("multiple batches via size trigger", func(t *testing.T) { mockClient := mocks.NewClient(t) + mockClient.EXPECT().Close().Return(nil).Maybe() done := make(chan struct{}) callCount := 0 @@ -438,6 +604,7 @@ func TestStart(t *testing.T) { func TestCallbacks(t *testing.T) { t.Run("callback invoked on successful send", func(t *testing.T) { mockClient := mocks.NewClient(t) + mockClient.EXPECT().Close().Return(nil).Maybe() done := make(chan struct{}) callbackDone := make(chan error, 1) @@ -488,6 +655,7 @@ func TestCallbacks(t *testing.T) { t.Run("callback receives error on failed send", func(t *testing.T) { mockClient := mocks.NewClient(t) + mockClient.EXPECT().Close().Return(nil).Maybe() done := make(chan struct{}) callbackDone := make(chan error, 1) expectedErr := assert.AnError @@ -540,6 +708,7 @@ func TestCallbacks(t *testing.T) { t.Run("nil callback works without panic", func(t *testing.T) { mockClient := mocks.NewClient(t) + mockClient.EXPECT().Close().Return(nil).Maybe() done := make(chan struct{}) mockClient. @@ -581,6 +750,7 @@ func TestCallbacks(t *testing.T) { t.Run("multiple messages with different callbacks", func(t *testing.T) { mockClient := mocks.NewClient(t) + mockClient.EXPECT().Close().Return(nil).Maybe() done := make(chan struct{}) callback1Done := make(chan error, 1) callback2Done := make(chan error, 1) @@ -661,6 +831,7 @@ func TestCallbacks(t *testing.T) { t.Run("callback invoked for timeout-triggered batch", func(t *testing.T) { mockClient := mocks.NewClient(t) + mockClient.EXPECT().Close().Return(nil).Maybe() done := make(chan struct{}) callbackDone := make(chan error, 1) @@ -711,6 +882,7 @@ func TestCallbacks(t *testing.T) { t.Run("callback invoked for size-triggered batch", func(t *testing.T) { mockClient := mocks.NewClient(t) + mockClient.EXPECT().Close().Return(nil).Maybe() done := make(chan struct{}) callbackDone := make(chan error, 1) @@ -765,6 +937,7 @@ func TestCallbacks(t *testing.T) { t.Run("callbacks invoked on context cancellation", func(t *testing.T) { mockClient := mocks.NewClient(t) + mockClient.EXPECT().Close().Return(nil).Maybe() done := make(chan struct{}) callbackDone := make(chan error, 1) @@ -821,8 +994,42 @@ func TestCallbacks(t *testing.T) { } func TestStop(t *testing.T) { + t.Run("close underlying client only once when enabled", func(t *testing.T) { + mockClient := mocks.NewClient(t) + mockClient.EXPECT().Close().Return(nil).Once() + + client, err := NewBatchClient(mockClient) + require.NoError(t, err) + + client.Stop() + client.Stop() + client.Stop() + }) + + t.Run("Stop without Start returns promptly", func(t *testing.T) { + mockClient := mocks.NewClient(t) + mockClient.EXPECT().Close().Return(nil).Once() + + client, err := NewBatchClient(mockClient, WithShutdownTimeout(10*time.Second)) + require.NoError(t, err) + + done := make(chan struct{}) + go func() { + client.Stop() + close(done) + }() + + select { + case <-done: + // Stop returned without waiting for shutdownTimeout + case <-time.After(time.Second): + t.Fatal("Stop blocked; likely waiting on batcherDone that was never closed") + } + }) + t.Run("can call Stop multiple times without panic", func(t *testing.T) { mockClient := mocks.NewClient(t) + mockClient.EXPECT().Close().Return(nil).Maybe() client, err := NewBatchClient(mockClient, WithBatchSize(10)) require.NoError(t, err) @@ -837,6 +1044,12 @@ func TestStop(t *testing.T) { t.Run("QueueMessage returns error after Stop", func(t *testing.T) { mockClient := mocks.NewClient(t) + mockClient.EXPECT().Close().Return(nil).Maybe() + mockClient. + On("PublishBatch", mock.Anything, mock.Anything). + Return(&chipingress.PublishResponse{}, nil). + Maybe() + client, err := NewBatchClient(mockClient, WithBatchSize(10)) require.NoError(t, err) @@ -853,7 +1066,7 @@ func TestStop(t *testing.T) { }, nil) require.NoError(t, err) - // Stop the client + // Stop the client — drains any buffered messages client.Stop() // Queue message after stop - should fail @@ -868,6 +1081,7 @@ func TestStop(t *testing.T) { t.Run("clears seqnum counters on Stop", func(t *testing.T) { mockClient := mocks.NewClient(t) + mockClient.EXPECT().Close().Return(nil).Maybe() client, err := NewBatchClient(mockClient, WithBatchSize(10)) require.NoError(t, err) @@ -894,9 +1108,21 @@ func countCounters(counters *sync.Map) int { return n } +func largeTestEvent(id string) *chipingress.CloudEventPb { + return &chipingress.CloudEventPb{ + Id: id, + Source: "test-source", + Type: "test.event.type", + SpecVersion: "1.0", + Data: &cepb.CloudEvent_BinaryData{ + BinaryData: []byte("0123456789abcdefghijklmnopqrstuvwxyz"), + }, + } +} + func TestSeqnum(t *testing.T) { t.Run("dropped messages consume seqnum and create detectable gaps", func(t *testing.T) { - client, err := NewBatchClient(nil, WithMessageBuffer(1)) + client, err := NewBatchClient(mocks.NewClient(t), WithMessageBuffer(1)) require.NoError(t, err) first := &chipingress.CloudEventPb{Id: "id-1", Source: "domain-a", Type: "entity-x"} @@ -923,7 +1149,7 @@ func TestSeqnum(t *testing.T) { }) t.Run("reusing event pointer preserves queued seqnum snapshots", func(t *testing.T) { - client, err := NewBatchClient(nil, WithMessageBuffer(2)) + client, err := NewBatchClient(mocks.NewClient(t), WithMessageBuffer(2)) require.NoError(t, err) event := &chipingress.CloudEventPb{Id: "id-1", Source: "domain-a", Type: "entity-x"} @@ -943,7 +1169,7 @@ func TestSeqnum(t *testing.T) { }) t.Run("reusing event pointer can overwrite queued seqnum when clone disabled", func(t *testing.T) { - client, err := NewBatchClient(nil, WithMessageBuffer(2), WithEventClone(false)) + client, err := NewBatchClient(mocks.NewClient(t), WithMessageBuffer(2), WithEventClone(false)) require.NoError(t, err) event := &chipingress.CloudEventPb{Id: "id-1", Source: "domain-a", Type: "entity-x"} @@ -963,7 +1189,7 @@ func TestSeqnum(t *testing.T) { }) t.Run("stamps sequential seqnum for same source+type", func(t *testing.T) { - client, err := NewBatchClient(nil, WithMessageBuffer(10)) + client, err := NewBatchClient(mocks.NewClient(t), WithMessageBuffer(10)) require.NoError(t, err) events := []*chipingress.CloudEventPb{ @@ -988,7 +1214,7 @@ func TestSeqnum(t *testing.T) { }) t.Run("independent counters per source+type pair", func(t *testing.T) { - client, err := NewBatchClient(nil, WithMessageBuffer(10)) + client, err := NewBatchClient(mocks.NewClient(t), WithMessageBuffer(10)) require.NoError(t, err) // Queue events with different source+type combinations @@ -1024,7 +1250,7 @@ func TestSeqnum(t *testing.T) { }) t.Run("source and type values containing separator do not collide", func(t *testing.T) { - client, err := NewBatchClient(nil, WithMessageBuffer(10)) + client, err := NewBatchClient(mocks.NewClient(t), WithMessageBuffer(10)) require.NoError(t, err) events := []*chipingress.CloudEventPb{ @@ -1052,7 +1278,7 @@ func TestSeqnum(t *testing.T) { }) t.Run("concurrent access produces unique seqnums", func(t *testing.T) { - client, err := NewBatchClient(nil, WithMessageBuffer(1000)) + client, err := NewBatchClient(mocks.NewClient(t), WithMessageBuffer(1000)) require.NoError(t, err) const numGoroutines = 50 @@ -1109,3 +1335,348 @@ func TestSeqnum(t *testing.T) { } }) } + +func TestBatchClient_Metrics(t *testing.T) { + t.Run("records success path metrics", func(t *testing.T) { + reader, restore := useTestMeterProvider(t) + defer restore() + + mockClient := mocks.NewClient(t) + mockClient.EXPECT().Close().Return(nil).Maybe() + done := make(chan struct{}) + mockClient. + On("PublishBatch", mock.Anything, mock.Anything). + Return(&chipingress.PublishResponse{}, nil). + Run(func(_ mock.Arguments) { close(done) }). + Once() + + client, err := NewBatchClient( + mockClient, + WithBatchSize(1), + WithBatchInterval(time.Second), + WithMessageBuffer(10), + WithMaxGRPCRequestSize(2048), + ) + require.NoError(t, err) + client.Start(t.Context()) + + err = client.QueueMessage(&chipingress.CloudEventPb{ + Id: "metric-success", + Source: "platform", + Type: "MetricSuccess", + }, nil) + require.NoError(t, err) + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("timeout waiting for PublishBatch") + } + + client.Stop() + rm := collectResourceMetrics(t, reader) + + reqTotal := mustMetric(t, rm, "chip_ingress.batch.send_requests_total") + reqSum, ok := reqTotal.Data.(metricdata.Sum[int64]) + require.True(t, ok) + successPoint := mustInt64SumPointWithAttr(t, reqSum, "status", "success") + assert.GreaterOrEqual(t, successPoint.Value, int64(1)) + + msgSize := mustMetric(t, rm, "chip_ingress.batch.request_size_messages") + msgSizeHist, ok := msgSize.Data.(metricdata.Histogram[int64]) + require.True(t, ok) + msgSizePoint := mustInt64HistogramPointWithIntAttr(t, msgSizeHist, "max_batch_size", 1) + assert.GreaterOrEqual(t, msgSizePoint.Count, uint64(1)) + + reqSize := mustMetric(t, rm, "chip_ingress.batch.request_size_bytes") + reqSizeHist, ok := reqSize.Data.(metricdata.Histogram[int64]) + require.True(t, ok) + reqSizePoint := mustInt64HistogramPointWithIntAttr(t, reqSizeHist, "max_grpc_request_size_bytes", 2048) + assert.GreaterOrEqual(t, reqSizePoint.Count, uint64(1)) + + latency := mustMetric(t, rm, "chip_ingress.batch.request_latency_ms") + latencyHist, ok := latency.Data.(metricdata.Histogram[float64]) + require.True(t, ok) + latencyPoint := mustFloat64HistogramPointWithAttr(t, latencyHist, "status", "success") + assert.GreaterOrEqual(t, latencyPoint.Count, uint64(1)) + + config := mustMetric(t, rm, "chip_ingress.batch.config.info") + configGauge, ok := config.Data.(metricdata.Gauge[int64]) + require.True(t, ok) + require.NotEmpty(t, configGauge.DataPoints) + assert.Equal(t, int64(1), configGauge.DataPoints[0].Value) + assert.True(t, hasIntAttr(configGauge.DataPoints[0].Attributes, "max_batch_size", 1)) + assert.True(t, hasIntAttr(configGauge.DataPoints[0].Attributes, "max_grpc_request_size_bytes", 2048)) + }) + + t.Run("records failure counters and latency", func(t *testing.T) { + reader, restore := useTestMeterProvider(t) + defer restore() + + mockClient := mocks.NewClient(t) + mockClient.EXPECT().Close().Return(nil).Maybe() + done := make(chan struct{}) + mockClient. + On("PublishBatch", mock.Anything, mock.Anything). + Return(&chipingress.PublishResponse{}, assert.AnError). + Run(func(_ mock.Arguments) { close(done) }). + Once() + + client, err := NewBatchClient(mockClient, WithBatchSize(1), WithMessageBuffer(10)) + require.NoError(t, err) + client.Start(t.Context()) + + err = client.QueueMessage(&chipingress.CloudEventPb{ + Id: "metric-failure", + Source: "platform", + Type: "MetricFailure", + }, nil) + require.NoError(t, err) + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("timeout waiting for PublishBatch") + } + + client.Stop() + rm := collectResourceMetrics(t, reader) + + reqTotal := mustMetric(t, rm, "chip_ingress.batch.send_requests_total") + reqSum, ok := reqTotal.Data.(metricdata.Sum[int64]) + require.True(t, ok) + failureReq := mustInt64SumPointWithAttr(t, reqSum, "status", "failure") + assert.GreaterOrEqual(t, failureReq.Value, int64(1)) + + latency := mustMetric(t, rm, "chip_ingress.batch.request_latency_ms") + latencyHist, ok := latency.Data.(metricdata.Histogram[float64]) + require.True(t, ok) + failureLatency := mustFloat64HistogramPointWithAttr(t, latencyHist, "status", "failure") + assert.GreaterOrEqual(t, failureLatency.Count, uint64(1)) + }) +} + +func TestSplitMessagesByRequestSize(t *testing.T) { + t.Run("empty messages returns nil", func(t *testing.T) { + result := splitMessagesByRequestSize(nil, 1024) + assert.Nil(t, result) + }) + + t.Run("zero max request size returns single batch", func(t *testing.T) { + msgs := []*messageWithCallback{ + {event: largeTestEvent("a")}, + {event: largeTestEvent("b")}, + } + result := splitMessagesByRequestSize(msgs, 0) + require.Len(t, result, 1) + assert.Len(t, result[0], 2) + }) + + t.Run("negative max request size returns single batch", func(t *testing.T) { + msgs := []*messageWithCallback{ + {event: largeTestEvent("a")}, + } + result := splitMessagesByRequestSize(msgs, -1) + require.Len(t, result, 1) + assert.Len(t, result[0], 1) + }) + + t.Run("all messages fit returns single batch", func(t *testing.T) { + msgs := []*messageWithCallback{ + {event: largeTestEvent("a")}, + {event: largeTestEvent("b")}, + } + allBatch := &chipingress.CloudEventBatch{Events: []*chipingress.CloudEventPb{msgs[0].event, msgs[1].event}} + result := splitMessagesByRequestSize(msgs, proto.Size(allBatch)+100) + require.Len(t, result, 1) + assert.Len(t, result[0], 2) + }) + + t.Run("each message in its own batch when tight limit", func(t *testing.T) { + msgs := []*messageWithCallback{ + {event: largeTestEvent("a")}, + {event: largeTestEvent("b")}, + {event: largeTestEvent("c")}, + } + singleBatch := &chipingress.CloudEventBatch{Events: []*chipingress.CloudEventPb{msgs[0].event}} + // Set limit to exactly fit one event but not two. + result := splitMessagesByRequestSize(msgs, proto.Size(singleBatch)) + require.Len(t, result, 3) + for _, batch := range result { + assert.Len(t, batch, 1) + } + }) +} + +func BenchmarkSendBatch(b *testing.B) { + b.Run("no splitting (maxGRPCRequestSize=0)", func(b *testing.B) { + client, err := NewBatchClient( + &chipingress.NoopClient{}, + WithBatchSize(100), + WithMessageBuffer(b.N*100+10), + WithBatchInterval(time.Hour), + ) + if err != nil { + b.Fatal(err) + } + client.Start(context.Background()) + defer client.Stop() + + msgs := make([]*messageWithCallback, 100) + for i := range msgs { + msgs[i] = &messageWithCallback{ + event: &chipingress.CloudEventPb{ + Id: strconv.Itoa(i), + Source: "bench", + Type: "bench.event", + }, + } + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + client.sendBatch(context.Background(), msgs) + } + }) + + b.Run("with splitting (maxGRPCRequestSize=512)", func(b *testing.B) { + client, err := NewBatchClient( + &chipingress.NoopClient{}, + WithBatchSize(100), + WithMessageBuffer(b.N*100+10), + WithBatchInterval(time.Hour), + WithMaxGRPCRequestSize(512), + ) + if err != nil { + b.Fatal(err) + } + client.Start(context.Background()) + defer client.Stop() + + msgs := make([]*messageWithCallback, 100) + for i := range msgs { + msgs[i] = &messageWithCallback{ + event: &chipingress.CloudEventPb{ + Id: strconv.Itoa(i), + Source: "bench", + Type: "bench.event", + }, + } + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + client.sendBatch(context.Background(), msgs) + } + }) +} + +func BenchmarkBatchClient_QueueMessage(b *testing.B) { + client, err := NewBatchClient( + &chipingress.NoopClient{}, + WithBatchSize(b.N+1), + WithMessageBuffer(b.N+10), + WithBatchInterval(time.Hour), + ) + if err != nil { + b.Fatal(err) + } + client.Start(context.Background()) + defer client.Stop() + + payload := &chipingress.CloudEventPb{ + Source: "bench", + Type: "bench.event", + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + payload.Id = strconv.Itoa(i) + if err := client.QueueMessage(payload, nil); err != nil { + b.Fatal(err) + } + } +} + +func useTestMeterProvider(t *testing.T) (*sdkmetric.ManualReader, func()) { + t.Helper() + prev := otel.GetMeterProvider() + reader := sdkmetric.NewManualReader() + provider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)) + otel.SetMeterProvider(provider) + return reader, func() { + require.NoError(t, provider.Shutdown(t.Context())) + otel.SetMeterProvider(prev) + } +} + +func collectResourceMetrics(t *testing.T, reader *sdkmetric.ManualReader) metricdata.ResourceMetrics { + t.Helper() + var rm metricdata.ResourceMetrics + require.NoError(t, reader.Collect(t.Context(), &rm)) + return rm +} + +func mustMetric(t *testing.T, rm metricdata.ResourceMetrics, name string) metricdata.Metrics { + t.Helper() + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + if m.Name == name { + return m + } + } + } + t.Fatalf("metric %q not found", name) + return metricdata.Metrics{} +} + +func mustInt64SumPointWithAttr(t *testing.T, sum metricdata.Sum[int64], key, want string) metricdata.DataPoint[int64] { + t.Helper() + for _, dp := range sum.DataPoints { + if hasStringAttr(dp.Attributes, key, want) { + return dp + } + } + t.Fatalf("sum datapoint with %s=%s not found", key, want) + return metricdata.DataPoint[int64]{} +} + +func mustInt64HistogramPointWithIntAttr(t *testing.T, hist metricdata.Histogram[int64], key string, want int) metricdata.HistogramDataPoint[int64] { + t.Helper() + for _, dp := range hist.DataPoints { + if hasIntAttr(dp.Attributes, key, want) { + return dp + } + } + t.Fatalf("histogram datapoint with %s=%d not found", key, want) + return metricdata.HistogramDataPoint[int64]{} +} + +func mustFloat64HistogramPointWithAttr(t *testing.T, hist metricdata.Histogram[float64], key, want string) metricdata.HistogramDataPoint[float64] { + t.Helper() + for _, dp := range hist.DataPoints { + if hasStringAttr(dp.Attributes, key, want) { + return dp + } + } + t.Fatalf("histogram datapoint with %s=%s not found", key, want) + return metricdata.HistogramDataPoint[float64]{} +} + +func hasStringAttr(set attribute.Set, key, want string) bool { + for _, kv := range set.ToSlice() { + if string(kv.Key) == key { + return kv.Value.AsString() == want + } + } + return false +} + +func hasIntAttr(set attribute.Set, key string, want int) bool { + for _, kv := range set.ToSlice() { + if string(kv.Key) == key { + return int(kv.Value.AsInt64()) == want + } + } + return false +} diff --git a/pkg/chipingress/client_test.go b/pkg/chipingress/client_test.go index 6b259460a6..e4d892fdbf 100644 --- a/pkg/chipingress/client_test.go +++ b/pkg/chipingress/client_test.go @@ -86,6 +86,16 @@ func TestClient(t *testing.T) { assert.Empty(t, result) }) + t.Run("Close", func(t *testing.T) { + rawClient, err := NewClient("localhost:8080") + require.NoError(t, err) + + client, ok := rawClient.(*client) + require.True(t, ok) + + require.NoError(t, client.Close()) + }) + } func TestNewEvent(t *testing.T) { diff --git a/pkg/chipingress/go.mod b/pkg/chipingress/go.mod index 0730579539..925c12c493 100644 --- a/pkg/chipingress/go.mod +++ b/pkg/chipingress/go.mod @@ -8,7 +8,9 @@ require ( github.com/google/uuid v1.6.0 github.com/stretchr/testify v1.11.1 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.63.0 + go.opentelemetry.io/otel v1.43.0 go.opentelemetry.io/otel/metric v1.43.0 + go.opentelemetry.io/otel/sdk/metric v1.43.0 go.opentelemetry.io/otel/trace v1.43.0 go.uber.org/zap v1.27.0 google.golang.org/grpc v1.79.3 @@ -26,9 +28,7 @@ require ( github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/stretchr/objx v0.5.2 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect - go.opentelemetry.io/otel v1.43.0 // indirect go.opentelemetry.io/otel/sdk v1.43.0 // indirect - go.opentelemetry.io/otel/sdk/metric v1.43.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.48.0 // indirect golang.org/x/sys v0.42.0 // indirect From 5d7752cc6cd41b5eca951ae2c5590eeaf7c17075 Mon Sep 17 00:00:00 2001 From: Pavel <177363085+pkcll@users.noreply.github.com> Date: Fri, 15 May 2026 09:32:27 -0400 Subject: [PATCH 2/4] pkg/chipingress/batch: preserve caller context for send metrics --- pkg/chipingress/batch/client.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/chipingress/batch/client.go b/pkg/chipingress/batch/client.go index e585682d12..691bb91174 100644 --- a/pkg/chipingress/batch/client.go +++ b/pkg/chipingress/batch/client.go @@ -266,7 +266,7 @@ func (b *Client) sendBatch(ctx context.Context, messages []*messageWithCallback) batchReq, batchBytes := newBatchRequest(batchMessages) if b.maxGRPCRequestSize > 0 && batchBytes > b.maxGRPCRequestSize { err := fmt.Errorf("publish batch serialized size %d exceeds max gRPC request size %d", batchBytes, b.maxGRPCRequestSize) - b.metrics.recordSend(context.Background(), len(batchMessages), batchBytes, 0, false) + b.metrics.recordSend(ctx, len(batchMessages), batchBytes, 0, false) b.log.Errorw("failed to publish batch", "error", err) b.completeBatchCallbacks(batchMessages, err) continue @@ -278,7 +278,7 @@ func (b *Client) sendBatch(ctx context.Context, messages []*messageWithCallback) _, err := b.client.PublishBatch(ctxTimeout, batchReq) cancel() - b.metrics.recordSend(context.Background(), len(batchMessages), batchBytes, time.Since(startedAt), err == nil) + b.metrics.recordSend(ctx, len(batchMessages), batchBytes, time.Since(startedAt), err == nil) if err != nil { b.log.Errorw("failed to publish batch", "error", err) } From 384abb626e18caa6d215d72c8d838e6e04041498 Mon Sep 17 00:00:00 2001 From: Pavel <177363085+pkcll@users.noreply.github.com> Date: Fri, 15 May 2026 09:40:29 -0400 Subject: [PATCH 3/4] pkg/chipingress: use testing contexts in tests --- pkg/chipingress/batch/client_test.go | 16 ++++++++-------- pkg/chipingress/client_test.go | 16 +++++++++------- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/pkg/chipingress/batch/client_test.go b/pkg/chipingress/batch/client_test.go index 4578fc5bb8..899cadd5cf 100644 --- a/pkg/chipingress/batch/client_test.go +++ b/pkg/chipingress/batch/client_test.go @@ -248,9 +248,9 @@ func TestSendBatch(t *testing.T) { {event: &chipingress.CloudEventPb{Id: "batch3-id-1", Source: "test-source", Type: "test.event.type"}}, } - client.sendBatch(context.Background(), batch1) - client.sendBatch(context.Background(), batch2) - client.sendBatch(context.Background(), batch3) + client.sendBatch(t.Context(), batch1) + client.sendBatch(t.Context(), batch2) + client.sendBatch(t.Context(), batch3) // wait for the internal goroutines to complete select { @@ -1519,7 +1519,7 @@ func BenchmarkSendBatch(b *testing.B) { if err != nil { b.Fatal(err) } - client.Start(context.Background()) + client.Start(b.Context()) defer client.Stop() msgs := make([]*messageWithCallback, 100) @@ -1535,7 +1535,7 @@ func BenchmarkSendBatch(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - client.sendBatch(context.Background(), msgs) + client.sendBatch(b.Context(), msgs) } }) @@ -1550,7 +1550,7 @@ func BenchmarkSendBatch(b *testing.B) { if err != nil { b.Fatal(err) } - client.Start(context.Background()) + client.Start(b.Context()) defer client.Stop() msgs := make([]*messageWithCallback, 100) @@ -1566,7 +1566,7 @@ func BenchmarkSendBatch(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - client.sendBatch(context.Background(), msgs) + client.sendBatch(b.Context(), msgs) } }) } @@ -1581,7 +1581,7 @@ func BenchmarkBatchClient_QueueMessage(b *testing.B) { if err != nil { b.Fatal(err) } - client.Start(context.Background()) + client.Start(b.Context()) defer client.Stop() payload := &chipingress.CloudEventPb{ diff --git a/pkg/chipingress/client_test.go b/pkg/chipingress/client_test.go index e4d892fdbf..1299d2ce1e 100644 --- a/pkg/chipingress/client_test.go +++ b/pkg/chipingress/client_test.go @@ -71,7 +71,7 @@ func TestClient(t *testing.T) { require.NoError(t, err) // Test Ping returns success - pingResp, err := client.Ping(context.Background(), &pb.EmptyRequest{}) + pingResp, err := client.Ping(t.Context(), &pb.EmptyRequest{}) require.NoError(t, err) assert.NotNil(t, pingResp) assert.Equal(t, "pong", pingResp.Message) @@ -80,7 +80,7 @@ func TestClient(t *testing.T) { schemas := []*pb.Schema{ {Subject: "test", Schema: `{"test":"value"}`, Format: 1}, } - result, err := client.RegisterSchemas(context.Background(), schemas...) + result, err := client.RegisterSchemas(t.Context(), schemas...) require.NoError(t, err) assert.NotNil(t, result) assert.Empty(t, result) @@ -673,8 +673,9 @@ func TestNewClientWithTLS(t *testing.T) { func TestClient_RegisterSchemas(t *testing.T) { t.Run("successfully registers schemas", func(t *testing.T) { mockClient := mocks.NewClient(t) + ctx := t.Context() mockClient.EXPECT().RegisterSchema( - context.Background(), + ctx, &pb.RegisterSchemaRequest{ Schemas: []*pb.Schema{ {Subject: "schema1", Schema: `{"type":"record","name":"Test","fields":[{"name":"field1"}]}`, Format: 1}, @@ -698,15 +699,16 @@ func TestClient_RegisterSchemas(t *testing.T) { {Subject: "schema2", Schema: `{"type":"record","name":"Test2","fields":[{"name":"field2"}]}`, Format: 2}, } - result, err := client.RegisterSchemas(context.Background(), schemas...) + result, err := client.RegisterSchemas(ctx, schemas...) require.NoError(t, err) assert.Equal(t, map[string]int{"schema1": 1, "schema2": 2}, result) }) - t.Run("returns error when registration fails", func(t *testing.T) { + t.Run("returns error when registration fails", func(t *testing.T) { mockClient := mocks.NewClient(t) + ctx := t.Context() mockClient.EXPECT().RegisterSchema( - context.Background(), + ctx, &pb.RegisterSchemaRequest{ Schemas: []*pb.Schema{ {Subject: "schema1", Schema: `{"type":"record","name":"Test","fields":[{"name":"field1"}]}`, Format: 1}, @@ -723,7 +725,7 @@ func TestClient_RegisterSchemas(t *testing.T) { {Subject: "schema1", Schema: `{"type":"record","name":"Test","fields":[{"name":"field1"}]}`, Format: 1}, } - result, err := client.RegisterSchemas(context.Background(), schemas...) + result, err := client.RegisterSchemas(ctx, schemas...) assert.Nil(t, result) assert.EqualError(t, err, "failed to register schema: registration failed") }) From fa82ee2493bf88f584f32effb4eea9def0f95dd5 Mon Sep 17 00:00:00 2001 From: Pavel <177363085+pkcll@users.noreply.github.com> Date: Fri, 15 May 2026 11:23:59 -0400 Subject: [PATCH 4/4] pkg/chipingress: fix client test formatting --- pkg/chipingress/client_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/chipingress/client_test.go b/pkg/chipingress/client_test.go index 1299d2ce1e..12d567d0d1 100644 --- a/pkg/chipingress/client_test.go +++ b/pkg/chipingress/client_test.go @@ -704,7 +704,7 @@ func TestClient_RegisterSchemas(t *testing.T) { assert.Equal(t, map[string]int{"schema1": 1, "schema2": 2}, result) }) - t.Run("returns error when registration fails", func(t *testing.T) { + t.Run("returns error when registration fails", func(t *testing.T) { mockClient := mocks.NewClient(t) ctx := t.Context() mockClient.EXPECT().RegisterSchema(