diff --git a/bulker/bulkerapp/app/abstract_batch_consumer.go b/bulker/bulkerapp/app/abstract_batch_consumer.go index d30cff375..4edce4111 100644 --- a/bulker/bulkerapp/app/abstract_batch_consumer.go +++ b/bulker/bulkerapp/app/abstract_batch_consumer.go @@ -65,6 +65,14 @@ type AbstractBatchConsumer struct { //consumer can be paused between batches(idle) and also can be paused during loading batch to destination(not idle) paused atomic.Bool resumeChannel chan struct{} + //stopChannel signals the pause heartbeat loop to exit for suspend (vs. + //resume). It is unbuffered on purpose: _unpause() must block until the + //heartbeat has actually left its loop, so pauseOrSuspend can close the + //consumer without racing an in-flight ReadMessage (see _unpause). + stopChannel chan struct{} + //restarting guards against piling up overlapping restartConsumer calls + //from the pause heartbeat loop (see restartConsumerAsync). + restarting atomic.Bool batchSizeFunc BatchSizesFunction batchFunc BatchFunction @@ -116,7 +124,12 @@ func NewAbstractBatchConsumer(repository *Repository, destinationId string, batc producerConfig: producerConfig, waitForMessages: time.Duration(config.BatchRunnerWaitForMessagesSec) * time.Second, closed: make(chan struct{}), - resumeChannel: make(chan struct{}), + //buffered size 1: resume() can deposit the signal even if the + //heartbeat goroutine is mid-iteration (e.g. blocked in + //restartConsumer); the heartbeat picks it up on the next pass. + resumeChannel: make(chan struct{}, 1), + //unbuffered: suspend must rendezvous with the heartbeat (see field doc). + stopChannel: make(chan struct{}), } bc.idle.Store(true) return bc, nil @@ -406,6 +419,12 @@ func (bc *AbstractBatchConsumer) pause(immediatePoll bool) { if !bc.paused.CompareAndSwap(false, true) { return } + //drain any stale resume signal left over from a prior cycle so the new + //heartbeat goroutine doesn't break out of its loop on the first pass. + select { + case <-bc.resumeChannel: + default: + } bc.pauseKafkaConsumer() safego.RunWithRestart(func() { @@ -426,8 +445,26 @@ func (bc *AbstractBatchConsumer) pause(immediatePoll bool) { select { case <-bc.resumeChannel: bc.paused.CompareAndSwap(true, false) + //Defensive: resume() may have called consumer.Resume on + //a consumer that was since replaced (e.g. by an in-flight + //restartConsumer). Re-resume the current one so the kafka + //state matches paused=false even after that race. + if currentConsumer := bc.consumer.Load(); currentConsumer != nil { + if parts, perr := currentConsumer.Assignment(); perr == nil { + _ = currentConsumer.Resume(parts) + } + } bc.Debugf("Consumer resumed.") break loop + case <-bc.stopChannel: + //Suspend path: the caller (pauseOrSuspend) is about to close + //the consumer, so we only stop heartbeating — no Resume. + //Because stopChannel is unbuffered, _unpause is still blocked + //on the send here, guaranteeing the consumer is no longer in + //ReadMessage when Close runs. + bc.paused.CompareAndSwap(true, false) + bc.Debugf("Consumer heartbeat stopped for suspend.") + break loop case <-pauseTicker.C: } } @@ -452,7 +489,11 @@ func (bc *AbstractBatchConsumer) pause(immediatePoll bool) { if kafkaErr.IsRetriable() { time.Sleep(10 * time.Second) } else { - bc.restartConsumer(nil) + //restartConsumer blocks for KafkaSessionTimeoutMs + 15s + //baseline per init attempt; running it synchronously + //here would starve the resumeChannel select and trip + //"Resume timeout" in resume() once 5 min elapses. + bc.restartConsumerAsync() } } else if message != nil { bc.Debugf("Unexpected message on paused consumer: %v", message) @@ -461,7 +502,7 @@ func (bc *AbstractBatchConsumer) pause(immediatePoll bool) { if err != nil { bc.errorMetric("ROLLBACK_ON_PAUSE_ERR") bc.SystemErrorf("Failed to rollback offset on paused consumer: %v", err) - bc.restartConsumer(nil) + bc.restartConsumerAsync() } bc.pauseKafkaConsumer() } @@ -494,6 +535,22 @@ func (bc *AbstractBatchConsumer) initConsumer(force bool) (consumer *kafka.Consu return consumer, nil } +// restartConsumerAsync schedules restartConsumer to run in a separate +// goroutine, returning immediately. Re-entry is suppressed: if a restart +// is already in flight, the call is a no-op. Use from contexts that must +// not block — notably the pause heartbeat loop, where a synchronous +// restartConsumer can starve the resumeChannel select for longer than +// KafkaMaxPollIntervalMs and cause "Resume timeout" in resume(). +func (bc *AbstractBatchConsumer) restartConsumerAsync() { + if !bc.restarting.CompareAndSwap(false, true) { + return + } + go func() { + defer bc.restarting.Store(false) + bc.restartConsumer(nil) + }() +} + func (bc *AbstractBatchConsumer) restartConsumer(beforeInit func()) { if bc.retired.Load() { return @@ -564,12 +621,19 @@ func (bc *AbstractBatchConsumer) rebalanceCallback(consumer *kafka.Consumer, eve return nil } +// _unpause stops the pause heartbeat for the suspend path. It sends on the +// unbuffered stopChannel so the send only completes once the heartbeat has +// received it and is leaving its loop — i.e. it is no longer inside +// ReadMessage. That rendezvous lets pauseOrSuspend close the consumer right +// after without racing the heartbeat into a non-retriable ReadMessage error +// (which would spuriously trigger restartConsumerAsync and recreate the +// consumer we intended to suspend). func (bc *AbstractBatchConsumer) _unpause() { if !bc.paused.Load() { return } select { - case bc.resumeChannel <- struct{}{}: + case bc.stopChannel <- struct{}{}: return case <-time.After(time.Duration(bc.config.KafkaMaxPollIntervalMs) * time.Millisecond): bc.errorMetric("resume_error")