Skip to content
209 changes: 209 additions & 0 deletions pkg/beholder/beholderstore/durable_event_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
// Package beholderstore provides a Postgres-backed implementation of
// beholder.DurableEventStore. It is kept in a sibling package to pkg/beholder
// so that consumers of the beholder API (including builds targeting wasip1)
// do not transitively import lib/pq.
package beholderstore

import (
"context"
"fmt"
"strings"
"time"

"github.com/lib/pq"

"github.com/smartcontractkit/chainlink-common/pkg/beholder"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
)

const chipDurableEventsTable = "cre.chip_durable_events"

// Store is a Postgres-backed implementation of beholder.DurableEventStore.
type Store struct {
ds sqlutil.DataSource
}

var (
_ beholder.DurableEventStore = (*Store)(nil)
_ beholder.DurableQueueObserver = (*Store)(nil)
_ beholder.BatchInserter = (*Store)(nil)
)

// New returns a Postgres-backed DurableEventStore bound to ds.
func New(ds sqlutil.DataSource) *Store {
return &Store{ds: ds}
}

func (s *Store) Insert(ctx context.Context, payload []byte) (int64, error) {
const q = `INSERT INTO ` + chipDurableEventsTable + ` (payload) VALUES ($1) RETURNING id`
var id int64
if err := s.ds.GetContext(ctx, &id, q, payload); err != nil {
return 0, fmt.Errorf("failed to insert chip durable event: %w", err)
}
return id, nil
}

func (s *Store) InsertBatch(ctx context.Context, payloads [][]byte) ([]int64, error) {
if len(payloads) == 0 {
return nil, nil
}
placeholders := make([]string, len(payloads))
args := make([]interface{}, len(payloads))
for i, p := range payloads {
placeholders[i] = fmt.Sprintf("($%d)", i+1)
args[i] = p
}
q := fmt.Sprintf(
"INSERT INTO %s (payload) VALUES %s RETURNING id",
chipDurableEventsTable,
strings.Join(placeholders, ","),
)

var ids []int64
if err := s.ds.SelectContext(ctx, &ids, q, args...); err != nil {
return nil, fmt.Errorf("failed to batch insert chip durable events: %w", err)
}
return ids, nil
}

func (s *Store) Delete(ctx context.Context, id int64) error {
const q = `DELETE FROM ` + chipDurableEventsTable + ` WHERE id = $1`
if _, err := s.ds.ExecContext(ctx, q, id); err != nil {
return fmt.Errorf("failed to delete chip durable event id=%d: %w", id, err)
}
return nil
}

func (s *Store) MarkDelivered(ctx context.Context, id int64) error {
const q = `UPDATE ` + chipDurableEventsTable + ` SET delivered_at = now() WHERE id = $1 AND delivered_at IS NULL`
if _, err := s.ds.ExecContext(ctx, q, id); err != nil {
return fmt.Errorf("failed to mark chip durable event delivered id=%d: %w", id, err)
}
return nil
}

func (s *Store) MarkDeliveredBatch(ctx context.Context, ids []int64) (int64, error) {
if len(ids) == 0 {
return 0, nil
}
const q = `UPDATE ` + chipDurableEventsTable + ` SET delivered_at = now() WHERE id = ANY($1) AND delivered_at IS NULL`
res, err := s.ds.ExecContext(ctx, q, pq.Array(ids))
if err != nil {
return 0, fmt.Errorf("failed to batch mark chip durable events delivered: %w", err)
}
n, _ := res.RowsAffected()
return n, nil
}

func (s *Store) PurgeDelivered(ctx context.Context, batchLimit int) (int64, error) {
if batchLimit <= 0 {
return 0, nil
}
const q = `
WITH picked AS (
SELECT id FROM ` + chipDurableEventsTable + `
WHERE delivered_at IS NOT NULL
ORDER BY delivered_at ASC
LIMIT $1
)
DELETE FROM ` + chipDurableEventsTable + ` AS t
USING picked WHERE t.id = picked.id`
res, err := s.ds.ExecContext(ctx, q, batchLimit)
if err != nil {
return 0, fmt.Errorf("failed to purge delivered chip durable events: %w", err)
}
n, err := res.RowsAffected()
if err != nil {
return 0, fmt.Errorf("purge delivered rows affected: %w", err)
}
return n, nil
}

func (s *Store) ListPending(ctx context.Context, createdBefore time.Time, limit int) ([]beholder.DurableEvent, error) {
const q = `
SELECT id, payload, created_at
FROM ` + chipDurableEventsTable + `
WHERE delivered_at IS NULL
AND created_at < $1
ORDER BY created_at ASC
LIMIT $2`

type row struct {
ID int64 `db:"id"`
Payload []byte `db:"payload"`
CreatedAt time.Time `db:"created_at"`
}

var rows []row
if err := s.ds.SelectContext(ctx, &rows, q, createdBefore, limit); err != nil {
return nil, fmt.Errorf("failed to list pending chip durable events: %w", err)
}

out := make([]beholder.DurableEvent, 0, len(rows))
for _, r := range rows {
out = append(out, beholder.DurableEvent{
ID: r.ID,
Payload: r.Payload,
CreatedAt: r.CreatedAt,
})
}
return out, nil
}

func (s *Store) DeleteExpired(ctx context.Context, ttl time.Duration) (int64, error) {
const q = `
WITH deleted AS (
DELETE FROM ` + chipDurableEventsTable + `
WHERE created_at <= now() - $1::interval
RETURNING id
)
SELECT count(*) FROM deleted`

var count int64
if err := s.ds.GetContext(ctx, &count, q, ttl.String()); err != nil {
return 0, fmt.Errorf("failed to delete expired chip durable events: %w", err)
}
return count, nil
}

type chipDurableQueueAgg struct {
Cnt int64 `db:"cnt"`
PayloadSum int64 `db:"payload_sum"`
MinCreated *time.Time `db:"min_created"`
}

// ObserveDurableQueue implements beholder.DurableQueueObserver for queue depth / age gauges.
func (s *Store) ObserveDurableQueue(ctx context.Context, eventTTL, nearExpiryLead time.Duration) (beholder.DurableQueueStats, error) {
const qAgg = `
SELECT
count(*)::bigint AS cnt,
coalesce(sum(octet_length(payload)), 0)::bigint AS payload_sum,
min(created_at) AS min_created
FROM ` + chipDurableEventsTable + `
WHERE delivered_at IS NULL`

var row chipDurableQueueAgg
if err := s.ds.GetContext(ctx, &row, qAgg); err != nil {
return beholder.DurableQueueStats{}, fmt.Errorf("durable queue aggregate: %w", err)
}
var st beholder.DurableQueueStats
st.Depth = row.Cnt
st.PayloadBytes = row.PayloadSum
if row.MinCreated != nil {
st.OldestPendingAge = time.Since(*row.MinCreated)
}
if eventTTL > 0 && nearExpiryLead > 0 && nearExpiryLead < eventTTL {
ttlSec := int64(eventTTL.Round(time.Second) / time.Second)
leadSec := int64(nearExpiryLead.Round(time.Second) / time.Second)
const qNear = `
SELECT count(*)::bigint
FROM ` + chipDurableEventsTable + `
WHERE delivered_at IS NULL
AND created_at >= now() - ($1::bigint * interval '1 second')
AND created_at < now() - (($1::bigint - $2::bigint) * interval '1 second')`
if err := s.ds.GetContext(ctx, &st.NearTTLCount, qNear, ttlSec, leadSec); err != nil {
return beholder.DurableQueueStats{}, fmt.Errorf("durable queue near-ttl: %w", err)
}
}
return st, nil
}
49 changes: 49 additions & 0 deletions pkg/beholder/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ type Client struct {
// Chip
Chip chipingress.Client

// durableEmitter persists and resends events to Chip when configured
durableEmitter *DurableEmitter

// Providers
LoggerProvider otellog.LoggerProvider
TracerProvider oteltrace.TracerProvider
Expand Down Expand Up @@ -356,6 +359,52 @@ func (c *Client) IsSignerSet() bool {
return c.lazySigner != nil && c.lazySigner.IsSet()
}

// SetupDurableEmitter replaces client.Emitter with a DualSourceEmitter whose Chip
// sink is a DurableEmitter backed by the supplied store. CloudEvents are persisted
// before async delivery to Chip ingress, so they survive process restarts and chip
// ingress outages.
//
// StartDurableEmitter must be called before emitting events.
func (c *Client) SetupDurableEmitter(store DurableEventStore, retransmit bool) error {
if c.Chip == nil {
return fmt.Errorf("chip ingress client is nil")
}
if store == nil {
return fmt.Errorf("durable emitter requires a non-nil DurableEventStore")
}

lggr := c.Config.ChipIngressLogger
if lggr == nil {
return fmt.Errorf("chip ingress logger is required for durable emitter setup")
}

durableEmitter, err := NewDurableEmitter(store, c.Chip, retransmit, DefaultDurableEmitterConfig(), lggr)
if err != nil {
return fmt.Errorf("failed to create durable emitter: %w", err)
}

otlpEmitter := NewMessageEmitter(c.MessageLoggerProvider.Logger("durable-emitter"))
dualEmitter, err := NewDualSourceEmitter(durableEmitter, otlpEmitter)
if err != nil {
return fmt.Errorf("failed to create dual source emitter: %w", err)
}

c.Emitter = dualEmitter
c.durableEmitter = durableEmitter

lggr.Infow("Durable emitter enabled — all CloudEvent sources use the durable Chip queue")
return nil
}

// StartDurableEmitter starts durable emitter. Close is handled transatively when Emitter is closed.
func (c *Client) StartDurableEmitter(ctx context.Context) error {
if c.durableEmitter == nil {
return fmt.Errorf("failed to start nil durable emitter; call SetupDurableEmitter first")
}
c.durableEmitter.Start(ctx)
return nil
}

func newOtelResource(cfg Config) (resource *sdkresource.Resource, err error) {
extraResources, err := sdkresource.New(
context.Background(),
Expand Down
37 changes: 19 additions & 18 deletions pkg/beholder/durable_emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,12 @@ type insertResult struct {
type DurableEmitter struct {
store DurableEventStore
client chipingress.Client
// isHostProcess determines if the emitter runs retransmit and cleanup loops.
// Should be set to false when initialized inside LOOP plugins.
isHostProcess bool
cfg DurableEmitterConfig
log logger.Logger
// retransmit determines if the emitter runs the retransmit, expiry, and purge
// loops over the shared queue. Should be set to false when initialized inside
// LOOP plugins so a single host process owns reclamation and cleanup.
retransmit bool
cfg DurableEmitterConfig
log logger.Logger

metrics *durableEmitterMetrics
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DylanTinianov

Observability Comparison: DurableEmitter vs BatchEmitterService + batch.Client

Metrics Coverage

Concern BatchEmitterService + batch.Client DurableEmitter
Events sent (success) chip_ingress.events_sent per domain/entity beholder.durable_emitter.emit.success (count only, no domain/entity breakdown)
Events dropped/failed chip_ingress.events_dropped per domain/entity beholder.durable_emitter.emit.failure (no domain/entity)
Batch send requests chip_ingress.batch.send_requests_total with status=success/failure ❌ No per-RPC request counter
Request size (messages) chip_ingress.batch.request_size_messages histogram with max_batch_size attr ❌ Not tracked
Request size (bytes) chip_ingress.batch.request_size_bytes histogram with max_grpc_request_size attr ❌ Not tracked
Request latency chip_ingress.batch.request_latency_ms histogram with status ⚠️ Only via hooks (OnBatchPublish) — not exported as a metric instrument
Config info gauge chip_ingress.batch.config.info with all config attrs ❌ Not tracked
Queue depth ❌ Not tracked (fire-and-forget buffer) queueDepth / queueDepthMax gauges
Emit latency (caller-facing) ❌ Not tracked emit.duration + emit.total_duration histograms
DB store operations N/A (no DB) ✅ Instrumented store wrapper (insert/mark/purge durations)
Near-TTL / expiry N/A expiredPurged, NearTTLCount via ObserveDurableQueue
Publish batch events OK/err Implicit via sent/dropped counters publishBatchEvOK / publishBatchEvErr

Key Gaps in DurableEmitter

  1. No per-domain/entity attribution — BatchEmitterService tags every metric with domain + entity via metricAttrsFor(). DurableEmitter metrics are flat counters with no cardinality, making it impossible to identify which event source is failing.

  2. No request size observability — batch.Client records both message count and byte size histograms per send. DurableEmitter has no visibility into whether batches are approaching gRPC limits (which ties back to the missing size-splitting gap).

  3. No send request counter with status — batch.Client's send_requests_total with success/failure lets you compute error rate. DurableEmitter only logs failures; no metric-based alerting possible on publish RPC error rate.

  4. No config info metric — batch.Client emits a gauge with all configuration parameters (batch size, buffer size, timeouts, etc.) for runtime introspection. DurableEmitter has no equivalent — you can't verify running config from metrics alone.

  5. Latency only via hooks, not instrumentsOnBatchPublish callback provides latency to test code but doesn't register an OTel histogram. You can't dashboard or alert on publish latency without custom wiring.

Where DurableEmitter is Better

  • Queue depth visibility — critical for a persistence-backed system. You can alert on backlog growth.
  • Emit-path latency — tracks how long the caller blocks on DB insert, useful for detecting DB pressure.
  • Store-layer instrumentation — the metrics-instrumented store wrapper gives per-operation DB latency.

Logging

Concern batch.Client DurableEmitter
Publish failure Errorw("failed to publish batch") Warnw("PublishBatch failed, events will be retransmitted")
Buffer full Silent (returns error to caller) Warnw("batch publish channel full, relying on retransmit")
Retransmit activity N/A ✅ Logs enqueue counts, skipped, pending depth
Shutdown timeout Warnw("timed out waiting for shutdown") ❌ No timeout exists
Config at startup Via gauge metric only Via Infow log lines (coalescing, raw-codec)

Summary

BatchEmitterService has better transport-layer observability (request sizes, latency histograms, per-domain attribution, config gauges). DurableEmitter has better persistence-layer observability (queue depth, DB operation metrics, emit latency). If DurableEmitter composed over BatchEmitterService, you'd get both layers covered without duplication.


Expand Down Expand Up @@ -225,7 +226,7 @@ var _ Emitter = (*DurableEmitter)(nil)
func NewDurableEmitter(
store DurableEventStore,
client chipingress.Client,
isHostProcess bool,
retransmit bool,
cfg DurableEmitterConfig,
log logger.Logger,
) (*DurableEmitter, error) {
Expand All @@ -251,13 +252,13 @@ func NewDurableEmitter(
store = newMetricsInstrumentedStore(store, m)
}
d := &DurableEmitter{
store: store,
client: client,
isHostProcess: isHostProcess,
cfg: cfg,
log: log,
metrics: m,
stopCh: make(chan struct{}),
store: store,
client: client,
retransmit: retransmit,
cfg: cfg,
log: log,
metrics: m,
stopCh: make(chan struct{}),
}
if cp, ok := client.(grpcConnProvider); ok {
d.rawConn = cp.Conn()
Expand Down Expand Up @@ -302,7 +303,7 @@ func (d *DurableEmitter) Start(_ context.Context) {
insertWorkers = 4
}

if d.isHostProcess {
if d.retransmit {
d.wg.Go(d.retransmitLoop)
if !d.cfg.DisablePruning {
d.wg.Go(d.expiryLoop)
Expand Down Expand Up @@ -819,13 +820,13 @@ func (d *DurableEmitter) retransmitPending() {
return
}

d.retransmit(pending)
d.retransmitBatch(pending)
}

// retransmit enqueues pending DB rows to publishCh so the batch workers handle
// publishing. When rawConn is set, payloads are passed through without
// retransmitBatch enqueues pending DB rows to publishCh so the batch workers
// handle publishing. When rawConn is set, payloads are passed through without
// proto.Unmarshal — the batch workers use buildBatchBytes for the wire format.
func (d *DurableEmitter) retransmit(pending []DurableEvent) {
func (d *DurableEmitter) retransmitBatch(pending []DurableEvent) {
var enqueued int

for _, pe := range pending {
Expand Down
12 changes: 6 additions & 6 deletions pkg/beholder/durable_emitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func TestDurableEmitter_HooksPublishFailureSkipsMarkHook(t *testing.T) {
assert.Equal(t, int32(0), markCalls.Load())
}

func TestDurableEmitter_NonHostProcessSkipsRetransmitAndExpiry(t *testing.T) {
func TestDurableEmitter_NoRetransmitSkipsRetransmitAndExpiry(t *testing.T) {
store := NewMemDurableEventStore()
client := &testChipClient{}
client.setPublishErr(errors.New("chip unavailable"))
Expand All @@ -249,14 +249,14 @@ func TestDurableEmitter_NonHostProcessSkipsRetransmitAndExpiry(t *testing.T) {
return client.batchCount.Load() >= 1 && store.Len() == 1
}, 2*time.Second, 5*time.Millisecond, "initial PublishBatch should fail and leave the row")

// Several host-only ticks would have cleared or retried by now.
// Several retransmit-only ticks would have cleared or retried by now.
time.Sleep(250 * time.Millisecond)

assert.Equal(t, 1, store.Len(), "non-host must not run retransmit or expiry loops")
assert.Equal(t, int64(1), client.batchCount.Load(), "non-host must not schedule extra PublishBatch via retransmit")
assert.Equal(t, 1, store.Len(), "retransmit=false must not run retransmit or expiry loops")
assert.Equal(t, int64(1), client.batchCount.Load(), "retransmit=false must not schedule extra PublishBatch via retransmit")
}

func TestDurableEmitter_NonHostProcessStillDeliversViaBatchWorkers(t *testing.T) {
func TestDurableEmitter_NoRetransmitStillDeliversViaBatchWorkers(t *testing.T) {
store := NewMemDurableEventStore()
client := &testChipClient{}

Expand All @@ -272,7 +272,7 @@ func TestDurableEmitter_NonHostProcessStillDeliversViaBatchWorkers(t *testing.T)

require.Eventually(t, func() bool {
return store.Len() == 0 && client.batchCount.Load() >= 1
}, 2*time.Second, 10*time.Millisecond, "batch publish workers must still run when isHostProcess is false")
}, 2*time.Second, 10*time.Millisecond, "batch publish workers must still run when retransmit is false")
}

func TestDurableEmitter_EmitPersistsAndPublishes(t *testing.T) {
Expand Down
Loading
Loading