diff --git a/binlog_writer.go b/binlog_writer.go index bc4447e3..6292700e 100644 --- a/binlog_writer.go +++ b/binlog_writer.go @@ -25,9 +25,20 @@ type BinlogWriter struct { logger Logger } -func (b *BinlogWriter) Run() { +// Initialize allocates the event buffer channel and logger so that +// BufferBinlogEvents can be called safely before Run() starts in its own +// goroutine. Ferry.NewBinlogWriter calls this automatically; external callers +// that construct a BinlogWriter directly must call Initialize() before +// registering BufferBinlogEvents as an event listener. +func (b *BinlogWriter) Initialize() { b.logger = LogWithField("tag", "binlog_writer") b.binlogEventBuffer = make(chan DMLEvent, b.BatchSize) +} + +func (b *BinlogWriter) Run() { + if b.binlogEventBuffer == nil { + panic("Initialize() has not been called prior to Run()") + } batch := make([]DMLEvent, 0, b.BatchSize) for { diff --git a/ferry.go b/ferry.go index 3cb35d69..9666576c 100644 --- a/ferry.go +++ b/ferry.go @@ -162,7 +162,7 @@ func (f *Ferry) newBinlogStreamer(db *sql.DB, dbConf *DatabaseConfig, schemaRewr func (f *Ferry) NewBinlogWriter() *BinlogWriter { f.ensureInitialized() - return &BinlogWriter{ + w := &BinlogWriter{ DB: f.TargetDB, DatabaseRewrites: f.Config.DatabaseRewrites, TableRewrites: f.Config.TableRewrites, @@ -174,6 +174,8 @@ func (f *Ferry) NewBinlogWriter() *BinlogWriter { ErrorHandler: f.ErrorHandler, StateTracker: f.StateTracker, } + w.Initialize() + return w } func (f *Ferry) NewBinlogWriterWithoutStateTracker() *BinlogWriter { diff --git a/test/go/binlog_writer_test.go b/test/go/binlog_writer_test.go new file mode 100644 index 00000000..83b7ba08 --- /dev/null +++ b/test/go/binlog_writer_test.go @@ -0,0 +1,64 @@ +package test + +import ( + "fmt" + "testing" + "time" + + "github.com/Shopify/ghostferry" + "github.com/go-mysql-org/go-mysql/mysql" + "github.com/stretchr/testify/require" +) + +// stubDMLEvent satisfies the DMLEvent interface with no-op implementations. +// It is only used to exercise the BinlogWriter buffer without needing a real +// MySQL connection or a fully populated TableSchema. +type stubDMLEvent struct{} + +func (e *stubDMLEvent) Database() string { return "db" } +func (e *stubDMLEvent) Table() string { return "tbl" } +func (e *stubDMLEvent) TableSchema() *ghostferry.TableSchema { return nil } +func (e *stubDMLEvent) AsSQLString(_, _ string) (string, error) { return "", nil } +func (e *stubDMLEvent) OldValues() ghostferry.RowData { return nil } +func (e *stubDMLEvent) NewValues() ghostferry.RowData { return nil } +func (e *stubDMLEvent) PaginationKey() (string, error) { return "", nil } +func (e *stubDMLEvent) BinlogPosition() mysql.Position { return mysql.Position{} } +func (e *stubDMLEvent) ResumableBinlogPosition() mysql.Position { return mysql.Position{} } +func (e *stubDMLEvent) Annotation() (string, error) { return "", nil } +func (e *stubDMLEvent) Timestamp() time.Time { return time.Time{} } + +// TestBinlogWriterBufferBinlogEventsBeforeRun verifies that BufferBinlogEvents +// does not block when called before Run() has started in its own goroutine. +// +// Regression: BinlogWriter.Run() used to be the sole place that created the +// binlogEventBuffer channel. Ferry.Run() launches the writer and streamer +// goroutines concurrently (ferry.go:733-745). If the BinlogStreamer receives +// a row event and calls BufferBinlogEvents before the writer goroutine +// schedules and reaches the make() call, the send blocks forever on a nil +// channel, hanging the test (and in production, the entire run). +// +// The fix: BinlogWriter.Initialize() creates the channel eagerly, and +// Ferry.NewBinlogWriter() calls it before any goroutine is started. +func TestBinlogWriterBufferBinlogEventsBeforeRun(t *testing.T) { + w := &ghostferry.BinlogWriter{ + BatchSize: 10, + } + w.Initialize() + + done := make(chan error, 1) + go func() { + done <- w.BufferBinlogEvents([]ghostferry.DMLEvent{&stubDMLEvent{}}) + }() + + select { + case err := <-done: + require.NoError(t, err) + case <-time.After(2 * time.Second): + t.Fatal(fmt.Sprintf( + "BufferBinlogEvents blocked after Initialize() — nil channel race is not fixed", + )) + } + + // Drain the buffer so the goroutine above can exit cleanly. + w.Stop() +}