From 34f285ceaf2d4bf0a1a4073ba94834b8548e22ec Mon Sep 17 00:00:00 2001 From: Tommy Brunn Date: Thu, 12 Sep 2019 15:02:21 +0200 Subject: [PATCH 1/3] Default throttle_time_ms to 0 if missing in ApiVersions response Fixes #491. --- .../v1_response_missing_throttle_time.json | 1 + .../requests/apiVersions/v1/response.js | 7 ++- .../requests/apiVersions/v1/response.spec.js | 47 +++++++++++++++++++ 3 files changed, 53 insertions(+), 2 deletions(-) create mode 100644 src/protocol/requests/apiVersions/fixtures/v1_response_missing_throttle_time.json diff --git a/src/protocol/requests/apiVersions/fixtures/v1_response_missing_throttle_time.json b/src/protocol/requests/apiVersions/fixtures/v1_response_missing_throttle_time.json new file mode 100644 index 000000000..bdd41257b --- /dev/null +++ b/src/protocol/requests/apiVersions/fixtures/v1_response_missing_throttle_time.json @@ -0,0 +1 @@ +{"type":"Buffer","data":[0,0,0,0,0,34,0,0,0,0,0,3,0,1,0,0,0,5,0,2,0,0,0,2,0,3,0,0,0,4,0,4,0,0,0,0,0,5,0,0,0,0,0,6,0,0,0,3,0,7,0,1,0,1,0,8,0,0,0,3,0,9,0,0,0,3,0,10,0,0,0,1,0,11,0,0,0,2,0,12,0,0,0,1,0,13,0,0,0,1,0,14,0,0,0,1,0,15,0,0,0,1,0,16,0,0,0,1,0,17,0,0,0,0,0,18,0,0,0,1,0,19,0,0,0,2,0,20,0,0,0,1,0,21,0,0,0,0,0,22,0,0,0,0,0,23,0,0,0,0,0,24,0,0,0,0,0,25,0,0,0,0,0,26,0,0,0,0,0,27,0,0,0,0,0,28,0,0,0,0,0,29,0,0,0,0,0,30,0,0,0,0,0,31,0,0,0,0,0,32,0,0,0,0,0,33,0,0,0,0]} \ No newline at end of file diff --git a/src/protocol/requests/apiVersions/v1/response.js b/src/protocol/requests/apiVersions/v1/response.js index c7b27d521..9e261a340 100644 --- a/src/protocol/requests/apiVersions/v1/response.js +++ b/src/protocol/requests/apiVersions/v1/response.js @@ -24,10 +24,13 @@ const decode = async rawData => { failIfVersionNotSupported(errorCode) + const apiVersions = decoder.readArray(apiVersion) + const throttleTime = decoder.canReadInt32() ? decoder.readInt32() : 0 + return { errorCode, - apiVersions: decoder.readArray(apiVersion), - throttleTime: decoder.readInt32(), + apiVersions, + throttleTime, } } diff --git a/src/protocol/requests/apiVersions/v1/response.spec.js b/src/protocol/requests/apiVersions/v1/response.spec.js index 4f16241f5..9017a9ba7 100644 --- a/src/protocol/requests/apiVersions/v1/response.spec.js +++ b/src/protocol/requests/apiVersions/v1/response.spec.js @@ -48,6 +48,53 @@ describe('Protocol > Requests > ApiVersions > v1', () => { await expect(parse(data)).resolves.toBeTruthy() }) + // https://github.com/tulios/kafkajs/issues/491 + test.only('defaults throttle_time_ms if it is not provided in the response', async () => { + const data = await decode( + Buffer.from(require('../fixtures/v1_response_missing_throttle_time.json')) + ) + expect(data).toEqual({ + apiVersions: [ + { apiKey: 0, maxVersion: 3, minVersion: 0 }, + { apiKey: 1, maxVersion: 5, minVersion: 0 }, + { apiKey: 2, maxVersion: 2, minVersion: 0 }, + { apiKey: 3, maxVersion: 4, minVersion: 0 }, + { apiKey: 4, maxVersion: 0, minVersion: 0 }, + { apiKey: 5, maxVersion: 0, minVersion: 0 }, + { apiKey: 6, maxVersion: 3, minVersion: 0 }, + { apiKey: 7, maxVersion: 1, minVersion: 1 }, + { apiKey: 8, maxVersion: 3, minVersion: 0 }, + { apiKey: 9, maxVersion: 3, minVersion: 0 }, + { apiKey: 10, maxVersion: 1, minVersion: 0 }, + { apiKey: 11, maxVersion: 2, minVersion: 0 }, + { apiKey: 12, maxVersion: 1, minVersion: 0 }, + { apiKey: 13, maxVersion: 1, minVersion: 0 }, + { apiKey: 14, maxVersion: 1, minVersion: 0 }, + { apiKey: 15, maxVersion: 1, minVersion: 0 }, + { apiKey: 16, maxVersion: 1, minVersion: 0 }, + { apiKey: 17, maxVersion: 0, minVersion: 0 }, + { apiKey: 18, maxVersion: 1, minVersion: 0 }, + { apiKey: 19, maxVersion: 2, minVersion: 0 }, + { apiKey: 20, maxVersion: 1, minVersion: 0 }, + { apiKey: 21, maxVersion: 0, minVersion: 0 }, + { apiKey: 22, maxVersion: 0, minVersion: 0 }, + { apiKey: 23, maxVersion: 0, minVersion: 0 }, + { apiKey: 24, maxVersion: 0, minVersion: 0 }, + { apiKey: 25, maxVersion: 0, minVersion: 0 }, + { apiKey: 26, maxVersion: 0, minVersion: 0 }, + { apiKey: 27, maxVersion: 0, minVersion: 0 }, + { apiKey: 28, maxVersion: 0, minVersion: 0 }, + { apiKey: 29, maxVersion: 0, minVersion: 0 }, + { apiKey: 30, maxVersion: 0, minVersion: 0 }, + { apiKey: 31, maxVersion: 0, minVersion: 0 }, + { apiKey: 32, maxVersion: 0, minVersion: 0 }, + { apiKey: 33, maxVersion: 0, minVersion: 0 }, + ], + errorCode: 0, + throttleTime: 0, + }) + }) + test('throws KafkaJSProtocolError if the api is not supported', async () => { await expect(decode(unsupportedVersionResponse())).rejects.toThrow( /The version of API is not supported/ From 621611139f41aebfa89fe6d271a5f4ef7a636585 Mon Sep 17 00:00:00 2001 From: Tommy Brunn Date: Thu, 12 Sep 2019 16:50:23 +0200 Subject: [PATCH 2/3] Remove only from test --- src/protocol/requests/apiVersions/v1/response.spec.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/protocol/requests/apiVersions/v1/response.spec.js b/src/protocol/requests/apiVersions/v1/response.spec.js index 9017a9ba7..0b7c307e6 100644 --- a/src/protocol/requests/apiVersions/v1/response.spec.js +++ b/src/protocol/requests/apiVersions/v1/response.spec.js @@ -49,7 +49,7 @@ describe('Protocol > Requests > ApiVersions > v1', () => { }) // https://github.com/tulios/kafkajs/issues/491 - test.only('defaults throttle_time_ms if it is not provided in the response', async () => { + test('defaults throttle_time_ms if it is not provided in the response', async () => { const data = await decode( Buffer.from(require('../fixtures/v1_response_missing_throttle_time.json')) ) From 4b79b54e41d5cf458452d1ea2b5f11cc6d8829d2 Mon Sep 17 00:00:00 2001 From: Tommy Brunn Date: Thu, 12 Sep 2019 16:50:28 +0200 Subject: [PATCH 3/3] Document defaulting throttle_time_ms in ApiVersions response --- src/protocol/requests/apiVersions/v1/response.js | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/protocol/requests/apiVersions/v1/response.js b/src/protocol/requests/apiVersions/v1/response.js index 9e261a340..149c2403d 100644 --- a/src/protocol/requests/apiVersions/v1/response.js +++ b/src/protocol/requests/apiVersions/v1/response.js @@ -25,6 +25,15 @@ const decode = async rawData => { failIfVersionNotSupported(errorCode) const apiVersions = decoder.readArray(apiVersion) + + /** + * The Java client defaults this value to 0 if not present, + * even though it is required in the protocol. This is to + * work around https://github.com/tulios/kafkajs/issues/491 + * + * See: + * https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java#L23-L25 + */ const throttleTime = decoder.canReadInt32() ? decoder.readInt32() : 0 return {