Skip to content

Commit ab5fd8a

Browse files
committed
Fixed parsing of extended timestamp in chunk message header type 3
1 parent 553a888 commit ab5fd8a

File tree

3 files changed

+56
-13
lines changed

3 files changed

+56
-13
lines changed

chunk_header.go

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -79,14 +79,15 @@ func encodeChunkBasicHeader(w io.Writer, mh *chunkBasicHeader) error {
7979
}
8080

8181
type chunkMessageHeader struct {
82-
timestamp uint32 // fmt = 0
83-
timestampDelta uint32 // fmt = 1 | 2
84-
messageLength uint32 // fmt = 0 | 1
85-
messageTypeID byte // fmt = 0 | 1
86-
messageStreamID uint32 // fmt = 0
82+
timestamp uint32 // fmt = 0
83+
timestampDelta uint32 // fmt = 1 | 2
84+
messageLength uint32 // fmt = 0 | 1
85+
messageTypeID byte // fmt = 0 | 1
86+
messageStreamID uint32 // fmt = 0
87+
extendedTimestampMode ExtendedTimestampMode // fmt = 0 | 1 | 2
8788
}
8889

89-
func decodeChunkMessageHeader(r io.Reader, fmt byte, buf []byte, mh *chunkMessageHeader) error {
90+
func decodeChunkMessageHeader(r io.Reader, fmt byte, buf []byte, mh *chunkMessageHeader, extendedTimestampMode ExtendedTimestampMode) error {
9091
if buf == nil || len(buf) < 11 {
9192
buf = make([]byte, 11)
9293
}
@@ -104,8 +105,10 @@ func decodeChunkMessageHeader(r io.Reader, fmt byte, buf []byte, mh *chunkMessag
104105
mh.messageLength = binary.BigEndian.Uint32(cache32bits)
105106
mh.messageTypeID = buf[6] // 8bits
106107
mh.messageStreamID = binary.LittleEndian.Uint32(buf[7:11]) // 32bits
108+
mh.extendedTimestampMode = ExtendedTimestampUnused
107109

108110
if mh.timestamp == 0xffffff {
111+
mh.extendedTimestampMode = ExtendedTimestampUsed
109112
_, err := io.ReadAtLeast(r, cache32bits, 4)
110113
if err != nil {
111114
return err
@@ -123,8 +126,10 @@ func decodeChunkMessageHeader(r io.Reader, fmt byte, buf []byte, mh *chunkMessag
123126
copy(cache32bits[1:], buf[3:6]) // 24bits BE
124127
mh.messageLength = binary.BigEndian.Uint32(cache32bits)
125128
mh.messageTypeID = buf[6] // 8bits
129+
mh.extendedTimestampMode = ExtendedTimestampUnused
126130

127131
if mh.timestampDelta == 0xffffff {
132+
mh.extendedTimestampMode = ExtendedTimestampDeltaUsed
128133
_, err := io.ReadAtLeast(r, cache32bits, 4)
129134
if err != nil {
130135
return err
@@ -139,8 +144,10 @@ func decodeChunkMessageHeader(r io.Reader, fmt byte, buf []byte, mh *chunkMessag
139144

140145
copy(cache32bits[1:], buf[0:3]) // 24bits BE
141146
mh.timestampDelta = binary.BigEndian.Uint32(cache32bits)
147+
mh.extendedTimestampMode = ExtendedTimestampUnused
142148

143149
if mh.timestampDelta == 0xffffff {
150+
mh.extendedTimestampMode = ExtendedTimestampDeltaUsed
144151
_, err := io.ReadAtLeast(r, cache32bits, 4)
145152
if err != nil {
146153
return err
@@ -149,7 +156,22 @@ func decodeChunkMessageHeader(r io.Reader, fmt byte, buf []byte, mh *chunkMessag
149156
}
150157

151158
case 3:
152-
// DO NOTHING
159+
// DO NOTHING unless an extended timestamp was used in preceding messages
160+
switch extendedTimestampMode {
161+
case ExtendedTimestampUsed:
162+
_, err := io.ReadAtLeast(r, cache32bits, 4)
163+
if err != nil {
164+
return err
165+
}
166+
mh.timestamp = binary.BigEndian.Uint32(cache32bits)
167+
168+
case ExtendedTimestampDeltaUsed:
169+
_, err := io.ReadAtLeast(r, cache32bits, 4)
170+
if err != nil {
171+
return err
172+
}
173+
mh.timestampDelta = binary.BigEndian.Uint32(cache32bits)
174+
}
153175

154176
default:
155177
panic("Unexpected fmt")

chunk_stream_reader.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,14 @@ import (
1111
"bytes"
1212
)
1313

14+
type ExtendedTimestampMode byte
15+
16+
const (
17+
ExtendedTimestampUnused ExtendedTimestampMode = 0
18+
ExtendedTimestampUsed ExtendedTimestampMode = 1
19+
ExtendedTimestampDeltaUsed ExtendedTimestampMode = 2
20+
)
21+
1422
type ChunkStreamReader struct {
1523
basicHeader chunkBasicHeader
1624
messageHeader chunkMessageHeader
@@ -21,6 +29,8 @@ type ChunkStreamReader struct {
2129
messageTypeID byte
2230
messageStreamID uint32
2331

32+
extendedTimestampMode ExtendedTimestampMode
33+
2434
buf bytes.Buffer
2535
completed bool
2636
}

chunk_streamer.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -200,16 +200,17 @@ func (cs *ChunkStreamer) readChunk() (*ChunkStreamReader, error) {
200200
}
201201
//cs.logger.Debugf("(READ) BasicHeader = %+v", bh)
202202

203+
reader, err := cs.prepareChunkReader(bh.chunkStreamID)
204+
if err != nil {
205+
return nil, errors.Wrapf(err, "Failed to prepare chunk reader")
206+
}
207+
203208
var mh chunkMessageHeader
204-
if err := decodeChunkMessageHeader(cs.r, bh.fmt, cs.cacheBuffer, &mh); err != nil {
209+
if err := decodeChunkMessageHeader(cs.r, bh.fmt, cs.cacheBuffer, &mh, reader.extendedTimestampMode); err != nil {
205210
return nil, err
206211
}
207212
//cs.logger.Debugf("(READ) MessageHeader = %+v", mh)
208213

209-
reader, err := cs.prepareChunkReader(bh.chunkStreamID)
210-
if err != nil {
211-
return nil, errors.Wrapf(err, "Failed to prepare chunk reader")
212-
}
213214
if reader.completed {
214215
reader.buf.Reset()
215216
reader.completed = false
@@ -225,17 +226,27 @@ func (cs *ChunkStreamer) readChunk() (*ChunkStreamReader, error) {
225226
reader.messageLength = mh.messageLength
226227
reader.messageTypeID = mh.messageTypeID
227228
reader.messageStreamID = mh.messageStreamID
229+
reader.extendedTimestampMode = mh.extendedTimestampMode
228230

229231
case 1:
230232
reader.timestampDelta = mh.timestampDelta
231233
reader.messageLength = mh.messageLength
232234
reader.messageTypeID = mh.messageTypeID
235+
reader.extendedTimestampMode = mh.extendedTimestampMode
233236

234237
case 2:
235238
reader.timestampDelta = mh.timestampDelta
239+
reader.extendedTimestampMode = mh.extendedTimestampMode
236240

237241
case 3:
238-
// DO NOTHING
242+
// DO NOTHING unless an extended timestamp was used in preceding messages
243+
switch reader.extendedTimestampMode {
244+
case ExtendedTimestampUsed:
245+
reader.timestamp = mh.timestamp
246+
247+
case ExtendedTimestampDeltaUsed:
248+
reader.timestampDelta = mh.timestampDelta
249+
}
239250

240251
default:
241252
panic("unsupported chunk") // TODO: fix

0 commit comments

Comments
 (0)