Skip to content

Commit

Permalink
Add support to protocol fetch v4
Browse files Browse the repository at this point in the history
  • Loading branch information
tulios committed Jun 23, 2018
1 parent 5b36598 commit d102f24
Show file tree
Hide file tree
Showing 12 changed files with 382 additions and 1 deletion.
15 changes: 15 additions & 0 deletions src/protocol/decoder.js
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,21 @@ module.exports = class Decoder {
return array
}

readVarIntArray(reader) {
const length = this.readVarInt()

if (length === -1) {
return []
}

const array = []
for (let i = 0; i < length; i++) {
array.push(reader(this))
}

return array
}

async readArrayAsync(reader) {
const length = this.readInt32()

Expand Down
4 changes: 3 additions & 1 deletion src/protocol/message/decoder.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ const decodeMessage = (decoder, magicByte) => {
case 1:
return V1Decoder(decoder)
default:
throw new KafkaJSNonRetriableError(`Unsupported message version, magic byte: ${magicByte}`)
throw new KafkaJSNonRetriableError(
`Unsupported MessageSet message version, magic byte: ${magicByte}`
)
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/protocol/recordBatch/header/v0/decoder.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
module.exports = decoder => ({
key: decoder.readVarIntString(),
value: decoder.readVarIntBytes(),
})
46 changes: 46 additions & 0 deletions src/protocol/recordBatch/record/v0/decoder.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
const Long = require('long')
const HeaderDecoder = require('../../header/v0/decoder')

/**
* v0
* Record =>
* Length => Varint
* Attributes => Int8
* TimestampDelta => Varlong
* OffsetDelta => Varint
* Key => varInt|Bytes
* Value => varInt|Bytes
* Headers => [HeaderKey HeaderValue]
* HeaderKey => VarInt|String
* HeaderValue => VarInt|Bytes
*/

module.exports = (decoder, { firstOffset, firstTimestamp, magicByte }) => {
const attributes = decoder.readInt8()

const timestampDelta = decoder.readVarLong()
const timestamp = Long.fromValue(firstTimestamp)
.add(timestampDelta)
.toString()

const offsetDelta = decoder.readVarInt()
const offset = Long.fromValue(firstOffset)
.add(offsetDelta)
.toString()

const key = decoder.readVarIntBytes()
const value = decoder.readVarIntBytes()
const headers = decoder
.readVarIntArray(HeaderDecoder)
.reduce((obj, { key, value }) => ({ ...obj, [key]: value }), {})

return {
magicByte,
attributes, // Record level attributes are presently unused
timestamp,
offset,
key,
value,
headers,
}
}
91 changes: 91 additions & 0 deletions src/protocol/recordBatch/v0/decoder.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
const Decoder = require('../../decoder')
const { lookupCodecByRecordBatchAttributes } = require('../../message/compression')
const { KafkaJSPartialMessageError } = require('../../../errors')
const RecordDecoder = require('../record/v0/decoder')

const TRANSACTIONAL_FLAG_MASK = 0x10
const CONTROL_FLAG_MASK = 0x20

/**
* v0
* RecordBatch =>
* FirstOffset => int64
* Length => int32
* PartitionLeaderEpoch => int32
* Magic => int8
* CRC => int32
* Attributes => int16
* LastOffsetDelta => int32
* FirstTimestamp => int64
* MaxTimestamp => int64
* ProducerId => int64
* ProducerEpoch => int16
* FirstSequence => int32
* Records => [Record]
*/

module.exports = async decoder => {
const firstOffset = decoder.readInt64().toString()
const length = decoder.readInt32()

const remainingBytes = Buffer.byteLength(decoder.slice(length).buffer)

if (remainingBytes < length) {
throw new KafkaJSPartialMessageError(
`Tried to decode a partial record batch: remainingBytes(${remainingBytes}) < recordBatchLength(${length})`
)
}

const partitionLeaderEpoch = decoder.readInt32()

// The magic byte was read by the Fetch protocol to distinguish between
// the record batch and the legacy message set. It's not used here but
// it has to be read.
const magicByte = decoder.readInt8() // eslint-disable-line no-unused-vars

// The library is currently not performing CRC validations
const crc = decoder.readInt32() // eslint-disable-line no-unused-vars

const attributes = decoder.readInt16()
const lastOffsetDelta = decoder.readInt32()
const firstTimestamp = decoder.readInt64().toString()
const maxTimestamp = decoder.readInt64().toString()
const producerId = decoder.readInt64().toString()
const producerEpoch = decoder.readInt16()
const firstSequence = decoder.readInt32()

const inTransaction = (attributes & TRANSACTIONAL_FLAG_MASK) > 0
const isControlBatch = (attributes & CONTROL_FLAG_MASK) > 0
const codec = lookupCodecByRecordBatchAttributes(attributes)

const recordsSize = Buffer.byteLength(decoder.buffer)
let recordsDecoder = decoder.slice(recordsSize)
if (codec) {
// TODO: support compression, something like:
// const decompressedBuffer = await codec.decompress(recordsDecoder.buffer)
// recordsDecoder = new Decoder(decompressedBuffer)
}

const records = recordsDecoder.readArray(decoder => {
const recordBuffer = decoder.readVarIntBytes()
return RecordDecoder(new Decoder(recordBuffer), {
firstOffset,
firstTimestamp,
magicByte,
})
})

return {
firstOffset,
firstTimestamp,
partitionLeaderEpoch,
inTransaction,
isControlBatch,
lastOffsetDelta,
producerId,
producerEpoch,
firstSequence,
maxTimestamp,
records,
}
}
1 change: 1 addition & 0 deletions src/protocol/requests/fetch/fixtures/v4_request.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"type":"Buffer","data":[0,0,0,0,0,0,0,5,0,0,0,1,0,160,0,0,1,0,0,0,1,0,10,116,101,115,116,45,116,111,112,105,99,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,16,0,0]}
1 change: 1 addition & 0 deletions src/protocol/requests/fetch/fixtures/v4_response.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"type":"Buffer","data":[0,0,0,0,0,0,0,1,0,31,116,101,115,116,45,116,111,112,105,99,45,97,98,52,100,53,52,55,55,52,100,99,97,100,99,51,57,53,97,55,102,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,3,0,0,0,0,0,0,0,3,0,0,0,0,0,0,0,217,0,0,0,0,0,0,0,0,0,0,0,205,0,0,0,0,2,214,84,85,104,0,0,0,0,0,2,0,0,1,95,136,193,114,169,0,0,1,95,136,193,114,169,255,255,255,255,255,255,255,255,0,0,0,0,0,0,0,0,0,3,102,0,0,0,10,107,101,121,45,48,24,115,111,109,101,45,118,97,108,117,101,45,48,2,24,104,101,97,100,101,114,45,107,101,121,45,48,28,104,101,97,100,101,114,45,118,97,108,117,101,45,48,102,0,0,2,10,107,101,121,45,49,24,115,111,109,101,45,118,97,108,117,101,45,49,2,24,104,101,97,100,101,114,45,107,101,121,45,49,28,104,101,97,100,101,114,45,118,97,108,117,101,45,49,102,0,0,4,10,107,101,121,45,50,24,115,111,109,101,45,118,97,108,117,101,45,50,2,24,104,101,97,100,101,114,45,107,101,121,45,50,28,104,101,97,100,101,114,45,118,97,108,117,101,45,50]}
17 changes: 17 additions & 0 deletions src/protocol/requests/fetch/index.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
const ISOLATION_LEVEL = require('../../isolationLevel')

// For normal consumers, use -1
const REPLICA_ID = -1

Expand All @@ -22,6 +24,21 @@ const versions = {
const response = require('./v3/response')
return { request: request({ replicaId, maxWaitTime, minBytes, maxBytes, topics }), response }
},
4: ({
replicaId = REPLICA_ID,
isolationLevel = ISOLATION_LEVEL.READ_COMMITTED,
maxWaitTime,
minBytes,
maxBytes,
topics,
}) => {
const request = require('./v4/request')
const response = require('./v4/response')
return {
request: request({ replicaId, isolationLevel, maxWaitTime, minBytes, maxBytes, topics }),
response,
}
},
}

module.exports = {
Expand Down
51 changes: 51 additions & 0 deletions src/protocol/requests/fetch/v4/request.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
const Encoder = require('../../../encoder')
const { Fetch: apiKey } = require('../../apiKeys')
const ISOLATION_LEVEL = require('../../../isolationLevel')

/**
* Fetch Request (Version: 4) => replica_id max_wait_time min_bytes max_bytes isolation_level [topics]
* replica_id => INT32
* max_wait_time => INT32
* min_bytes => INT32
* max_bytes => INT32
* isolation_level => INT8
* topics => topic [partitions]
* topic => STRING
* partitions => partition fetch_offset max_bytes
* partition => INT32
* fetch_offset => INT64
* max_bytes => INT32
*/

module.exports = ({
replicaId,
maxWaitTime,
minBytes,
maxBytes,
topics,
isolationLevel = ISOLATION_LEVEL.READ_COMMITTED,
}) => ({
apiKey,
apiVersion: 4,
apiName: 'Fetch',
encode: async () => {
return new Encoder()
.writeInt32(replicaId)
.writeInt32(maxWaitTime)
.writeInt32(minBytes)
.writeInt32(maxBytes)
.writeInt8(isolationLevel)
.writeArray(topics.map(encodeTopic))
},
})

const encodeTopic = ({ topic, partitions }) => {
return new Encoder().writeString(topic).writeArray(partitions.map(encodePartition))
}

const encodePartition = ({ partition, fetchOffset, maxBytes }) => {
return new Encoder()
.writeInt32(partition)
.writeInt64(fetchOffset)
.writeInt32(maxBytes)
}
25 changes: 25 additions & 0 deletions src/protocol/requests/fetch/v4/request.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
const RequestV4Protocol = require('./request')

describe('Protocol > Requests > Fetch > v4', () => {
test('request', async () => {
const minBytes = 1
const maxBytes = 10485760 // 10MB
const maxWaitTime = 5
const maxBytesPerPartition = 1048576 // 1MB
const topics = [
{
topic: 'test-topic',
partitions: [
{
partition: 0,
fetchOffset: 0,
maxBytes: maxBytesPerPartition,
},
],
},
]

const { buffer } = await RequestV4Protocol({ maxWaitTime, minBytes, maxBytes, topics }).encode()
expect(buffer).toEqual(Buffer.from(require('../fixtures/v4_request.json')))
})
})
73 changes: 73 additions & 0 deletions src/protocol/requests/fetch/v4/response.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
const Decoder = require('../../../decoder')
const { parse: parseV1 } = require('../v1/response')
const MessageSetDecoder = require('../../../messageSet/decoder')
const RecordBatchDecoder = require('../../../recordBatch/v0/decoder')
const { MAGIC_BYTE } = require('../../../recordBatch/v0')

// the magic offset is at the same offset for all current message formats, but the 4 bytes
// between the size and the magic is dependent on the version.
const MAGIC_OFFSET = 16

/**
* Fetch Response (Version: 4) => throttle_time_ms [responses]
* throttle_time_ms => INT32
* responses => topic [partition_responses]
* topic => STRING
* partition_responses => partition_header record_set
* partition_header => partition error_code high_watermark last_stable_offset [aborted_transactions]
* partition => INT32
* error_code => INT16
* high_watermark => INT64
* last_stable_offset => INT64
* aborted_transactions => producer_id first_offset
* producer_id => INT64
* first_offset => INT64
* record_set => RECORDS
*/

const decodeMessages = async decoder => {
const messagesBuffer = decoder.readBytes()
const magicByte = messagesBuffer.slice(MAGIC_OFFSET).readInt8()

if (magicByte === MAGIC_BYTE) {
const recordBatch = await RecordBatchDecoder(new Decoder(messagesBuffer))
return recordBatch.records
}

return MessageSetDecoder(decoder)
}

const decodeAbortedTransactions = decoder => ({
producerId: decoder.readInt64().toString(),
firstOffset: decoder.readInt64().toString(),
})

const decodePartition = async decoder => ({
partition: decoder.readInt32(),
errorCode: decoder.readInt16(),
highWatermark: decoder.readInt64().toString(),
lastStableOffset: decoder.readInt64().toString(),
abortedTransactions: decoder.readArray(decodeAbortedTransactions),
messages: await decodeMessages(decoder),
})

const decodeResponse = async decoder => ({
topicName: decoder.readString(),
partitions: await decoder.readArrayAsync(decodePartition),
})

const decode = async rawData => {
const decoder = new Decoder(rawData)
const throttleTime = decoder.readInt32()
const responses = await decoder.readArrayAsync(decodeResponse)

return {
throttleTime,
responses,
}
}

module.exports = {
decode,
parse: parseV1,
}
Loading

0 comments on commit d102f24

Please sign in to comment.