Skip to content

Commit ac4e886

Browse files
authored
feat: Add Granuile CNM Ingester TF (#365)
* feat: add mock lambda + tf * feat: allow some settings configurable + add tags * fix: prep for s3 part (blocked by IAM) * chore: remove comment * fix: update sns policy for s3 connection * fix: remove s3 bucket code
1 parent 344dc57 commit ac4e886

File tree

8 files changed

+182
-2
lines changed

8 files changed

+182
-2
lines changed

cumulus_lambda_functions/granules_cnm_ingester/__init__.py

Whitespace-only changes.
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import json
2+
from cumulus_lambda_functions.lib.lambda_logger_generator import LambdaLoggerGenerator
3+
4+
5+
def lambda_handler(event, context):
6+
"""
7+
:param event:
8+
:param context:
9+
:return:
10+
"""
11+
LambdaLoggerGenerator.remove_default_handlers()
12+
print(f'event: {event}')
13+
raise NotImplementedError('Require implementation later')
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
resource "aws_lambda_function" "granules_cnm_ingester" {
2+
filename = local.lambda_file_name
3+
source_code_hash = filebase64sha256(local.lambda_file_name)
4+
function_name = "${var.prefix}-granules_cnm_ingester"
5+
role = var.lambda_processing_role_arn
6+
handler = "cumulus_lambda_functions.granules_cnm_ingester.lambda_function.lambda_handler"
7+
runtime = "python3.9"
8+
timeout = 300
9+
reserved_concurrent_executions = var.granules_cnm_ingester__lambda_concurrency # TODO
10+
environment {
11+
variables = {
12+
LOG_LEVEL = var.log_level
13+
SNS_TOPIC_ARN = var.cnm_sns_topic_arn
14+
}
15+
}
16+
17+
vpc_config {
18+
subnet_ids = var.cumulus_lambda_subnet_ids
19+
security_group_ids = local.security_group_ids_set ? var.security_group_ids : [aws_security_group.unity_cumulus_lambda_sg[0].id]
20+
}
21+
tags = var.tags
22+
}
23+
24+
resource "aws_sns_topic" "granules_cnm_ingester" {
25+
name = "${var.prefix}-granules_cnm_ingester"
26+
tags = var.tags
27+
}
28+
29+
resource "aws_sns_topic_policy" "granules_cnm_ingester_policy" {
30+
arn = aws_sns_topic.granules_cnm_ingester.arn
31+
policy = templatefile("${path.module}/sns_policy.json", {
32+
region: var.aws_region,
33+
accountId: local.account_id,
34+
snsName: "${var.prefix}-granules_cnm_ingester",
35+
s3Glob: var.granules_cnm_ingester__s3_glob
36+
})
37+
}
38+
39+
resource "aws_sqs_queue" "dead_letter_granules_cnm_ingester" { // https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/sqs_queue
40+
// TODO how to notify admin for failed ingestion?
41+
tags = var.tags
42+
name = "${var.prefix}-dead_letter_granules_cnm_ingester"
43+
delay_seconds = 0
44+
max_message_size = 262144
45+
message_retention_seconds = 345600
46+
visibility_timeout_seconds = 300
47+
receive_wait_time_seconds = 0
48+
policy = templatefile("${path.module}/sqs_policy.json", {
49+
region: var.aws_region,
50+
roleArn: var.lambda_processing_role_arn,
51+
accountId: local.account_id,
52+
sqsName: "${var.prefix}-dead_letter_granules_cnm_ingester",
53+
})
54+
// redrive_policy = jsonencode({
55+
// deadLetterTargetArn = aws_sqs_queue.terraform_queue_deadletter.arn
56+
// maxReceiveCount = 4
57+
// })
58+
// tags = {
59+
// Environment = "production"
60+
// }
61+
}
62+
63+
resource "aws_sqs_queue" "granules_cnm_ingester" { // https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/sqs_queue
64+
name = "${var.prefix}-granules_cnm_ingester"
65+
delay_seconds = 0
66+
max_message_size = 262144
67+
message_retention_seconds = 345600
68+
visibility_timeout_seconds = var.granules_cnm_ingester__sqs_visibility_timeout_seconds // Used as cool off time in seconds. It will wait for 5 min if it fails
69+
receive_wait_time_seconds = 0
70+
policy = templatefile("${path.module}/sqs_policy.json", {
71+
region: var.aws_region,
72+
roleArn: var.lambda_processing_role_arn,
73+
accountId: local.account_id,
74+
sqsName: "${var.prefix}-granules_cnm_ingester",
75+
})
76+
redrive_policy = jsonencode({
77+
deadLetterTargetArn = aws_sqs_queue.dead_letter_granules_cnm_ingester.arn
78+
maxReceiveCount = var.granules_cnm_ingester__sqs_retried_count // How many times it will be retried.
79+
})
80+
tags = var.tags
81+
}
82+
83+
resource "aws_sns_topic_subscription" "granules_cnm_ingester_topic_subscription" { // https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/sns_topic_subscription
84+
topic_arn = aws_sns_topic.granules_cnm_ingester.arn
85+
protocol = "sqs"
86+
endpoint = aws_sqs_queue.granules_cnm_ingester.arn
87+
# filter_policy_scope = "MessageBody" // MessageAttributes. not using attributes
88+
# filter_policy = templatefile("${path.module}/ideas_api_job_results_filter_policy.json", {})
89+
}
90+
91+
resource "aws_lambda_event_source_mapping" "granules_cnm_ingester_queue_lambda_trigger" { // https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lambda_event_source_mapping#sqs
92+
event_source_arn = aws_sqs_queue.granules_cnm_ingester.arn
93+
function_name = aws_lambda_function.granules_cnm_ingester.arn
94+
batch_size = 1
95+
enabled = true
96+
}

tf-module/unity-cumulus/main.tf

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ resource "aws_ssm_parameter" "uds_api_1" {
170170
name = "/unity/unity-ds/api-gateway/integrations/${var.prefix}-uds_api_1-function-name"
171171
type = "String"
172172
value = aws_lambda_function.uds_api_1.function_name
173+
tags = var.tags
173174
}
174175

175176

@@ -178,4 +179,5 @@ resource "aws_ssm_parameter" "uds_api_1" {
178179
name = "${var.health_check_base_path}/${var.health_check_marketplace_item}/${var.health_check_component_name}/url"
179180
type = "String"
180181
value = "${var.uds_base_url}/${var.dapa_api_prefix}/collections"
182+
tags = var.tags
181183
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
{
2+
"Version": "2008-10-17",
3+
"Id": "__default_policy_ID",
4+
"Statement": [
5+
{
6+
"Effect": "Allow",
7+
"Principal": {
8+
"Service": ["s3.amazonaws.com"]
9+
},
10+
"Action": [
11+
"SNS:GetTopicAttributes",
12+
"SNS:SetTopicAttributes",
13+
"SNS:AddPermission",
14+
"SNS:RemovePermission",
15+
"SNS:DeleteTopic",
16+
"SNS:Subscribe",
17+
"SNS:ListSubscriptionsByTopic",
18+
"SNS:Publish"
19+
],
20+
"Resource": "arn:aws:sns:${region}:${accountId}:${snsName}",
21+
"Condition": {
22+
"ArnLike": {
23+
"aws:SourceArn": "arn:aws:s3:*:*:${s3Glob}"
24+
},
25+
"StringEquals": {
26+
"AWS:SourceAccount": "${accountId}"
27+
}
28+
}
29+
}
30+
]
31+
}

tf-module/unity-cumulus/sqs-sns.tf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ resource "aws_sqs_queue" "granules_to_es_queue" { // https://registry.terraform
2222
// tags = {
2323
// Environment = "production"
2424
// }
25+
tags = var.tags
2526
}
2627

2728
resource "aws_sns_topic_subscription" "report_granules_topic_subscription" { // https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/sns_topic_subscription

tf-module/unity-cumulus/sqs_policy.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
"sqs:SendMessage*"
1313
],
1414
"Principal": {
15-
"Service": "sns.amazonaws.com"
15+
"Service": ["sns.amazonaws.com", "sqs.amazonaws.com"]
1616
},
1717
"Resource": "arn:aws:sqs:${region}:${accountId}:${sqsName}"
1818
},

tf-module/unity-cumulus/variables.tf

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,4 +190,41 @@ variable "health_check_base_path" {
190190
type = string
191191
default = "/unity/healthCheck"
192192
description = "base path for healthcheck which should start with, but not end with `/`"
193-
}
193+
}
194+
195+
// << Variables for granules_cnm_ingester >>
196+
variable "granules_cnm_ingester__sqs_visibility_timeout_seconds" {
197+
type = number
198+
default = 300
199+
description = "when a lambda ends in error, how much sqs should wait till it is retried again. (in seconds). defaulted to 5 min"
200+
}
201+
202+
variable "granules_cnm_ingester__sqs_retried_count" {
203+
type = number
204+
default = 3
205+
description = "How many times it is retried before pushing it to DLQ. defaulted to 3 times"
206+
}
207+
208+
variable "granules_cnm_ingester__lambda_concurrency" {
209+
type = number
210+
default = 20
211+
description = "How many Lambdas can be executed for CNM ingester concurrently"
212+
}
213+
214+
variable "granules_cnm_ingester__bucket_notification_prefix" {
215+
type = string
216+
default = "stage_out"
217+
description = "path to the directory where catalogs.json will be written"
218+
}
219+
220+
variable "granules_cnm_ingester__s3_glob" {
221+
type = string
222+
default = "*unity*"
223+
description = "GLOB expression that has all s3 buckets connecting to SNS topic"
224+
}
225+
#variable "granules_cnm_ingester__is_deploying_bucket" {
226+
# type = bool
227+
# default = false
228+
# description = "flag to specify if deploying example bucket"
229+
#}
230+
// << Variables for granules_cnm_ingester END >>

0 commit comments

Comments
 (0)