Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add AWS_IAM SASL protocol #402

Merged
merged 3 commits into from
Jun 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ KafkaJS is battle-tested and ready for production.
- Snappy and LZ4 compression through plugins
- Plain, SSL and SASL_SSL implementations
- Support for SCRAM-SHA-256 and SCRAM-SHA-512
- Support for AWS IAM authentication
- Admin client

## <a name="getting-started"></a> Getting Started
Expand Down
45 changes: 42 additions & 3 deletions docs/Configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ Refer to [TLS create secure context](https://nodejs.org/dist/latest-v8.x/docs/ap

## <a name="sasl"></a> SASL

Kafka has support for using SASL to authenticate clients. The `sasl` option can be used to configure the authentication mechanism. Currently, KafkaJS supports `PLAIN`, `SCRAM-SHA-256`, and `SCRAM-SHA-512` mechanisms.
Kafka has support for using SASL to authenticate clients. The `sasl` option can be used to configure the authentication mechanism. Currently, KafkaJS supports `PLAIN`, `SCRAM-SHA-256`, `SCRAM-SHA-512`, and `AWS-IAM` mechanisms.

### PLAIN/SCRAM Example
```javascript
new Kafka({
clientId: 'my-app',
Expand All @@ -53,7 +54,45 @@ new Kafka({
})
```

It is __highly recommended__ that you use SSL for encryption when using `PLAIN`.
### AWS IAM Example

```javascript
new Kafka({
clientId: 'my-app',
brokers: ['kafka1:9092', 'kafka2:9092'],
// authenticationTimeout: 1000,
sasl: {
mechanism: 'aws-iam',
authorizationIdentity: 'AIDAIOSFODNN7EXAMPLE', // UserId or RoleId
accessKeyId: 'AKIAIOSFODNN7EXAMPLE',
secretAccessKey: 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY',
sessionToken: 'WHArYt8i5vfQUrIU5ZbMLCbjcAiv/Eww6eL9tgQMJp6QFNEXAMPLETOKEN' // Optional
},
})
```

For more information on the basics of IAM credentials and authentication, see the
[AWS Security Credentials - Access Keys](https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys) page.

Use of this functionality requires
[STACK's Kafka AWS IAM LoginModule](https://github.com/STACK-Fintech/kafka-auth-aws-iam), or a
compatible alternative to be installed on all of the target brokers.

In the above example, the `authorizationIdentity` must be the `aws:userid` of the AWS IAM
identity. Typically, you can retrieve this value using the `aws iam get-user` or `aws iam get-role`
commands of the [AWS CLI toolkit](https://aws.amazon.com/cli/). The `aws:userid` is usually listed
as the `UserId` or `RoleId` property of the response.

You can also programmatically retrieve the `aws:userid` for currently available credentials with the
[AWS SDK's Security Token Service](https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/STS.html).

A complete breakdown can be found in the IAM User Guide's
[Reference on Policy Variables](https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_variables.html#policy-vars-infotouse).

### Use Encrypted Protocols

It is __highly recommended__ that you use SSL for encryption when using `PLAIN` or `AWS`,
otherwise credentials will be transmitted in cleartext!

## Connection Timeout

Expand Down Expand Up @@ -193,4 +232,4 @@ const kafka = new Kafka({
brokers: ['kafka1:9092', 'kafka2:9092'],
socketFactory: myCustomSocketFactory,
})
```
```
43 changes: 43 additions & 0 deletions src/broker/saslAuthenticator/awsIam.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
const awsIam = require('../../protocol/sasl/awsIam')
const { KafkaJSSASLAuthenticationError } = require('../../errors')

module.exports = class AWSIAMAuthenticator {
constructor(connection, logger, saslAuthenticate) {
this.connection = connection
this.logger = logger.namespace('SASLAWSIAMAuthenticator')
this.saslAuthenticate = saslAuthenticate
}

async authenticate() {
const { sasl } = this.connection
if (!sasl.authorizationIdentity) {
throw new KafkaJSSASLAuthenticationError('SASL AWS-IAM: Missing authorizationIdentity')
}
if (!sasl.accessKeyId) {
throw new KafkaJSSASLAuthenticationError('SASL AWS-IAM: Missing accessKeyId')
}
if (!sasl.secretAccessKey) {
throw new KafkaJSSASLAuthenticationError('SASL AWS-IAM: Missing secretAccessKey')
}
if (!sasl.sessionToken) {
sasl.sessionToken = ''
}

const request = awsIam.request(sasl)
const response = awsIam.response
const { host, port } = this.connection
const broker = `${host}:${port}`

try {
this.logger.debug('Authenticate with SASL AWS-IAM', { broker })
await this.saslAuthenticate({ request, response })
this.logger.debug('SASL AWS-IAM authentication successful', { broker })
} catch (e) {
const error = new KafkaJSSASLAuthenticationError(
`SASL AWS-IAM authentication failed: ${e.message}`
)
this.logger.error(error.message, { broker })
throw error
}
}
}
32 changes: 32 additions & 0 deletions src/broker/saslAuthenticator/awsIam.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
const { newLogger } = require('testHelpers')
const AWSIAM = require('./awsIam')

describe('Broker > SASL Authenticator > AWS-IAM', () => {
it('throws KafkaJSSASLAuthenticationError for missing authorizationIdentity', async () => {
const awsIam = new AWSIAM({ sasl: {} }, newLogger())
await expect(awsIam.authenticate()).rejects.toThrow(
'SASL AWS-IAM: Missing authorizationIdentity'
)
})

it('throws KafkaJSSASLAuthenticationError for invalid accessKeyId', async () => {
const awsIam = new AWSIAM(
{
sasl: {
authorizationIdentity: '<authorizationIdentity>',
secretAccessKey: '<secretAccessKey>',
},
},
newLogger()
)
await expect(awsIam.authenticate()).rejects.toThrow('SASL AWS-IAM: Missing accessKeyId')
})

it('throws KafkaJSSASLAuthenticationError for invalid secretAccessKey', async () => {
const awsIam = new AWSIAM(
{ sasl: { authorizationIdentity: '<authorizationIdentity>', accessKeyId: '<accessKeyId>' } },
newLogger()
)
await expect(awsIam.authenticate()).rejects.toThrow('SASL AWS-IAM: Missing secretAccessKey')
})
})
2 changes: 2 additions & 0 deletions src/broker/saslAuthenticator/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ const apiKeys = require('../../protocol/requests/apiKeys')
const PlainAuthenticator = require('./plain')
const SCRAM256Authenticator = require('./scram256')
const SCRAM512Authenticator = require('./scram512')
const AWSIAMAuthenticator = require('./awsIam')
const { KafkaJSSASLAuthenticationError } = require('../../errors')

const AUTHENTICATORS = {
PLAIN: PlainAuthenticator,
'SCRAM-SHA-256': SCRAM256Authenticator,
'SCRAM-SHA-512': SCRAM512Authenticator,
'AWS-IAM': AWSIAMAuthenticator,
}

const SUPPORTED_MECHANISMS = Object.keys(AUTHENTICATORS)
Expand Down
4 changes: 4 additions & 0 deletions src/protocol/sasl/awsIam/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
module.exports = {
request: require('./request'),
response: require('./response'),
}
11 changes: 11 additions & 0 deletions src/protocol/sasl/awsIam/request.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
const Encoder = require('../../encoder')

const US_ASCII_NULL_CHAR = '\u0000'

module.exports = ({ authorizationIdentity, accessKeyId, secretAccessKey, sessionToken = '' }) => ({
encode: async () => {
return new Encoder().writeBytes(
[authorizationIdentity, accessKeyId, secretAccessKey, sessionToken].join(US_ASCII_NULL_CHAR)
)
},
})
4 changes: 4 additions & 0 deletions src/protocol/sasl/awsIam/response.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
module.exports = {
decode: async () => true,
parse: async () => true,
}