Skip to content

Add new custom resource: elasticsearch.IngestPipeline #46

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ the custom resource. The code corresponding to a class `Resource` in the module
Building
--------

The build script gathers all custom resources in a single (generated)
The build script (`build.py`) gathers all custom resources in a single (generated)
CloudFormation template. Each resource inside `lambda_code` is zipped.
The following (relative) paths are treated specially:

Expand Down
65 changes: 56 additions & 9 deletions build.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
from pip._internal import main as pipmain # pip 10

import troposphere
from troposphere import Template, awslambda, logs, Sub, Output, Export, GetAtt, constants
from troposphere import Template, awslambda, logs, Sub, Output, Export, GetAtt, constants, Ref, Not, Equals, Join, ec2
from custom_resources.LambdaBackedCustomResource import LambdaBackedCustomResource

parser = argparse.ArgumentParser(description='Build custom resources CloudForamtion template')
parser = argparse.ArgumentParser(description='Build custom resources CloudFormation template')
parser.add_argument('--class-dir', help='Where to look for the CustomResource classes',
default='custom_resources')
parser.add_argument('--lambda-dir', help='Where to look for defined Lambda functions',
Expand Down Expand Up @@ -55,6 +55,18 @@
template.set_parameter_label(s3_path, "S3 path")
template.add_parameter_to_group(s3_path, lambda_code_location)

vpc_subnets = template.add_parameter(troposphere.Parameter(
"VpcSubnets",
# Type cannot be a list of subnets ids if we want them to also support being empty
Type=constants.COMMA_DELIMITED_LIST,
Default="",
Description="(optional) VPC subnets for Custom Resources that run attached to a VPC"
))
template.set_parameter_label(vpc_subnets, "VPC Subnets")
vpc_settings = template.add_parameter_to_group(vpc_subnets, "VPC Settings")

has_vpc_subnets = template.add_condition("HasVpcSubnets", Not(Equals(Join("", Ref(vpc_subnets)), "")))


def rec_split_path(path: str) -> typing.List[str]:
"""
Expand Down Expand Up @@ -248,9 +260,38 @@ def create_zip_file(custom_resource: CustomResource, output_dir: str):

zip_filename = create_zip_file(custom_resource, args.output_dir)

function_settings = custom_resource.troposphere_class.function_settings()
needs_vpc = False
created_aws_objects: list[troposphere.BaseAWSObject] = []
if "VpcConfig" in function_settings:
needs_vpc = True
security_group = template.add_resource(ec2.SecurityGroup(
"{custom_resource_name}SecurityGroup".format(custom_resource_name=custom_resource_name_cfn),
GroupDescription="Security Group for the {custom_resource_name} custom resource".format(
custom_resource_name='.'.join(custom_resource.name)
),
))
created_aws_objects.append(security_group)
created_aws_objects.append(template.add_output(Output(
"{custom_resource_name}SecurityGroup".format(custom_resource_name=custom_resource_name_cfn),
Value=Ref(security_group),
Description="Security Group used by the {custom_resource_name} custom resource".format(
custom_resource_name='.'.join(custom_resource.name)
),
Export=Export(Sub("${{AWS::StackName}}-{custom_resource_name}SecurityGroup".format(
custom_resource_name=custom_resource_name_cfn,
))),
)))

function_settings["VpcConfig"] = awslambda.VPCConfig(
SecurityGroupIds=[Ref(security_group)],
SubnetIds=Ref(vpc_subnets)
)

role = template.add_resource(custom_resource.troposphere_class.lambda_role(
"{custom_resource_name}Role".format(custom_resource_name=custom_resource_name_cfn),
))
created_aws_objects.append(role)
awslambdafunction = template.add_resource(awslambda.Function(
"{custom_resource_name}Function".format(custom_resource_name=custom_resource_name_cfn),
Code=awslambda.Code(
Expand All @@ -259,14 +300,15 @@ def create_zip_file(custom_resource: CustomResource, output_dir: str):
zip_filename]),
),
Role=GetAtt(role, 'Arn'),
**custom_resource.troposphere_class.function_settings()
**function_settings
))
template.add_resource(logs.LogGroup(
created_aws_objects.append(awslambdafunction)
created_aws_objects.append(template.add_resource(logs.LogGroup(
"{custom_resource_name}Logs".format(custom_resource_name=custom_resource_name_cfn),
LogGroupName=troposphere.Join('', ["/aws/lambda/", troposphere.Ref(awslambdafunction)]),
RetentionInDays=90,
))
template.add_output(Output(
)))
created_aws_objects.append(template.add_output(Output(
"{custom_resource_name}ServiceToken".format(custom_resource_name=custom_resource_name_cfn),
Value=GetAtt(awslambdafunction, 'Arn'),
Description="ServiceToken for the {custom_resource_name} custom resource".format(
Expand All @@ -275,8 +317,8 @@ def create_zip_file(custom_resource: CustomResource, output_dir: str):
Export=Export(Sub("${{AWS::StackName}}-{custom_resource_name}ServiceToken".format(
custom_resource_name=custom_resource_name_cfn
)))
))
template.add_output(Output(
)))
created_aws_objects.append(template.add_output(Output(
"{custom_resource_name}Role".format(custom_resource_name=custom_resource_name_cfn),
Value=GetAtt(role, 'Arn'),
Description="Role used by the {custom_resource_name} custom resource".format(
Expand All @@ -285,7 +327,12 @@ def create_zip_file(custom_resource: CustomResource, output_dir: str):
Export=Export(Sub("${{AWS::StackName}}-{custom_resource_name}Role".format(
custom_resource_name=custom_resource_name_cfn,
))),
))
)))
if needs_vpc:
for aws_object in created_aws_objects:
if aws_object.resource.get('Condition'):
raise ValueError("Can't handle multiple conditions")
aws_object.Condition = has_vpc_subnets

with open(os.path.join(args.output_dir, 'cfn.json'), 'w') as f:
f.write(template.to_json())
36 changes: 36 additions & 0 deletions custom_resources/elasticsearch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""Custom resources related to Elasticsearch."""
from six import string_types
from .LambdaBackedCustomResource import LambdaBackedCustomResource


class IngestPipelineViaVpc(LambdaBackedCustomResource):
props = {
'EsHost': (string_types, True),
'PipelineName': (string_types, True),
'IngestDocument': (dict, True),
}

@classmethod
def _lambda_policy(cls):
return {
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Action": [
"es:ESHttpPut",
"es:ESHttpDelete",
],
"Resource": "*",
}],
}

@classmethod
def _update_lambda_settings(cls, settings):
"""
Update the default settings for the lambda function.

:param settings: The default settings that will be used
:return: updated settings
"""
settings['VpcConfig'] = {} # build.py adds the config if the key is present
return settings
63 changes: 63 additions & 0 deletions lambda_code/elasticsearch/IngestPipelineViaVpc/index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
"""
Custom resource to create an ingest pipeline in your AWS Elasticsearch Cluster.
"""

import json
import os

from cfn_custom_resource import CloudFormationCustomResource
from _metadata import CUSTOM_RESOURCE_NAME

from elasticsearch import Elasticsearch, RequestsHttpConnection, ElasticsearchException
from requests_aws4auth import AWS4Auth

REGION = os.environ['AWS_REGION']

class IngestPipelineViaVpc(CloudFormationCustomResource):
RESOURCE_TYPE_SPEC = CUSTOM_RESOURCE_NAME

def validate(self):
self.es_host = self.resource_properties['EsHost']
self.pipeline_name = self.resource_properties['PipelineName']
self.ingest_doc = self.resource_properties['IngestDocument']

def create(self):
service = 'es'
credentials = self.get_boto3_session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, REGION, service, session_token=credentials.token)

es = Elasticsearch(
hosts = [{'host': self.es_host, 'port': 443}],
http_auth = awsauth,
use_ssl = True,
verify_certs = True,
connection_class = RequestsHttpConnection
)

es.ingest.put_pipeline(id=self.pipeline_name, body=json.dumps(self.ingest_doc))
return {}

def update(self):
return self.create()

def delete(self):
service = 'es'
credentials = self.get_boto3_session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, REGION, service, session_token=credentials.token)

es = Elasticsearch(
hosts = [{'host': self.es_host, 'port': 443}],
http_auth = awsauth,
use_ssl = True,
verify_certs = True,
connection_class = RequestsHttpConnection
)

try:
es.ingest.delete_pipeline(id=self.pipeline_name)
except ElasticsearchException:
# Assume already deleted
pass


handler = IngestPipelineViaVpc.get_handler()
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
git+https://github.com/iRobotCorporation/cfn-custom-resource#egg=cfn-custom-resource
elasticsearch
requests-aws4auth