-
Notifications
You must be signed in to change notification settings - Fork 1k
replication: Add mysql::serialization based Gtid Log Event #990
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 25 commits
3ea3d67
a3d5dee
10158f4
6888c31
09838f5
b6ceaf3
3bd47aa
0b445a1
b55f86e
3028527
76b98bb
990de1d
0237595
209cfad
ad8acac
f412b2b
a7e4280
b7b8f38
28d3be8
4a8e497
c74b18e
aa11671
86bf3fc
8d78a72
ba1dcc9
1558b91
da20b43
ab00980
80d1d62
89b293f
a45d879
085c631
ac2d26f
3a1789b
14de33c
173111f
f129364
b7e35fe
78157e3
9b61d61
dfc7475
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,6 +12,7 @@ import ( | |
"unicode" | ||
|
||
"github.com/go-mysql-org/go-mysql/mysql" | ||
"github.com/go-mysql-org/go-mysql/serialization" | ||
"github.com/google/uuid" | ||
"github.com/pingcap/errors" | ||
) | ||
|
@@ -420,6 +421,7 @@ func (e *QueryEvent) Dump(w io.Writer) { | |
type GTIDEvent struct { | ||
CommitFlag uint8 | ||
SID []byte | ||
Tag string | ||
GNO int64 | ||
LastCommitted int64 | ||
SequenceNumber int64 | ||
|
@@ -512,7 +514,11 @@ func (e *GTIDEvent) Dump(w io.Writer) { | |
|
||
fmt.Fprintf(w, "Commit flag: %d\n", e.CommitFlag) | ||
u, _ := uuid.FromBytes(e.SID) | ||
fmt.Fprintf(w, "GTID_NEXT: %s:%d\n", u.String(), e.GNO) | ||
if e.Tag != "" { | ||
fmt.Fprintf(w, "GTID_NEXT: %s:%s:%d\n", u.String(), e.Tag, e.GNO) | ||
} else { | ||
fmt.Fprintf(w, "GTID_NEXT: %s:%d\n", u.String(), e.GNO) | ||
} | ||
fmt.Fprintf(w, "LAST_COMMITTED: %d\n", e.LastCommitted) | ||
fmt.Fprintf(w, "SEQUENCE_NUMBER: %d\n", e.SequenceNumber) | ||
fmt.Fprintf(w, "Immediate commmit timestamp: %d (%s)\n", e.ImmediateCommitTimestamp, fmtTime(e.ImmediateCommitTime())) | ||
|
@@ -543,6 +549,195 @@ func (e *GTIDEvent) OriginalCommitTime() time.Time { | |
return microSecTimestampToTime(e.OriginalCommitTimestamp) | ||
} | ||
|
||
// GtidTaggedLogEvent is for a GTID event with a tag. | ||
// This is similar to GTIDEvent, but it has a tag and uses a different serialization format. | ||
type GtidTaggedLogEvent struct { | ||
GTIDEvent | ||
msg serialization.Message | ||
} | ||
|
||
func (e *GtidTaggedLogEvent) Decode(data []byte) error { | ||
e.msg = serialization.Message{ | ||
Format: serialization.Format{ | ||
Fields: []serialization.Field{ | ||
{ | ||
Name: "gtid_flags", | ||
Type: serialization.FieldIntFixed{ | ||
Length: 1, | ||
}, | ||
}, | ||
{ | ||
Name: "uuid", | ||
Type: serialization.FieldIntFixed{ | ||
Length: 16, | ||
}, | ||
}, | ||
{ | ||
Name: "gno", | ||
Type: serialization.FieldIntVar{}, | ||
}, | ||
{ | ||
Name: "tag", | ||
Type: serialization.FieldString{}, | ||
}, | ||
{ | ||
Name: "last_committed", | ||
Type: serialization.FieldIntVar{}, | ||
}, | ||
{ | ||
Name: "sequence_number", | ||
Type: serialization.FieldIntVar{}, | ||
}, | ||
{ | ||
Name: "immediate_commit_timestamp", | ||
Type: serialization.FieldUintVar{}, | ||
}, | ||
{ | ||
Name: "original_commit_timestamp", | ||
Type: serialization.FieldUintVar{}, | ||
Optional: true, | ||
}, | ||
{ | ||
Name: "transaction_length", | ||
Type: serialization.FieldUintVar{}, | ||
}, | ||
{ | ||
Name: "immediate_server_version", | ||
Type: serialization.FieldUintVar{}, | ||
}, | ||
{ | ||
Name: "original_server_version", | ||
Type: serialization.FieldUintVar{}, | ||
Optional: true, | ||
}, | ||
{ | ||
Name: "commit_group_ticket", | ||
Optional: true, | ||
}, | ||
}, | ||
}, | ||
} | ||
|
||
err := serialization.Unmarshal(data, &e.msg) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
f, err := e.msg.GetFieldByName("gtid_flags") | ||
if err != nil { | ||
return err | ||
} | ||
if v, ok := f.Type.(serialization.FieldIntFixed); ok { | ||
e.CommitFlag = v.Value[0] | ||
} else { | ||
return errors.New("failed to get gtid_flags field") | ||
} | ||
|
||
f, err = e.msg.GetFieldByName("uuid") | ||
if err != nil { | ||
return err | ||
} | ||
if v, ok := f.Type.(serialization.FieldIntFixed); ok { | ||
e.SID = v.Value | ||
} else { | ||
return errors.New("failed to get uuid field") | ||
} | ||
|
||
f, err = e.msg.GetFieldByName("gno") | ||
if err != nil { | ||
return err | ||
} | ||
if v, ok := f.Type.(serialization.FieldIntVar); ok { | ||
e.GNO = v.Value | ||
} else { | ||
return errors.New("failed to get gno field") | ||
} | ||
|
||
f, err = e.msg.GetFieldByName("tag") | ||
if err != nil { | ||
return err | ||
} | ||
if v, ok := f.Type.(serialization.FieldString); ok { | ||
e.Tag = v.Value | ||
} else { | ||
return errors.New("failed to get tag field") | ||
} | ||
|
||
f, err = e.msg.GetFieldByName("last_committed") | ||
if err != nil { | ||
return err | ||
} | ||
if v, ok := f.Type.(serialization.FieldIntVar); ok { | ||
e.LastCommitted = v.Value | ||
} else { | ||
return errors.New("failed to get last_committed field") | ||
} | ||
|
||
f, err = e.msg.GetFieldByName("sequence_number") | ||
if err != nil { | ||
return err | ||
} | ||
if v, ok := f.Type.(serialization.FieldIntVar); ok { | ||
e.SequenceNumber = v.Value | ||
} else { | ||
return errors.New("failed to get sequence_number field") | ||
} | ||
|
||
f, err = e.msg.GetFieldByName("immediate_commit_timestamp") | ||
if err != nil { | ||
return err | ||
} | ||
if v, ok := f.Type.(serialization.FieldUintVar); ok { | ||
e.ImmediateCommitTimestamp = v.Value | ||
} else { | ||
return errors.New("failed to get immediate_commit_timestamp field") | ||
} | ||
|
||
f, err = e.msg.GetFieldByName("original_commit_timestamp") | ||
if err != nil { | ||
return err | ||
} | ||
if v, ok := f.Type.(serialization.FieldUintVar); ok { | ||
e.OriginalCommitTimestamp = v.Value | ||
} else { | ||
return errors.New("failed to get original_commit_timestamp field") | ||
} | ||
|
||
f, err = e.msg.GetFieldByName("immediate_server_version") | ||
if err != nil { | ||
return err | ||
} | ||
if v, ok := f.Type.(serialization.FieldUintVar); ok { | ||
e.ImmediateServerVersion = uint32(v.Value) | ||
} else { | ||
return errors.New("failed to get immediate_server_version field") | ||
} | ||
|
||
f, err = e.msg.GetFieldByName("original_server_version") | ||
if err != nil { | ||
return err | ||
} | ||
if v, ok := f.Type.(serialization.FieldUintVar); ok { | ||
e.OriginalServerVersion = uint32(v.Value) | ||
} else { | ||
return errors.New("failed to get original_server_version field") | ||
} | ||
|
||
f, err = e.msg.GetFieldByName("transaction_length") | ||
if err != nil { | ||
return err | ||
} | ||
if v, ok := f.Type.(serialization.FieldUintVar); ok { | ||
e.TransactionLength = v.Value | ||
} else { | ||
return errors.New("failed to get transaction_length field") | ||
} | ||
|
||
// TODO: add and test commit_group_ticket | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TODO There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I haven't yet been able to generate any events with an actual I first tried with Then looking at the code I noticed that this has to do with Group Replication (part of InnoDB Cluster), So I setup a cluster with 3 sandbox instances... but that also didn't give me any events for testing. Looks like this might also be known as BGC (Binlog Group Commit) tickets. |
||
|
||
return nil | ||
} | ||
|
||
type BeginLoadQueryEvent struct { | ||
FileID uint32 | ||
BlockData []byte | ||
|
Uh oh!
There was an error while loading. Please reload this page.