Skip to content

Commit

Permalink
Merge branch 'v2.x-input-mysql-replicate-db-may-error-by-restart-bug'…
Browse files Browse the repository at this point in the history
… into v2.2.3

* v2.x-input-mysql-replicate-db-may-error-by-restart-bug:
  修复MySQL源,指定同步表配置后,但可能没生效的BUG BUG复现过程: 1. 配置了一个T1表同步 2. 重启进程,并且T1表数据不再做任何更新 3. 配置T2表同步 4. 更新T1表的数据
  • Loading branch information
jc3wish committed Dec 3, 2023
2 parents 2b92425 + 44fad31 commit d33b950
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 5 deletions.
5 changes: 5 additions & 0 deletions input/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import (
mysqlDriver "github.com/brokercap/Bifrost/Bristol/mysql"
inputDriver "github.com/brokercap/Bifrost/input/driver"
"log"
"sync"
)

var MySQLBinlogDump string

type MysqlInput struct {
sync.RWMutex
inputDriver.PluginDriverInterface
inputInfo inputDriver.InputInfo
binlogDump *mysqlDriver.BinlogDump
Expand All @@ -18,6 +20,8 @@ type MysqlInput struct {
PluginStatusChan chan *inputDriver.PluginStatus
eventID uint64
callback inputDriver.Callback

replicateDoDb map[string]map[string]bool
}

func NewInputPlugin() inputDriver.Driver {
Expand Down Expand Up @@ -63,6 +67,7 @@ func (c *MysqlInput) Start0() error {
},
nil, nil)
c.binlogDump.SetNextEventID(c.eventID)
c.InitBinlogDumpReplicateDoDb()
if !c.inputInfo.IsGTID || c.inputInfo.GTID == "" {
go c.binlogDump.StartDumpBinlog(c.inputInfo.BinlogFileName, c.inputInfo.BinlogPostion, c.inputInfo.ServerId, c.reslut, c.inputInfo.MaxFileName, c.inputInfo.MaxPosition)
} else {
Expand Down
55 changes: 50 additions & 5 deletions input/mysql/replicate.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,62 @@
package mysql

func (c *MysqlInput) AddReplicateDoDb(SchemaName,TableName string) (err error) {
func (c *MysqlInput) AddReplicateDoDb(SchemaName, TableName string) (err error) {
c.AddReplicateDoDb0(SchemaName, TableName)
if c.binlogDump == nil {
return
}
c.binlogDump.AddReplicateDoDb(SchemaName,TableName)
c.binlogDump.AddReplicateDoDb(SchemaName, TableName)
return nil
}

func (c *MysqlInput) DelReplicateDoDb(SchemaName,TableName string) (err error) {
func (c *MysqlInput) AddReplicateDoDb0(SchemaName, TableName string) {
c.Lock()
defer c.Unlock()
if c.replicateDoDb == nil {
c.replicateDoDb = make(map[string]map[string]bool, 0)
}
if _, ok := c.replicateDoDb[SchemaName]; !ok {
c.replicateDoDb[SchemaName] = make(map[string]bool, 0)
}
c.replicateDoDb[SchemaName][TableName] = true
return
}

func (c *MysqlInput) DelReplicateDoDb(SchemaName, TableName string) (err error) {
c.DelReplicateDoDb0(SchemaName, TableName)
if c.binlogDump == nil {
return
}
c.binlogDump.DelReplicateDoDb(SchemaName,TableName)
c.binlogDump.DelReplicateDoDb(SchemaName, TableName)
return nil
}
}

func (c *MysqlInput) DelReplicateDoDb0(SchemaName, TableName string) {
c.Lock()
defer c.Unlock()
if c.replicateDoDb == nil {
return
}
if _, ok := c.replicateDoDb[SchemaName]; !ok {
return
}
delete(c.replicateDoDb[SchemaName], TableName)
return
}

func (c *MysqlInput) InitBinlogDumpReplicateDoDb() {
c.Lock()
defer c.Unlock()
if c.replicateDoDb == nil {
return
}
if c.binlogDump == nil {
return
}
for schemaName, replicateDoDbTablesMap := range c.replicateDoDb {
for tableName := range replicateDoDbTablesMap {
c.binlogDump.AddReplicateDoDb(schemaName, tableName)
}
}
return
}

0 comments on commit d33b950

Please sign in to comment.