From b9effb0186089b37370a8641aeb25e4775382072 Mon Sep 17 00:00:00 2001 From: Andreas Kohn Date: Thu, 25 Jun 2020 22:28:24 +0200 Subject: [PATCH] Add protocol definitions for Fetch API v10 This version is identical to v9, but allows for the compression type to be 'zstd', and allows a new error code to be returned when fetching. We already had earlier code to inject a zstd codec through the regular codec extension mechanism, this commit now simply makes it possible to actually trigger that code. --- .../requests/fetch/fixtures/v10_request.json | 1 + .../requests/fetch/fixtures/v10_response.json | 1 + src/protocol/requests/fetch/index.js | 29 +++++++ src/protocol/requests/fetch/v10/request.js | 55 +++++++++++++ .../requests/fetch/v10/request.spec.js | 28 +++++++ src/protocol/requests/fetch/v10/response.js | 26 ++++++ .../requests/fetch/v10/response.spec.js | 79 +++++++++++++++++++ 7 files changed, 219 insertions(+) create mode 100644 src/protocol/requests/fetch/fixtures/v10_request.json create mode 100644 src/protocol/requests/fetch/fixtures/v10_response.json create mode 100644 src/protocol/requests/fetch/v10/request.js create mode 100644 src/protocol/requests/fetch/v10/request.spec.js create mode 100644 src/protocol/requests/fetch/v10/response.js create mode 100644 src/protocol/requests/fetch/v10/response.spec.js diff --git a/src/protocol/requests/fetch/fixtures/v10_request.json b/src/protocol/requests/fetch/fixtures/v10_request.json new file mode 100644 index 000000000..5a59531ef --- /dev/null +++ b/src/protocol/requests/fetch/fixtures/v10_request.json @@ -0,0 +1 @@ +{"type":"Buffer","data":[255,255,255,255,0,0,0,100,0,0,0,1,0,160,0,0,1,0,0,0,0,255,255,255,255,0,0,0,1,0,73,116,101,115,116,45,116,111,112,105,99,45,50,48,55,55,98,57,100,50,98,51,54,99,52,48,56,50,101,53,57,52,45,52,48,50,48,45,98,53,97,53,50,98,50,55,45,53,54,100,102,45,52,98,56,55,45,56,48,48,100,45,56,50,99,49,99,102,50,54,51,49,55,100,0,0,0,1,0,0,0,0,255,255,255,255,0,0,0,0,0,0,0,0,255,255,255,255,255,255,255,255,0,16,0,0,0,0,0,0]} \ No newline at end of file diff --git a/src/protocol/requests/fetch/fixtures/v10_response.json b/src/protocol/requests/fetch/fixtures/v10_response.json new file mode 100644 index 000000000..49b4a8ed9 --- /dev/null +++ b/src/protocol/requests/fetch/fixtures/v10_response.json @@ -0,0 +1 @@ +{"type":"Buffer","data":[0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,73,116,101,115,116,45,116,111,112,105,99,45,50,48,55,55,98,57,100,50,98,51,54,99,52,48,56,50,101,53,57,52,45,52,48,50,48,45,98,53,97,53,50,98,50,55,45,53,54,100,102,45,52,98,56,55,45,56,48,48,100,45,56,50,99,49,99,102,50,54,51,49,55,100,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,3,0,0,0,0,0,0,0,3,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,217,0,0,0,0,0,0,0,0,0,0,0,205,0,0,0,0,2,214,84,85,104,0,0,0,0,0,2,0,0,1,95,136,193,114,169,0,0,1,95,136,193,114,169,255,255,255,255,255,255,255,255,0,0,0,0,0,0,0,0,0,3,102,0,0,0,10,107,101,121,45,48,24,115,111,109,101,45,118,97,108,117,101,45,48,2,24,104,101,97,100,101,114,45,107,101,121,45,48,28,104,101,97,100,101,114,45,118,97,108,117,101,45,48,102,0,0,2,10,107,101,121,45,49,24,115,111,109,101,45,118,97,108,117,101,45,49,2,24,104,101,97,100,101,114,45,107,101,121,45,49,28,104,101,97,100,101,114,45,118,97,108,117,101,45,49,102,0,0,4,10,107,101,121,45,50,24,115,111,109,101,45,118,97,108,117,101,45,50,2,24,104,101,97,100,101,114,45,107,101,121,45,50,28,104,101,97,100,101,114,45,118,97,108,117,101,45,50]} diff --git a/src/protocol/requests/fetch/index.js b/src/protocol/requests/fetch/index.js index 912ba5b57..fefad3bbc 100644 --- a/src/protocol/requests/fetch/index.js +++ b/src/protocol/requests/fetch/index.js @@ -183,6 +183,35 @@ const versions = { requestTimeout: requestTimeout(maxWaitTime), } }, + 10: ({ + replicaId = REPLICA_ID, + isolationLevel = ISOLATION_LEVEL.READ_COMMITTED, + sessionId = 0, + sessionEpoch = -1, + forgottenTopics = [], + maxWaitTime, + minBytes, + maxBytes, + topics, + }) => { + const request = require('./v10/request') + const response = require('./v10/response') + return { + request: request({ + replicaId, + isolationLevel, + sessionId, + sessionEpoch, + forgottenTopics, + maxWaitTime, + minBytes, + maxBytes, + topics, + }), + response, + requestTimeout: requestTimeout(maxWaitTime), + } + }, } module.exports = { diff --git a/src/protocol/requests/fetch/v10/request.js b/src/protocol/requests/fetch/v10/request.js new file mode 100644 index 000000000..09c1f37b4 --- /dev/null +++ b/src/protocol/requests/fetch/v10/request.js @@ -0,0 +1,55 @@ +const ISOLATION_LEVEL = require('../../../isolationLevel') +const requestV9 = require('../v9/request') + +/** + * ZStd Compression + * @see https://cwiki.apache.org/confluence/display/KAFKA/KIP-110%3A+Add+Codec+for+ZStandard+Compression + */ + +/** + * Fetch Request (Version: 10) => replica_id max_wait_time min_bytes max_bytes isolation_level session_id session_epoch [topics] [forgotten_topics_data] + * replica_id => INT32 + * max_wait_time => INT32 + * min_bytes => INT32 + * max_bytes => INT32 + * isolation_level => INT8 + * session_id => INT32 + * session_epoch => INT32 + * topics => topic [partitions] + * topic => STRING + * partitions => partition current_leader_epoch fetch_offset log_start_offset partition_max_bytes + * partition => INT32 + * current_leader_epoch => INT32 + * fetch_offset => INT64 + * log_start_offset => INT64 + * partition_max_bytes => INT32 + * forgotten_topics_data => topic [partitions] + * topic => STRING + * partitions => INT32 + */ + +module.exports = ({ + replicaId, + maxWaitTime, + minBytes, + maxBytes, + topics, + isolationLevel = ISOLATION_LEVEL.READ_COMMITTED, + sessionId = 0, + sessionEpoch = -1, + forgottenTopics = [], // Topics to remove from the fetch session +}) => + Object.assign( + requestV9({ + replicaId, + maxWaitTime, + minBytes, + maxBytes, + topics, + isolationLevel, + sessionId, + sessionEpoch, + forgottenTopics, + }), + { apiVersion: 10 } + ) diff --git a/src/protocol/requests/fetch/v10/request.spec.js b/src/protocol/requests/fetch/v10/request.spec.js new file mode 100644 index 000000000..00ee37e4c --- /dev/null +++ b/src/protocol/requests/fetch/v10/request.spec.js @@ -0,0 +1,28 @@ +const RequestV10Protocol = require('./request') + +describe('Protocol > Requests > Fetch > v10', () => { + test('request', async () => { + const minBytes = 1 + const maxBytes = 10485760 // 10MB + const maxWaitTime = 100 + const maxBytesPerPartition = 1048576 // 1MB + const topics = [ + { + topic: 'test-topic-2077b9d2b36c4082e594-4020-b5a52b27-56df-4b87-800d-82c1cf26317d', + partitions: [ + { partition: 0, currentLeaderEpoch: -1, fetchOffset: 0, maxBytes: maxBytesPerPartition }, + ], + }, + ] + + const { buffer } = await RequestV10Protocol({ + replicaId: -1, + maxWaitTime, + minBytes, + maxBytes, + topics, + }).encode() + + expect(buffer).toEqual(Buffer.from(require('../fixtures/v10_request.json'))) + }) +}) diff --git a/src/protocol/requests/fetch/v10/response.js b/src/protocol/requests/fetch/v10/response.js new file mode 100644 index 000000000..c09a298d5 --- /dev/null +++ b/src/protocol/requests/fetch/v10/response.js @@ -0,0 +1,26 @@ +const { decode, parse } = require('../v9/response') + +/** + * Fetch Response (Version: 10) => throttle_time_ms error_code session_id [responses] + * throttle_time_ms => INT32 + * error_code => INT16 + * session_id => INT32 + * responses => topic [partition_responses] + * topic => STRING + * partition_responses => partition_header record_set + * partition_header => partition error_code high_watermark last_stable_offset log_start_offset [aborted_transactions] + * partition => INT32 + * error_code => INT16 + * high_watermark => INT64 + * last_stable_offset => INT64 + * log_start_offset => INT64 + * aborted_transactions => producer_id first_offset + * producer_id => INT64 + * first_offset => INT64 + * record_set => RECORDS + */ + +module.exports = { + decode, + parse, +} diff --git a/src/protocol/requests/fetch/v10/response.spec.js b/src/protocol/requests/fetch/v10/response.spec.js new file mode 100644 index 000000000..6f03661fb --- /dev/null +++ b/src/protocol/requests/fetch/v10/response.spec.js @@ -0,0 +1,79 @@ +const { decode, parse } = require('./response') + +describe('Protocol > Requests > Fetch > v10', () => { + const batchContext = { + firstOffset: expect.any(String), + firstSequence: expect.any(Number), + firstTimestamp: expect.any(String), + inTransaction: expect.any(Boolean), + isControlBatch: expect.any(Boolean), + lastOffsetDelta: expect.any(Number), + magicByte: expect.any(Number), + maxTimestamp: expect.any(String), + partitionLeaderEpoch: expect.any(Number), + producerEpoch: expect.any(Number), + producerId: expect.any(String), + } + + test('response', async () => { + const data = await decode(Buffer.from(require('../fixtures/v10_response.json'))) + expect(data).toEqual({ + throttleTime: 0, + clientSideThrottleTime: 0, + errorCode: 0, + sessionId: 0, + responses: [ + { + topicName: 'test-topic-2077b9d2b36c4082e594-4020-b5a52b27-56df-4b87-800d-82c1cf26317d', + partitions: [ + { + partition: 0, + errorCode: 0, + highWatermark: '3', + abortedTransactions: [], + lastStableOffset: '3', + lastStartOffset: '0', + messages: [ + { + offset: '0', + magicByte: 2, + attributes: 0, + batchContext, + timestamp: '1509827900073', + headers: { 'header-key-0': Buffer.from('header-value-0') }, + key: Buffer.from('key-0'), + value: Buffer.from('some-value-0'), + isControlRecord: false, + }, + { + offset: '1', + magicByte: 2, + attributes: 0, + batchContext, + timestamp: '1509827900073', + headers: { 'header-key-1': Buffer.from('header-value-1') }, + key: Buffer.from('key-1'), + value: Buffer.from('some-value-1'), + isControlRecord: false, + }, + { + offset: '2', + magicByte: 2, + attributes: 0, + batchContext, + timestamp: '1509827900073', + headers: { 'header-key-2': Buffer.from('header-value-2') }, + key: Buffer.from('key-2'), + value: Buffer.from('some-value-2'), + isControlRecord: false, + }, + ], + }, + ], + }, + ], + }) + + await expect(parse(data)).resolves.toBeTruthy() + }) +})