diff --git a/go/binlog/gomysql_reader.go b/go/binlog/gomysql_reader.go index 79f258e3a..d42ba1f30 100644 --- a/go/binlog/gomysql_reader.go +++ b/go/binlog/gomysql_reader.go @@ -78,6 +78,10 @@ func (this *GoMySQLReader) GetCurrentBinlogCoordinates() *mysql.BinlogCoordinate // StreamEvents func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEvent *replication.RowsEvent, entriesChannel chan<- *BinlogEntry) error { + if this.currentCoordinates.IsLogPosOverflowBeyond4Bytes(&this.LastAppliedRowsEventHint) { + return fmt.Errorf("Unexpected rows event at %+v, the binlog end_log_pos is overflow 4 bytes", this.currentCoordinates) + } + if this.currentCoordinates.SmallerThanOrEquals(&this.LastAppliedRowsEventHint) { this.migrationContext.Log.Debugf("Skipping handled query at %+v", this.currentCoordinates) return nil @@ -141,6 +145,7 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha this.currentCoordinatesMutex.Lock() defer this.currentCoordinatesMutex.Unlock() this.currentCoordinates.LogPos = int64(ev.Header.LogPos) + this.currentCoordinates.EventSize = int64(ev.Header.EventSize) }() switch binlogEvent := ev.Event.(type) { diff --git a/go/mysql/binlog.go b/go/mysql/binlog.go index ad5e56f12..92d832f00 100644 --- a/go/mysql/binlog.go +++ b/go/mysql/binlog.go @@ -14,8 +14,9 @@ import ( // BinlogCoordinates described binary log coordinates in the form of log file & log position. type BinlogCoordinates struct { - LogFile string - LogPos int64 + LogFile string + LogPos int64 + EventSize int64 } // ParseInstanceKey will parse an InstanceKey from a string representation such as 127.0.0.1:3306 @@ -74,3 +75,27 @@ func (this *BinlogCoordinates) SmallerThanOrEquals(other *BinlogCoordinates) boo } return this.LogFile == other.LogFile && this.LogPos == other.LogPos } + +// IsLogPosOverflowBeyond4Bytes returns true if the coordinate endpos is overflow beyond 4 bytes. +// The binlog event end_log_pos field type is defined as uint32, 4 bytes. +// https://github.com/go-mysql-org/go-mysql/blob/master/replication/event.go +// https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_replication_binlog_event.html#sect_protocol_replication_binlog_event_header +// Issue: https://github.com/github/gh-ost/issues/1366 +func (this *BinlogCoordinates) IsLogPosOverflowBeyond4Bytes(preCoordinate *BinlogCoordinates) bool { + if preCoordinate == nil { + return false + } + if preCoordinate.IsEmpty() { + return false + } + + if this.LogFile != preCoordinate.LogFile { + return false + } + + if preCoordinate.LogPos+this.EventSize >= 1<<32 { + // Unexpected rows event, the previous binlog log_pos + current binlog event_size is overflow 4 bytes + return true + } + return false +} diff --git a/go/mysql/binlog_test.go b/go/mysql/binlog_test.go index 1bb7c0598..6d7a567af 100644 --- a/go/mysql/binlog_test.go +++ b/go/mysql/binlog_test.go @@ -6,6 +6,7 @@ package mysql import ( + "math" "testing" "github.com/openark/golib/log" @@ -52,3 +53,46 @@ func TestBinlogCoordinatesAsKey(t *testing.T) { test.S(t).ExpectEquals(len(m), 3) } + +func TestIsLogPosOverflowBeyond4Bytes(t *testing.T) { + { + var preCoordinates *BinlogCoordinates + curCoordinates := &BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: 10321, EventSize: 1100} + test.S(t).ExpectFalse(curCoordinates.IsLogPosOverflowBeyond4Bytes(preCoordinates)) + } + { + preCoordinates := &BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: 1100, EventSize: 1100} + curCoordinates := &BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: int64(uint32(preCoordinates.LogPos + 1100)), EventSize: 1100} + test.S(t).ExpectFalse(curCoordinates.IsLogPosOverflowBeyond4Bytes(preCoordinates)) + } + { + preCoordinates := &BinlogCoordinates{LogFile: "mysql-bin.00016", LogPos: 1100, EventSize: 1100} + curCoordinates := &BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: int64(uint32(preCoordinates.LogPos + 1100)), EventSize: 1100} + test.S(t).ExpectFalse(curCoordinates.IsLogPosOverflowBeyond4Bytes(preCoordinates)) + } + { + preCoordinates := &BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: math.MaxUint32 - 1001, EventSize: 1000} + curCoordinates := &BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: int64(uint32(preCoordinates.LogPos + 1000)), EventSize: 1000} + test.S(t).ExpectFalse(curCoordinates.IsLogPosOverflowBeyond4Bytes(preCoordinates)) + } + { + preCoordinates := &BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: math.MaxUint32 - 1000, EventSize: 1000} + curCoordinates := &BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: int64(uint32(preCoordinates.LogPos + 1000)), EventSize: 1000} + test.S(t).ExpectFalse(curCoordinates.IsLogPosOverflowBeyond4Bytes(preCoordinates)) + } + { + preCoordinates := &BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: math.MaxUint32 - 999, EventSize: 1000} + curCoordinates := &BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: int64(uint32(preCoordinates.LogPos + 1000)), EventSize: 1000} + test.S(t).ExpectTrue(curCoordinates.IsLogPosOverflowBeyond4Bytes(preCoordinates)) + } + { + preCoordinates := &BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: int64(uint32(math.MaxUint32 - 500)), EventSize: 1000} + curCoordinates := &BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: int64(uint32(preCoordinates.LogPos + 1000)), EventSize: 1000} + test.S(t).ExpectTrue(curCoordinates.IsLogPosOverflowBeyond4Bytes(preCoordinates)) + } + { + preCoordinates := &BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: math.MaxUint32, EventSize: 1000} + curCoordinates := &BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: int64(uint32(preCoordinates.LogPos + 1000)), EventSize: 1000} + test.S(t).ExpectTrue(curCoordinates.IsLogPosOverflowBeyond4Bytes(preCoordinates)) + } +}