Skip to content

Commit

Permalink
Merge pull request #495 from tulios/default-throttle-time-on-apiversi…
Browse files Browse the repository at this point in the history
…ons-491

Default throttle_time_ms to 0 if missing in ApiVersions response
  • Loading branch information
tulios authored Sep 12, 2019
2 parents e7d2ce4 + 4b79b54 commit c3a38ea
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"type":"Buffer","data":[0,0,0,0,0,34,0,0,0,0,0,3,0,1,0,0,0,5,0,2,0,0,0,2,0,3,0,0,0,4,0,4,0,0,0,0,0,5,0,0,0,0,0,6,0,0,0,3,0,7,0,1,0,1,0,8,0,0,0,3,0,9,0,0,0,3,0,10,0,0,0,1,0,11,0,0,0,2,0,12,0,0,0,1,0,13,0,0,0,1,0,14,0,0,0,1,0,15,0,0,0,1,0,16,0,0,0,1,0,17,0,0,0,0,0,18,0,0,0,1,0,19,0,0,0,2,0,20,0,0,0,1,0,21,0,0,0,0,0,22,0,0,0,0,0,23,0,0,0,0,0,24,0,0,0,0,0,25,0,0,0,0,0,26,0,0,0,0,0,27,0,0,0,0,0,28,0,0,0,0,0,29,0,0,0,0,0,30,0,0,0,0,0,31,0,0,0,0,0,32,0,0,0,0,0,33,0,0,0,0]}
16 changes: 14 additions & 2 deletions src/protocol/requests/apiVersions/v1/response.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,22 @@ const decode = async rawData => {

failIfVersionNotSupported(errorCode)

const apiVersions = decoder.readArray(apiVersion)

/**
* The Java client defaults this value to 0 if not present,
* even though it is required in the protocol. This is to
* work around https://github.com/tulios/kafkajs/issues/491
*
* See:
* https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java#L23-L25
*/
const throttleTime = decoder.canReadInt32() ? decoder.readInt32() : 0

return {
errorCode,
apiVersions: decoder.readArray(apiVersion),
throttleTime: decoder.readInt32(),
apiVersions,
throttleTime,
}
}

Expand Down
47 changes: 47 additions & 0 deletions src/protocol/requests/apiVersions/v1/response.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,53 @@ describe('Protocol > Requests > ApiVersions > v1', () => {
await expect(parse(data)).resolves.toBeTruthy()
})

// https://github.com/tulios/kafkajs/issues/491
test('defaults throttle_time_ms if it is not provided in the response', async () => {
const data = await decode(
Buffer.from(require('../fixtures/v1_response_missing_throttle_time.json'))
)
expect(data).toEqual({
apiVersions: [
{ apiKey: 0, maxVersion: 3, minVersion: 0 },
{ apiKey: 1, maxVersion: 5, minVersion: 0 },
{ apiKey: 2, maxVersion: 2, minVersion: 0 },
{ apiKey: 3, maxVersion: 4, minVersion: 0 },
{ apiKey: 4, maxVersion: 0, minVersion: 0 },
{ apiKey: 5, maxVersion: 0, minVersion: 0 },
{ apiKey: 6, maxVersion: 3, minVersion: 0 },
{ apiKey: 7, maxVersion: 1, minVersion: 1 },
{ apiKey: 8, maxVersion: 3, minVersion: 0 },
{ apiKey: 9, maxVersion: 3, minVersion: 0 },
{ apiKey: 10, maxVersion: 1, minVersion: 0 },
{ apiKey: 11, maxVersion: 2, minVersion: 0 },
{ apiKey: 12, maxVersion: 1, minVersion: 0 },
{ apiKey: 13, maxVersion: 1, minVersion: 0 },
{ apiKey: 14, maxVersion: 1, minVersion: 0 },
{ apiKey: 15, maxVersion: 1, minVersion: 0 },
{ apiKey: 16, maxVersion: 1, minVersion: 0 },
{ apiKey: 17, maxVersion: 0, minVersion: 0 },
{ apiKey: 18, maxVersion: 1, minVersion: 0 },
{ apiKey: 19, maxVersion: 2, minVersion: 0 },
{ apiKey: 20, maxVersion: 1, minVersion: 0 },
{ apiKey: 21, maxVersion: 0, minVersion: 0 },
{ apiKey: 22, maxVersion: 0, minVersion: 0 },
{ apiKey: 23, maxVersion: 0, minVersion: 0 },
{ apiKey: 24, maxVersion: 0, minVersion: 0 },
{ apiKey: 25, maxVersion: 0, minVersion: 0 },
{ apiKey: 26, maxVersion: 0, minVersion: 0 },
{ apiKey: 27, maxVersion: 0, minVersion: 0 },
{ apiKey: 28, maxVersion: 0, minVersion: 0 },
{ apiKey: 29, maxVersion: 0, minVersion: 0 },
{ apiKey: 30, maxVersion: 0, minVersion: 0 },
{ apiKey: 31, maxVersion: 0, minVersion: 0 },
{ apiKey: 32, maxVersion: 0, minVersion: 0 },
{ apiKey: 33, maxVersion: 0, minVersion: 0 },
],
errorCode: 0,
throttleTime: 0,
})
})

test('throws KafkaJSProtocolError if the api is not supported', async () => {
await expect(decode(unsupportedVersionResponse())).rejects.toThrow(
/The version of API is not supported/
Expand Down

0 comments on commit c3a38ea

Please sign in to comment.