Skip to content

Commit 2da4180

Browse files
authored
Merge pull request #636 from ackleymi/repeating-grps
Maintain repeating group field order when parsing messages
2 parents fbe0cd7 + 3517c8b commit 2da4180

File tree

5 files changed

+471
-59
lines changed

5 files changed

+471
-59
lines changed

Makefile

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ linters-install:
2525
lint: linters-install
2626
golangci-lint run
2727

28+
# An easy way to run the linter without going through the install process -
29+
# docker run -t --rm -v $(pwd):/app -w /app golangci/golangci-lint:v1.57.2 golangci-lint run -v
30+
# See https://golangci-lint.run/welcome/install/ for more details.
31+
2832
# ---------------------------------------------------------------
2933
# Targets related to running acceptance tests -
3034

in_session.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,11 @@ func (state inSession) resendMessages(session *session, beginSeqNo, endSeqNo int
232232
nextSeqNum := seqNum
233233
msg := NewMessage()
234234
for _, msgBytes := range msgs {
235-
_ = ParseMessageWithDataDictionary(msg, bytes.NewBuffer(msgBytes), session.transportDataDictionary, session.appDataDictionary)
235+
err = ParseMessageWithDataDictionary(msg, bytes.NewBuffer(msgBytes), session.transportDataDictionary, session.appDataDictionary)
236+
if err != nil {
237+
session.log.OnEventf("Resend Msg Parse Error: %v, %v", err.Error(), bytes.NewBuffer(msgBytes).String())
238+
return // We cant continue with a message that cant be parsed correctly.
239+
}
236240
msgType, _ := msg.Header.GetBytes(tagMsgType)
237241
sentMessageSeqNum, _ := msg.Header.GetInt(tagMsgSeqNum)
238242

message.go

Lines changed: 224 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,19 @@ import (
2727
// Header is first section of a FIX Message.
2828
type Header struct{ FieldMap }
2929

30+
// msgparser contains message parsing vars needed to parse a string into a message.
31+
type msgParser struct {
32+
msg *Message
33+
transportDataDictionary *datadictionary.DataDictionary
34+
appDataDictionary *datadictionary.DataDictionary
35+
rawBytes []byte
36+
fieldIndex int
37+
parsedFieldBytes *TagValue
38+
trailerBytes []byte
39+
foundBody bool
40+
foundTrailer bool
41+
}
42+
3043
// in the message header, the first 3 tags in the message header must be 8,9,35.
3144
func headerFieldOrdering(i, j Tag) bool {
3245
var ordering = func(t Tag) uint32 {
@@ -152,124 +165,134 @@ func ParseMessageWithDataDictionary(
152165
msg *Message,
153166
rawMessage *bytes.Buffer,
154167
transportDataDictionary *datadictionary.DataDictionary,
155-
_ *datadictionary.DataDictionary,
168+
appDataDictionary *datadictionary.DataDictionary,
156169
) (err error) {
157-
msg.Header.Clear()
158-
msg.Body.Clear()
159-
msg.Trailer.Clear()
160-
msg.rawMessage = rawMessage
170+
// Create msgparser before we go any further.
171+
mp := &msgParser{
172+
msg: msg,
173+
transportDataDictionary: transportDataDictionary,
174+
appDataDictionary: appDataDictionary,
175+
}
176+
mp.msg.rawMessage = rawMessage
177+
mp.rawBytes = rawMessage.Bytes()
161178

162-
rawBytes := rawMessage.Bytes()
179+
return doParsing(mp)
180+
}
163181

164-
// Allocate fields in one chunk.
182+
// doParsing executes the message parsing process.
183+
func doParsing(mp *msgParser) (err error) {
184+
// Initialize for parsing.
185+
mp.msg.Header.Clear()
186+
mp.msg.Body.Clear()
187+
mp.msg.Trailer.Clear()
188+
189+
// Allocate expected message fields in one chunk.
165190
fieldCount := 0
166-
for _, b := range rawBytes {
191+
for _, b := range mp.rawBytes {
167192
if b == '\001' {
168193
fieldCount++
169194
}
170195
}
171-
172196
if fieldCount == 0 {
173-
return parseError{OrigError: fmt.Sprintf("No Fields detected in %s", string(rawBytes))}
197+
return parseError{OrigError: fmt.Sprintf("No Fields detected in %s", string(mp.rawBytes))}
174198
}
175-
176-
if cap(msg.fields) < fieldCount {
177-
msg.fields = make([]TagValue, fieldCount)
199+
if cap(mp.msg.fields) < fieldCount {
200+
mp.msg.fields = make([]TagValue, fieldCount)
178201
} else {
179-
msg.fields = msg.fields[0:fieldCount]
202+
mp.msg.fields = mp.msg.fields[0:fieldCount]
180203
}
181204

182-
fieldIndex := 0
183-
184205
// Message must start with begin string, body length, msg type.
185-
if rawBytes, err = extractSpecificField(&msg.fields[fieldIndex], tagBeginString, rawBytes); err != nil {
206+
// Get begin string.
207+
if mp.rawBytes, err = extractSpecificField(&mp.msg.fields[mp.fieldIndex], tagBeginString, mp.rawBytes); err != nil {
186208
return
187209
}
210+
mp.msg.Header.add(mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1])
188211

189-
msg.Header.add(msg.fields[fieldIndex : fieldIndex+1])
190-
fieldIndex++
191-
192-
parsedFieldBytes := &msg.fields[fieldIndex]
193-
if rawBytes, err = extractSpecificField(parsedFieldBytes, tagBodyLength, rawBytes); err != nil {
212+
// Get body length.
213+
mp.fieldIndex++
214+
mp.parsedFieldBytes = &mp.msg.fields[mp.fieldIndex]
215+
if mp.rawBytes, err = extractSpecificField(mp.parsedFieldBytes, tagBodyLength, mp.rawBytes); err != nil {
194216
return
195217
}
218+
mp.msg.Header.add(mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1])
196219

197-
msg.Header.add(msg.fields[fieldIndex : fieldIndex+1])
198-
fieldIndex++
199-
200-
parsedFieldBytes = &msg.fields[fieldIndex]
201-
if rawBytes, err = extractSpecificField(parsedFieldBytes, tagMsgType, rawBytes); err != nil {
220+
// Get msg type.
221+
mp.fieldIndex++
222+
mp.parsedFieldBytes = &mp.msg.fields[mp.fieldIndex]
223+
if mp.rawBytes, err = extractSpecificField(mp.parsedFieldBytes, tagMsgType, mp.rawBytes); err != nil {
202224
return
203225
}
226+
mp.msg.Header.add(mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1])
204227

228+
// Start parsing.
229+
mp.fieldIndex++
205230
xmlDataLen := 0
206231
xmlDataMsg := false
207-
208-
msg.Header.add(msg.fields[fieldIndex : fieldIndex+1])
209-
fieldIndex++
210-
211-
trailerBytes := []byte{}
212-
foundBody := false
213-
foundTrailer := false
232+
mp.trailerBytes = []byte{}
233+
mp.foundBody = false
234+
mp.foundTrailer = false
214235
for {
215-
parsedFieldBytes = &msg.fields[fieldIndex]
236+
mp.parsedFieldBytes = &mp.msg.fields[mp.fieldIndex]
216237
if xmlDataLen > 0 {
217-
rawBytes, err = extractXMLDataField(parsedFieldBytes, rawBytes, xmlDataLen)
238+
mp.rawBytes, err = extractXMLDataField(mp.parsedFieldBytes, mp.rawBytes, xmlDataLen)
218239
xmlDataLen = 0
219240
xmlDataMsg = true
220241
} else {
221-
rawBytes, err = extractField(parsedFieldBytes, rawBytes)
242+
mp.rawBytes, err = extractField(mp.parsedFieldBytes, mp.rawBytes)
222243
}
223244
if err != nil {
224245
return
225246
}
226247

227248
switch {
228-
case isHeaderField(parsedFieldBytes.tag, transportDataDictionary):
229-
msg.Header.add(msg.fields[fieldIndex : fieldIndex+1])
230-
case isTrailerField(parsedFieldBytes.tag, transportDataDictionary):
231-
msg.Trailer.add(msg.fields[fieldIndex : fieldIndex+1])
232-
foundTrailer = true
249+
case isHeaderField(mp.parsedFieldBytes.tag, mp.transportDataDictionary):
250+
mp.msg.Header.add(mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1])
251+
case isTrailerField(mp.parsedFieldBytes.tag, mp.transportDataDictionary):
252+
mp.msg.Trailer.add(mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1])
253+
mp.foundTrailer = true
254+
case isNumInGroupField(mp.msg, []Tag{mp.parsedFieldBytes.tag}, mp.appDataDictionary):
255+
parseGroup(mp, []Tag{mp.parsedFieldBytes.tag})
233256
default:
234-
foundBody = true
235-
trailerBytes = rawBytes
236-
msg.Body.add(msg.fields[fieldIndex : fieldIndex+1])
257+
mp.foundBody = true
258+
mp.trailerBytes = mp.rawBytes
259+
mp.msg.Body.add(mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1])
237260
}
238-
if parsedFieldBytes.tag == tagCheckSum {
261+
if mp.parsedFieldBytes.tag == tagCheckSum {
239262
break
240263
}
241264

242-
if !foundBody {
243-
msg.bodyBytes = rawBytes
265+
if !mp.foundBody {
266+
mp.msg.bodyBytes = mp.rawBytes
244267
}
245268

246-
if parsedFieldBytes.tag == tagXMLDataLen {
247-
xmlDataLen, _ = msg.Header.GetInt(tagXMLDataLen)
269+
if mp.parsedFieldBytes.tag == tagXMLDataLen {
270+
xmlDataLen, _ = mp.msg.Header.GetInt(tagXMLDataLen)
248271
}
249-
fieldIndex++
272+
mp.fieldIndex++
250273
}
251274

252275
// This will happen if there are no fields in the body
253-
if foundTrailer && !foundBody {
254-
trailerBytes = rawBytes
255-
msg.bodyBytes = nil
276+
if mp.foundTrailer && !mp.foundBody {
277+
mp.trailerBytes = mp.rawBytes
278+
mp.msg.bodyBytes = nil
256279
}
257280

258281
// Body length would only be larger than trailer if fields out of order.
259-
if len(msg.bodyBytes) > len(trailerBytes) {
260-
msg.bodyBytes = msg.bodyBytes[:len(msg.bodyBytes)-len(trailerBytes)]
282+
if len(mp.msg.bodyBytes) > len(mp.trailerBytes) {
283+
mp.msg.bodyBytes = mp.msg.bodyBytes[:len(mp.msg.bodyBytes)-len(mp.trailerBytes)]
261284
}
262285

263286
length := 0
264-
for _, field := range msg.fields {
287+
for _, field := range mp.msg.fields {
265288
switch field.tag {
266289
case tagBeginString, tagBodyLength, tagCheckSum: // Tags do not contribute to length.
267290
default:
268291
length += field.length()
269292
}
270293
}
271294

272-
bodyLength, err := msg.Header.GetInt(tagBodyLength)
295+
bodyLength, err := mp.msg.Header.GetInt(tagBodyLength)
273296
if err != nil {
274297
err = parseError{OrigError: err.Error()}
275298
} else if length != bodyLength && !xmlDataMsg {
@@ -279,6 +302,149 @@ func ParseMessageWithDataDictionary(
279302
return
280303
}
281304

305+
// parseGroup iterates through a repeating group to maintain correct order of those fields.
306+
func parseGroup(mp *msgParser, tags []Tag) {
307+
mp.foundBody = true
308+
dm := mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1]
309+
fields := getGroupFields(mp.msg, tags, mp.appDataDictionary)
310+
311+
for {
312+
mp.fieldIndex++
313+
mp.parsedFieldBytes = &mp.msg.fields[mp.fieldIndex]
314+
mp.rawBytes, _ = extractField(mp.parsedFieldBytes, mp.rawBytes)
315+
mp.trailerBytes = mp.rawBytes
316+
317+
// Is this field a member for the group.
318+
if isGroupMember(mp.parsedFieldBytes.tag, fields) {
319+
// Is this field a nested repeating group.
320+
if isNumInGroupField(mp.msg, append(tags, mp.parsedFieldBytes.tag), mp.appDataDictionary) {
321+
dm = append(dm, *mp.parsedFieldBytes)
322+
tags = append(tags, mp.parsedFieldBytes.tag)
323+
fields = getGroupFields(mp.msg, tags, mp.appDataDictionary)
324+
continue
325+
}
326+
// Add the field member to the group.
327+
dm = append(dm, *mp.parsedFieldBytes)
328+
} else if isHeaderField(mp.parsedFieldBytes.tag, mp.transportDataDictionary) {
329+
// Found a header tag for some reason..
330+
mp.msg.Body.add(dm)
331+
mp.msg.Header.add(mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1])
332+
break
333+
} else if isTrailerField(mp.parsedFieldBytes.tag, mp.transportDataDictionary) {
334+
// Found the trailer at the end of the message.
335+
mp.msg.Body.add(dm)
336+
mp.msg.Trailer.add(mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1])
337+
mp.foundTrailer = true
338+
break
339+
} else {
340+
// Found a body field outside the group.
341+
searchTags := []Tag{mp.parsedFieldBytes.tag}
342+
// Is this a new group not inside the existing group.
343+
if isNumInGroupField(mp.msg, searchTags, mp.appDataDictionary) {
344+
// Add the current repeating group.
345+
mp.msg.Body.add(dm)
346+
// Cycle again with the new group.
347+
dm = mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1]
348+
fields = getGroupFields(mp.msg, searchTags, mp.appDataDictionary)
349+
continue
350+
}
351+
if len(tags) > 1 {
352+
searchTags = tags[:len(tags)-1]
353+
}
354+
// Did this tag occur after a nested group and belongs to the parent group.
355+
if isNumInGroupField(mp.msg, searchTags, mp.appDataDictionary) {
356+
// Add the field member to the group.
357+
dm = append(dm, *mp.parsedFieldBytes)
358+
// Continue parsing the parent group.
359+
fields = getGroupFields(mp.msg, searchTags, mp.appDataDictionary)
360+
continue
361+
}
362+
// Add the repeating group.
363+
mp.msg.Body.add(dm)
364+
// Add the next body field.
365+
mp.msg.Body.add(mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1])
366+
367+
break
368+
}
369+
}
370+
}
371+
372+
// isNumInGroupField evaluates if this tag is the start of a repeating group.
373+
// tags slice will contain multiple tags if the tag in question is found while processing a group already.
374+
func isNumInGroupField(msg *Message, tags []Tag, appDataDictionary *datadictionary.DataDictionary) bool {
375+
if appDataDictionary != nil {
376+
msgt, err := msg.MsgType()
377+
if err != nil {
378+
return false
379+
}
380+
mm, ok := appDataDictionary.Messages[msgt]
381+
if ok {
382+
fields := mm.Fields
383+
for idx, tag := range tags {
384+
fd, ok := fields[int(tag)]
385+
if ok {
386+
if idx == len(tags)-1 {
387+
if len(fd.Fields) > 0 {
388+
return true
389+
}
390+
} else {
391+
// Map nested fields.
392+
newFields := make(map[int]*datadictionary.FieldDef)
393+
for _, ff := range fd.Fields {
394+
newFields[ff.Tag()] = ff
395+
}
396+
fields = newFields
397+
}
398+
}
399+
}
400+
}
401+
}
402+
return false
403+
}
404+
405+
// getGroupFields gets the relevant fields for parsing a repeating group if this tag is the start of a repeating group.
406+
// tags slice will contain multiple tags if the tag in question is found while processing a group already.
407+
func getGroupFields(msg *Message, tags []Tag, appDataDictionary *datadictionary.DataDictionary) (fields []*datadictionary.FieldDef) {
408+
if appDataDictionary != nil {
409+
msgt, err := msg.MsgType()
410+
if err != nil {
411+
return
412+
}
413+
mm, ok := appDataDictionary.Messages[msgt]
414+
if ok {
415+
fields := mm.Fields
416+
for idx, tag := range tags {
417+
fd, ok := fields[int(tag)]
418+
if ok {
419+
if idx == len(tags)-1 {
420+
if len(fd.Fields) > 0 {
421+
return fd.Fields
422+
}
423+
} else {
424+
// Map nested fields.
425+
newFields := make(map[int]*datadictionary.FieldDef)
426+
for _, ff := range fd.Fields {
427+
newFields[ff.Tag()] = ff
428+
}
429+
fields = newFields
430+
}
431+
}
432+
}
433+
}
434+
}
435+
return
436+
}
437+
438+
// isGroupMember evaluates if this tag belongs to a repeating group.
439+
func isGroupMember(tag Tag, fields []*datadictionary.FieldDef) bool {
440+
for _, f := range fields {
441+
if f.Tag() == int(tag) {
442+
return true
443+
}
444+
}
445+
return false
446+
}
447+
282448
func isHeaderField(tag Tag, dataDict *datadictionary.DataDictionary) bool {
283449
if tag.IsHeader() {
284450
return true

0 commit comments

Comments
 (0)