Skip to content
This repository has been archived by the owner on Jun 4, 2021. It is now read-only.

Adding ability for SQS source CRD to use annotations #1035

Merged
merged 5 commits into from
Mar 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ required = [

[[constraint]]
name = "github.com/aws/aws-sdk-go"
version = "1.15.73"
version = "1.29.21"

[[constraint]]
name = "github.com/gorilla/websocket"
Expand Down
2 changes: 1 addition & 1 deletion awssqs/cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

"knative.dev/pkg/injection/sharedmain"

awssqssource "knative.dev/eventing-contrib/awssqs/pkg/client/injection/reconciler/sources/v1alpha1/awssqssource/stub"
awssqssource "knative.dev/eventing-contrib/awssqs/pkg/reconciler"
)

func main() {
Expand Down
2 changes: 1 addition & 1 deletion awssqs/cmd/receive_adapter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func main() {
adapter := &awssqs.Adapter{
QueueURL: getRequiredEnv(envQueueURL),
SinkURI: getRequiredEnv(envSinkURI),
CredsFile: getRequiredEnv(envCredsFile),
CredsFile: os.Getenv(envCredsFile),
OnFailedPollWaitSecs: 2,
}

Expand Down
4 changes: 4 additions & 0 deletions awssqs/config/201-clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,22 @@ rules:
- eventing.knative.dev
resources:
- channels
- eventtypes
AceHack marked this conversation as resolved.
Show resolved Hide resolved
verbs:
- get
- list
- watch
# Secrets read
# Services read
- apiGroups:
- ""
resources:
- secrets
- services
verbs:
- get
- list
- watch


---
Expand Down
2 changes: 2 additions & 0 deletions awssqs/config/300-awssqssource.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ spec:
properties:
awsCredsSecret:
type: object
annotations:
type: object
queueUrl:
type: string
serviceAccountName:
Expand Down
25 changes: 17 additions & 8 deletions awssqs/pkg/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,18 @@ func (a *Adapter) Start(ctx context.Context, stopCh <-chan struct{}) error {
return err
}

sess := session.Must(session.NewSessionWithOptions(session.Options{
SharedConfigState: session.SharedConfigDisable,
Config: aws.Config{Region: &region},
SharedConfigFiles: []string{a.CredsFile},
}))
var sess *session.Session
if a.CredsFile == "" {
sess = session.Must(session.NewSessionWithOptions(session.Options{
Config: aws.Config{Region: &region},
}))
} else {
sess = session.Must(session.NewSessionWithOptions(session.Options{
SharedConfigState: session.SharedConfigDisable,
Config: aws.Config{Region: &region},
SharedConfigFiles: []string{a.CredsFile},
}))
}

q := sqs.New(sess)

Expand Down Expand Up @@ -170,17 +177,19 @@ func (a *Adapter) receiveMessage(ctx context.Context, m *sqs.Message, ack func()

func (a *Adapter) makeEvent(m *sqs.Message) (*cloudevents.Event, error) {

// TODO verify the timestamp conversion
timestamp, err := strconv.ParseInt(*m.Attributes["SentTimestamp"], 10, 64)
if err != nil {
if err == nil {
//Convert to nanoseconds as sqs SentTimestamp is millisecond
timestamp = timestamp * int64(1000000)
} else {
timestamp = time.Now().UnixNano()
}

event := cloudevents.NewEvent(cloudevents.VersionV1)
event.SetID(*m.MessageId)
event.SetType(sourcesv1alpha1.AwsSqsSourceEventType)
event.SetSource(types.ParseURIRef(a.QueueURL).String())
event.SetTime(time.Unix(timestamp, 0))
event.SetTime(time.Unix(0, timestamp))

if err := event.SetData(m); err != nil {
return nil, err
Expand Down
10 changes: 6 additions & 4 deletions awssqs/pkg/adapter/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,20 @@ import (
)

func TestPostMessage_ServeHTTP(t *testing.T) {
timestamp := "1542107977907705474"
//Millisecond unix timestamp
timestamp := "1542107977907"
testCases := map[string]struct {
sink func(http.ResponseWriter, *http.Request)
reqBody string
error bool
}{
"happy": {
sink: sinkAccepted,
reqBody: `{"Attributes":{"SentTimestamp":"1542107977907705474"},"Body":"The body","MD5OfBody":null,"MD5OfMessageAttributes":null,"MessageAttributes":null,"MessageId":"ABC01","ReceiptHandle":null}`,
reqBody: `{"Attributes":{"SentTimestamp":"1542107977907"},"Body":"The body","MD5OfBody":null,"MD5OfMessageAttributes":null,"MessageAttributes":null,"MessageId":"ABC01","ReceiptHandle":null}`,
},
"rejected": {
sink: sinkRejected,
reqBody: `{"Attributes":{"SentTimestamp":"1542107977907705474"},"Body":"The body","MD5OfBody":null,"MD5OfMessageAttributes":null,"MessageAttributes":null,"MessageId":"ABC01","ReceiptHandle":null}`,
reqBody: `{"Attributes":{"SentTimestamp":"1542107977907"},"Body":"The body","MD5OfBody":null,"MD5OfMessageAttributes":null,"MessageAttributes":null,"MessageId":"ABC01","ReceiptHandle":null}`,
error: true,
},
}
Expand Down Expand Up @@ -105,7 +106,8 @@ func TestReceiveMessage_ServeHTTP(t *testing.T) {

id := "ABC01"
body := "the body"
timestamp := "1542107977907705474"
//Millisecond unix timestamp
timestamp := "1542107977907"
m := &sqs.Message{
MessageId: &id,
Body: &body,
Expand Down
7 changes: 6 additions & 1 deletion awssqs/pkg/apis/sources/v1alpha1/aws_sqs_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,12 @@ type AwsSqsSourceSpec struct {
QueueURL string `json:"queueUrl"`

// AwsCredsSecret is the credential to use to poll the AWS SQS
AwsCredsSecret corev1.SecretKeySelector `json:"awsCredsSecret,omitempty"`
// +optional
AwsCredsSecret *corev1.SecretKeySelector `json:"awsCredsSecret,omitempty"`

// Annotations to add to the pod, mostly used for Kube2IAM role
// +optional
Annotations map[string]string `json:"annotations,omitempty"`

// Sink is a reference to an object that will resolve to a domain name to
// use as the sink. This is where events will be received.
Expand Down
13 changes: 12 additions & 1 deletion awssqs/pkg/apis/sources/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

89 changes: 52 additions & 37 deletions awssqs/pkg/reconciler/resources/receive_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,52 @@ const (
// MakeReceiveAdapter generates (but does not insert into K8s) the
// Receive Adapter Deployment for AWS SQS Sources.
func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment {
credsFile := fmt.Sprintf("%s/%s", credsMountPath, args.Source.Spec.AwsCredsSecret.Key)
credsFile := ""
if args.Source.Spec.AwsCredsSecret != nil {
credsFile = fmt.Sprintf("%s/%s", credsMountPath, args.Source.Spec.AwsCredsSecret.Key)
}

replicas := int32(1)
annotations := map[string]string{"sidecar.istio.io/inject": "true"}
for k, v := range args.Source.Spec.Annotations {
annotations[k] = v
}

envVars := []corev1.EnvVar{
{
Name: "AWS_SQS_URL",
Value: args.Source.Spec.QueueURL,
},
{
Name: "SINK_URI",
Value: args.SinkURI,
},
}

volMounts := []corev1.VolumeMount(nil)
vols := []corev1.Volume(nil)

if credsFile != "" {
envVars = append(envVars, corev1.EnvVar{Name: "AWS_APPLICATION_CREDENTIALS", Value: credsFile})
volMounts = []corev1.VolumeMount{
{
Name: credsVolume,
MountPath: credsMountPath,
},
}

vols = []corev1.Volume{
{
Name: credsVolume,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: args.Source.Spec.AwsCredsSecret.Name,
},
},
},
}
}

return &v1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Namespace: args.Source.Namespace,
Expand All @@ -62,49 +106,20 @@ func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment {
Replicas: &replicas,
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
"sidecar.istio.io/inject": "true",
},
Labels: args.Labels,
Annotations: annotations,
Labels: args.Labels,
},
Spec: corev1.PodSpec{
ServiceAccountName: args.Source.Spec.ServiceAccountName,
Containers: []corev1.Container{
{
Name: "receive-adapter",
Image: args.Image,
Env: []corev1.EnvVar{
{
Name: "AWS_APPLICATION_CREDENTIALS",
Value: credsFile,
},
{
Name: "AWS_SQS_URL",
Value: args.Source.Spec.QueueURL,
},
{
Name: "SINK_URI",
Value: args.SinkURI,
},
},
VolumeMounts: []corev1.VolumeMount{
{
Name: credsVolume,
MountPath: credsMountPath,
},
},
},
},
Volumes: []corev1.Volume{
{
Name: credsVolume,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: args.Source.Spec.AwsCredsSecret.Name,
},
},
Name: "receive-adapter",
Image: args.Image,
Env: envVars,
VolumeMounts: volMounts,
},
},
Volumes: vols,
},
},
},
Expand Down
33 changes: 27 additions & 6 deletions awssqs/samples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ commands from the root.
1. Create an [AWS SQS queue](https://aws.amazon.com/sqs/).

1. Setup
[Knative Eventing](https://github.com/knative/docs/tree/master/eventing).
[Knative Eventing](https://github.com/knative/docs/tree/master/docs/eventing).

1. The
[in-memory channel CRD](https://github.com/knative/eventing/blob/master/config/channels/in-memory-channel/README.md)
Expand All @@ -39,6 +39,8 @@ queue.

### Set up credentials

NOTE: If using Kube2IAM skip this step

Acquire
[AWS Credentials](https://docs.aws.amazon.com/general/latest/gr/aws-security-credentials.html)
for the same account. Your credentials file should look like this:
Expand All @@ -55,6 +57,15 @@ Then create a secret for the downloaded key:
kubectl -n knative-sources create secret generic awssqs-source-credentials --from-file=credentials=PATH_TO_CREDENTIALS_FILE
```

### Set up [Kube2IAM](https://github.com/jtblin/kube2iam) credentials

NOTE: If not using Kube2IAM skip this step

Replace the `AWS_IAM_ROLE` place holders in
`samples/awssqs-source-kube2iam.yaml`.

`AWS_IAM_ROLE` should be replaced with your AWS IAM role

### Deployment

Deploy the `AwsSqsSource` controller as part of eventing-source's controller.
Expand All @@ -66,7 +77,8 @@ ko apply -f awssqs/config/
Note that if the `Source` Service Account secret is in a non-default location,
you will need to update the YAML first.

Replace the place holders in `samples/awssqs-source.yaml`.
Replace the place holders in `samples/awssqs-source.yaml` or
`samples/awssqs-source-kube2iam.yaml` depending on credential type used.

- `QUEUE_URL` should be replaced with your AWS SQS queue URL.

Expand All @@ -75,11 +87,20 @@ Replace the place holders in `samples/awssqs-source.yaml`.
sed -i "s|QUEUE_URL|$QUEUE_URL|" awssqs-source.yaml
```

Now deploy `awssqs-source.yaml`.
Now deploy `awssqs-source.yaml` or `awssqs-source-kube2iam.yaml` depending on
credential type used.

```shell
ko apply -f awssqs/samples/awssqs-source.yaml
```
- Credentials File

```shell
ko apply -f awssqs/samples/awssqs-source.yaml
```

- Kube2IAM credentials

```shell
ko apply -f awssqs/samples/awssqs-source-kube2iam.yaml
```

You can use [kail](https://github.com/boz/kail/) to tail the logs of the
subscriber.
Expand Down
Loading