From 8193468f6d4c864da388102c9cd3a9eddb9d77b2 Mon Sep 17 00:00:00 2001 From: Leszek Zalewski Date: Wed, 15 Apr 2026 10:45:33 +0200 Subject: [PATCH 1/2] fix: initialise BinlogWriter channel before Run() to prevent nil-chan hang MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Ferry.Run() launches BinlogWriter.Run() and BinlogStreamer.Run() in concurrent goroutines (ferry.go:733-745). Previously, the event buffer channel was created inside BinlogWriter.Run(), so if the BinlogStreamer received a row event and called BufferBinlogEvents() before the writer goroutine was scheduled, the send would block forever on a nil channel, hanging the test (observed as TestUpdateBinlogSelectCopy timing out after 10 minutes with goroutine state "chan send (nil chan)"). Introduce BinlogWriter.Initialize() — mirroring the existing pattern on BatchWriter — that allocates the channel and logger eagerly. Call it from Ferry.NewBinlogWriter() so the writer is ready for buffering before any goroutine starts. Add a nil-channel guard in Run() as a defensive fallback for callers that construct BinlogWriter directly. Add TestBinlogWriterBufferBinlogEventsBeforeRun to give deterministic, sub-second coverage of the race without relying on the slow integration test timeout. --- binlog_writer.go | 14 +++++++- ferry.go | 4 ++- test/go/binlog_writer_test.go | 64 +++++++++++++++++++++++++++++++++++ 3 files changed, 80 insertions(+), 2 deletions(-) create mode 100644 test/go/binlog_writer_test.go diff --git a/binlog_writer.go b/binlog_writer.go index bc4447e3..4b19b931 100644 --- a/binlog_writer.go +++ b/binlog_writer.go @@ -25,9 +25,21 @@ type BinlogWriter struct { logger Logger } -func (b *BinlogWriter) Run() { +// Initialize allocates the event buffer channel and logger so that +// BufferBinlogEvents can be called safely before Run() starts in its own +// goroutine. Ferry.NewBinlogWriter calls this automatically; external callers +// that construct a BinlogWriter directly must call Initialize() before +// registering BufferBinlogEvents as an event listener. +func (b *BinlogWriter) Initialize() { b.logger = LogWithField("tag", "binlog_writer") b.binlogEventBuffer = make(chan DMLEvent, b.BatchSize) +} + +func (b *BinlogWriter) Run() { + if b.binlogEventBuffer == nil { + // Defensive fallback: caller forgot Initialize(), behave as before. + b.Initialize() + } batch := make([]DMLEvent, 0, b.BatchSize) for { diff --git a/ferry.go b/ferry.go index 3cb35d69..9666576c 100644 --- a/ferry.go +++ b/ferry.go @@ -162,7 +162,7 @@ func (f *Ferry) newBinlogStreamer(db *sql.DB, dbConf *DatabaseConfig, schemaRewr func (f *Ferry) NewBinlogWriter() *BinlogWriter { f.ensureInitialized() - return &BinlogWriter{ + w := &BinlogWriter{ DB: f.TargetDB, DatabaseRewrites: f.Config.DatabaseRewrites, TableRewrites: f.Config.TableRewrites, @@ -174,6 +174,8 @@ func (f *Ferry) NewBinlogWriter() *BinlogWriter { ErrorHandler: f.ErrorHandler, StateTracker: f.StateTracker, } + w.Initialize() + return w } func (f *Ferry) NewBinlogWriterWithoutStateTracker() *BinlogWriter { diff --git a/test/go/binlog_writer_test.go b/test/go/binlog_writer_test.go new file mode 100644 index 00000000..83b7ba08 --- /dev/null +++ b/test/go/binlog_writer_test.go @@ -0,0 +1,64 @@ +package test + +import ( + "fmt" + "testing" + "time" + + "github.com/Shopify/ghostferry" + "github.com/go-mysql-org/go-mysql/mysql" + "github.com/stretchr/testify/require" +) + +// stubDMLEvent satisfies the DMLEvent interface with no-op implementations. +// It is only used to exercise the BinlogWriter buffer without needing a real +// MySQL connection or a fully populated TableSchema. +type stubDMLEvent struct{} + +func (e *stubDMLEvent) Database() string { return "db" } +func (e *stubDMLEvent) Table() string { return "tbl" } +func (e *stubDMLEvent) TableSchema() *ghostferry.TableSchema { return nil } +func (e *stubDMLEvent) AsSQLString(_, _ string) (string, error) { return "", nil } +func (e *stubDMLEvent) OldValues() ghostferry.RowData { return nil } +func (e *stubDMLEvent) NewValues() ghostferry.RowData { return nil } +func (e *stubDMLEvent) PaginationKey() (string, error) { return "", nil } +func (e *stubDMLEvent) BinlogPosition() mysql.Position { return mysql.Position{} } +func (e *stubDMLEvent) ResumableBinlogPosition() mysql.Position { return mysql.Position{} } +func (e *stubDMLEvent) Annotation() (string, error) { return "", nil } +func (e *stubDMLEvent) Timestamp() time.Time { return time.Time{} } + +// TestBinlogWriterBufferBinlogEventsBeforeRun verifies that BufferBinlogEvents +// does not block when called before Run() has started in its own goroutine. +// +// Regression: BinlogWriter.Run() used to be the sole place that created the +// binlogEventBuffer channel. Ferry.Run() launches the writer and streamer +// goroutines concurrently (ferry.go:733-745). If the BinlogStreamer receives +// a row event and calls BufferBinlogEvents before the writer goroutine +// schedules and reaches the make() call, the send blocks forever on a nil +// channel, hanging the test (and in production, the entire run). +// +// The fix: BinlogWriter.Initialize() creates the channel eagerly, and +// Ferry.NewBinlogWriter() calls it before any goroutine is started. +func TestBinlogWriterBufferBinlogEventsBeforeRun(t *testing.T) { + w := &ghostferry.BinlogWriter{ + BatchSize: 10, + } + w.Initialize() + + done := make(chan error, 1) + go func() { + done <- w.BufferBinlogEvents([]ghostferry.DMLEvent{&stubDMLEvent{}}) + }() + + select { + case err := <-done: + require.NoError(t, err) + case <-time.After(2 * time.Second): + t.Fatal(fmt.Sprintf( + "BufferBinlogEvents blocked after Initialize() — nil channel race is not fixed", + )) + } + + // Drain the buffer so the goroutine above can exit cleanly. + w.Stop() +} From a7cfed6bc00986fac7ede886f133db2a1eede4cd Mon Sep 17 00:00:00 2001 From: Leszek Zalewski Date: Wed, 15 Apr 2026 15:12:09 +0200 Subject: [PATCH 2/2] Panic when binlog writer was not initialized --- binlog_writer.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/binlog_writer.go b/binlog_writer.go index 4b19b931..6292700e 100644 --- a/binlog_writer.go +++ b/binlog_writer.go @@ -37,8 +37,7 @@ func (b *BinlogWriter) Initialize() { func (b *BinlogWriter) Run() { if b.binlogEventBuffer == nil { - // Defensive fallback: caller forgot Initialize(), behave as before. - b.Initialize() + panic("Initialize() has not been called prior to Run()") } batch := make([]DMLEvent, 0, b.BatchSize)