diff --git a/.travis.yml b/.travis.yml index 2b973ecf8..5acbbe779 100644 --- a/.travis.yml +++ b/.travis.yml @@ -10,7 +10,7 @@ services: - docker env: - - DOCKER_COMPOSE_VERSION=1.21.2 RELEASE_BRANCH=master + - DOCKER_COMPOSE_VERSION=1.21.2 RELEASE_BRANCH=master CI=true install: # Installing Docker Compose @@ -22,6 +22,9 @@ install: # Installing and Starting MySQL - .travisci/start-mysql.sh +before_script: + - ulimit -n 16384 + script: - make test diff --git a/binlog_streamer.go b/binlog_streamer.go index 611188d79..4c6317fd1 100644 --- a/binlog_streamer.go +++ b/binlog_streamer.go @@ -5,9 +5,10 @@ import ( "crypto/tls" sqlorig "database/sql" "fmt" - sql "github.com/Shopify/ghostferry/sqlwrapper" "time" + sql "github.com/Shopify/ghostferry/sqlwrapper" + "github.com/siddontang/go-mysql/mysql" "github.com/siddontang/go-mysql/replication" "github.com/sirupsen/logrus" @@ -24,12 +25,13 @@ type BinlogStreamer struct { TableSchema TableSchemaCache - binlogSyncer *replication.BinlogSyncer - binlogStreamer *replication.BinlogStreamer - lastStreamedBinlogPosition mysql.Position - targetBinlogPosition mysql.Position - lastProcessedEventTime time.Time - lastLagMetricEmittedTime time.Time + binlogSyncer *replication.BinlogSyncer + binlogStreamer *replication.BinlogStreamer + lastStreamedBinlogPosition mysql.Position + lastResumableBinlogPosition mysql.Position + targetBinlogPosition mysql.Position + lastProcessedEventTime time.Time + lastLagMetricEmittedTime time.Time stopRequested bool @@ -98,6 +100,7 @@ func (s *BinlogStreamer) ConnectBinlogStreamerToMysqlFrom(startFromBinlogPositio } s.lastStreamedBinlogPosition = startFromBinlogPosition + s.lastResumableBinlogPosition = startFromBinlogPosition s.logger.WithFields(logrus.Fields{ "file": s.lastStreamedBinlogPosition.Name, @@ -122,7 +125,6 @@ func (s *BinlogStreamer) Run() { }() s.logger.Info("starting binlog streamer") - for !s.stopRequested || (s.stopRequested && s.lastStreamedBinlogPosition.Compare(s.targetBinlogPosition) < 0) { var ev *replication.BinlogEvent var timedOut bool @@ -153,6 +155,9 @@ func (s *BinlogStreamer) Run() { case *replication.RotateEvent: // This event is needed because we need to update the last successful // binlog position. + s.lastResumableBinlogPosition.Pos = uint32(e.Position) + s.lastResumableBinlogPosition.Name = string(e.NextLogName) + s.lastStreamedBinlogPosition.Pos = uint32(e.Position) s.lastStreamedBinlogPosition.Name = string(e.NextLogName) s.logger.WithFields(logrus.Fields{ @@ -165,7 +170,6 @@ func (s *BinlogStreamer) Run() { s.logger.WithError(err).Error("failed to handle rows event") s.ErrorHandler.Fatal("binlog_streamer", err) } - s.updateLastStreamedPosAndTime(ev) case *replication.FormatDescriptionEvent: // This event has a LogPos = 0, presumably because this is the first @@ -183,6 +187,33 @@ func (s *BinlogStreamer) Run() { // with empty GenericEvent structs. // so there's no way to handle this for us. continue + case *replication.XIDEvent, *replication.GTIDEvent: + // With regards to DMLs, we see (at least) the following sequence + // of events in the binlog stream: + // + // - GTIDEvent <- START of transaction + // - QueryEvent + // - RowsQueryEvent + // - TableMapEvent + // - RowsEvent + // - RowsEvent + // - XIDEvent <- END of transaction + // + // *NOTE* + // + // First, RowsQueryEvent is only available with `binlog_rows_query_log_events` + // set to "ON". + // + // Second, there will be at least one (but potentially more) RowsEvents + // depending on the number of rows updated in the transaction. + // + // Lastly, GTIDEvents will only be available if they are enabled. + // + // As a result, the following case will set the last resumable position for + // interruption to EITHER the start (if using GTIDs) or the end of the + // last transaction + s.lastResumableBinlogPosition.Pos = uint32(ev.Header.LogPos) + s.updateLastStreamedPosAndTime(ev) default: s.updateLastStreamedPosAndTime(ev) } @@ -261,7 +292,7 @@ func (s *BinlogStreamer) handleRowsEvent(ev *replication.BinlogEvent) error { return nil } - dmlEvs, err := NewBinlogDMLEvents(table, ev, pos) + dmlEvs, err := NewBinlogDMLEvents(table, ev, pos, s.lastResumableBinlogPosition) if err != nil { return err } diff --git a/binlog_writer.go b/binlog_writer.go index f005c9878..eb51810b2 100644 --- a/binlog_writer.go +++ b/binlog_writer.go @@ -2,6 +2,7 @@ package ghostferry import ( "fmt" + sql "github.com/Shopify/ghostferry/sqlwrapper" "github.com/sirupsen/logrus" @@ -110,7 +111,7 @@ func (b *BinlogWriter) writeEvents(events []DMLEvent) error { } if b.StateTracker != nil { - b.StateTracker.UpdateLastWrittenBinlogPosition(events[len(events)-1].BinlogPosition()) + b.StateTracker.UpdateLastResumableBinlogPosition(events[len(events)-1].ResumableBinlogPosition()) } return nil diff --git a/dml_events.go b/dml_events.go index a260b3788..82199d391 100644 --- a/dml_events.go +++ b/dml_events.go @@ -44,12 +44,14 @@ type DMLEvent interface { NewValues() RowData PaginationKey() (uint64, error) BinlogPosition() mysql.Position + ResumableBinlogPosition() mysql.Position } // The base of DMLEvent to provide the necessary methods. type DMLEventBase struct { - table *TableSchema - pos mysql.Position + table *TableSchema + pos mysql.Position + resumablePos mysql.Position } func (e *DMLEventBase) Database() string { @@ -68,18 +70,22 @@ func (e *DMLEventBase) BinlogPosition() mysql.Position { return e.pos } +func (e *DMLEventBase) ResumableBinlogPosition() mysql.Position { + return e.resumablePos +} + type BinlogInsertEvent struct { newValues RowData *DMLEventBase } -func NewBinlogInsertEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos mysql.Position) ([]DMLEvent, error) { +func NewBinlogInsertEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos, resumablePos mysql.Position) ([]DMLEvent, error) { insertEvents := make([]DMLEvent, len(rowsEvent.Rows)) for i, row := range rowsEvent.Rows { insertEvents[i] = &BinlogInsertEvent{ newValues: row, - DMLEventBase: &DMLEventBase{table: table, pos: pos}, + DMLEventBase: &DMLEventBase{table, pos, resumablePos}, } } @@ -117,7 +123,7 @@ type BinlogUpdateEvent struct { *DMLEventBase } -func NewBinlogUpdateEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos mysql.Position) ([]DMLEvent, error) { +func NewBinlogUpdateEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos, resumablePos mysql.Position) ([]DMLEvent, error) { // UPDATE events have two rows in the RowsEvent. The first row is the // entries of the old record (for WHERE) and the second row is the // entries of the new record (for SET). @@ -133,7 +139,7 @@ func NewBinlogUpdateEvents(table *TableSchema, rowsEvent *replication.RowsEvent, updateEvents[i/2] = &BinlogUpdateEvent{ oldValues: row, newValues: rowsEvent.Rows[i+1], - DMLEventBase: &DMLEventBase{table: table, pos: pos}, + DMLEventBase: &DMLEventBase{table, pos, resumablePos}, } } @@ -177,13 +183,13 @@ func (e *BinlogDeleteEvent) NewValues() RowData { return nil } -func NewBinlogDeleteEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos mysql.Position) ([]DMLEvent, error) { +func NewBinlogDeleteEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos, resumablePos mysql.Position) ([]DMLEvent, error) { deleteEvents := make([]DMLEvent, len(rowsEvent.Rows)) for i, row := range rowsEvent.Rows { deleteEvents[i] = &BinlogDeleteEvent{ oldValues: row, - DMLEventBase: &DMLEventBase{table: table, pos: pos}, + DMLEventBase: &DMLEventBase{table, pos, resumablePos}, } } @@ -205,7 +211,7 @@ func (e *BinlogDeleteEvent) PaginationKey() (uint64, error) { return paginationKeyFromEventData(e.table, e.oldValues) } -func NewBinlogDMLEvents(table *TableSchema, ev *replication.BinlogEvent, pos mysql.Position) ([]DMLEvent, error) { +func NewBinlogDMLEvents(table *TableSchema, ev *replication.BinlogEvent, pos, resumablePos mysql.Position) ([]DMLEvent, error) { rowsEvent := ev.Event.(*replication.RowsEvent) for _, row := range rowsEvent.Rows { @@ -238,11 +244,11 @@ func NewBinlogDMLEvents(table *TableSchema, ev *replication.BinlogEvent, pos mys switch ev.Header.EventType { case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2: - return NewBinlogInsertEvents(table, rowsEvent, pos) + return NewBinlogInsertEvents(table, rowsEvent, pos, resumablePos) case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2: - return NewBinlogDeleteEvents(table, rowsEvent, pos) + return NewBinlogDeleteEvents(table, rowsEvent, pos, resumablePos) case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2: - return NewBinlogUpdateEvents(table, rowsEvent, pos) + return NewBinlogUpdateEvents(table, rowsEvent, pos, resumablePos) default: return nil, fmt.Errorf("unrecognized rows event: %s", ev.Header.EventType.String()) } diff --git a/ferry.go b/ferry.go index 65750a74e..2d2b82135 100644 --- a/ferry.go +++ b/ferry.go @@ -5,7 +5,6 @@ import ( "encoding/json" "errors" "fmt" - sql "github.com/Shopify/ghostferry/sqlwrapper" "math" "net/http" "os" @@ -15,6 +14,8 @@ import ( "syscall" "time" + sql "github.com/Shopify/ghostferry/sqlwrapper" + "github.com/go-sql-driver/mysql" siddontanglog "github.com/siddontang/go-log/log" siddontangmysql "github.com/siddontang/go-mysql/mysql" @@ -507,9 +508,9 @@ func (f *Ferry) Start() error { // If we don't set this now, there is a race condition where Ghostferry // is terminated with some rows copied but no binlog events are written. // This guarentees that we are able to restart from a valid location. - f.StateTracker.UpdateLastWrittenBinlogPosition(pos) + f.StateTracker.UpdateLastResumableBinlogPosition(pos) if f.inlineVerifier != nil { - f.StateTracker.UpdateLastStoredBinlogPositionForInlineVerifier(pos) + f.StateTracker.UpdateLastResumableBinlogPositionForInlineVerifier(pos) } return nil diff --git a/inline_verifier.go b/inline_verifier.go index 957f6ddbb..cc0d886ea 100644 --- a/inline_verifier.go +++ b/inline_verifier.go @@ -4,12 +4,13 @@ import ( "bytes" "context" "fmt" - sql "github.com/Shopify/ghostferry/sqlwrapper" "strconv" "strings" "sync" "time" + sql "github.com/Shopify/ghostferry/sqlwrapper" + "github.com/golang/snappy" "github.com/sirupsen/logrus" ) @@ -569,7 +570,7 @@ func (v *InlineVerifier) binlogEventListener(evs []DMLEvent) error { if v.StateTracker != nil { ev := evs[len(evs)-1] - v.StateTracker.UpdateLastStoredBinlogPositionForInlineVerifier(ev.BinlogPosition()) + v.StateTracker.UpdateLastResumableBinlogPositionForInlineVerifier(ev.ResumableBinlogPosition()) } return nil diff --git a/sharding/test/copy_filter_test.go b/sharding/test/copy_filter_test.go index d1b93e9c9..11cdbc0a4 100644 --- a/sharding/test/copy_filter_test.go +++ b/sharding/test/copy_filter_test.go @@ -142,7 +142,7 @@ func (t *CopyFilterTestSuite) TestShardingValueTypes() { } for _, tenantId := range tenantIds { - dmlEvents, _ := ghostferry.NewBinlogInsertEvents(t.normalTable, t.newRowsEvent([]interface{}{1001, tenantId, "data"}), mysql.Position{}) + dmlEvents, _ := ghostferry.NewBinlogInsertEvents(t.normalTable, t.newRowsEvent([]interface{}{1001, tenantId, "data"}), mysql.Position{}, mysql.Position{}) applicable, err := t.filter.ApplicableEvent(dmlEvents[0]) t.Require().Nil(err) t.Require().True(applicable, fmt.Sprintf("value %t wasn't applicable", tenantId)) @@ -150,7 +150,7 @@ func (t *CopyFilterTestSuite) TestShardingValueTypes() { } func (t *CopyFilterTestSuite) TestInvalidShardingValueTypesErrors() { - dmlEvents, err := ghostferry.NewBinlogInsertEvents(t.normalTable, t.newRowsEvent([]interface{}{1001, string("1"), "data"}), mysql.Position{}) + dmlEvents, err := ghostferry.NewBinlogInsertEvents(t.normalTable, t.newRowsEvent([]interface{}{1001, string("1"), "data"}), mysql.Position{}, mysql.Position{}) _, err = t.filter.ApplicableEvent(dmlEvents[0]) t.Require().Equal("parsing new sharding key: invalid type %!t(string=1)", err.Error()) } diff --git a/state_tracker.go b/state_tracker.go index c2eab7404..87fcc1076 100644 --- a/state_tracker.go +++ b/state_tracker.go @@ -113,14 +113,14 @@ func NewStateTrackerFromSerializedState(speedLogCount int, serializedState *Seri return s } -func (s *StateTracker) UpdateLastWrittenBinlogPosition(pos mysql.Position) { +func (s *StateTracker) UpdateLastResumableBinlogPosition(pos mysql.Position) { s.BinlogRWMutex.Lock() defer s.BinlogRWMutex.Unlock() s.lastWrittenBinlogPosition = pos } -func (s *StateTracker) UpdateLastStoredBinlogPositionForInlineVerifier(pos mysql.Position) { +func (s *StateTracker) UpdateLastResumableBinlogPositionForInlineVerifier(pos mysql.Position) { s.BinlogRWMutex.Lock() defer s.BinlogRWMutex.Unlock() diff --git a/test/go/dml_events_test.go b/test/go/dml_events_test.go index b3d4e4bc6..8a98dc135 100644 --- a/test/go/dml_events_test.go +++ b/test/go/dml_events_test.go @@ -61,7 +61,7 @@ func (this *DMLEventsTestSuite) TestBinlogInsertEventGeneratesInsertQuery() { }, } - dmlEvents, err := ghostferry.NewBinlogInsertEvents(this.sourceTable, rowsEvent, mysql.Position{}) + dmlEvents, err := ghostferry.NewBinlogInsertEvents(this.sourceTable, rowsEvent, mysql.Position{}, mysql.Position{}) this.Require().Nil(err) this.Require().Equal(2, len(dmlEvents)) @@ -80,7 +80,7 @@ func (this *DMLEventsTestSuite) TestBinlogInsertEventWithWrongColumnsReturnsErro Rows: [][]interface{}{{1000}}, } - dmlEvents, err := ghostferry.NewBinlogInsertEvents(this.sourceTable, rowsEvent, mysql.Position{}) + dmlEvents, err := ghostferry.NewBinlogInsertEvents(this.sourceTable, rowsEvent, mysql.Position{}, mysql.Position{}) this.Require().Nil(err) this.Require().Equal(1, len(dmlEvents)) @@ -95,7 +95,7 @@ func (this *DMLEventsTestSuite) TestBinlogInsertEventMetadata() { Rows: [][]interface{}{{1000}}, } - dmlEvents, err := ghostferry.NewBinlogInsertEvents(this.sourceTable, rowsEvent, mysql.Position{}) + dmlEvents, err := ghostferry.NewBinlogInsertEvents(this.sourceTable, rowsEvent, mysql.Position{}, mysql.Position{}) this.Require().Nil(err) this.Require().Equal(1, len(dmlEvents)) this.Require().Equal("test_schema", dmlEvents[0].Database()) @@ -115,7 +115,7 @@ func (this *DMLEventsTestSuite) TestBinlogUpdateEventGeneratesUpdateQuery() { }, } - dmlEvents, err := ghostferry.NewBinlogUpdateEvents(this.sourceTable, rowsEvent, mysql.Position{}) + dmlEvents, err := ghostferry.NewBinlogUpdateEvents(this.sourceTable, rowsEvent, mysql.Position{}, mysql.Position{}) this.Require().Nil(err) this.Require().Equal(2, len(dmlEvents)) @@ -134,7 +134,7 @@ func (this *DMLEventsTestSuite) TestBinlogUpdateEventWithWrongColumnsReturnsErro Rows: [][]interface{}{{1000}, {1000}}, } - dmlEvents, err := ghostferry.NewBinlogUpdateEvents(this.sourceTable, rowsEvent, mysql.Position{}) + dmlEvents, err := ghostferry.NewBinlogUpdateEvents(this.sourceTable, rowsEvent, mysql.Position{}, mysql.Position{}) this.Require().Nil(err) this.Require().Equal(1, len(dmlEvents)) @@ -152,7 +152,7 @@ func (this *DMLEventsTestSuite) TestBinlogUpdateEventWithNull() { }, } - dmlEvents, err := ghostferry.NewBinlogUpdateEvents(this.sourceTable, rowsEvent, mysql.Position{}) + dmlEvents, err := ghostferry.NewBinlogUpdateEvents(this.sourceTable, rowsEvent, mysql.Position{}, mysql.Position{}) this.Require().Nil(err) this.Require().Equal(1, len(dmlEvents)) @@ -167,7 +167,7 @@ func (this *DMLEventsTestSuite) TestBinlogUpdateEventMetadata() { Rows: [][]interface{}{{1000}, {1001}}, } - dmlEvents, err := ghostferry.NewBinlogUpdateEvents(this.sourceTable, rowsEvent, mysql.Position{}) + dmlEvents, err := ghostferry.NewBinlogUpdateEvents(this.sourceTable, rowsEvent, mysql.Position{}, mysql.Position{}) this.Require().Nil(err) this.Require().Equal(1, len(dmlEvents)) this.Require().Equal("test_schema", dmlEvents[0].Database()) @@ -185,7 +185,7 @@ func (this *DMLEventsTestSuite) TestBinlogDeleteEventGeneratesDeleteQuery() { }, } - dmlEvents, err := ghostferry.NewBinlogDeleteEvents(this.sourceTable, rowsEvent, mysql.Position{}) + dmlEvents, err := ghostferry.NewBinlogDeleteEvents(this.sourceTable, rowsEvent, mysql.Position{}, mysql.Position{}) this.Require().Nil(err) this.Require().Equal(2, len(dmlEvents)) @@ -206,7 +206,7 @@ func (this *DMLEventsTestSuite) TestBinlogDeleteEventWithNull() { }, } - dmlEvents, err := ghostferry.NewBinlogDeleteEvents(this.sourceTable, rowsEvent, mysql.Position{}) + dmlEvents, err := ghostferry.NewBinlogDeleteEvents(this.sourceTable, rowsEvent, mysql.Position{}, mysql.Position{}) this.Require().Nil(err) this.Require().Equal(1, len(dmlEvents)) @@ -221,7 +221,7 @@ func (this *DMLEventsTestSuite) TestBinlogDeleteEventWithWrongColumnsReturnsErro Rows: [][]interface{}{{1000}}, } - dmlEvents, err := ghostferry.NewBinlogDeleteEvents(this.sourceTable, rowsEvent, mysql.Position{}) + dmlEvents, err := ghostferry.NewBinlogDeleteEvents(this.sourceTable, rowsEvent, mysql.Position{}, mysql.Position{}) this.Require().Nil(err) this.Require().Equal(1, len(dmlEvents)) @@ -236,7 +236,7 @@ func (this *DMLEventsTestSuite) TestBinlogDeleteEventMetadata() { Rows: [][]interface{}{{1000}}, } - dmlEvents, err := ghostferry.NewBinlogDeleteEvents(this.sourceTable, rowsEvent, mysql.Position{}) + dmlEvents, err := ghostferry.NewBinlogDeleteEvents(this.sourceTable, rowsEvent, mysql.Position{}, mysql.Position{}) this.Require().Nil(err) this.Require().Equal(1, len(dmlEvents)) this.Require().Equal("test_schema", dmlEvents[0].Database()) diff --git a/test/integration/interrupt_resume_test.rb b/test/integration/interrupt_resume_test.rb index 6122d3cca..d57f8945e 100644 --- a/test/integration/interrupt_resume_test.rb +++ b/test/integration/interrupt_resume_test.rb @@ -1,5 +1,4 @@ require "test_helper" - require "json" class InterruptResumeTest < GhostferryTestCase @@ -304,4 +303,61 @@ def test_interrupt_resume_inline_verifier_will_verify_additional_rows_changed_on error_line = ghostferry.error_lines.last assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: #{chosen_id} ] ", error_line["msg"] end + + # originally taken from @kolbitsch-lastline in https://github.com/Shopify/ghostferry/pull/160 + def test_interrupt_resume_between_consecutive_rows_events + ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY, config: { verifier_type: "Inline" }) + + start_binlog_status = source_db.query('SHOW MASTER STATUS').first + + # create a series of rows-events that do not have interleaved table-map + # events. This is the case when multiple rows are affected in a single + # DML event. + # Since we are racing between applying rows and sending the shutdown event, + # we emit a whole bunch of them + num_batches = 20 + num_values_per_batch = 1000 + row_id = 0 + ghostferry.on_status(Ghostferry::Status::BINLOG_STREAMING_STARTED) do + for _batch_id in 0..num_batches do + insert_sql = "INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (data) VALUES " + for value_in_batch in 0..num_values_per_batch do + row_id += 1 + insert_sql += ", " if value_in_batch > 0 + insert_sql += "('data#{row_id}')" + end + source_db.query(insert_sql) + end + end + + ghostferry.on_status(Ghostferry::Status::AFTER_BINLOG_APPLY) do + # while we are emitting events in the loop above, try to inject a shutdown + # signal, hoping to interrupt between applying an INSERT and receiving the + # next table-map event + if row_id > 20 + ghostferry.term_and_wait_for_exit + end + end + + dumped_state = ghostferry.run_expecting_interrupt + + refute_nil dumped_state['LastWrittenBinlogPosition']['Name'] + refute_nil dumped_state['LastWrittenBinlogPosition']['Pos'] + refute_nil dumped_state['LastStoredBinlogPositionForInlineVerifier']['Name'] + refute_nil dumped_state['LastStoredBinlogPositionForInlineVerifier']['Pos'] + + # assert the resumable position is not the start position + if dumped_state['LastWrittenBinlogPosition']['Name'] == start_binlog_status['File'] + refute_equal dumped_state['LastWrittenBinlogPosition']['Pos'], start_binlog_status['Position'] + refute_equal dumped_state['LastStoredBinlogPositionForInlineVerifier']['Pos'], start_binlog_status['Position'] + end + + ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) + # if we did not resume at a proper state, this invocation of ghostferry + # will crash, complaining that a rows-event is referring to an unknown + # table + ghostferry.run(dumped_state) + + assert_test_table_is_identical + end end diff --git a/test/integration/trivial_test.rb b/test/integration/trivial_test.rb index 6c6a778b6..246d31baa 100644 --- a/test/integration/trivial_test.rb +++ b/test/integration/trivial_test.rb @@ -26,14 +26,16 @@ def test_copy_data_with_writes_to_source def test_logged_query_omits_columns seed_simple_database_with_single_table - ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) - ghostferry.run + with_env('CI', nil) do + ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) + ghostferry.run - assert ghostferry.logrus_lines["cursor"].length > 0 + assert ghostferry.logrus_lines["cursor"].length > 0 - ghostferry.logrus_lines["cursor"].each do |line| - if line["msg"].start_with?("found ") - assert line["sql"].start_with?("SELECT [omitted] FROM") + ghostferry.logrus_lines["cursor"].each do |line| + if line["msg"].start_with?("found ") + assert line["sql"].start_with?("SELECT [omitted] FROM") + end end end end diff --git a/test/lib/go/integrationferry.go b/test/lib/go/integrationferry.go index 96e796d87..771b98c38 100644 --- a/test/lib/go/integrationferry.go +++ b/test/lib/go/integrationferry.go @@ -248,6 +248,9 @@ func NewStandardConfig() (*ghostferry.Config, error) { func main() { logrus.SetFormatter(&logrus.JSONFormatter{}) logrus.SetLevel(logrus.DebugLevel) + if os.Getenv("CI") == "true" { + logrus.SetLevel(logrus.ErrorLevel) + } config, err := NewStandardConfig() if err != nil { diff --git a/test/test_helper.rb b/test/test_helper.rb index 97cc70d63..f7ba427b6 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -113,4 +113,12 @@ def assert_basic_fields_exist_in_dumped_state(dumped_state) refute dumped_state["CompletedTables"].nil? refute dumped_state["LastWrittenBinlogPosition"].nil? end + + def with_env(key, value) + previous_value = ENV.delete(key) + ENV[key] = value + yield + ensure + ENV[key] = previous_value + end end diff --git a/testhelpers/unit_test_suite.go b/testhelpers/unit_test_suite.go index c627b3392..6f63a7a14 100644 --- a/testhelpers/unit_test_suite.go +++ b/testhelpers/unit_test_suite.go @@ -17,6 +17,9 @@ func SetupTest() { var err error logrus.SetLevel(logrus.DebugLevel) + if os.Getenv("CI") == "true" { + logrus.SetLevel(logrus.ErrorLevel) + } seed := time.Now().UnixNano() envseed := os.Getenv("SEED")