Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ services:
- docker

env:
- DOCKER_COMPOSE_VERSION=1.21.2 RELEASE_BRANCH=master
- DOCKER_COMPOSE_VERSION=1.21.2 RELEASE_BRANCH=master CI=true

install:
# Installing Docker Compose
Expand All @@ -22,6 +22,9 @@ install:
# Installing and Starting MySQL
- .travisci/start-mysql.sh

before_script:
- ulimit -n 16384

script:
- make test

Expand Down
51 changes: 41 additions & 10 deletions binlog_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import (
"crypto/tls"
sqlorig "database/sql"
"fmt"
sql "github.com/Shopify/ghostferry/sqlwrapper"
"time"

sql "github.com/Shopify/ghostferry/sqlwrapper"

"github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go-mysql/replication"
"github.com/sirupsen/logrus"
Expand All @@ -24,12 +25,13 @@ type BinlogStreamer struct {

TableSchema TableSchemaCache

binlogSyncer *replication.BinlogSyncer
binlogStreamer *replication.BinlogStreamer
lastStreamedBinlogPosition mysql.Position
targetBinlogPosition mysql.Position
lastProcessedEventTime time.Time
lastLagMetricEmittedTime time.Time
binlogSyncer *replication.BinlogSyncer
binlogStreamer *replication.BinlogStreamer
lastStreamedBinlogPosition mysql.Position
lastResumableBinlogPosition mysql.Position
targetBinlogPosition mysql.Position
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have to do this now, but this was brought up the other day: Since we will eventually start streaming binlog from the target for verification, maybe we should rename this to "stopAtBinlogPosition", or something to that effect.

I just wanted to mention this so we don't forget about it.

lastProcessedEventTime time.Time
lastLagMetricEmittedTime time.Time

stopRequested bool

Expand Down Expand Up @@ -98,6 +100,7 @@ func (s *BinlogStreamer) ConnectBinlogStreamerToMysqlFrom(startFromBinlogPositio
}

s.lastStreamedBinlogPosition = startFromBinlogPosition
s.lastResumableBinlogPosition = startFromBinlogPosition

s.logger.WithFields(logrus.Fields{
"file": s.lastStreamedBinlogPosition.Name,
Expand All @@ -122,7 +125,6 @@ func (s *BinlogStreamer) Run() {
}()

s.logger.Info("starting binlog streamer")

for !s.stopRequested || (s.stopRequested && s.lastStreamedBinlogPosition.Compare(s.targetBinlogPosition) < 0) {
var ev *replication.BinlogEvent
var timedOut bool
Expand Down Expand Up @@ -153,6 +155,9 @@ func (s *BinlogStreamer) Run() {
case *replication.RotateEvent:
// This event is needed because we need to update the last successful
// binlog position.
s.lastResumableBinlogPosition.Pos = uint32(e.Position)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we need to set the .Name here as well? This is a RotateEvent so the file should change, right? That would get rid of the slightly cryptic s.lastResumableBinlogPosition.Name line below?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following up on our conversation, we decided to set the name here and in the ConnectBinlogStreamerToMysqlFrom above 👍

s.lastResumableBinlogPosition.Name = string(e.NextLogName)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, I thought we agreed we can't resume from a RotateEvent's position?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only place where MySQL replication gives us the file name of the binlog. This value is therefore cached until the next Rotate event, which would give us the next file name. So we have to set this filename here?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, why can't we use a separate currentFilename variable to cache the current filename?

the current filename needs a clear separation from the resumable position.


s.lastStreamedBinlogPosition.Pos = uint32(e.Position)
s.lastStreamedBinlogPosition.Name = string(e.NextLogName)
s.logger.WithFields(logrus.Fields{
Expand All @@ -165,7 +170,6 @@ func (s *BinlogStreamer) Run() {
s.logger.WithError(err).Error("failed to handle rows event")
s.ErrorHandler.Fatal("binlog_streamer", err)
}

s.updateLastStreamedPosAndTime(ev)
case *replication.FormatDescriptionEvent:
// This event has a LogPos = 0, presumably because this is the first
Expand All @@ -183,6 +187,33 @@ func (s *BinlogStreamer) Run() {
// with empty GenericEvent structs.
// so there's no way to handle this for us.
continue
case *replication.XIDEvent, *replication.GTIDEvent:
// With regards to DMLs, we see (at least) the following sequence
// of events in the binlog stream:
//
// - GTIDEvent <- START of transaction
// - QueryEvent
// - RowsQueryEvent
// - TableMapEvent
// - RowsEvent
// - RowsEvent
// - XIDEvent <- END of transaction
//
// *NOTE*
//
// First, RowsQueryEvent is only available with `binlog_rows_query_log_events`
// set to "ON".
//
// Second, there will be at least one (but potentially more) RowsEvents
// depending on the number of rows updated in the transaction.
//
// Lastly, GTIDEvents will only be available if they are enabled.
//
// As a result, the following case will set the last resumable position for
// interruption to EITHER the start (if using GTIDs) or the end of the
// last transaction
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Succinct explanation. Thank you!

s.lastResumableBinlogPosition.Pos = uint32(ev.Header.LogPos)
Comment thread
hkdsun marked this conversation as resolved.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's document in code comments what got us to this? 🙏

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can do 👍

s.updateLastStreamedPosAndTime(ev)
default:
s.updateLastStreamedPosAndTime(ev)
}
Expand Down Expand Up @@ -261,7 +292,7 @@ func (s *BinlogStreamer) handleRowsEvent(ev *replication.BinlogEvent) error {
return nil
}

dmlEvs, err := NewBinlogDMLEvents(table, ev, pos)
dmlEvs, err := NewBinlogDMLEvents(table, ev, pos, s.lastResumableBinlogPosition)
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion binlog_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ghostferry

import (
"fmt"

sql "github.com/Shopify/ghostferry/sqlwrapper"

"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -110,7 +111,7 @@ func (b *BinlogWriter) writeEvents(events []DMLEvent) error {
}

if b.StateTracker != nil {
b.StateTracker.UpdateLastWrittenBinlogPosition(events[len(events)-1].BinlogPosition())
b.StateTracker.UpdateLastResumableBinlogPosition(events[len(events)-1].ResumableBinlogPosition())
}

return nil
Expand Down
30 changes: 18 additions & 12 deletions dml_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@ type DMLEvent interface {
NewValues() RowData
PaginationKey() (uint64, error)
BinlogPosition() mysql.Position
ResumableBinlogPosition() mysql.Position
}

// The base of DMLEvent to provide the necessary methods.
type DMLEventBase struct {
table *TableSchema
pos mysql.Position
table *TableSchema
pos mysql.Position
resumablePos mysql.Position
}

func (e *DMLEventBase) Database() string {
Expand All @@ -68,18 +70,22 @@ func (e *DMLEventBase) BinlogPosition() mysql.Position {
return e.pos
}

func (e *DMLEventBase) ResumableBinlogPosition() mysql.Position {
return e.resumablePos
}

type BinlogInsertEvent struct {
newValues RowData
*DMLEventBase
}

func NewBinlogInsertEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos mysql.Position) ([]DMLEvent, error) {
func NewBinlogInsertEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos, resumablePos mysql.Position) ([]DMLEvent, error) {
insertEvents := make([]DMLEvent, len(rowsEvent.Rows))

for i, row := range rowsEvent.Rows {
insertEvents[i] = &BinlogInsertEvent{
newValues: row,
DMLEventBase: &DMLEventBase{table: table, pos: pos},
DMLEventBase: &DMLEventBase{table, pos, resumablePos},
}
}

Expand Down Expand Up @@ -117,7 +123,7 @@ type BinlogUpdateEvent struct {
*DMLEventBase
}

func NewBinlogUpdateEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos mysql.Position) ([]DMLEvent, error) {
func NewBinlogUpdateEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos, resumablePos mysql.Position) ([]DMLEvent, error) {
// UPDATE events have two rows in the RowsEvent. The first row is the
// entries of the old record (for WHERE) and the second row is the
// entries of the new record (for SET).
Expand All @@ -133,7 +139,7 @@ func NewBinlogUpdateEvents(table *TableSchema, rowsEvent *replication.RowsEvent,
updateEvents[i/2] = &BinlogUpdateEvent{
oldValues: row,
newValues: rowsEvent.Rows[i+1],
DMLEventBase: &DMLEventBase{table: table, pos: pos},
DMLEventBase: &DMLEventBase{table, pos, resumablePos},
}
}

Expand Down Expand Up @@ -177,13 +183,13 @@ func (e *BinlogDeleteEvent) NewValues() RowData {
return nil
}

func NewBinlogDeleteEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos mysql.Position) ([]DMLEvent, error) {
func NewBinlogDeleteEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos, resumablePos mysql.Position) ([]DMLEvent, error) {
deleteEvents := make([]DMLEvent, len(rowsEvent.Rows))

for i, row := range rowsEvent.Rows {
deleteEvents[i] = &BinlogDeleteEvent{
oldValues: row,
DMLEventBase: &DMLEventBase{table: table, pos: pos},
DMLEventBase: &DMLEventBase{table, pos, resumablePos},
}
}

Expand All @@ -205,7 +211,7 @@ func (e *BinlogDeleteEvent) PaginationKey() (uint64, error) {
return paginationKeyFromEventData(e.table, e.oldValues)
}

func NewBinlogDMLEvents(table *TableSchema, ev *replication.BinlogEvent, pos mysql.Position) ([]DMLEvent, error) {
func NewBinlogDMLEvents(table *TableSchema, ev *replication.BinlogEvent, pos, resumablePos mysql.Position) ([]DMLEvent, error) {
rowsEvent := ev.Event.(*replication.RowsEvent)

for _, row := range rowsEvent.Rows {
Expand Down Expand Up @@ -238,11 +244,11 @@ func NewBinlogDMLEvents(table *TableSchema, ev *replication.BinlogEvent, pos mys

switch ev.Header.EventType {
case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2:
return NewBinlogInsertEvents(table, rowsEvent, pos)
return NewBinlogInsertEvents(table, rowsEvent, pos, resumablePos)
case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2:
return NewBinlogDeleteEvents(table, rowsEvent, pos)
return NewBinlogDeleteEvents(table, rowsEvent, pos, resumablePos)
case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2:
return NewBinlogUpdateEvents(table, rowsEvent, pos)
return NewBinlogUpdateEvents(table, rowsEvent, pos, resumablePos)
default:
return nil, fmt.Errorf("unrecognized rows event: %s", ev.Header.EventType.String())
}
Expand Down
7 changes: 4 additions & 3 deletions ferry.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"errors"
"fmt"
sql "github.com/Shopify/ghostferry/sqlwrapper"
"math"
"net/http"
"os"
Expand All @@ -15,6 +14,8 @@ import (
"syscall"
"time"

sql "github.com/Shopify/ghostferry/sqlwrapper"

"github.com/go-sql-driver/mysql"
siddontanglog "github.com/siddontang/go-log/log"
siddontangmysql "github.com/siddontang/go-mysql/mysql"
Expand Down Expand Up @@ -507,9 +508,9 @@ func (f *Ferry) Start() error {
// If we don't set this now, there is a race condition where Ghostferry
// is terminated with some rows copied but no binlog events are written.
// This guarentees that we are able to restart from a valid location.
f.StateTracker.UpdateLastWrittenBinlogPosition(pos)
f.StateTracker.UpdateLastResumableBinlogPosition(pos)
if f.inlineVerifier != nil {
f.StateTracker.UpdateLastStoredBinlogPositionForInlineVerifier(pos)
f.StateTracker.UpdateLastResumableBinlogPositionForInlineVerifier(pos)
}

return nil
Expand Down
5 changes: 3 additions & 2 deletions inline_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import (
"bytes"
"context"
"fmt"
sql "github.com/Shopify/ghostferry/sqlwrapper"
"strconv"
"strings"
"sync"
"time"

sql "github.com/Shopify/ghostferry/sqlwrapper"

"github.com/golang/snappy"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -569,7 +570,7 @@ func (v *InlineVerifier) binlogEventListener(evs []DMLEvent) error {

if v.StateTracker != nil {
ev := evs[len(evs)-1]
v.StateTracker.UpdateLastStoredBinlogPositionForInlineVerifier(ev.BinlogPosition())
v.StateTracker.UpdateLastResumableBinlogPositionForInlineVerifier(ev.ResumableBinlogPosition())
}

return nil
Expand Down
4 changes: 2 additions & 2 deletions sharding/test/copy_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,15 +142,15 @@ func (t *CopyFilterTestSuite) TestShardingValueTypes() {
}

for _, tenantId := range tenantIds {
dmlEvents, _ := ghostferry.NewBinlogInsertEvents(t.normalTable, t.newRowsEvent([]interface{}{1001, tenantId, "data"}), mysql.Position{})
dmlEvents, _ := ghostferry.NewBinlogInsertEvents(t.normalTable, t.newRowsEvent([]interface{}{1001, tenantId, "data"}), mysql.Position{}, mysql.Position{})
applicable, err := t.filter.ApplicableEvent(dmlEvents[0])
t.Require().Nil(err)
t.Require().True(applicable, fmt.Sprintf("value %t wasn't applicable", tenantId))
}
}

func (t *CopyFilterTestSuite) TestInvalidShardingValueTypesErrors() {
dmlEvents, err := ghostferry.NewBinlogInsertEvents(t.normalTable, t.newRowsEvent([]interface{}{1001, string("1"), "data"}), mysql.Position{})
dmlEvents, err := ghostferry.NewBinlogInsertEvents(t.normalTable, t.newRowsEvent([]interface{}{1001, string("1"), "data"}), mysql.Position{}, mysql.Position{})
_, err = t.filter.ApplicableEvent(dmlEvents[0])
t.Require().Equal("parsing new sharding key: invalid type %!t(string=1)", err.Error())
}
Expand Down
4 changes: 2 additions & 2 deletions state_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,14 @@ func NewStateTrackerFromSerializedState(speedLogCount int, serializedState *Seri
return s
}

func (s *StateTracker) UpdateLastWrittenBinlogPosition(pos mysql.Position) {
func (s *StateTracker) UpdateLastResumableBinlogPosition(pos mysql.Position) {
s.BinlogRWMutex.Lock()
defer s.BinlogRWMutex.Unlock()

s.lastWrittenBinlogPosition = pos
}

func (s *StateTracker) UpdateLastStoredBinlogPositionForInlineVerifier(pos mysql.Position) {
func (s *StateTracker) UpdateLastResumableBinlogPositionForInlineVerifier(pos mysql.Position) {
s.BinlogRWMutex.Lock()
defer s.BinlogRWMutex.Unlock()

Expand Down
Loading