Skip to content

Commit

Permalink
Add protocol definitions for Fetch API v10
Browse files Browse the repository at this point in the history
This version is identical to v9, but allows for the compression type to be 'zstd', and allows
a new error code to be returned when fetching. We already had earlier code to inject a zstd
codec through the regular codec extension mechanism, this commit now simply makes it possible
to actually trigger that code.
  • Loading branch information
ankon committed Jun 26, 2020
1 parent 13d65d6 commit b9effb0
Show file tree
Hide file tree
Showing 7 changed files with 219 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/protocol/requests/fetch/fixtures/v10_request.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"type":"Buffer","data":[255,255,255,255,0,0,0,100,0,0,0,1,0,160,0,0,1,0,0,0,0,255,255,255,255,0,0,0,1,0,73,116,101,115,116,45,116,111,112,105,99,45,50,48,55,55,98,57,100,50,98,51,54,99,52,48,56,50,101,53,57,52,45,52,48,50,48,45,98,53,97,53,50,98,50,55,45,53,54,100,102,45,52,98,56,55,45,56,48,48,100,45,56,50,99,49,99,102,50,54,51,49,55,100,0,0,0,1,0,0,0,0,255,255,255,255,0,0,0,0,0,0,0,0,255,255,255,255,255,255,255,255,0,16,0,0,0,0,0,0]}
1 change: 1 addition & 0 deletions src/protocol/requests/fetch/fixtures/v10_response.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"type":"Buffer","data":[0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,73,116,101,115,116,45,116,111,112,105,99,45,50,48,55,55,98,57,100,50,98,51,54,99,52,48,56,50,101,53,57,52,45,52,48,50,48,45,98,53,97,53,50,98,50,55,45,53,54,100,102,45,52,98,56,55,45,56,48,48,100,45,56,50,99,49,99,102,50,54,51,49,55,100,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,3,0,0,0,0,0,0,0,3,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,217,0,0,0,0,0,0,0,0,0,0,0,205,0,0,0,0,2,214,84,85,104,0,0,0,0,0,2,0,0,1,95,136,193,114,169,0,0,1,95,136,193,114,169,255,255,255,255,255,255,255,255,0,0,0,0,0,0,0,0,0,3,102,0,0,0,10,107,101,121,45,48,24,115,111,109,101,45,118,97,108,117,101,45,48,2,24,104,101,97,100,101,114,45,107,101,121,45,48,28,104,101,97,100,101,114,45,118,97,108,117,101,45,48,102,0,0,2,10,107,101,121,45,49,24,115,111,109,101,45,118,97,108,117,101,45,49,2,24,104,101,97,100,101,114,45,107,101,121,45,49,28,104,101,97,100,101,114,45,118,97,108,117,101,45,49,102,0,0,4,10,107,101,121,45,50,24,115,111,109,101,45,118,97,108,117,101,45,50,2,24,104,101,97,100,101,114,45,107,101,121,45,50,28,104,101,97,100,101,114,45,118,97,108,117,101,45,50]}
29 changes: 29 additions & 0 deletions src/protocol/requests/fetch/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,35 @@ const versions = {
requestTimeout: requestTimeout(maxWaitTime),
}
},
10: ({
replicaId = REPLICA_ID,
isolationLevel = ISOLATION_LEVEL.READ_COMMITTED,
sessionId = 0,
sessionEpoch = -1,
forgottenTopics = [],
maxWaitTime,
minBytes,
maxBytes,
topics,
}) => {
const request = require('./v10/request')
const response = require('./v10/response')
return {
request: request({
replicaId,
isolationLevel,
sessionId,
sessionEpoch,
forgottenTopics,
maxWaitTime,
minBytes,
maxBytes,
topics,
}),
response,
requestTimeout: requestTimeout(maxWaitTime),
}
},
}

module.exports = {
Expand Down
55 changes: 55 additions & 0 deletions src/protocol/requests/fetch/v10/request.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
const ISOLATION_LEVEL = require('../../../isolationLevel')
const requestV9 = require('../v9/request')

/**
* ZStd Compression
* @see https://cwiki.apache.org/confluence/display/KAFKA/KIP-110%3A+Add+Codec+for+ZStandard+Compression
*/

/**
* Fetch Request (Version: 10) => replica_id max_wait_time min_bytes max_bytes isolation_level session_id session_epoch [topics] [forgotten_topics_data]
* replica_id => INT32
* max_wait_time => INT32
* min_bytes => INT32
* max_bytes => INT32
* isolation_level => INT8
* session_id => INT32
* session_epoch => INT32
* topics => topic [partitions]
* topic => STRING
* partitions => partition current_leader_epoch fetch_offset log_start_offset partition_max_bytes
* partition => INT32
* current_leader_epoch => INT32
* fetch_offset => INT64
* log_start_offset => INT64
* partition_max_bytes => INT32
* forgotten_topics_data => topic [partitions]
* topic => STRING
* partitions => INT32
*/

module.exports = ({
replicaId,
maxWaitTime,
minBytes,
maxBytes,
topics,
isolationLevel = ISOLATION_LEVEL.READ_COMMITTED,
sessionId = 0,
sessionEpoch = -1,
forgottenTopics = [], // Topics to remove from the fetch session
}) =>
Object.assign(
requestV9({
replicaId,
maxWaitTime,
minBytes,
maxBytes,
topics,
isolationLevel,
sessionId,
sessionEpoch,
forgottenTopics,
}),
{ apiVersion: 10 }
)
28 changes: 28 additions & 0 deletions src/protocol/requests/fetch/v10/request.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
const RequestV10Protocol = require('./request')

describe('Protocol > Requests > Fetch > v10', () => {
test('request', async () => {
const minBytes = 1
const maxBytes = 10485760 // 10MB
const maxWaitTime = 100
const maxBytesPerPartition = 1048576 // 1MB
const topics = [
{
topic: 'test-topic-2077b9d2b36c4082e594-4020-b5a52b27-56df-4b87-800d-82c1cf26317d',
partitions: [
{ partition: 0, currentLeaderEpoch: -1, fetchOffset: 0, maxBytes: maxBytesPerPartition },
],
},
]

const { buffer } = await RequestV10Protocol({
replicaId: -1,
maxWaitTime,
minBytes,
maxBytes,
topics,
}).encode()

expect(buffer).toEqual(Buffer.from(require('../fixtures/v10_request.json')))
})
})
26 changes: 26 additions & 0 deletions src/protocol/requests/fetch/v10/response.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
const { decode, parse } = require('../v9/response')

/**
* Fetch Response (Version: 10) => throttle_time_ms error_code session_id [responses]
* throttle_time_ms => INT32
* error_code => INT16
* session_id => INT32
* responses => topic [partition_responses]
* topic => STRING
* partition_responses => partition_header record_set
* partition_header => partition error_code high_watermark last_stable_offset log_start_offset [aborted_transactions]
* partition => INT32
* error_code => INT16
* high_watermark => INT64
* last_stable_offset => INT64
* log_start_offset => INT64
* aborted_transactions => producer_id first_offset
* producer_id => INT64
* first_offset => INT64
* record_set => RECORDS
*/

module.exports = {
decode,
parse,
}
79 changes: 79 additions & 0 deletions src/protocol/requests/fetch/v10/response.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
const { decode, parse } = require('./response')

describe('Protocol > Requests > Fetch > v10', () => {
const batchContext = {
firstOffset: expect.any(String),
firstSequence: expect.any(Number),
firstTimestamp: expect.any(String),
inTransaction: expect.any(Boolean),
isControlBatch: expect.any(Boolean),
lastOffsetDelta: expect.any(Number),
magicByte: expect.any(Number),
maxTimestamp: expect.any(String),
partitionLeaderEpoch: expect.any(Number),
producerEpoch: expect.any(Number),
producerId: expect.any(String),
}

test('response', async () => {
const data = await decode(Buffer.from(require('../fixtures/v10_response.json')))
expect(data).toEqual({
throttleTime: 0,
clientSideThrottleTime: 0,
errorCode: 0,
sessionId: 0,
responses: [
{
topicName: 'test-topic-2077b9d2b36c4082e594-4020-b5a52b27-56df-4b87-800d-82c1cf26317d',
partitions: [
{
partition: 0,
errorCode: 0,
highWatermark: '3',
abortedTransactions: [],
lastStableOffset: '3',
lastStartOffset: '0',
messages: [
{
offset: '0',
magicByte: 2,
attributes: 0,
batchContext,
timestamp: '1509827900073',
headers: { 'header-key-0': Buffer.from('header-value-0') },
key: Buffer.from('key-0'),
value: Buffer.from('some-value-0'),
isControlRecord: false,
},
{
offset: '1',
magicByte: 2,
attributes: 0,
batchContext,
timestamp: '1509827900073',
headers: { 'header-key-1': Buffer.from('header-value-1') },
key: Buffer.from('key-1'),
value: Buffer.from('some-value-1'),
isControlRecord: false,
},
{
offset: '2',
magicByte: 2,
attributes: 0,
batchContext,
timestamp: '1509827900073',
headers: { 'header-key-2': Buffer.from('header-value-2') },
key: Buffer.from('key-2'),
value: Buffer.from('some-value-2'),
isControlRecord: false,
},
],
},
],
},
],
})

await expect(parse(data)).resolves.toBeTruthy()
})
})

0 comments on commit b9effb0

Please sign in to comment.