-
Notifications
You must be signed in to change notification settings - Fork 1k
feat: Charset parameter for Mysql connection #1027
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 36 commits
ef55c4c
6855741
e6565cd
cb4f084
8d7d13c
f81397d
f77308e
824d650
61a148b
8cc642f
05b87bb
2025086
fa2e8fa
e95e5cd
fdd11a6
d947c9e
73477d3
eb2ea8e
007d419
c6c9248
d15a7ce
4298599
1f7d701
98108dd
411b75d
9bd5965
c349655
e7d51c3
c3593a7
fd2c582
3d213ae
b34a98d
7222b87
b4851e0
b939f18
88f0609
0799a57
a2053cd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -65,6 +65,10 @@ func NewCanal(cfg *Config) (*Canal, error) { | |
|
||
c.ctx, c.cancel = context.WithCancel(context.Background()) | ||
|
||
if cfg.WaitTimeBetweenConnectionSeconds > 0 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it should be |
||
cfg.WaitTimeBetweenConnectionSeconds = time.Duration(5) * time.Second | ||
} | ||
|
||
c.dumpDoneCh = make(chan struct{}) | ||
c.eventHandler = &DummyEventHandler{} | ||
c.parser = parser.New() | ||
|
@@ -192,6 +196,7 @@ func (c *Canal) RunFrom(pos mysql.Position) error { | |
return c.Run() | ||
} | ||
|
||
// Start from selected GTIDSet | ||
func (c *Canal) StartFromGTID(set mysql.GTIDSet) error { | ||
c.master.UpdateGTIDSet(set) | ||
|
||
|
@@ -238,15 +243,17 @@ func (c *Canal) run() error { | |
} | ||
|
||
func (c *Canal) Close() { | ||
log.Infof("closing canal") | ||
log.Debugf("closing canal") | ||
c.m.Lock() | ||
defer c.m.Unlock() | ||
|
||
c.cancel() | ||
c.syncer.Close() | ||
c.connLock.Lock() | ||
c.conn.Close() | ||
c.conn = nil | ||
if c.conn != nil { | ||
c.conn.Close() | ||
c.conn = nil | ||
} | ||
c.connLock.Unlock() | ||
|
||
_ = c.eventHandler.OnPosSynced(c.master.Position(), c.master.GTIDSet(), true) | ||
|
@@ -413,20 +420,21 @@ func (c *Canal) checkBinlogRowFormat() error { | |
|
||
func (c *Canal) prepareSyncer() error { | ||
cfg := replication.BinlogSyncerConfig{ | ||
ServerID: c.cfg.ServerID, | ||
Flavor: c.cfg.Flavor, | ||
User: c.cfg.User, | ||
Password: c.cfg.Password, | ||
Charset: c.cfg.Charset, | ||
HeartbeatPeriod: c.cfg.HeartbeatPeriod, | ||
ReadTimeout: c.cfg.ReadTimeout, | ||
UseDecimal: c.cfg.UseDecimal, | ||
ParseTime: c.cfg.ParseTime, | ||
SemiSyncEnabled: c.cfg.SemiSyncEnabled, | ||
MaxReconnectAttempts: c.cfg.MaxReconnectAttempts, | ||
DisableRetrySync: c.cfg.DisableRetrySync, | ||
TimestampStringLocation: c.cfg.TimestampStringLocation, | ||
TLSConfig: c.cfg.TLSConfig, | ||
ServerID: c.cfg.ServerID, | ||
Flavor: c.cfg.Flavor, | ||
User: c.cfg.User, | ||
Password: c.cfg.Password, | ||
Charset: c.cfg.Charset, | ||
HeartbeatPeriod: c.cfg.HeartbeatPeriod, | ||
ReadTimeout: c.cfg.ReadTimeout, | ||
UseDecimal: c.cfg.UseDecimal, | ||
ParseTime: c.cfg.ParseTime, | ||
SemiSyncEnabled: c.cfg.SemiSyncEnabled, | ||
MaxReconnectAttempts: c.cfg.MaxReconnectAttempts, | ||
DisableRetrySync: c.cfg.DisableRetrySync, | ||
TimestampStringLocation: c.cfg.TimestampStringLocation, | ||
TLSConfig: c.cfg.TLSConfig, | ||
WaitTimeBetweenConnectionSeconds: c.cfg.WaitTimeBetweenConnectionSeconds, | ||
} | ||
|
||
if strings.Contains(c.cfg.Addr, "/") { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ package canal | |
|
||
import ( | ||
"fmt" | ||
"strings" | ||
"sync/atomic" | ||
"time" | ||
|
||
|
@@ -22,15 +23,15 @@ func (c *Canal) startSyncer() (*replication.BinlogStreamer, error) { | |
if err != nil { | ||
return nil, errors.Errorf("start sync replication at binlog %v error %v", pos, err) | ||
} | ||
log.Infof("start sync binlog at binlog file %v", pos) | ||
log.Debugf("start sync binlog at binlog file %v", pos) | ||
Comment on lines
-25
to
+26
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You might think there is a need to change the log lever for these information messages. But it should be done in another PR, not this one. It brings confusion, especially because there are no links between these changes and the topic of the PR |
||
return s, nil | ||
} else { | ||
gsetClone := gset.Clone() | ||
s, err := c.syncer.StartSyncGTID(gset) | ||
if err != nil { | ||
return nil, errors.Errorf("start sync replication at GTID set %v error %v", gset, err) | ||
} | ||
log.Infof("start sync binlog at GTID set %v", gsetClone) | ||
log.Debugf("start sync binlog at GTID set %v", gsetClone) | ||
return s, nil | ||
} | ||
} | ||
|
@@ -65,7 +66,7 @@ func (c *Canal) runSyncBinlog() error { | |
switch e := ev.Event.(type) { | ||
case *replication.RotateEvent: | ||
fakeRotateLogName = string(e.NextLogName) | ||
log.Infof("received fake rotate event, next log name is %s", e.NextLogName) | ||
log.Debugf("received fake rotate event, next log name is %s", e.NextLogName) | ||
} | ||
|
||
continue | ||
|
@@ -92,7 +93,7 @@ func (c *Canal) runSyncBinlog() error { | |
case *replication.RotateEvent: | ||
pos.Name = string(e.NextLogName) | ||
pos.Pos = uint32(e.Position) | ||
log.Infof("rotate binlog to %s", pos) | ||
log.Debugf("rotate binlog to %s", pos) | ||
savePos = true | ||
force = true | ||
if err = c.eventHandler.OnRotate(e); err != nil { | ||
|
@@ -142,7 +143,15 @@ func (c *Canal) runSyncBinlog() error { | |
case *replication.QueryEvent: | ||
stmts, _, err := c.parser.Parse(string(e.Query), "", "") | ||
if err != nil { | ||
log.Errorf("parse query(%s) err %v, will skip this event", e.Query, err) | ||
msg := err.Error() | ||
if strings.Contains(strings.ToLower(msg), strings.ToLower("procedure")) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Parsing error message like this is a bad practice. Don't you have an error code should could catch? |
||
// Cut the first row from the message since it contain the procedure call and not the entire message | ||
fl := strings.Split(msg, "\n") | ||
log.Debugf("parse SP Error: (%s)", fl[0]) | ||
} else { | ||
log.Debugf("parse query(%s) err %v", e.Query, err) | ||
} | ||
log.Debugln("will skip this event") | ||
continue | ||
} | ||
for _, stmt := range stmts { | ||
|
@@ -232,7 +241,7 @@ func parseStmt(stmt ast.StmtNode) (ns []*node) { | |
|
||
func (c *Canal) updateTable(db, table string) (err error) { | ||
c.ClearTableCache([]byte(db), []byte(table)) | ||
log.Infof("table structure changed, clear table cache: %s.%s\n", db, table) | ||
log.Debugf("table structure changed, clear table cache: %s.%s\n", db, table) | ||
if err = c.eventHandler.OnTableChanged(db, table); err != nil && errors.Cause(err) != schema.ErrTableNotExist { | ||
return errors.Trace(err) | ||
} | ||
|
@@ -270,38 +279,10 @@ func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error { | |
return errors.Errorf("%s not supported now", e.Header.EventType) | ||
} | ||
events := newRowsEvent(t, action, ev.Rows, e.Header) | ||
events.Header.Gtid = c.SyncedGTIDSet() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you explain why this is changed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 |
||
return c.eventHandler.OnRow(events) | ||
} | ||
|
||
func (c *Canal) FlushBinlog() error { | ||
_, err := c.Execute("FLUSH BINARY LOGS") | ||
return errors.Trace(err) | ||
} | ||
|
||
func (c *Canal) WaitUntilPos(pos mysql.Position, timeout time.Duration) error { | ||
timer := time.NewTimer(timeout) | ||
for { | ||
select { | ||
case <-timer.C: | ||
return errors.Errorf("wait position %v too long > %s", pos, timeout) | ||
default: | ||
err := c.FlushBinlog() | ||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
curPos := c.master.Position() | ||
if curPos.Compare(pos) >= 0 { | ||
return nil | ||
} else { | ||
log.Debugf("master pos is %v, wait catching %v", curPos, pos) | ||
time.Sleep(100 * time.Millisecond) | ||
} | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
Comment on lines
-276
to
-303
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removing this break the API of the package I don't see why they were moved as functions accepting Canal |
||
|
||
func (c *Canal) GetMasterPos() (mysql.Position, error) { | ||
rr, err := c.Execute("SHOW MASTER STATUS") | ||
if err != nil { | ||
|
@@ -336,12 +317,3 @@ func (c *Canal) GetMasterGTIDSet() (mysql.GTIDSet, error) { | |
} | ||
return gset, nil | ||
} | ||
|
||
func (c *Canal) CatchMasterPos(timeout time.Duration) error { | ||
pos, err := c.GetMasterPos() | ||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
|
||
return c.WaitUntilPos(pos, timeout) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the log level should be kept, other developers may rely on this behaviour.
Also for other files.