Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion binlog_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,20 @@ type BinlogWriter struct {
logger Logger
}

func (b *BinlogWriter) Run() {
// Initialize allocates the event buffer channel and logger so that
// BufferBinlogEvents can be called safely before Run() starts in its own
// goroutine. Ferry.NewBinlogWriter calls this automatically; external callers
// that construct a BinlogWriter directly must call Initialize() before
// registering BufferBinlogEvents as an event listener.
func (b *BinlogWriter) Initialize() {
b.logger = LogWithField("tag", "binlog_writer")
b.binlogEventBuffer = make(chan DMLEvent, b.BatchSize)
}

func (b *BinlogWriter) Run() {
if b.binlogEventBuffer == nil {
panic("Initialize() has not been called prior to Run()")
}

batch := make([]DMLEvent, 0, b.BatchSize)
for {
Expand Down
4 changes: 3 additions & 1 deletion ferry.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (f *Ferry) newBinlogStreamer(db *sql.DB, dbConf *DatabaseConfig, schemaRewr
func (f *Ferry) NewBinlogWriter() *BinlogWriter {
f.ensureInitialized()

return &BinlogWriter{
w := &BinlogWriter{
DB: f.TargetDB,
DatabaseRewrites: f.Config.DatabaseRewrites,
TableRewrites: f.Config.TableRewrites,
Expand All @@ -174,6 +174,8 @@ func (f *Ferry) NewBinlogWriter() *BinlogWriter {
ErrorHandler: f.ErrorHandler,
StateTracker: f.StateTracker,
}
w.Initialize()
return w
}

func (f *Ferry) NewBinlogWriterWithoutStateTracker() *BinlogWriter {
Expand Down
64 changes: 64 additions & 0 deletions test/go/binlog_writer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package test

import (
"fmt"
"testing"
"time"

"github.com/Shopify/ghostferry"
"github.com/go-mysql-org/go-mysql/mysql"
"github.com/stretchr/testify/require"
)

// stubDMLEvent satisfies the DMLEvent interface with no-op implementations.
// It is only used to exercise the BinlogWriter buffer without needing a real
// MySQL connection or a fully populated TableSchema.
type stubDMLEvent struct{}

func (e *stubDMLEvent) Database() string { return "db" }
func (e *stubDMLEvent) Table() string { return "tbl" }
func (e *stubDMLEvent) TableSchema() *ghostferry.TableSchema { return nil }
func (e *stubDMLEvent) AsSQLString(_, _ string) (string, error) { return "", nil }
func (e *stubDMLEvent) OldValues() ghostferry.RowData { return nil }
func (e *stubDMLEvent) NewValues() ghostferry.RowData { return nil }
func (e *stubDMLEvent) PaginationKey() (string, error) { return "", nil }
func (e *stubDMLEvent) BinlogPosition() mysql.Position { return mysql.Position{} }
func (e *stubDMLEvent) ResumableBinlogPosition() mysql.Position { return mysql.Position{} }
func (e *stubDMLEvent) Annotation() (string, error) { return "", nil }
func (e *stubDMLEvent) Timestamp() time.Time { return time.Time{} }

// TestBinlogWriterBufferBinlogEventsBeforeRun verifies that BufferBinlogEvents
// does not block when called before Run() has started in its own goroutine.
//
// Regression: BinlogWriter.Run() used to be the sole place that created the
// binlogEventBuffer channel. Ferry.Run() launches the writer and streamer
// goroutines concurrently (ferry.go:733-745). If the BinlogStreamer receives
// a row event and calls BufferBinlogEvents before the writer goroutine
// schedules and reaches the make() call, the send blocks forever on a nil
// channel, hanging the test (and in production, the entire run).
//
// The fix: BinlogWriter.Initialize() creates the channel eagerly, and
// Ferry.NewBinlogWriter() calls it before any goroutine is started.
func TestBinlogWriterBufferBinlogEventsBeforeRun(t *testing.T) {
w := &ghostferry.BinlogWriter{
BatchSize: 10,
}
w.Initialize()

done := make(chan error, 1)
go func() {
done <- w.BufferBinlogEvents([]ghostferry.DMLEvent{&stubDMLEvent{}})
}()

select {
case err := <-done:
require.NoError(t, err)
case <-time.After(2 * time.Second):
t.Fatal(fmt.Sprintf(
"BufferBinlogEvents blocked after Initialize() — nil channel race is not fixed",
))
}

// Drain the buffer so the goroutine above can exit cleanly.
w.Stop()
}
Loading