Skip to content

Commit

Permalink
Merge branch 'v2.2.3' into v2.3.4
Browse files Browse the repository at this point in the history
* v2.2.3:
  修复MySQL源,指定同步表配置后,但可能没生效的BUG BUG复现过程: 1. 配置了一个T1表同步 2. 重启进程,并且T1表数据不再做任何更新 3. 配置T2表同步 4. 更新T1表的数据
  修复mysql 带有触发器的时候binlog解析失败的bug #263
  修复plugin clickhouse decimal 精度丢失的bug #204
  v2.2.2

# Conflicts:
#	README.EN.MD
#	README.MD
#	changelog.txt
#	config/version.go
  • Loading branch information
jc3wish committed Dec 3, 2023
2 parents e00f683 + d33b950 commit 00d2bb8
Show file tree
Hide file tree
Showing 8 changed files with 531 additions and 76 deletions.
7 changes: 2 additions & 5 deletions Bristol/mysql/event_row.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,7 @@ func (parser *eventParser) parseRowsEvent(buf *bytes.Buffer) (event *RowsEvent,
tableIdSize = 6
}

_, err = readFixedLengthInteger(buf, tableIdSize)
// 这里采用 lastMapEvent.tableId ,则不采用 readFixedLengthInteger 解析出来的 tableId
// 实际在运行中,发现存在 readFixedLengthInteger 解析出来的tableId 并不能在 parser.tableSchemaMap 找到的情况,row event 紧随 map event 之后,所以这里采用 parser.lastMapEventTableId 应该不会有问题
event.tableId = parser.lastMapEvent.tableId
event.tableId, err = readFixedLengthInteger(buf, tableIdSize)
err = binary.Read(buf, binary.LittleEndian, &event.flags)
switch event.header.EventType {
case UPDATE_ROWS_EVENTv2, WRITE_ROWS_EVENTv2, DELETE_ROWS_EVENTv2:
Expand All @@ -62,7 +59,7 @@ func (parser *eventParser) parseRowsEvent(buf *bytes.Buffer) (event *RowsEvent,
}
for buf.Len() > 0 {
var row map[string]interface{}
row, err = parser.parseEventRow(buf, parser.lastMapEvent, parser.tableSchemaMap[event.tableId].ColumnSchemaTypeList)
row, err = parser.parseEventRow(buf, parser.tableMap[event.tableId], parser.tableSchemaMap[event.tableId].ColumnSchemaTypeList)
if err != nil {
log.Println("event row parser err:", err)
return
Expand Down
2 changes: 2 additions & 0 deletions Bristol/mysql/lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
)

type tableStruct struct {
SchemaName string
TableName string
Pri []string
ColumnSchemaTypeList []*ColumnInfo
needReload bool
Expand Down
131 changes: 66 additions & 65 deletions Bristol/mysql/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,45 +2,44 @@ package mysql

import (
"bytes"
"strings"
"fmt"
"database/sql/driver"
"fmt"
"log"
"runtime/debug"
"strings"
)

type eventParser struct {
format *FormatDescriptionEvent
tableMap map[uint64]*TableMapEvent // tableId 对应的最后一个 TableMapEvent 事件
tableNameMap map[string]uint64 // schame.table 做为key 对应的tableId
tableSchemaMap map[uint64]*tableStruct // tableId 对应的表结构
dataSource *string
connStatus StatusFlag
conn MysqlConnection
dumpBinLogStatus StatusFlag
binlogFileName string
format *FormatDescriptionEvent
tableMap map[uint64]*TableMapEvent // tableId 对应的最后一个 TableMapEvent 事件
tableNameMap map[string]uint64 // schame.table 做为key 对应的tableId
tableSchemaMap map[uint64]*tableStruct // tableId 对应的表结构
dataSource *string
connStatus StatusFlag
conn MysqlConnection
dumpBinLogStatus StatusFlag
binlogFileName string
currentBinlogFileName string
binlogPosition uint32
binlogTimestamp uint32
lastEventID uint64
maxBinlogFileName string
maxBinlogPosition uint32
eventDo []bool
ServerId uint32
connectionId string
binlog_checksum bool
filterNextRowEvent bool
binlogDump *BinlogDump
lastMapEvent *TableMapEvent // 保存最近一次 map event 解析出来的 tableId,用于接下来的 row event 解析使用,因为实际运行中发现,row event 解析出来 tableId 可能对不上。row event 紧跟在 map event 之后,row event 的时候,直接采用最后一次map event
callbackErrChan chan error
isGTID bool
nextEventID uint64 // 下一个事件ID, 不能修改
lastPrevtiousGTIDSMap map[string]Intervals // 当前解析的 binlog 文件的 PrevtiousGTIDS 对应关系
gtidSetInfo GTIDSet
dbType DBType
binlogPosition uint32
binlogTimestamp uint32
lastEventID uint64
maxBinlogFileName string
maxBinlogPosition uint32
eventDo []bool
ServerId uint32
connectionId string
binlog_checksum bool
filterNextRowEvent bool
binlogDump *BinlogDump
lastMapEvent *TableMapEvent // 保存最近一次 map event 解析出来的 tableId,用于接下来的 row event 解析使用,因为实际运行中发现,row event 解析出来 tableId 可能对不上。row event 紧跟在 map event 之后,row event 的时候,直接采用最后一次map event
callbackErrChan chan error
isGTID bool
nextEventID uint64 // 下一个事件ID, 不能修改
lastPrevtiousGTIDSMap map[string]Intervals // 当前解析的 binlog 文件的 PrevtiousGTIDS 对应关系
gtidSetInfo GTIDSet
dbType DBType
}


func newEventParser(binlogDump *BinlogDump) (parser *eventParser) {
parser = new(eventParser)
parser.tableMap = make(map[uint64]*TableMapEvent)
Expand All @@ -63,15 +62,15 @@ func (parser *eventParser) getNextEventID() uint64 {
}

func (parser *eventParser) getGTIDSIDStart(sid string) int64 {
if _,ok := parser.lastPrevtiousGTIDSMap[sid];ok {
if _, ok := parser.lastPrevtiousGTIDSMap[sid]; ok {
return parser.lastPrevtiousGTIDSMap[sid].Start
}
return 1
}

func (parser *eventParser) saveBinlog(event *EventReslut) {
switch event.Header.EventType {
case QUERY_EVENT,XID_EVENT:
case QUERY_EVENT, XID_EVENT:
if event.BinlogFileName == "" {
return
}
Expand All @@ -87,7 +86,7 @@ func (parser *eventParser) saveBinlog(event *EventReslut) {
parser.currentBinlogFileName = event.BinlogFileName
parser.lastEventID = event.EventID
parser.binlogDump.Unlock()
case GTID_EVENT,ANONYMOUS_GTID_EVENT,MARIADB_GTID_EVENT:
case GTID_EVENT, ANONYMOUS_GTID_EVENT, MARIADB_GTID_EVENT:
parser.binlogDump.Lock()
parser.binlogTimestamp = event.Header.Timestamp
parser.lastEventID = event.EventID
Expand Down Expand Up @@ -118,28 +117,28 @@ func (parser *eventParser) parseEvent(data []byte) (event *EventReslut, filename
return
case PREVIOUS_GTIDS_EVENT:
var PreviousGTIDSEvent *PreviousGTIDSEvent
PreviousGTIDSEvent,err = parser.parsePrevtiousGTIDSEvent(buf)
PreviousGTIDSEvent, err = parser.parsePrevtiousGTIDSEvent(buf)
event = &EventReslut{
Header: PreviousGTIDSEvent.header,
BinlogFileName: parser.currentBinlogFileName,
BinlogPosition: PreviousGTIDSEvent.header.LogPos,
}
return
case GTID_EVENT,ANONYMOUS_GTID_EVENT:
case GTID_EVENT, ANONYMOUS_GTID_EVENT:
var GtidEvent *GTIDEvent
GtidEvent, err = parser.parseGTIDEvent(buf)
gtid := fmt.Sprintf("%s:%d-%d",GtidEvent.SID36,parser.getGTIDSIDStart(GtidEvent.SID36),GtidEvent.GNO)
gtid := fmt.Sprintf("%s:%d-%d", GtidEvent.SID36, parser.getGTIDSIDStart(GtidEvent.SID36), GtidEvent.GNO)
parser.gtidSetInfo.Update(gtid)
event = &EventReslut{
Header: GtidEvent.header,
BinlogFileName: parser.currentBinlogFileName,
BinlogPosition: GtidEvent.header.LogPos,
Gtid: parser.gtidSetInfo.String(),
Gtid: parser.gtidSetInfo.String(),
}
break
case MARIADB_GTID_LIST_EVENT:
var MariaDBGTIDSEvent *MariadbGTIDListEvent
MariaDBGTIDSEvent,err = parser.MariadbGTIDListEvent(buf)
MariaDBGTIDSEvent, err = parser.MariadbGTIDListEvent(buf)
event = &EventReslut{
Header: MariaDBGTIDSEvent.header,
BinlogFileName: parser.currentBinlogFileName,
Expand All @@ -149,25 +148,25 @@ func (parser *eventParser) parseEvent(data []byte) (event *EventReslut, filename
case MARIADB_GTID_EVENT:
var GtidEvent *MariadbGTIDEvent
GtidEvent, err = parser.MariadbGTIDEvent(buf)
gtid := fmt.Sprintf("%d-%d-%d",GtidEvent.GTID.DomainID,GtidEvent.GTID.ServerID,GtidEvent.GTID.SequenceNumber)
gtid := fmt.Sprintf("%d-%d-%d", GtidEvent.GTID.DomainID, GtidEvent.GTID.ServerID, GtidEvent.GTID.SequenceNumber)
parser.gtidSetInfo.Update(gtid)
event = &EventReslut{
Header: GtidEvent.header,
BinlogFileName: parser.currentBinlogFileName,
BinlogPosition: GtidEvent.header.LogPos,
Gtid: parser.gtidSetInfo.String(),
Gtid: parser.gtidSetInfo.String(),
}
return
case FORMAT_DESCRIPTION_EVENT:
parser.format, err = parser.parseFormatDescriptionEvent(buf)
if strings.Contains(parser.format.mysqlServerVersion,"MariaDB") {
if strings.Contains(parser.format.mysqlServerVersion, "MariaDB") {
parser.dbType = DB_TYPE_MARIADB
}
// 这要地方要对 gtidSetInfo 初始化,在假如非 GTID 解析的情况下,但是 数据库本身又有 GTID 事件,是存在可能解析出错的情况的
if parser.gtidSetInfo == nil {
if parser.dbType == DB_TYPE_MARIADB {
parser.gtidSetInfo = NewMariaDBGtidSet("")
}else{
} else {
parser.gtidSetInfo = NewMySQLGtidSet("")
}
}
Expand Down Expand Up @@ -221,11 +220,11 @@ func (parser *eventParser) parseEvent(data []byte) (event *EventReslut, filename
BinlogFileName: rotateEvent.filename,
BinlogPosition: rotateEvent.header.LogPos,
}
for _,v := range parser.tableSchemaMap {
for _, v := range parser.tableSchemaMap {
v.needReload = true
}
parser.saveBinlog(event)
log.Println(*parser.dataSource," ROTATE_EVENT ",event.BinlogFileName)
log.Println(*parser.dataSource, " ROTATE_EVENT ", event.BinlogFileName)
break
case TABLE_MAP_EVENT:
var table_map_event *TableMapEvent
Expand All @@ -239,7 +238,7 @@ func (parser *eventParser) parseEvent(data []byte) (event *EventReslut, filename
} else {
parser.filterNextRowEvent = false
_, ok := parser.tableSchemaMap[table_map_event.tableId]
if !ok || ( parser.tableSchemaMap[table_map_event.tableId].needReload == true ) {
if !ok || (parser.tableSchemaMap[table_map_event.tableId].needReload == true) {
parser.GetTableSchema(table_map_event.tableId, table_map_event.schemaName, table_map_event.tableName)
}
}
Expand All @@ -263,10 +262,10 @@ func (parser *eventParser) parseEvent(data []byte) (event *EventReslut, filename
Header: rowsEvent.header,
BinlogFileName: parser.currentBinlogFileName,
BinlogPosition: rowsEvent.header.LogPos,
SchemaName: parser.lastMapEvent.schemaName,
TableName: parser.lastMapEvent.tableName,
SchemaName: tableInfo.SchemaName,
TableName: tableInfo.TableName,
Rows: rowsEvent.rows,
Pri: tableInfo.Pri,
Pri: tableInfo.Pri,
ColumnMapping: tableInfo.ColumnMapping,
}
} else {
Expand All @@ -282,7 +281,7 @@ func (parser *eventParser) parseEvent(data []byte) (event *EventReslut, filename
break
case XID_EVENT:
var xidEvent *XIdEvent
xidEvent,err = parser.parseXidEvent(buf)
xidEvent, err = parser.parseXidEvent(buf)
if err != nil {
log.Println("xid event err:", err)
}
Expand All @@ -293,7 +292,7 @@ func (parser *eventParser) parseEvent(data []byte) (event *EventReslut, filename
SchemaName: "",
TableName: "",
Rows: nil,
Gtid: parser.gtidSetInfo.String(),
Gtid: parser.gtidSetInfo.String(),
}
break
default:
Expand Down Expand Up @@ -328,9 +327,9 @@ func (parser *eventParser) ParserConnClose(lock bool) {
}
parser.connStatus = STATUS_CLOSED
if parser.conn != nil {
func(){
func(){
if err := recover();err != nil {
func() {
func() {
if err := recover(); err != nil {
return
}
}()
Expand Down Expand Up @@ -389,10 +388,12 @@ func (parser *eventParser) GetTableSchemaByName(tableId uint64, database string,
defer rows.Close()
//columeArr := make([]*tableStruct column_schema_type,0)
tableInfo := &tableStruct{
SchemaName: database,
TableName: tablename,
Pri: make([]string, 0),
ColumnSchemaTypeList: make([]*ColumnInfo, 0),
}
ColumnMapping := make(map[string]string,0)
ColumnMapping := make(map[string]string, 0)
for {
dest := make([]driver.Value, 11, 11)
err := rows.Next(dest)
Expand Down Expand Up @@ -490,7 +491,7 @@ func (parser *eventParser) GetTableSchemaByName(tableId uint64, database string,
}
if dest[10] == nil {
IS_NULLABLE = "YES"
}else{
} else {
IS_NULLABLE = dest[10].(string)
}

Expand Down Expand Up @@ -524,7 +525,7 @@ func (parser *eventParser) GetTableSchemaByName(tableId uint64, database string,
} else {
if COLUMN_TYPE == "tinyint(1)" {
columnMappingType = "bool"
}else{
} else {
columnMappingType = "int8"
}
}
Expand Down Expand Up @@ -552,16 +553,16 @@ func (parser *eventParser) GetTableSchemaByName(tableId uint64, database string,
} else {
columnMappingType = "int64"
}
case "numeric" :
columnMappingType = strings.Replace(COLUMN_TYPE,"numeric","decimal",1)
case "real" :
columnMappingType = strings.Replace(COLUMN_TYPE,"real","double",1)
case "numeric":
columnMappingType = strings.Replace(COLUMN_TYPE, "numeric", "decimal", 1)
case "real":
columnMappingType = strings.Replace(COLUMN_TYPE, "real", "double", 1)
default:
columnMappingType = COLUMN_TYPE
break
}
if IS_NULLABLE == "YES" {
columnMappingType = "Nullable("+columnMappingType+")"
columnMappingType = "Nullable(" + columnMappingType + ")"
}
ColumnMapping[COLUMN_NAME] = columnMappingType
}
Expand Down Expand Up @@ -641,12 +642,12 @@ func (parser *eventParser) KillConnect(connectionId string) (b bool) {
return true
}

func (parser *eventParser) GetTableId(database string, tablename string) (uint64,error) {
func (parser *eventParser) GetTableId(database string, tablename string) (uint64, error) {
key := database + "." + tablename
if _, ok := parser.tableNameMap[key]; !ok {
return 0,fmt.Errorf("not found key:%s",key)
return 0, fmt.Errorf("not found key:%s", key)
}
return parser.tableNameMap[key],nil
return parser.tableNameMap[key], nil
}

func (parser *eventParser) delTableId(database string, tablename string) {
Expand All @@ -656,4 +657,4 @@ func (parser *eventParser) delTableId(database string, tablename string) {
}
delete(parser.tableNameMap, key)
return
}
}
5 changes: 5 additions & 0 deletions input/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import (
mysqlDriver "github.com/brokercap/Bifrost/Bristol/mysql"
inputDriver "github.com/brokercap/Bifrost/input/driver"
"log"
"sync"
)

var MySQLBinlogDump string

type MysqlInput struct {
sync.RWMutex
inputDriver.PluginDriverInterface
inputInfo inputDriver.InputInfo
binlogDump *mysqlDriver.BinlogDump
Expand All @@ -18,6 +20,8 @@ type MysqlInput struct {
PluginStatusChan chan *inputDriver.PluginStatus
eventID uint64
callback inputDriver.Callback

replicateDoDb map[string]map[string]bool
}

func NewInputPlugin() inputDriver.Driver {
Expand Down Expand Up @@ -63,6 +67,7 @@ func (c *MysqlInput) Start0() error {
},
nil, nil)
c.binlogDump.SetNextEventID(c.eventID)
c.InitBinlogDumpReplicateDoDb()
if !c.inputInfo.IsGTID || c.inputInfo.GTID == "" {
go c.binlogDump.StartDumpBinlog(c.inputInfo.BinlogFileName, c.inputInfo.BinlogPostion, c.inputInfo.ServerId, c.reslut, c.inputInfo.MaxFileName, c.inputInfo.MaxPosition)
} else {
Expand Down
Loading

0 comments on commit 00d2bb8

Please sign in to comment.