From 716b147b46ac99052a6e60529eba3489001633de Mon Sep 17 00:00:00 2001 From: Justin Kaseman Date: Tue, 19 May 2026 17:21:41 -0700 Subject: [PATCH 01/13] (fix): bound capability call max concurrency --- pkg/workflows/wasm/host/execution.go | 19 ++++++++++++ .../wasm/host/execution_await_order_test.go | 1 + pkg/workflows/wasm/host/module.go | 29 ++++++++++++------- pkg/workflows/wasm/host/module_test.go | 1 + 4 files changed, 40 insertions(+), 10 deletions(-) diff --git a/pkg/workflows/wasm/host/execution.go b/pkg/workflows/wasm/host/execution.go index ec9fd1bbfd..67f6854d3b 100644 --- a/pkg/workflows/wasm/host/execution.go +++ b/pkg/workflows/wasm/host/execution.go @@ -21,6 +21,7 @@ type execution[T any] struct { ctx context.Context capabilityResponses map[int32]<-chan *sdkpb.CapabilityResponse secretsResponses map[int32]<-chan *secretsResponse + pendingCallsSem chan struct{} lock sync.RWMutex module *module executor ExecutionHelper @@ -38,12 +39,21 @@ type execution[T any] struct { // channel and storing each channel with a unique identifier for future // retrieval on await. func (e *execution[T]) callCapAsync(ctx context.Context, req *sdkpb.CapabilityRequest) error { + // Acquire semaphore slot before spawning goroutine to bound concurrency. + select { + case e.pendingCallsSem <- struct{}{}: + case <-ctx.Done(): + return ctx.Err() + } + ch := make(chan *sdkpb.CapabilityResponse, 1) e.lock.Lock() defer e.lock.Unlock() e.capabilityResponses[req.CallbackId] = ch go func() { + defer func() { <-e.pendingCallsSem }() + resp, err := e.executor.CallCapability(ctx, req) if err != nil { @@ -95,12 +105,21 @@ type secretsResponse struct { } func (e *execution[T]) getSecretsAsync(ctx context.Context, req *sdkpb.GetSecretsRequest) error { + // Acquire semaphore slot before spawning goroutine to bound concurrency. + select { + case e.pendingCallsSem <- struct{}{}: + case <-ctx.Done(): + return ctx.Err() + } + ch := make(chan *secretsResponse, 1) e.lock.Lock() defer e.lock.Unlock() e.secretsResponses[req.CallbackId] = ch go func() { + defer func() { <-e.pendingCallsSem }() + resp, err := e.executor.GetSecrets(ctx, req) sr := &secretsResponse{responses: resp, err: err} diff --git a/pkg/workflows/wasm/host/execution_await_order_test.go b/pkg/workflows/wasm/host/execution_await_order_test.go index 29084302a2..ff4323dcd7 100644 --- a/pkg/workflows/wasm/host/execution_await_order_test.go +++ b/pkg/workflows/wasm/host/execution_await_order_test.go @@ -69,6 +69,7 @@ func TestAwaitCapabilities_headOfLineBlocksOnEarlierID(t *testing.T) { exec := &execution[*sdkpb.ExecutionResult]{ ctx: t.Context(), capabilityResponses: make(map[int32]<-chan *sdkpb.CapabilityResponse), + pendingCallsSem: make(chan struct{}, 100), executor: stub, } diff --git a/pkg/workflows/wasm/host/module.go b/pkg/workflows/wasm/host/module.go index 07b42a00eb..fe180679e2 100644 --- a/pkg/workflows/wasm/host/module.go +++ b/pkg/workflows/wasm/host/module.go @@ -42,6 +42,7 @@ var ( defaultMinMemoryMBs = uint64(128) DefaultInitialFuel = uint64(100_000_000) defaultMaxFetchRequests = 5 + defaultMaxPendingCalls = 100 defaultMaxCompressedBinarySize = 20 * 1024 * 1024 // 20 MB defaultMaxDecompressedBinarySize = 100 * 1024 * 1024 // 100 MB defaultMaxResponseSizeBytes = 5 * 1024 * 1024 // 5 MB @@ -61,16 +62,19 @@ type DeterminismConfig struct { Seed int64 } type ModuleConfig struct { - TickInterval time.Duration - Timeout *time.Duration - MaxMemoryMBs uint64 - MinMemoryMBs uint64 - MemoryLimiter limits.BoundLimiter[config.Size] // supersedes Max/MinMemoryMBs if set - InitialFuel uint64 - Logger logger.Logger - IsUncompressed bool - Fetch func(ctx context.Context, req *FetchRequest) (*FetchResponse, error) - MaxFetchRequests int + TickInterval time.Duration + Timeout *time.Duration + MaxMemoryMBs uint64 + MinMemoryMBs uint64 + MemoryLimiter limits.BoundLimiter[config.Size] // supersedes Max/MinMemoryMBs if set + InitialFuel uint64 + Logger logger.Logger + IsUncompressed bool + Fetch func(ctx context.Context, req *FetchRequest) (*FetchResponse, error) + MaxFetchRequests int + // MaxPendingCalls bounds the number of concurrent in-flight capability call + // goroutines per execution. Additional calls block until a slot is freed. + MaxPendingCalls int MaxCompressedBinarySize uint64 MaxCompressedBinaryLimiter limits.BoundLimiter[config.Size] // supersedes MaxCompressedBinarySize if set MaxDecompressedBinarySize uint64 @@ -192,6 +196,10 @@ func NewModule(ctx context.Context, modCfg *ModuleConfig, binary []byte, opts .. modCfg.MaxFetchRequests = defaultMaxFetchRequests } + if modCfg.MaxPendingCalls == 0 { + modCfg.MaxPendingCalls = defaultMaxPendingCalls + } + if modCfg.Labeler == nil { modCfg.Labeler = &unimplementedMessageEmitter{} } @@ -693,6 +701,7 @@ func runWasm[I, O proto.Message]( ctx: ctxWithTimeout, capabilityResponses: map[int32]<-chan *sdkpb.CapabilityResponse{}, secretsResponses: map[int32]<-chan *secretsResponse{}, + pendingCallsSem: make(chan struct{}, m.cfg.MaxPendingCalls), module: m, executor: helper, donSeed: donSeed, diff --git a/pkg/workflows/wasm/host/module_test.go b/pkg/workflows/wasm/host/module_test.go index ff7307ca79..25f7dacab9 100644 --- a/pkg/workflows/wasm/host/module_test.go +++ b/pkg/workflows/wasm/host/module_test.go @@ -633,6 +633,7 @@ func Test_CallAwaitRace(t *testing.T) { exec := &execution[*wasmpb.ExecutionResult]{ module: m, capabilityResponses: map[int32]<-chan *sdkpb.CapabilityResponse{}, + pendingCallsSem: make(chan struct{}, 100), ctx: t.Context(), executor: mockExecHelper, } From c9695d0a647b71d226b924d4da04960324ba2efb Mon Sep 17 00:00:00 2001 From: Justin Kaseman Date: Tue, 19 May 2026 17:36:09 -0700 Subject: [PATCH 02/13] Match engine CapabilityConcurrencyLimit --- pkg/workflows/wasm/host/module.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/workflows/wasm/host/module.go b/pkg/workflows/wasm/host/module.go index fe180679e2..676cd32915 100644 --- a/pkg/workflows/wasm/host/module.go +++ b/pkg/workflows/wasm/host/module.go @@ -42,7 +42,7 @@ var ( defaultMinMemoryMBs = uint64(128) DefaultInitialFuel = uint64(100_000_000) defaultMaxFetchRequests = 5 - defaultMaxPendingCalls = 100 + defaultMaxPendingCalls = 30 // matches engine CapabilityConcurrencyLimit default defaultMaxCompressedBinarySize = 20 * 1024 * 1024 // 20 MB defaultMaxDecompressedBinarySize = 100 * 1024 * 1024 // 100 MB defaultMaxResponseSizeBytes = 5 * 1024 * 1024 // 5 MB From 8f688adb9d1164d68c1efbd99b4399b92e777830 Mon Sep 17 00:00:00 2001 From: Justin Kaseman Date: Tue, 19 May 2026 17:39:59 -0700 Subject: [PATCH 03/13] Changes from review --- pkg/workflows/wasm/host/execution_await_order_test.go | 2 +- pkg/workflows/wasm/host/module.go | 2 +- pkg/workflows/wasm/host/module_test.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/workflows/wasm/host/execution_await_order_test.go b/pkg/workflows/wasm/host/execution_await_order_test.go index ff4323dcd7..77380baa1f 100644 --- a/pkg/workflows/wasm/host/execution_await_order_test.go +++ b/pkg/workflows/wasm/host/execution_await_order_test.go @@ -69,7 +69,7 @@ func TestAwaitCapabilities_headOfLineBlocksOnEarlierID(t *testing.T) { exec := &execution[*sdkpb.ExecutionResult]{ ctx: t.Context(), capabilityResponses: make(map[int32]<-chan *sdkpb.CapabilityResponse), - pendingCallsSem: make(chan struct{}, 100), + pendingCallsSem: make(chan struct{}, defaultMaxPendingCalls), executor: stub, } diff --git a/pkg/workflows/wasm/host/module.go b/pkg/workflows/wasm/host/module.go index 676cd32915..bf3e09a37b 100644 --- a/pkg/workflows/wasm/host/module.go +++ b/pkg/workflows/wasm/host/module.go @@ -42,7 +42,7 @@ var ( defaultMinMemoryMBs = uint64(128) DefaultInitialFuel = uint64(100_000_000) defaultMaxFetchRequests = 5 - defaultMaxPendingCalls = 30 // matches engine CapabilityConcurrencyLimit default + defaultMaxPendingCalls = 30 // matches engine CapabilityConcurrencyLimit default defaultMaxCompressedBinarySize = 20 * 1024 * 1024 // 20 MB defaultMaxDecompressedBinarySize = 100 * 1024 * 1024 // 100 MB defaultMaxResponseSizeBytes = 5 * 1024 * 1024 // 5 MB diff --git a/pkg/workflows/wasm/host/module_test.go b/pkg/workflows/wasm/host/module_test.go index 25f7dacab9..4c2178c2b3 100644 --- a/pkg/workflows/wasm/host/module_test.go +++ b/pkg/workflows/wasm/host/module_test.go @@ -633,7 +633,7 @@ func Test_CallAwaitRace(t *testing.T) { exec := &execution[*wasmpb.ExecutionResult]{ module: m, capabilityResponses: map[int32]<-chan *sdkpb.CapabilityResponse{}, - pendingCallsSem: make(chan struct{}, 100), + pendingCallsSem: make(chan struct{}, defaultMaxPendingCalls), ctx: t.Context(), executor: mockExecHelper, } From b932fc673c0abb84dd19c3a500065e50931cfdf6 Mon Sep 17 00:00:00 2001 From: Justin Kaseman Date: Tue, 19 May 2026 18:00:09 -0700 Subject: [PATCH 04/13] release in awaitCapabilites instead of callCapAsync --- pkg/workflows/wasm/host/execution.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/pkg/workflows/wasm/host/execution.go b/pkg/workflows/wasm/host/execution.go index 67f6854d3b..20f5abd5e0 100644 --- a/pkg/workflows/wasm/host/execution.go +++ b/pkg/workflows/wasm/host/execution.go @@ -52,8 +52,6 @@ func (e *execution[T]) callCapAsync(ctx context.Context, req *sdkpb.CapabilityRe e.capabilityResponses[req.CallbackId] = ch go func() { - defer func() { <-e.pendingCallsSem }() - resp, err := e.executor.CallCapability(ctx, req) if err != nil { @@ -92,6 +90,7 @@ func (e *execution[T]) awaitCapabilities(ctx context.Context, acr *sdkpb.AwaitCa } delete(e.capabilityResponses, callId) + <-e.pendingCallsSem } return &sdkpb.AwaitCapabilitiesResponse{ @@ -118,8 +117,6 @@ func (e *execution[T]) getSecretsAsync(ctx context.Context, req *sdkpb.GetSecret e.secretsResponses[req.CallbackId] = ch go func() { - defer func() { <-e.pendingCallsSem }() - resp, err := e.executor.GetSecrets(ctx, req) sr := &secretsResponse{responses: resp, err: err} @@ -155,6 +152,7 @@ func (e *execution[T]) awaitSecrets(ctx context.Context, acr *sdkpb.AwaitSecrets } delete(e.secretsResponses, callId) + <-e.pendingCallsSem } return &sdkpb.AwaitSecretsResponse{ From b4f1a72c9ce863773bcf1dc6432dd9d69a99a5f0 Mon Sep 17 00:00:00 2001 From: Justin Kaseman Date: Wed, 20 May 2026 09:14:01 -0700 Subject: [PATCH 05/13] Use ResourcePoolLimiter for limit control & observability --- pkg/workflows/wasm/host/execution.go | 34 ++++++++++++------- .../wasm/host/execution_await_order_test.go | 4 ++- pkg/workflows/wasm/host/module.go | 8 ++++- pkg/workflows/wasm/host/module_test.go | 4 ++- 4 files changed, 34 insertions(+), 16 deletions(-) diff --git a/pkg/workflows/wasm/host/execution.go b/pkg/workflows/wasm/host/execution.go index 20f5abd5e0..a03ac1fa45 100644 --- a/pkg/workflows/wasm/host/execution.go +++ b/pkg/workflows/wasm/host/execution.go @@ -11,6 +11,7 @@ import ( "google.golang.org/protobuf/proto" "github.com/smartcontractkit/chainlink-common/pkg/config" + "github.com/smartcontractkit/chainlink-common/pkg/settings/limits" sdkpb "github.com/smartcontractkit/chainlink-protos/cre/go/sdk" wfpb "github.com/smartcontractkit/chainlink-protos/workflows/go/v2" ) @@ -21,7 +22,8 @@ type execution[T any] struct { ctx context.Context capabilityResponses map[int32]<-chan *sdkpb.CapabilityResponse secretsResponses map[int32]<-chan *secretsResponse - pendingCallsSem chan struct{} + pendingCallsLimiter limits.ResourcePoolLimiter[int] + pendingCallsFree map[int32]func() lock sync.RWMutex module *module executor ExecutionHelper @@ -39,17 +41,17 @@ type execution[T any] struct { // channel and storing each channel with a unique identifier for future // retrieval on await. func (e *execution[T]) callCapAsync(ctx context.Context, req *sdkpb.CapabilityRequest) error { - // Acquire semaphore slot before spawning goroutine to bound concurrency. - select { - case e.pendingCallsSem <- struct{}{}: - case <-ctx.Done(): - return ctx.Err() + // Acquire a slot from the pool limiter to bound concurrency. + free, err := e.pendingCallsLimiter.Wait(ctx, 1) + if err != nil { + return err } ch := make(chan *sdkpb.CapabilityResponse, 1) e.lock.Lock() defer e.lock.Unlock() e.capabilityResponses[req.CallbackId] = ch + e.pendingCallsFree[req.CallbackId] = free go func() { resp, err := e.executor.CallCapability(ctx, req) @@ -90,7 +92,10 @@ func (e *execution[T]) awaitCapabilities(ctx context.Context, acr *sdkpb.AwaitCa } delete(e.capabilityResponses, callId) - <-e.pendingCallsSem + if free, ok := e.pendingCallsFree[callId]; ok { + free() + delete(e.pendingCallsFree, callId) + } } return &sdkpb.AwaitCapabilitiesResponse{ @@ -104,17 +109,17 @@ type secretsResponse struct { } func (e *execution[T]) getSecretsAsync(ctx context.Context, req *sdkpb.GetSecretsRequest) error { - // Acquire semaphore slot before spawning goroutine to bound concurrency. - select { - case e.pendingCallsSem <- struct{}{}: - case <-ctx.Done(): - return ctx.Err() + // Acquire a slot from the pool limiter to bound concurrency. + free, err := e.pendingCallsLimiter.Wait(ctx, 1) + if err != nil { + return err } ch := make(chan *secretsResponse, 1) e.lock.Lock() defer e.lock.Unlock() e.secretsResponses[req.CallbackId] = ch + e.pendingCallsFree[req.CallbackId] = free go func() { resp, err := e.executor.GetSecrets(ctx, req) @@ -152,7 +157,10 @@ func (e *execution[T]) awaitSecrets(ctx context.Context, acr *sdkpb.AwaitSecrets } delete(e.secretsResponses, callId) - <-e.pendingCallsSem + if free, ok := e.pendingCallsFree[callId]; ok { + free() + delete(e.pendingCallsFree, callId) + } } return &sdkpb.AwaitSecretsResponse{ diff --git a/pkg/workflows/wasm/host/execution_await_order_test.go b/pkg/workflows/wasm/host/execution_await_order_test.go index 77380baa1f..faaa741d67 100644 --- a/pkg/workflows/wasm/host/execution_await_order_test.go +++ b/pkg/workflows/wasm/host/execution_await_order_test.go @@ -10,6 +10,7 @@ import ( "google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/emptypb" + "github.com/smartcontractkit/chainlink-common/pkg/settings/limits" sdkpb "github.com/smartcontractkit/chainlink-protos/cre/go/sdk" wfpb "github.com/smartcontractkit/chainlink-protos/workflows/go/v2" ) @@ -69,7 +70,8 @@ func TestAwaitCapabilities_headOfLineBlocksOnEarlierID(t *testing.T) { exec := &execution[*sdkpb.ExecutionResult]{ ctx: t.Context(), capabilityResponses: make(map[int32]<-chan *sdkpb.CapabilityResponse), - pendingCallsSem: make(chan struct{}, defaultMaxPendingCalls), + pendingCallsLimiter: limits.GlobalResourcePoolLimiter(defaultMaxPendingCalls), + pendingCallsFree: map[int32]func(){}, executor: stub, } diff --git a/pkg/workflows/wasm/host/module.go b/pkg/workflows/wasm/host/module.go index bf3e09a37b..e40d1ed98d 100644 --- a/pkg/workflows/wasm/host/module.go +++ b/pkg/workflows/wasm/host/module.go @@ -75,6 +75,7 @@ type ModuleConfig struct { // MaxPendingCalls bounds the number of concurrent in-flight capability call // goroutines per execution. Additional calls block until a slot is freed. MaxPendingCalls int + PendingCallsLimiter limits.ResourcePoolLimiter[int] // supersedes MaxPendingCalls if set MaxCompressedBinarySize uint64 MaxCompressedBinaryLimiter limits.BoundLimiter[config.Size] // supersedes MaxCompressedBinarySize if set MaxDecompressedBinarySize uint64 @@ -200,6 +201,10 @@ func NewModule(ctx context.Context, modCfg *ModuleConfig, binary []byte, opts .. modCfg.MaxPendingCalls = defaultMaxPendingCalls } + if modCfg.PendingCallsLimiter == nil { + modCfg.PendingCallsLimiter = limits.GlobalResourcePoolLimiter(modCfg.MaxPendingCalls) + } + if modCfg.Labeler == nil { modCfg.Labeler = &unimplementedMessageEmitter{} } @@ -701,7 +706,8 @@ func runWasm[I, O proto.Message]( ctx: ctxWithTimeout, capabilityResponses: map[int32]<-chan *sdkpb.CapabilityResponse{}, secretsResponses: map[int32]<-chan *secretsResponse{}, - pendingCallsSem: make(chan struct{}, m.cfg.MaxPendingCalls), + pendingCallsLimiter: m.cfg.PendingCallsLimiter, + pendingCallsFree: map[int32]func(){}, module: m, executor: helper, donSeed: donSeed, diff --git a/pkg/workflows/wasm/host/module_test.go b/pkg/workflows/wasm/host/module_test.go index 4c2178c2b3..92dad8259f 100644 --- a/pkg/workflows/wasm/host/module_test.go +++ b/pkg/workflows/wasm/host/module_test.go @@ -15,6 +15,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/custmsg" "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/settings/limits" "github.com/smartcontractkit/chainlink-common/pkg/utils/matches" wasmpb "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/pb" sdkpb "github.com/smartcontractkit/chainlink-protos/cre/go/sdk" @@ -633,7 +634,8 @@ func Test_CallAwaitRace(t *testing.T) { exec := &execution[*wasmpb.ExecutionResult]{ module: m, capabilityResponses: map[int32]<-chan *sdkpb.CapabilityResponse{}, - pendingCallsSem: make(chan struct{}, defaultMaxPendingCalls), + pendingCallsLimiter: limits.GlobalResourcePoolLimiter[int](defaultMaxPendingCalls), + pendingCallsFree: map[int32]func(){}, ctx: t.Context(), executor: mockExecHelper, } From 868f60931712bacda1020a74d2b7203f985a6f7b Mon Sep 17 00:00:00 2001 From: Justin Kaseman Date: Wed, 20 May 2026 09:18:31 -0700 Subject: [PATCH 06/13] Add negative number guard to MaxPendingCalls --- pkg/workflows/wasm/host/module.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/workflows/wasm/host/module.go b/pkg/workflows/wasm/host/module.go index e40d1ed98d..92329db1a0 100644 --- a/pkg/workflows/wasm/host/module.go +++ b/pkg/workflows/wasm/host/module.go @@ -201,6 +201,10 @@ func NewModule(ctx context.Context, modCfg *ModuleConfig, binary []byte, opts .. modCfg.MaxPendingCalls = defaultMaxPendingCalls } + if modCfg.MaxPendingCalls < 0 { + return nil, fmt.Errorf("MaxPendingCalls must be positive, got %d", modCfg.MaxPendingCalls) + } + if modCfg.PendingCallsLimiter == nil { modCfg.PendingCallsLimiter = limits.GlobalResourcePoolLimiter(modCfg.MaxPendingCalls) } From b4cb3d41d3fa85e00f2d51db81235349853e13b4 Mon Sep 17 00:00:00 2001 From: Justin Kaseman Date: Wed, 20 May 2026 09:46:37 -0700 Subject: [PATCH 07/13] defer free() --- pkg/workflows/wasm/host/execution.go | 15 ++++----------- .../wasm/host/execution_await_order_test.go | 1 - pkg/workflows/wasm/host/module.go | 7 +++---- pkg/workflows/wasm/host/module_test.go | 1 - 4 files changed, 7 insertions(+), 17 deletions(-) diff --git a/pkg/workflows/wasm/host/execution.go b/pkg/workflows/wasm/host/execution.go index a03ac1fa45..a12a6ceb33 100644 --- a/pkg/workflows/wasm/host/execution.go +++ b/pkg/workflows/wasm/host/execution.go @@ -23,7 +23,6 @@ type execution[T any] struct { capabilityResponses map[int32]<-chan *sdkpb.CapabilityResponse secretsResponses map[int32]<-chan *secretsResponse pendingCallsLimiter limits.ResourcePoolLimiter[int] - pendingCallsFree map[int32]func() lock sync.RWMutex module *module executor ExecutionHelper @@ -51,9 +50,10 @@ func (e *execution[T]) callCapAsync(ctx context.Context, req *sdkpb.CapabilityRe e.lock.Lock() defer e.lock.Unlock() e.capabilityResponses[req.CallbackId] = ch - e.pendingCallsFree[req.CallbackId] = free go func() { + defer free() + resp, err := e.executor.CallCapability(ctx, req) if err != nil { @@ -92,10 +92,6 @@ func (e *execution[T]) awaitCapabilities(ctx context.Context, acr *sdkpb.AwaitCa } delete(e.capabilityResponses, callId) - if free, ok := e.pendingCallsFree[callId]; ok { - free() - delete(e.pendingCallsFree, callId) - } } return &sdkpb.AwaitCapabilitiesResponse{ @@ -119,9 +115,10 @@ func (e *execution[T]) getSecretsAsync(ctx context.Context, req *sdkpb.GetSecret e.lock.Lock() defer e.lock.Unlock() e.secretsResponses[req.CallbackId] = ch - e.pendingCallsFree[req.CallbackId] = free go func() { + defer free() + resp, err := e.executor.GetSecrets(ctx, req) sr := &secretsResponse{responses: resp, err: err} @@ -157,10 +154,6 @@ func (e *execution[T]) awaitSecrets(ctx context.Context, acr *sdkpb.AwaitSecrets } delete(e.secretsResponses, callId) - if free, ok := e.pendingCallsFree[callId]; ok { - free() - delete(e.pendingCallsFree, callId) - } } return &sdkpb.AwaitSecretsResponse{ diff --git a/pkg/workflows/wasm/host/execution_await_order_test.go b/pkg/workflows/wasm/host/execution_await_order_test.go index faaa741d67..14de0d0b65 100644 --- a/pkg/workflows/wasm/host/execution_await_order_test.go +++ b/pkg/workflows/wasm/host/execution_await_order_test.go @@ -71,7 +71,6 @@ func TestAwaitCapabilities_headOfLineBlocksOnEarlierID(t *testing.T) { ctx: t.Context(), capabilityResponses: make(map[int32]<-chan *sdkpb.CapabilityResponse), pendingCallsLimiter: limits.GlobalResourcePoolLimiter(defaultMaxPendingCalls), - pendingCallsFree: map[int32]func(){}, executor: stub, } diff --git a/pkg/workflows/wasm/host/module.go b/pkg/workflows/wasm/host/module.go index 92329db1a0..89fa84227a 100644 --- a/pkg/workflows/wasm/host/module.go +++ b/pkg/workflows/wasm/host/module.go @@ -72,9 +72,9 @@ type ModuleConfig struct { IsUncompressed bool Fetch func(ctx context.Context, req *FetchRequest) (*FetchResponse, error) MaxFetchRequests int - // MaxPendingCalls bounds the number of concurrent in-flight capability call - // goroutines per execution. Additional calls block until a slot is freed. - MaxPendingCalls int + // MaxPendingCalls bounds concurrent in-flight capability calls per workflow. + MaxPendingCalls int + // When PendingCallsLimiter is set, it enforces a separate pending calls pool per workflow ID. PendingCallsLimiter limits.ResourcePoolLimiter[int] // supersedes MaxPendingCalls if set MaxCompressedBinarySize uint64 MaxCompressedBinaryLimiter limits.BoundLimiter[config.Size] // supersedes MaxCompressedBinarySize if set @@ -711,7 +711,6 @@ func runWasm[I, O proto.Message]( capabilityResponses: map[int32]<-chan *sdkpb.CapabilityResponse{}, secretsResponses: map[int32]<-chan *secretsResponse{}, pendingCallsLimiter: m.cfg.PendingCallsLimiter, - pendingCallsFree: map[int32]func(){}, module: m, executor: helper, donSeed: donSeed, diff --git a/pkg/workflows/wasm/host/module_test.go b/pkg/workflows/wasm/host/module_test.go index 92dad8259f..9fa89efcbc 100644 --- a/pkg/workflows/wasm/host/module_test.go +++ b/pkg/workflows/wasm/host/module_test.go @@ -635,7 +635,6 @@ func Test_CallAwaitRace(t *testing.T) { module: m, capabilityResponses: map[int32]<-chan *sdkpb.CapabilityResponse{}, pendingCallsLimiter: limits.GlobalResourcePoolLimiter[int](defaultMaxPendingCalls), - pendingCallsFree: map[int32]func(){}, ctx: t.Context(), executor: mockExecHelper, } From 57c4609e1d3b99b2f86eb041acc6471e91d4ee59 Mon Sep 17 00:00:00 2001 From: Justin Kaseman Date: Wed, 20 May 2026 10:00:08 -0700 Subject: [PATCH 08/13] Clean up comments & gofmt --- pkg/workflows/wasm/host/module.go | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/pkg/workflows/wasm/host/module.go b/pkg/workflows/wasm/host/module.go index 89fa84227a..e11e3358a9 100644 --- a/pkg/workflows/wasm/host/module.go +++ b/pkg/workflows/wasm/host/module.go @@ -72,10 +72,13 @@ type ModuleConfig struct { IsUncompressed bool Fetch func(ctx context.Context, req *FetchRequest) (*FetchResponse, error) MaxFetchRequests int - // MaxPendingCalls bounds concurrent in-flight capability calls per workflow. + // MaxPendingCalls is the fallback limit used to construct a default + // GlobalResourcePoolLimiter when PendingCallsLimiter is nil. MaxPendingCalls int - // When PendingCallsLimiter is set, it enforces a separate pending calls pool per workflow ID. - PendingCallsLimiter limits.ResourcePoolLimiter[int] // supersedes MaxPendingCalls if set + // PendingCallsLimiter bounds concurrent in-flight capability and secrets + // calls. When scoped (e.g. ScopeWorkflow), each workflow ID gets its own + // pool; when global/unscoped, the limit is shared across all callers. + PendingCallsLimiter limits.ResourcePoolLimiter[int] MaxCompressedBinarySize uint64 MaxCompressedBinaryLimiter limits.BoundLimiter[config.Size] // supersedes MaxCompressedBinarySize if set MaxDecompressedBinarySize uint64 @@ -197,15 +200,13 @@ func NewModule(ctx context.Context, modCfg *ModuleConfig, binary []byte, opts .. modCfg.MaxFetchRequests = defaultMaxFetchRequests } - if modCfg.MaxPendingCalls == 0 { - modCfg.MaxPendingCalls = defaultMaxPendingCalls - } - - if modCfg.MaxPendingCalls < 0 { - return nil, fmt.Errorf("MaxPendingCalls must be positive, got %d", modCfg.MaxPendingCalls) - } - if modCfg.PendingCallsLimiter == nil { + if modCfg.MaxPendingCalls == 0 { + modCfg.MaxPendingCalls = defaultMaxPendingCalls + } + if modCfg.MaxPendingCalls < 0 { + return nil, fmt.Errorf("MaxPendingCalls must be positive, got %d", modCfg.MaxPendingCalls) + } modCfg.PendingCallsLimiter = limits.GlobalResourcePoolLimiter(modCfg.MaxPendingCalls) } From 9b3c80c81ec08923d772aacc5cfd499873c1d1e0 Mon Sep 17 00:00:00 2001 From: Justin Kaseman Date: Fri, 22 May 2026 10:55:43 -0700 Subject: [PATCH 09/13] Use cresettings --- .../wasm/host/execution_await_order_test.go | 3 +- .../wasm/host/execution_semaphore_test.go | 286 ++++++++++++++++++ pkg/workflows/wasm/host/module.go | 21 +- pkg/workflows/wasm/host/module_test.go | 3 +- 4 files changed, 304 insertions(+), 9 deletions(-) create mode 100644 pkg/workflows/wasm/host/execution_semaphore_test.go diff --git a/pkg/workflows/wasm/host/execution_await_order_test.go b/pkg/workflows/wasm/host/execution_await_order_test.go index 14de0d0b65..f250dd69fc 100644 --- a/pkg/workflows/wasm/host/execution_await_order_test.go +++ b/pkg/workflows/wasm/host/execution_await_order_test.go @@ -10,6 +10,7 @@ import ( "google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/emptypb" + "github.com/smartcontractkit/chainlink-common/pkg/settings/cresettings" "github.com/smartcontractkit/chainlink-common/pkg/settings/limits" sdkpb "github.com/smartcontractkit/chainlink-protos/cre/go/sdk" wfpb "github.com/smartcontractkit/chainlink-protos/workflows/go/v2" @@ -70,7 +71,7 @@ func TestAwaitCapabilities_headOfLineBlocksOnEarlierID(t *testing.T) { exec := &execution[*sdkpb.ExecutionResult]{ ctx: t.Context(), capabilityResponses: make(map[int32]<-chan *sdkpb.CapabilityResponse), - pendingCallsLimiter: limits.GlobalResourcePoolLimiter(defaultMaxPendingCalls), + pendingCallsLimiter: limits.GlobalResourcePoolLimiter(cresettings.Default.PerWorkflow.CapabilityConcurrencyLimit.DefaultValue), executor: stub, } diff --git a/pkg/workflows/wasm/host/execution_semaphore_test.go b/pkg/workflows/wasm/host/execution_semaphore_test.go new file mode 100644 index 0000000000..00e1ab7a85 --- /dev/null +++ b/pkg/workflows/wasm/host/execution_semaphore_test.go @@ -0,0 +1,286 @@ +package host + +import ( + "context" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/emptypb" + + "github.com/smartcontractkit/chainlink-common/pkg/settings/limits" + sdkpb "github.com/smartcontractkit/chainlink-protos/cre/go/sdk" + wfpb "github.com/smartcontractkit/chainlink-protos/workflows/go/v2" +) + +// slowCapStub delays CallCapability by a configurable duration and counts in-flight calls. +type slowCapStub struct { + delay time.Duration + inflight atomic.Int32 + peakLoad atomic.Int32 + callCount atomic.Int32 +} + +func (s *slowCapStub) CallCapability(_ context.Context, _ *sdkpb.CapabilityRequest) (*sdkpb.CapabilityResponse, error) { + s.callCount.Add(1) + cur := s.inflight.Add(1) + for { + peak := s.peakLoad.Load() + if cur <= peak || s.peakLoad.CompareAndSwap(peak, cur) { + break + } + } + time.Sleep(s.delay) + s.inflight.Add(-1) + + payload, _ := anypb.New(&emptypb.Empty{}) + return &sdkpb.CapabilityResponse{ + Response: &sdkpb.CapabilityResponse_Payload{Payload: payload}, + }, nil +} + +func (s *slowCapStub) GetSecrets(context.Context, *sdkpb.GetSecretsRequest) ([]*sdkpb.SecretResponse, error) { + return nil, nil +} +func (s *slowCapStub) GetWorkflowExecutionID() string { return "test-exec" } +func (s *slowCapStub) GetNodeTime() time.Time { return time.Now() } +func (s *slowCapStub) GetDONTime() (time.Time, error) { return time.Now(), nil } +func (s *slowCapStub) EmitUserLog(string) error { return nil } +func (s *slowCapStub) EmitUserMetric(context.Context, *wfpb.WorkflowUserMetric) error { + return nil +} + +var _ ExecutionHelper = (*slowCapStub)(nil) + +func newTestExec(maxPending int, stub ExecutionHelper) *execution[*sdkpb.ExecutionResult] { + return &execution[*sdkpb.ExecutionResult]{ + ctx: context.Background(), + capabilityResponses: make(map[int32]<-chan *sdkpb.CapabilityResponse), + secretsResponses: make(map[int32]<-chan *secretsResponse), + pendingCallsLimiter: limits.GlobalResourcePoolLimiter[int](maxPending), + executor: stub, + } +} + +// TestSemaphore_BackpressureBlocksCallN proves that call N+1 blocks when +// N == MaxPendingCalls and nothing has been awaited yet. +func TestSemaphore_BackpressureBlocksCallN(t *testing.T) { + t.Parallel() + const max = 5 + + // Use a delay longer than the check window so goroutines hold their slots. + stub := &slowCapStub{delay: 5 * time.Second} + exec := newTestExec(max, stub) + + ctx := t.Context() + + // Fill semaphore. + for i := int32(0); i < max; i++ { + require.NoError(t, exec.callCapAsync(ctx, &sdkpb.CapabilityRequest{CallbackId: i})) + } + + // Next call should block. + blocked := make(chan struct{}) + go func() { + _ = exec.callCapAsync(ctx, &sdkpb.CapabilityRequest{CallbackId: max}) + close(blocked) + }() + + select { + case <-blocked: + t.Fatal("call max+1 did not block; semaphore backpressure broken") + case <-time.After(200 * time.Millisecond): + // expected — still blocked + } + + // Await the first call to free a slot. + resp, err := exec.awaitCapabilities(ctx, &sdkpb.AwaitCapabilitiesRequest{Ids: []int32{0}}) + require.NoError(t, err) + require.Len(t, resp.Responses, 1) + + // Now the blocked call should proceed. + select { + case <-blocked: + // success + case <-time.After(2 * time.Second): + t.Fatal("call max+1 did not unblock after await freed a slot") + } +} + +// TestSemaphore_HighThroughputBounded issues many calls in batches, +// awaiting each batch before the next. Peak in-flight goroutines must never +// exceed MaxPendingCalls. +func TestSemaphore_HighThroughputBounded(t *testing.T) { + t.Parallel() + const max = 10 + const batches = 50 + const callsPerBatch = max + + stub := &slowCapStub{delay: 1 * time.Millisecond} + exec := newTestExec(max, stub) + + ctx := t.Context() + var callId int32 + + for b := 0; b < batches; b++ { + ids := make([]int32, callsPerBatch) + for i := 0; i < callsPerBatch; i++ { + ids[i] = callId + require.NoError(t, exec.callCapAsync(ctx, &sdkpb.CapabilityRequest{CallbackId: callId})) + callId++ + } + resp, err := exec.awaitCapabilities(ctx, &sdkpb.AwaitCapabilitiesRequest{Ids: ids}) + require.NoError(t, err) + require.Len(t, resp.Responses, callsPerBatch) + } + + assert.LessOrEqual(t, int(stub.peakLoad.Load()), max, + "peak in-flight goroutines exceeded MaxPendingCalls") + assert.Equal(t, int32(batches*callsPerBatch), stub.callCount.Load()) +} + +// TestSemaphore_ContextCancelUnblocksCall proves that a blocked callCapAsync +// returns ctx.Err() when the context is cancelled. +func TestSemaphore_ContextCancelUnblocksCall(t *testing.T) { + t.Parallel() + const max = 2 + + stub := &slowCapStub{delay: 5 * time.Second} // very slow, won't finish + exec := newTestExec(max, stub) + + ctx, cancel := context.WithCancel(t.Context()) + + // Fill semaphore. + for i := int32(0); i < max; i++ { + require.NoError(t, exec.callCapAsync(ctx, &sdkpb.CapabilityRequest{CallbackId: i})) + } + + // Next call will block on semaphore. + var callErr error + done := make(chan struct{}) + go func() { + callErr = exec.callCapAsync(ctx, &sdkpb.CapabilityRequest{CallbackId: max}) + close(done) + }() + + // Cancel context. + cancel() + + select { + case <-done: + require.ErrorIs(t, callErr, context.Canceled) + case <-time.After(2 * time.Second): + t.Fatal("callCapAsync did not unblock after context cancel") + } +} + +// TestSemaphore_SlotsRecycledCorrectly ensures that after many await cycles, +// the semaphore is back to its full capacity and new calls can proceed. +func TestSemaphore_SlotsRecycledCorrectly(t *testing.T) { + t.Parallel() + const max = 5 + const rounds = 100 + + stub := &slowCapStub{delay: 0} + exec := newTestExec(max, stub) + + ctx := t.Context() + + for r := 0; r < rounds; r++ { + ids := make([]int32, max) + for i := int32(0); i < max; i++ { + id := int32(r*max) + i + ids[i] = id + require.NoError(t, exec.callCapAsync(ctx, &sdkpb.CapabilityRequest{CallbackId: id})) + } + _, err := exec.awaitCapabilities(ctx, &sdkpb.AwaitCapabilitiesRequest{Ids: ids}) + require.NoError(t, err) + } + + // After all rounds, all slots should be available again. + avail, err := exec.pendingCallsLimiter.Available(ctx) + require.NoError(t, err) + assert.Equal(t, max, avail, + "limiter still has occupied slots after all awaits completed") +} + +// TestSemaphore_MapCleanedOnAwait verifies the capabilityResponses map +// doesn't leak entries. +func TestSemaphore_MapCleanedOnAwait(t *testing.T) { + t.Parallel() + const max = 10 + const total = 200 + + stub := &slowCapStub{delay: 0} + exec := newTestExec(max, stub) + + ctx := t.Context() + + for i := int32(0); i < total; i += max { + ids := make([]int32, max) + for j := int32(0); j < max; j++ { + id := i + j + ids[j] = id + require.NoError(t, exec.callCapAsync(ctx, &sdkpb.CapabilityRequest{CallbackId: id})) + } + _, err := exec.awaitCapabilities(ctx, &sdkpb.AwaitCapabilitiesRequest{Ids: ids}) + require.NoError(t, err) + } + + exec.lock.RLock() + mapLen := len(exec.capabilityResponses) + exec.lock.RUnlock() + + assert.Equal(t, 0, mapLen, "capabilityResponses map leaked %d entries", mapLen) +} + +// TestSemaphore_ConcurrentCallAndAwait exercises concurrent callers issuing +// callCapAsync from multiple goroutines while others await, simulating the +// real engine dispatching multiple workflow executions. +func TestSemaphore_ConcurrentCallAndAwait(t *testing.T) { + t.Parallel() + const max = 10 + const workers = 20 + const callsPerWorker = 50 + + stub := &slowCapStub{delay: 10 * time.Microsecond} + // Each worker gets its own execution (like real CRE — one per WASM invocation). + // We want to prove that WITHIN a single execution, concurrent isn't needed because + // WASM is single-threaded. But let's stress the shared semaphore anyway. + exec := newTestExec(max, stub) + + ctx := t.Context() + var wg sync.WaitGroup + + // Simulate sequential call-then-await pattern from a single WASM thread + // (the real case). We run it in parallel workers to stress-test the lock. + for w := 0; w < workers; w++ { + wg.Add(1) + go func(workerID int) { + defer wg.Done() + for i := 0; i < callsPerWorker; i++ { + id := int32(workerID*callsPerWorker + i) + err := exec.callCapAsync(ctx, &sdkpb.CapabilityRequest{CallbackId: id}) + if err != nil { + return + } + _, err = exec.awaitCapabilities(ctx, &sdkpb.AwaitCapabilitiesRequest{Ids: []int32{id}}) + if err != nil { + return + } + } + }(w) + } + + wg.Wait() + + assert.LessOrEqual(t, int(stub.peakLoad.Load()), max) + assert.Equal(t, int32(workers*callsPerWorker), stub.callCount.Load()) + avail, err := exec.pendingCallsLimiter.Available(context.Background()) + require.NoError(t, err) + assert.Equal(t, max, avail) +} diff --git a/pkg/workflows/wasm/host/module.go b/pkg/workflows/wasm/host/module.go index e11e3358a9..d676727889 100644 --- a/pkg/workflows/wasm/host/module.go +++ b/pkg/workflows/wasm/host/module.go @@ -25,6 +25,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/custmsg" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/settings" + "github.com/smartcontractkit/chainlink-common/pkg/settings/cresettings" "github.com/smartcontractkit/chainlink-common/pkg/settings/limits" dagsdk "github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk" "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm" @@ -42,7 +43,6 @@ var ( defaultMinMemoryMBs = uint64(128) DefaultInitialFuel = uint64(100_000_000) defaultMaxFetchRequests = 5 - defaultMaxPendingCalls = 30 // matches engine CapabilityConcurrencyLimit default defaultMaxCompressedBinarySize = 20 * 1024 * 1024 // 20 MB defaultMaxDecompressedBinarySize = 100 * 1024 * 1024 // 100 MB defaultMaxResponseSizeBytes = 5 * 1024 * 1024 // 5 MB @@ -200,14 +200,21 @@ func NewModule(ctx context.Context, modCfg *ModuleConfig, binary []byte, opts .. modCfg.MaxFetchRequests = defaultMaxFetchRequests } + if modCfg.MaxPendingCalls < 0 { + return nil, fmt.Errorf("MaxPendingCalls must be positive, got %d", modCfg.MaxPendingCalls) + } + + if modCfg.PendingCallsLimiter == nil && modCfg.MaxPendingCalls > 0 { + modCfg.PendingCallsLimiter = limits.GlobalResourcePoolLimiter(modCfg.MaxPendingCalls) + } + if modCfg.PendingCallsLimiter == nil { - if modCfg.MaxPendingCalls == 0 { - modCfg.MaxPendingCalls = defaultMaxPendingCalls - } - if modCfg.MaxPendingCalls < 0 { - return nil, fmt.Errorf("MaxPendingCalls must be positive, got %d", modCfg.MaxPendingCalls) + lf := limits.Factory{Logger: modCfg.Logger} + var err error + modCfg.PendingCallsLimiter, err = limits.MakeResourcePoolLimiter(lf, cresettings.Default.PerWorkflow.CapabilityConcurrencyLimit) + if err != nil { + return nil, fmt.Errorf("failed to make pending calls limiter: %w", err) } - modCfg.PendingCallsLimiter = limits.GlobalResourcePoolLimiter(modCfg.MaxPendingCalls) } if modCfg.Labeler == nil { diff --git a/pkg/workflows/wasm/host/module_test.go b/pkg/workflows/wasm/host/module_test.go index 9fa89efcbc..7aa4343621 100644 --- a/pkg/workflows/wasm/host/module_test.go +++ b/pkg/workflows/wasm/host/module_test.go @@ -15,6 +15,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/custmsg" "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/settings/cresettings" "github.com/smartcontractkit/chainlink-common/pkg/settings/limits" "github.com/smartcontractkit/chainlink-common/pkg/utils/matches" wasmpb "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/pb" @@ -634,7 +635,7 @@ func Test_CallAwaitRace(t *testing.T) { exec := &execution[*wasmpb.ExecutionResult]{ module: m, capabilityResponses: map[int32]<-chan *sdkpb.CapabilityResponse{}, - pendingCallsLimiter: limits.GlobalResourcePoolLimiter[int](defaultMaxPendingCalls), + pendingCallsLimiter: limits.GlobalResourcePoolLimiter(cresettings.Default.PerWorkflow.CapabilityConcurrencyLimit.DefaultValue), ctx: t.Context(), executor: mockExecHelper, } From d08588b166b0c13967c9b0db93c486e98eea44e1 Mon Sep 17 00:00:00 2001 From: Justin Kaseman Date: Fri, 22 May 2026 10:59:15 -0700 Subject: [PATCH 10/13] Cleanup --- pkg/workflows/wasm/host/module.go | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/pkg/workflows/wasm/host/module.go b/pkg/workflows/wasm/host/module.go index d676727889..cce932515a 100644 --- a/pkg/workflows/wasm/host/module.go +++ b/pkg/workflows/wasm/host/module.go @@ -72,9 +72,6 @@ type ModuleConfig struct { IsUncompressed bool Fetch func(ctx context.Context, req *FetchRequest) (*FetchResponse, error) MaxFetchRequests int - // MaxPendingCalls is the fallback limit used to construct a default - // GlobalResourcePoolLimiter when PendingCallsLimiter is nil. - MaxPendingCalls int // PendingCallsLimiter bounds concurrent in-flight capability and secrets // calls. When scoped (e.g. ScopeWorkflow), each workflow ID gets its own // pool; when global/unscoped, the limit is shared across all callers. @@ -200,14 +197,6 @@ func NewModule(ctx context.Context, modCfg *ModuleConfig, binary []byte, opts .. modCfg.MaxFetchRequests = defaultMaxFetchRequests } - if modCfg.MaxPendingCalls < 0 { - return nil, fmt.Errorf("MaxPendingCalls must be positive, got %d", modCfg.MaxPendingCalls) - } - - if modCfg.PendingCallsLimiter == nil && modCfg.MaxPendingCalls > 0 { - modCfg.PendingCallsLimiter = limits.GlobalResourcePoolLimiter(modCfg.MaxPendingCalls) - } - if modCfg.PendingCallsLimiter == nil { lf := limits.Factory{Logger: modCfg.Logger} var err error From 6d318f5bd233404cc9998b0d89379c062edb09f7 Mon Sep 17 00:00:00 2001 From: Justin Kaseman Date: Fri, 22 May 2026 11:19:57 -0700 Subject: [PATCH 11/13] Give wasm tests an actual PnedingCallsLimiter in ModuleConfig --- pkg/workflows/wasm/host/wasm_nodag_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/workflows/wasm/host/wasm_nodag_test.go b/pkg/workflows/wasm/host/wasm_nodag_test.go index 917692b523..d7461b0e7a 100644 --- a/pkg/workflows/wasm/host/wasm_nodag_test.go +++ b/pkg/workflows/wasm/host/wasm_nodag_test.go @@ -11,6 +11,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/capabilities/v2/protoc/pkg/test_capabilities/basictrigger" "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/settings/cresettings" "github.com/smartcontractkit/chainlink-common/pkg/settings/limits" "github.com/smartcontractkit/chainlink-protos/cre/go/sdk" wfpb "github.com/smartcontractkit/chainlink-protos/workflows/go/v2" @@ -313,8 +314,9 @@ func Test_NoDAG_EmitMetricDisabled(t *testing.T) { func defaultNoDAGModCfg(t testing.TB) *ModuleConfig { return &ModuleConfig{ - Logger: logger.Test(t), - IsUncompressed: true, + Logger: logger.Test(t), + IsUncompressed: true, + PendingCallsLimiter: limits.GlobalResourcePoolLimiter(cresettings.Default.PerWorkflow.CapabilityConcurrencyLimit.DefaultValue), } } From fbee7a66ad5710b71b2ce527e8980308bedd13ef Mon Sep 17 00:00:00 2001 From: Justin Kaseman Date: Fri, 22 May 2026 12:24:17 -0700 Subject: [PATCH 12/13] fix test race --- pkg/workflows/wasm/host/execution_semaphore_test.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/pkg/workflows/wasm/host/execution_semaphore_test.go b/pkg/workflows/wasm/host/execution_semaphore_test.go index 00e1ab7a85..50300149d6 100644 --- a/pkg/workflows/wasm/host/execution_semaphore_test.go +++ b/pkg/workflows/wasm/host/execution_semaphore_test.go @@ -202,9 +202,12 @@ func TestSemaphore_SlotsRecycledCorrectly(t *testing.T) { } // After all rounds, all slots should be available again. - avail, err := exec.pendingCallsLimiter.Available(ctx) - require.NoError(t, err) - assert.Equal(t, max, avail, + // Goroutines release slots via defer after the channel send, so allow a + // brief window for the last batch of defers to execute. + assert.Eventually(t, func() bool { + avail, err := exec.pendingCallsLimiter.Available(ctx) + return err == nil && avail == max + }, time.Second, 5*time.Millisecond, "limiter still has occupied slots after all awaits completed") } From f51eeb0571255c4f71d56b352235d5b3f6415f63 Mon Sep 17 00:00:00 2001 From: Justin Kaseman Date: Fri, 22 May 2026 14:39:27 -0700 Subject: [PATCH 13/13] fix another racing test --- .../wasm/host/execution_semaphore_test.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/pkg/workflows/wasm/host/execution_semaphore_test.go b/pkg/workflows/wasm/host/execution_semaphore_test.go index 50300149d6..ed7c8720df 100644 --- a/pkg/workflows/wasm/host/execution_semaphore_test.go +++ b/pkg/workflows/wasm/host/execution_semaphore_test.go @@ -262,11 +262,9 @@ func TestSemaphore_ConcurrentCallAndAwait(t *testing.T) { // Simulate sequential call-then-await pattern from a single WASM thread // (the real case). We run it in parallel workers to stress-test the lock. for w := 0; w < workers; w++ { - wg.Add(1) - go func(workerID int) { - defer wg.Done() + wg.Go(func() { for i := 0; i < callsPerWorker; i++ { - id := int32(workerID*callsPerWorker + i) + id := int32(w*callsPerWorker + i) err := exec.callCapAsync(ctx, &sdkpb.CapabilityRequest{CallbackId: id}) if err != nil { return @@ -276,14 +274,16 @@ func TestSemaphore_ConcurrentCallAndAwait(t *testing.T) { return } } - }(w) + }) } wg.Wait() assert.LessOrEqual(t, int(stub.peakLoad.Load()), max) assert.Equal(t, int32(workers*callsPerWorker), stub.callCount.Load()) - avail, err := exec.pendingCallsLimiter.Available(context.Background()) - require.NoError(t, err) - assert.Equal(t, max, avail) + assert.Eventually(t, func() bool { + avail, err := exec.pendingCallsLimiter.Available(context.Background()) + return err == nil && avail == max + }, time.Second, 5*time.Millisecond, + "limiter still has occupied slots after all awaits completed") }