From 15323902151572dba8dafb67fde6200762a2db0e Mon Sep 17 00:00:00 2001 From: Ildar Nurislamov Date: Thu, 21 May 2026 17:55:59 +0400 Subject: [PATCH 1/2] fix(kafka): eliminate Resume timeout race in batch consumer pause/heartbeat resume() and the pause-heartbeat goroutine coordinate via an unbuffered resumeChannel, so a resume signal can only be delivered while the heartbeat is parked in its select. The heartbeat loop runs restartConsumer synchronously on non-retriable ReadMessage errors; restartConsumer blocks for KafkaSessionTimeoutMs+15s per init attempt (default 60s baseline) and loops if init keeps failing. With unhealthy brokers this stalls the heartbeat past KafkaMaxPollIntervalMs (5 min), triggering `failed to resume kafka consumer: Resume timeout` in resume() and the analogous failure in _unpause(). Two defense-in-depth fixes: - Buffer resumeChannel to size 1. resume() can deposit the signal even while the heartbeat is mid-iteration; the heartbeat picks it up on its next pass. pause() drains any stale signal on entry so a new cycle doesn't inherit one. The signal handler also re-resumes the current consumer in case it was replaced between resume()'s call and the heartbeat picking up the signal. - Run restartConsumer asynchronously when invoked from the heartbeat via new restartConsumerAsync, guarded by an atomic restarting flag to prevent overlapping restart attempts. The synchronous entrypoint is preserved for other call sites that must block until the new consumer is up. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../bulkerapp/app/abstract_batch_consumer.go | 47 +++++++++++++++++-- 1 file changed, 44 insertions(+), 3 deletions(-) diff --git a/bulker/bulkerapp/app/abstract_batch_consumer.go b/bulker/bulkerapp/app/abstract_batch_consumer.go index d30cff375..7f59bfac5 100644 --- a/bulker/bulkerapp/app/abstract_batch_consumer.go +++ b/bulker/bulkerapp/app/abstract_batch_consumer.go @@ -65,6 +65,9 @@ type AbstractBatchConsumer struct { //consumer can be paused between batches(idle) and also can be paused during loading batch to destination(not idle) paused atomic.Bool resumeChannel chan struct{} + //restarting guards against piling up overlapping restartConsumer calls + //from the pause heartbeat loop (see restartConsumerAsync). + restarting atomic.Bool batchSizeFunc BatchSizesFunction batchFunc BatchFunction @@ -116,7 +119,10 @@ func NewAbstractBatchConsumer(repository *Repository, destinationId string, batc producerConfig: producerConfig, waitForMessages: time.Duration(config.BatchRunnerWaitForMessagesSec) * time.Second, closed: make(chan struct{}), - resumeChannel: make(chan struct{}), + //buffered size 1: resume() can deposit the signal even if the + //heartbeat goroutine is mid-iteration (e.g. blocked in + //restartConsumer); the heartbeat picks it up on the next pass. + resumeChannel: make(chan struct{}, 1), } bc.idle.Store(true) return bc, nil @@ -406,6 +412,12 @@ func (bc *AbstractBatchConsumer) pause(immediatePoll bool) { if !bc.paused.CompareAndSwap(false, true) { return } + //drain any stale resume signal left over from a prior cycle so the new + //heartbeat goroutine doesn't break out of its loop on the first pass. + select { + case <-bc.resumeChannel: + default: + } bc.pauseKafkaConsumer() safego.RunWithRestart(func() { @@ -426,6 +438,15 @@ func (bc *AbstractBatchConsumer) pause(immediatePoll bool) { select { case <-bc.resumeChannel: bc.paused.CompareAndSwap(true, false) + //Defensive: resume() may have called consumer.Resume on + //a consumer that was since replaced (e.g. by an in-flight + //restartConsumer). Re-resume the current one so the kafka + //state matches paused=false even after that race. + if currentConsumer := bc.consumer.Load(); currentConsumer != nil { + if parts, perr := currentConsumer.Assignment(); perr == nil { + _ = currentConsumer.Resume(parts) + } + } bc.Debugf("Consumer resumed.") break loop case <-pauseTicker.C: @@ -452,7 +473,11 @@ func (bc *AbstractBatchConsumer) pause(immediatePoll bool) { if kafkaErr.IsRetriable() { time.Sleep(10 * time.Second) } else { - bc.restartConsumer(nil) + //restartConsumer blocks for KafkaSessionTimeoutMs + 15s + //baseline per init attempt; running it synchronously + //here would starve the resumeChannel select and trip + //"Resume timeout" in resume() once 5 min elapses. + bc.restartConsumerAsync() } } else if message != nil { bc.Debugf("Unexpected message on paused consumer: %v", message) @@ -461,7 +486,7 @@ func (bc *AbstractBatchConsumer) pause(immediatePoll bool) { if err != nil { bc.errorMetric("ROLLBACK_ON_PAUSE_ERR") bc.SystemErrorf("Failed to rollback offset on paused consumer: %v", err) - bc.restartConsumer(nil) + bc.restartConsumerAsync() } bc.pauseKafkaConsumer() } @@ -494,6 +519,22 @@ func (bc *AbstractBatchConsumer) initConsumer(force bool) (consumer *kafka.Consu return consumer, nil } +// restartConsumerAsync schedules restartConsumer to run in a separate +// goroutine, returning immediately. Re-entry is suppressed: if a restart +// is already in flight, the call is a no-op. Use from contexts that must +// not block — notably the pause heartbeat loop, where a synchronous +// restartConsumer can starve the resumeChannel select for longer than +// KafkaMaxPollIntervalMs and cause "Resume timeout" in resume(). +func (bc *AbstractBatchConsumer) restartConsumerAsync() { + if !bc.restarting.CompareAndSwap(false, true) { + return + } + go func() { + defer bc.restarting.Store(false) + bc.restartConsumer(nil) + }() +} + func (bc *AbstractBatchConsumer) restartConsumer(beforeInit func()) { if bc.retired.Load() { return From b45169a5939be723a58f15a9829c0bff45c660c2 Mon Sep 17 00:00:00 2001 From: Ildar Nurislamov Date: Thu, 28 May 2026 12:19:20 +0400 Subject: [PATCH 2/2] fix(kafka): separate suspend-stop signal from resume to keep rendezvous Buffering resumeChannel fixed the Resume timeout but weakened _unpause(): the buffered send returned immediately, so pauseOrSuspend could Close the consumer while the heartbeat was still in ReadMessage. The resulting non-retriable error tripped restartConsumerAsync and recreated a consumer we intended to suspend. Add a dedicated unbuffered stopChannel for the suspend path. _unpause() now sends on it, so the send only completes once the heartbeat has left its loop (no longer in ReadMessage), restoring the rendezvous Close relies on. resume() keeps using the buffered resumeChannel, preserving the non-blocking resume that fixed the timeout. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../bulkerapp/app/abstract_batch_consumer.go | 25 ++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/bulker/bulkerapp/app/abstract_batch_consumer.go b/bulker/bulkerapp/app/abstract_batch_consumer.go index 7f59bfac5..4edce4111 100644 --- a/bulker/bulkerapp/app/abstract_batch_consumer.go +++ b/bulker/bulkerapp/app/abstract_batch_consumer.go @@ -65,6 +65,11 @@ type AbstractBatchConsumer struct { //consumer can be paused between batches(idle) and also can be paused during loading batch to destination(not idle) paused atomic.Bool resumeChannel chan struct{} + //stopChannel signals the pause heartbeat loop to exit for suspend (vs. + //resume). It is unbuffered on purpose: _unpause() must block until the + //heartbeat has actually left its loop, so pauseOrSuspend can close the + //consumer without racing an in-flight ReadMessage (see _unpause). + stopChannel chan struct{} //restarting guards against piling up overlapping restartConsumer calls //from the pause heartbeat loop (see restartConsumerAsync). restarting atomic.Bool @@ -123,6 +128,8 @@ func NewAbstractBatchConsumer(repository *Repository, destinationId string, batc //heartbeat goroutine is mid-iteration (e.g. blocked in //restartConsumer); the heartbeat picks it up on the next pass. resumeChannel: make(chan struct{}, 1), + //unbuffered: suspend must rendezvous with the heartbeat (see field doc). + stopChannel: make(chan struct{}), } bc.idle.Store(true) return bc, nil @@ -449,6 +456,15 @@ func (bc *AbstractBatchConsumer) pause(immediatePoll bool) { } bc.Debugf("Consumer resumed.") break loop + case <-bc.stopChannel: + //Suspend path: the caller (pauseOrSuspend) is about to close + //the consumer, so we only stop heartbeating — no Resume. + //Because stopChannel is unbuffered, _unpause is still blocked + //on the send here, guaranteeing the consumer is no longer in + //ReadMessage when Close runs. + bc.paused.CompareAndSwap(true, false) + bc.Debugf("Consumer heartbeat stopped for suspend.") + break loop case <-pauseTicker.C: } } @@ -605,12 +621,19 @@ func (bc *AbstractBatchConsumer) rebalanceCallback(consumer *kafka.Consumer, eve return nil } +// _unpause stops the pause heartbeat for the suspend path. It sends on the +// unbuffered stopChannel so the send only completes once the heartbeat has +// received it and is leaving its loop — i.e. it is no longer inside +// ReadMessage. That rendezvous lets pauseOrSuspend close the consumer right +// after without racing the heartbeat into a non-retriable ReadMessage error +// (which would spuriously trigger restartConsumerAsync and recreate the +// consumer we intended to suspend). func (bc *AbstractBatchConsumer) _unpause() { if !bc.paused.Load() { return } select { - case bc.resumeChannel <- struct{}{}: + case bc.stopChannel <- struct{}{}: return case <-time.After(time.Duration(bc.config.KafkaMaxPollIntervalMs) * time.Millisecond): bc.errorMetric("resume_error")