Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 68 additions & 4 deletions bulker/bulkerapp/app/abstract_batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ type AbstractBatchConsumer struct {
//consumer can be paused between batches(idle) and also can be paused during loading batch to destination(not idle)
paused atomic.Bool
resumeChannel chan struct{}
//stopChannel signals the pause heartbeat loop to exit for suspend (vs.
//resume). It is unbuffered on purpose: _unpause() must block until the
//heartbeat has actually left its loop, so pauseOrSuspend can close the
//consumer without racing an in-flight ReadMessage (see _unpause).
stopChannel chan struct{}
//restarting guards against piling up overlapping restartConsumer calls
//from the pause heartbeat loop (see restartConsumerAsync).
restarting atomic.Bool

batchSizeFunc BatchSizesFunction
batchFunc BatchFunction
Expand Down Expand Up @@ -116,7 +124,12 @@ func NewAbstractBatchConsumer(repository *Repository, destinationId string, batc
producerConfig: producerConfig,
waitForMessages: time.Duration(config.BatchRunnerWaitForMessagesSec) * time.Second,
closed: make(chan struct{}),
resumeChannel: make(chan struct{}),
//buffered size 1: resume() can deposit the signal even if the
//heartbeat goroutine is mid-iteration (e.g. blocked in
//restartConsumer); the heartbeat picks it up on the next pass.
resumeChannel: make(chan struct{}, 1),

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making resumeChannel buffered removes the rendezvous semantics that _unpause() currently relies on in pauseOrSuspend() (_unpause(); consumer.Close(); bc.consumer.Store(nil)). With an unbuffered channel, _unpause() blocks until the pause heartbeat loop receives the signal; now it can return immediately while the loop is still inside ReadMessage(). Closing the consumer in that window can produce a non-timeout error in the pause loop and trigger restartConsumerAsync(), which can recreate a consumer right after we intended to suspend it. This looks like a user-visible regression (suspend may turn into restart churn).

Can we keep non-blocking resume() behavior without weakening _unpause() synchronization (e.g. separate channels/paths for resume vs suspend-ack, or an explicit ack/wait primitive for suspend)?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — fixed in b45169a with the separate-channel approach you suggested.

resume() keeps the buffered resumeChannel (non-blocking deposit, which is what fixed the Resume timeout). The suspend path now uses a dedicated unbuffered stopChannel: _unpause() sends on it, so the send only completes once the heartbeat has received it and is leaving its loop — i.e. no longer inside ReadMessage(). That restores the rendezvous pauseOrSuspend relies on, so consumer.Close() no longer races an active ReadMessage and can't trip restartConsumerAsync() into recreating the consumer we just suspended.

The heartbeat's stopChannel case only sets paused=false and breaks — no defensive Resume, since the caller is about to close the consumer anyway.

//unbuffered: suspend must rendezvous with the heartbeat (see field doc).
stopChannel: make(chan struct{}),
}
bc.idle.Store(true)
return bc, nil
Expand Down Expand Up @@ -406,6 +419,12 @@ func (bc *AbstractBatchConsumer) pause(immediatePoll bool) {
if !bc.paused.CompareAndSwap(false, true) {
return
}
//drain any stale resume signal left over from a prior cycle so the new
//heartbeat goroutine doesn't break out of its loop on the first pass.
select {
case <-bc.resumeChannel:
default:
}
bc.pauseKafkaConsumer()

safego.RunWithRestart(func() {
Expand All @@ -426,8 +445,26 @@ func (bc *AbstractBatchConsumer) pause(immediatePoll bool) {
select {
case <-bc.resumeChannel:
bc.paused.CompareAndSwap(true, false)
//Defensive: resume() may have called consumer.Resume on
//a consumer that was since replaced (e.g. by an in-flight
//restartConsumer). Re-resume the current one so the kafka
//state matches paused=false even after that race.
if currentConsumer := bc.consumer.Load(); currentConsumer != nil {
if parts, perr := currentConsumer.Assignment(); perr == nil {
_ = currentConsumer.Resume(parts)
}
}
bc.Debugf("Consumer resumed.")
break loop
case <-bc.stopChannel:
//Suspend path: the caller (pauseOrSuspend) is about to close
//the consumer, so we only stop heartbeating — no Resume.
//Because stopChannel is unbuffered, _unpause is still blocked
//on the send here, guaranteeing the consumer is no longer in
//ReadMessage when Close runs.
bc.paused.CompareAndSwap(true, false)
bc.Debugf("Consumer heartbeat stopped for suspend.")
break loop
case <-pauseTicker.C:
}
}
Expand All @@ -452,7 +489,11 @@ func (bc *AbstractBatchConsumer) pause(immediatePoll bool) {
if kafkaErr.IsRetriable() {
time.Sleep(10 * time.Second)
} else {
bc.restartConsumer(nil)
//restartConsumer blocks for KafkaSessionTimeoutMs + 15s
//baseline per init attempt; running it synchronously
//here would starve the resumeChannel select and trip
//"Resume timeout" in resume() once 5 min elapses.
bc.restartConsumerAsync()
}
} else if message != nil {
bc.Debugf("Unexpected message on paused consumer: %v", message)
Expand All @@ -461,7 +502,7 @@ func (bc *AbstractBatchConsumer) pause(immediatePoll bool) {
if err != nil {
bc.errorMetric("ROLLBACK_ON_PAUSE_ERR")
bc.SystemErrorf("Failed to rollback offset on paused consumer: %v", err)
bc.restartConsumer(nil)
bc.restartConsumerAsync()
}
bc.pauseKafkaConsumer()
}
Expand Down Expand Up @@ -494,6 +535,22 @@ func (bc *AbstractBatchConsumer) initConsumer(force bool) (consumer *kafka.Consu
return consumer, nil
}

// restartConsumerAsync schedules restartConsumer to run in a separate
// goroutine, returning immediately. Re-entry is suppressed: if a restart
// is already in flight, the call is a no-op. Use from contexts that must
// not block — notably the pause heartbeat loop, where a synchronous
// restartConsumer can starve the resumeChannel select for longer than
// KafkaMaxPollIntervalMs and cause "Resume timeout" in resume().
func (bc *AbstractBatchConsumer) restartConsumerAsync() {
if !bc.restarting.CompareAndSwap(false, true) {
return
}
go func() {
defer bc.restarting.Store(false)
bc.restartConsumer(nil)
}()
}

func (bc *AbstractBatchConsumer) restartConsumer(beforeInit func()) {
if bc.retired.Load() {
return
Expand Down Expand Up @@ -564,12 +621,19 @@ func (bc *AbstractBatchConsumer) rebalanceCallback(consumer *kafka.Consumer, eve
return nil
}

// _unpause stops the pause heartbeat for the suspend path. It sends on the
// unbuffered stopChannel so the send only completes once the heartbeat has
// received it and is leaving its loop — i.e. it is no longer inside
// ReadMessage. That rendezvous lets pauseOrSuspend close the consumer right
// after without racing the heartbeat into a non-retriable ReadMessage error
// (which would spuriously trigger restartConsumerAsync and recreate the
// consumer we intended to suspend).
func (bc *AbstractBatchConsumer) _unpause() {
if !bc.paused.Load() {
return
}
select {
case bc.resumeChannel <- struct{}{}:
case bc.stopChannel <- struct{}{}:
return
case <-time.After(time.Duration(bc.config.KafkaMaxPollIntervalMs) * time.Millisecond):
bc.errorMetric("resume_error")
Expand Down
Loading