Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable IAM based auth to ES for AWS clients (#465) #1036

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions cmd/ingester/app/consumer/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ func TestConstructor(t *testing.T) {

// partitionConsumerWrapper wraps a Sarama partition consumer into a Sarama cluster partition consumer
type partitionConsumerWrapper struct {
topic string
partition int32
topic string
partition int32
initialOffset int64
offset int64

sarama.PartitionConsumer
}
Expand All @@ -68,11 +70,24 @@ func (s partitionConsumerWrapper) Topic() string {
return s.topic
}

func (s partitionConsumerWrapper) InitialOffset() int64 {
return s.offset
}

func (s partitionConsumerWrapper) MarkOffset(offset int64, metadata string) {
s.offset = offset
}

func (s partitionConsumerWrapper) ResetOffset(offset int64, metadata string) {
s.offset = s.initialOffset
}

func newSaramaClusterConsumer(saramaPartitionConsumer sarama.PartitionConsumer) *kmocks.Consumer {
pcha := make(chan cluster.PartitionConsumer, 1)
pcha <- &partitionConsumerWrapper{
topic: topic,
partition: partition,
offset: 0,
PartitionConsumer: saramaPartitionConsumer,
}
saramaClusterConsumer := &kmocks.Consumer{}
Expand Down Expand Up @@ -141,6 +156,7 @@ func TestSaramaConsumerWrapper_start_Messages(t *testing.T) {
partitionConsumer: &partitionConsumerWrapper{
topic: topic,
partition: partition,
offset: 0,
PartitionConsumer: &kmocks.PartitionConsumer{},
},
},
Expand Down
29 changes: 25 additions & 4 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 37 additions & 3 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ import (
"sync"
"time"

"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/aws/signer/v4"
"github.com/pkg/errors"
"github.com/sha1sum/aws_signing_client"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to consult our lawyers if embedding crypto capability creates an export issue for Jaeger binaries.

"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"
"gopkg.in/olivere/elastic.v5"
Expand All @@ -42,6 +45,12 @@ type Configuration struct {
BulkActions int
BulkFlushInterval time.Duration
IndexPrefix string
AwsIamConfig AwsIamConfiguration
}

// AwsIamConfiguration describes the AWS-specific configuration needed to connect to ElasticSearch with IAM authn
type AwsIamConfiguration struct {
Enabled bool
}

// ClientBuilder creates new es.Client
Expand All @@ -58,7 +67,13 @@ func (c *Configuration) NewClient(logger *zap.Logger, metricsFactory metrics.Fac
if len(c.Servers) < 1 {
return nil, errors.New("No servers specified")
}
rawClient, err := elastic.NewClient(c.GetConfigs()...)

clientOptionFuncs, err := c.GetConfigs()
if err != nil {
return nil, err
}

rawClient, err := elastic.NewClient(clientOptionFuncs...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -143,6 +158,9 @@ func (c *Configuration) ApplyDefaults(source *Configuration) {
if c.BulkFlushInterval == 0 {
c.BulkFlushInterval = source.BulkFlushInterval
}
if !c.AwsIamConfig.Enabled {
c.AwsIamConfig.Enabled = source.AwsIamConfig.Enabled
}
}

// GetNumShards returns number of shards from Configuration
Expand All @@ -166,10 +184,26 @@ func (c *Configuration) GetIndexPrefix() string {
}

// GetConfigs wraps the configs to feed to the ElasticSearch client init
func (c *Configuration) GetConfigs() []elastic.ClientOptionFunc {
func (c *Configuration) GetConfigs() ([]elastic.ClientOptionFunc, error) {
options := make([]elastic.ClientOptionFunc, 3)
options[0] = elastic.SetURL(c.Servers...)
options[1] = elastic.SetBasicAuth(c.Username, c.Password)
options[2] = elastic.SetSniff(c.Sniffer)
return options

// if AWS IAM is enabled, instantiate the elastic client with a signing client created via the AWS SDK
if c.AwsIamConfig.Enabled {
sess, err := session.NewSession()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is a live session needed here? Looks like you're only accessing Config from it.

if err != nil {
return nil, err
}

signer := v4.NewSigner(sess.Config.Credentials)
awsSigningClient, err := aws_signing_client.New(signer, nil, "es", *sess.Config.Region)
if err != nil {
return nil, err
}
options = append(options, elastic.SetHttpClient(awsSigningClient))
}

return options, nil
}
9 changes: 9 additions & 0 deletions plugin/storage/es/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (
suffixBulkActions = ".bulk.actions"
suffixBulkFlushInterval = ".bulk.flush-interval"
suffixIndexPrefix = ".index-prefix"
suffixAwsIamEnabled = ".aws.iam_enabled"
)

// TODO this should be moved next to config.Configuration struct (maybe ./flags package)
Expand Down Expand Up @@ -75,6 +76,9 @@ func NewOptions(primaryNamespace string, otherNamespaces ...string) *Options {
BulkWorkers: 1,
BulkActions: 1000,
BulkFlushInterval: time.Millisecond * 200,
AwsIamConfig: config.AwsIamConfiguration{
Enabled: false,
},
},
servers: "http://127.0.0.1:9200",
namespace: primaryNamespace,
Expand Down Expand Up @@ -146,6 +150,10 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {
nsConfig.namespace+suffixIndexPrefix,
nsConfig.IndexPrefix,
"Optional prefix of Jaeger indices. For example \"production\" creates \"production:jaeger-*\".")
flagSet.Bool(
nsConfig.namespace+suffixAwsIamEnabled,
nsConfig.AwsIamConfig.Enabled,
"Whether to connect to AWS ElasticSearch with IAM authentication. Requires the proper .aws/ files to be setup to connect.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should there be an option with the directory name of these aws files? Relying on the working directory seems brittle.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, agreed. I think I'm actually leaning back to preferring that clients explicitly pass in the credential values that we require, rather than implicitly loading them from a file. Furthermore, see discussion #465 (comment) as to why we can't actually directly use the session created from such file.

I can go back to accepting the appropriate credential params.

cc @jmwaniki

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before I push over theses changes, here's the commit that has the params explicitly passed in, that I'm now in favor in. Mind taking a peak at that? wesleyk@b3eb1a1

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wesleyk
Regarding why a new session was needed, the credentials are retrieved from AWS STS when creating a new session using the instance IAM role. It uses the endpoint provider credentials.
See this: https://docs.aws.amazon.com/sdk-for-go/api/aws/credentials/endpointcreds/

I have a solution I've been testing that forces the credentials to be refreshed before expiring and it's worked for 2 days without a problem. That solution involves using the ec2RoleCreds
see: https://docs.aws.amazon.com/sdk-for-go/api/aws/credentials/ec2rolecreds/
with an expiry window. We can discuss further and see if it's a good solution

Thanks!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heads up, I was out for a bit. Will take a closer look at this sometime this week!

Also, @jmwaniki feel free to push out another change if you have something locally.

}

// InitFromViper initializes Options with properties from viper
Expand All @@ -169,6 +177,7 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) {
cfg.BulkActions = v.GetInt(cfg.namespace + suffixBulkActions)
cfg.BulkFlushInterval = v.GetDuration(cfg.namespace + suffixBulkFlushInterval)
cfg.IndexPrefix = v.GetString(cfg.namespace + suffixIndexPrefix)
cfg.AwsIamConfig.Enabled = v.GetBool(cfg.namespace + suffixAwsIamEnabled)
}

// GetPrimary returns primary configuration.
Expand Down
5 changes: 4 additions & 1 deletion plugin/storage/es/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ func TestOptions(t *testing.T) {
assert.Equal(t, int64(1), primary.NumReplicas)
assert.Equal(t, 72*time.Hour, primary.MaxSpanAge)
assert.False(t, primary.Sniffer)
assert.Equal(t, false, primary.AwsIamConfig.Enabled)

aux := opts.Get("archive")
assert.Equal(t, primary.Username, aux.Username)
assert.Equal(t, primary.Password, aux.Password)
assert.Equal(t, primary.Servers, aux.Servers)
assert.Equal(t, primary.AwsIamConfig, aux.AwsIamConfig)
}

func TestOptionsWithFlags(t *testing.T) {
Expand All @@ -50,7 +52,7 @@ func TestOptionsWithFlags(t *testing.T) {
"--es.sniffer=true",
"--es.max-span-age=48h",
"--es.num-shards=20",
"--es.num-replicas=10",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this one removed?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, reverting!

"--es.aws.iam_enabled=true",
// a couple overrides
"--es.aux.server-urls=3.3.3.3,4.4.4.4",
"--es.aux.max-span-age=24h",
Expand All @@ -62,6 +64,7 @@ func TestOptionsWithFlags(t *testing.T) {
assert.Equal(t, "hello", primary.Username)
assert.Equal(t, []string{"1.1.1.1", "2.2.2.2"}, primary.Servers)
assert.Equal(t, 48*time.Hour, primary.MaxSpanAge)
assert.Equal(t, true, primary.AwsIamConfig.Enabled)
assert.True(t, primary.Sniffer)

aux := opts.Get("es.aux")
Expand Down