Skip to content

Commit 155e335

Browse files
svart-ravnsiddontang
authored andcommitted
FormatDescriptionEvent should be readed by binlog file always (go-mysql-org#224)
1 parent 249187d commit 155e335

File tree

1 file changed

+63
-29
lines changed

1 file changed

+63
-29
lines changed

replication/parser.go

Lines changed: 63 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,13 @@ func (p *BinlogParser) ParseFile(name string, offset int64, onEvent OnEventFunc)
5050

5151
if offset < 4 {
5252
offset = 4
53+
} else if offset > 4 {
54+
// FORMAT_DESCRIPTION event should be read by default always (despite that fact passed offset may be higher than 4)
55+
if _, err = f.Seek(4, os.SEEK_SET); err != nil {
56+
return errors.Errorf("seek %s to %d error %v", name, offset, err)
57+
}
58+
59+
p.getFormatDescriptionEvent(f, onEvent)
5360
}
5461

5562
if _, err = f.Seek(offset, os.SEEK_SET); err != nil {
@@ -59,55 +66,82 @@ func (p *BinlogParser) ParseFile(name string, offset int64, onEvent OnEventFunc)
5966
return p.ParseReader(f, onEvent)
6067
}
6168

62-
func (p *BinlogParser) ParseReader(r io.Reader, onEvent OnEventFunc) error {
69+
70+
71+
func (p *BinlogParser) getFormatDescriptionEvent(r io.Reader, onEvent OnEventFunc) error {
72+
_, err := p.parseSingleEvent(&r, onEvent)
73+
return err
74+
}
75+
76+
77+
78+
func (p *BinlogParser) parseSingleEvent(r *io.Reader, onEvent OnEventFunc) (bool, error) {
6379
var err error
6480
var n int64
6581

66-
for {
67-
headBuf := make([]byte, EventHeaderSize)
82+
headBuf := make([]byte, EventHeaderSize)
6883

69-
if _, err = io.ReadFull(r, headBuf); err == io.EOF {
70-
return nil
71-
} else if err != nil {
72-
return errors.Trace(err)
73-
}
84+
if _, err = io.ReadFull(*r, headBuf); err == io.EOF {
85+
return true, nil
86+
} else if err != nil {
87+
return false, errors.Trace(err)
88+
}
7489

75-
var h *EventHeader
76-
h, err = p.parseHeader(headBuf)
77-
if err != nil {
78-
return errors.Trace(err)
79-
}
90+
var h *EventHeader
91+
h, err = p.parseHeader(headBuf)
92+
if err != nil {
93+
return false, errors.Trace(err)
94+
}
8095

81-
if h.EventSize <= uint32(EventHeaderSize) {
82-
return errors.Errorf("invalid event header, event size is %d, too small", h.EventSize)
96+
if h.EventSize <= uint32(EventHeaderSize) {
97+
return false, errors.Errorf("invalid event header, event size is %d, too small", h.EventSize)
98+
}
8399

84-
}
100+
var buf bytes.Buffer
101+
if n, err = io.CopyN(&buf, *r, int64(h.EventSize)-int64(EventHeaderSize)); err != nil {
102+
return false, errors.Errorf("get event body err %v, need %d - %d, but got %d", err, h.EventSize, EventHeaderSize, n)
103+
}
85104

86-
var buf bytes.Buffer
87-
if n, err = io.CopyN(&buf, r, int64(h.EventSize)-int64(EventHeaderSize)); err != nil {
88-
return errors.Errorf("get event body err %v, need %d - %d, but got %d", err, h.EventSize, EventHeaderSize, n)
89-
}
105+
data := buf.Bytes()
106+
rawData := data
90107

91-
data := buf.Bytes()
92-
rawData := data
108+
eventLen := int(h.EventSize) - EventHeaderSize
93109

94-
eventLen := int(h.EventSize) - EventHeaderSize
110+
if len(data) != eventLen {
111+
return false, errors.Errorf("invalid data size %d in event %s, less event length %d", len(data), h.EventType, eventLen)
112+
}
95113

96-
if len(data) != eventLen {
97-
return errors.Errorf("invalid data size %d in event %s, less event length %d", len(data), h.EventType, eventLen)
114+
var e Event
115+
e, err = p.parseEvent(h, data)
116+
if err != nil {
117+
if _, ok := err.(errMissingTableMapEvent); ok {
118+
return false, nil
98119
}
120+
return false, errors.Trace(err)
121+
}
99122

100-
var e Event
101-
e, err = p.parseEvent(h, data)
123+
if err = onEvent(&BinlogEvent{rawData, h, e}); err != nil {
124+
return false, errors.Trace(err)
125+
}
126+
127+
128+
return false, nil
129+
}
130+
131+
132+
func (p *BinlogParser) ParseReader(r io.Reader, onEvent OnEventFunc) error {
133+
134+
for {
135+
done, err := p.parseSingleEvent(&r, onEvent);
102136
if err != nil {
103137
if _, ok := err.(errMissingTableMapEvent); ok {
104138
continue
105139
}
106140
return errors.Trace(err)
107141
}
108142

109-
if err = onEvent(&BinlogEvent{rawData, h, e}); err != nil {
110-
return errors.Trace(err)
143+
if done {
144+
break
111145
}
112146
}
113147

0 commit comments

Comments
 (0)