Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix issue 887 #1269

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ type MigrationContext struct {

recentBinlogCoordinates mysql.BinlogCoordinates

BinlogSyncerMaxReconnectAttempts int
AllowSetupMetadataLockInstruments bool
BinlogSyncerMaxReconnectAttempts int

Log Logger
}
Expand Down
1 change: 1 addition & 0 deletions go/cmd/gh-ost/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func main() {
flag.Int64Var(&migrationContext.HooksStatusIntervalSec, "hooks-status-interval", 60, "how many seconds to wait between calling onStatus hook")

flag.UintVar(&migrationContext.ReplicaServerId, "replica-server-id", 99999, "server id used by gh-ost process. Default: 99999")
flag.BoolVar(&migrationContext.AllowSetupMetadataLockInstruments, "allow-setup-metadata-lock-instruments", false, "validate rename session acquiring lock whether is original table before unlock tables in cut-over phase")
flag.IntVar(&migrationContext.BinlogSyncerMaxReconnectAttempts, "binlogsyncer-max-reconnect-attempts", 0, "when master node fails, the maximum number of binlog synchronization attempts to reconnect. 0 is unlimited")

maxLoad := flag.String("max-load", "", "Comma delimited status-name=threshold. e.g: 'Threads_running=100,Threads_connected=500'. When status exceeds threshold, app throttles writes")
Expand Down
82 changes: 80 additions & 2 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package logic

import (
gosql "database/sql"
"errors"
"fmt"
"strings"
"sync/atomic"
Expand Down Expand Up @@ -416,6 +417,24 @@ func (this *Applier) WriteChangelogState(value string) (string, error) {
return this.WriteAndLogChangelog("state", value)
}

func (this *Applier) EnableMetadataLockInstrument() (err error) {
query := `select /*+ MAX_EXECUTION_TIME(300) */ ENABLED, TIMED from performance_schema.setup_instruments WHERE NAME = 'wait/lock/metadata/sql/mdl'`
var enabled, timed string
if err = this.db.QueryRow(query).Scan(&enabled, &timed); err != nil {
return this.migrationContext.Log.Errorf("query performance_schema.setup_instruments with name wait/lock/metadata/sql/mdl error: %s", err)
}
if strings.EqualFold(enabled, "YES") && strings.EqualFold(timed, "YES") {
this.migrationContext.Log.Infof("instrument wait/lock/metadata/sql/mdl already has been enabled")
return nil
}
this.migrationContext.Log.Infof("instrument wait/lock/metadata/sql/mdl state: enabled %s, timed %s", enabled, timed)
if _, err = this.db.Exec(`UPDATE performance_schema.setup_instruments SET ENABLED = 'YES', TIMED = 'YES' WHERE NAME = 'wait/lock/metadata/sql/mdl'`); err != nil {
return this.migrationContext.Log.Errorf("enable instrument wait/lock/metadata/sql/mdl error: %s", err)
}
this.migrationContext.Log.Infof("instrument wait/lock/metadata/sql/mdl enabled")
return nil
}

// InitiateHeartbeat creates a heartbeat cycle, writing to the changelog table.
// This is done asynchronously
func (this *Applier) InitiateHeartbeat() {
Expand Down Expand Up @@ -934,7 +953,7 @@ func (this *Applier) CreateAtomicCutOverSentryTable() error {
}

// AtomicCutOverMagicLock
func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocked chan<- error, okToUnlockTable <-chan bool, tableUnlocked chan<- error) error {
func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocked chan<- error, okToUnlockTable <-chan bool, tableUnlocked chan<- error, okToDropSentryTable <-chan bool, dropSentryTableDone chan<- bool) error {
tx, err := this.db.Begin()
if err != nil {
tableLocked <- err
Expand Down Expand Up @@ -1003,7 +1022,16 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke

// The cut-over phase will proceed to apply remaining backlog onto ghost table,
// and issue RENAME. We wait here until told to proceed.
<-okToUnlockTable

// we should make sure that the whole lock tables duration(include wait channel cost) not higher than rename session timeout.
// receive timeout channel, Rename session already has timeout&quit
select {
case <-okToDropSentryTable:
this.migrationContext.Log.Infof("Receive drop magic table channel, drop table %s.%s now", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.GetOldTableName()))
case <-time.After(time.Duration(tableLockTimeoutSeconds)*time.Second - time.Since(this.migrationContext.LockTablesStartTime)):
this.migrationContext.Log.Warningf("Wait drop magic table channel timeout, drop table %s.%s forcefully now. Noteworthy, it is unreasonable, timeout was used for foolproof, but normally it should not be here.", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.GetOldTableName()))
}

this.migrationContext.Log.Infof("Will now proceed to drop magic table and unlock tables")

// The magic table is here because we locked it. And we are the only ones allowed to drop it.
Expand All @@ -1019,6 +1047,16 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke
// We DO NOT return here because we must `UNLOCK TABLES`!
}

// Anyway, send dropSentryTableDone channel here
dropSentryTableDone <- true

// receive timeout channel, Rename session already has timeout&quit
select {
case <-okToUnlockTable:
this.migrationContext.Log.Infof("Receive unlock table channel, unlock tables now")
case <-time.After(time.Duration(tableLockTimeoutSeconds)*time.Second - time.Since(this.migrationContext.LockTablesStartTime)):
this.migrationContext.Log.Warningf("Wait unlock table channel timeout, unlock tables forcefully now. Noteworthy, it is unreasonable, timeout was used for foolproof, but normally it should not be here.")
}
// Tables still locked
this.migrationContext.Log.Infof("Releasing lock from %s.%s, %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName),
Expand Down Expand Up @@ -1079,6 +1117,46 @@ func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed
return nil
}

// ValidateGhostTableLocked validate original table has been locked via wait/lock/metadata/sql/mdl instrument.
// return error means the ghost table has not been locked by rename session which is not expected. Need kill rename session, unlock table try again later.
func (this *Applier) ValidateGhostTableLocked(renameSessionId int64) (err error) {
var schema, object, state, info string
query := `select /*+ MAX_EXECUTION_TIME(300) */ ifnull(a.object_schema,''),a.object_name,b.PROCESSLIST_STATE,b.PROCESSLIST_INFO from performance_schema.metadata_locks a join performance_schema.threads b on a.owner_thread_id=b.thread_id where b.processlist_id=? and a.lock_status='PENDING';`
// Not strictly validate here
if err := this.db.QueryRow(query, renameSessionId).Scan(&schema, &object, &state, &info); err != nil {
if errors.Is(err, gosql.ErrNoRows) {
this.migrationContext.Log.Warningf("query metadata locks returns %s, perhaps instrument wait/lock/metadata/sql/mdl not enabled, enable it via -allow-setup-metadata-lock-instruments", err)
}
this.migrationContext.Log.Warningf("Grabbing rename session acquire metadata lock error: %s, query:%s", err, query)
} else {
this.migrationContext.Log.Infof("Grabbing rename session acquire metadata lock is schema:%s, object:%s, state:%s, info: %s", schema, object, state, info)
if !strings.EqualFold(strings.ToLower(object), strings.ToLower(this.migrationContext.OriginalTableName)) {
return this.migrationContext.Log.Errorf("Expect rename session %d acquiring table metadata lock is %s.%s, but got %s.%s",
renameSessionId,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
sql.EscapeName(schema),
sql.EscapeName(object))
}
}
return nil
}

// KillRenameSession Kill rename session while ghost table not locked.
// Check rename session id whether is an expect process before execute kill command.
func (this *Applier) KillRenameSession(renameSessionId int64) (err error) {
if err := this.ExpectProcess(renameSessionId, "metadata lock", "rename"); err != nil {
return err
}
this.migrationContext.Log.Infof("Starting kill rename session %d", renameSessionId)
query := fmt.Sprintf(`kill /* gh-ost */ %d`, renameSessionId)
if _, err := this.db.Exec(query); err != nil {
return this.migrationContext.Log.Errorf("kill rename session %d error %s, anyway starting release original table now", renameSessionId, err)
}
this.migrationContext.Log.Infof("Kill rename session %d done", renameSessionId)
return nil
}

func (this *Applier) ShowStatusVariable(variableName string) (result int64, err error) {
query := fmt.Sprintf(`show global status like '%s'`, variableName)
if err := this.db.QueryRow(query).Scan(&variableName, &result); err != nil {
Expand Down
27 changes: 25 additions & 2 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,10 @@ func (this *Migrator) atomicCutOver() (err error) {
defer atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 0)

okToUnlockTable := make(chan bool, 4)
okToDropSentryTable := make(chan bool, 4)
dropSentryTableDone := make(chan bool, 2)
defer func() {
okToDropSentryTable <- true
okToUnlockTable <- true
}()

Expand All @@ -648,7 +651,7 @@ func (this *Migrator) atomicCutOver() (err error) {
tableLocked := make(chan error, 2)
tableUnlocked := make(chan error, 2)
go func() {
if err := this.applier.AtomicCutOverMagicLock(lockOriginalSessionIdChan, tableLocked, okToUnlockTable, tableUnlocked); err != nil {
if err := this.applier.AtomicCutOverMagicLock(lockOriginalSessionIdChan, tableLocked, okToUnlockTable, tableUnlocked, okToDropSentryTable, dropSentryTableDone); err != nil {
this.migrationContext.Log.Errore(err)
}
}()
Expand All @@ -674,6 +677,7 @@ func (this *Migrator) atomicCutOver() (err error) {
if err := this.applier.AtomicCutoverRename(renameSessionIdChan, tablesRenamed); err != nil {
// Abort! Release the lock
atomic.StoreInt64(&tableRenameKnownToHaveFailed, 1)
okToDropSentryTable <- true
okToUnlockTable <- true
}
}()
Expand All @@ -691,6 +695,7 @@ func (this *Migrator) atomicCutOver() (err error) {
// Wait for the RENAME to appear in PROCESSLIST
if err := this.retryOperation(waitForRename, true); err != nil {
// Abort! Release the lock
okToDropSentryTable <- true
okToUnlockTable <- true
return err
}
Expand All @@ -703,9 +708,20 @@ func (this *Migrator) atomicCutOver() (err error) {
}
this.migrationContext.Log.Infof("Connection holding lock on original table still exists")

okToDropSentryTable <- true
<-dropSentryTableDone

if err := this.applier.ValidateGhostTableLocked(renameSessionId); err != nil {
// Abort! Kill Rename session and release the lock
if err := this.applier.KillRenameSession(renameSessionId); err != nil {
this.migrationContext.Log.Errore(err)
}
okToUnlockTable <- true
return err
}

// Now that we've found the RENAME blocking, AND the locking connection still alive,
// we know it is safe to proceed to release the lock

okToUnlockTable <- true
// BAM! magic table dropped, original table lock is released
// -> RENAME released -> queries on original are unblocked.
Expand Down Expand Up @@ -1146,6 +1162,13 @@ func (this *Migrator) initiateApplier() error {
}
}
this.applier.WriteChangelogState(string(GhostTableMigrated))

if this.migrationContext.AllowSetupMetadataLockInstruments {
if err := this.applier.EnableMetadataLockInstrument(); err != nil {
this.migrationContext.Log.Errorf("Unable to enable metadata lock instrument, see further error details. Bailing out")
return err
}
}
go this.applier.InitiateHeartbeat()
return nil
}
Expand Down
Loading