Skip to content

Adding support for Athena #27

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jul 26, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 27 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ LambdaCron offers 4 different types of tasks:
* **Batch task**: submit AWS Batch job.
* **HTTP task**: send HTTP requests (GET & POST).

Currently LambdaCron intergrates with HTTP requests and 3 AWS services.
Currently LambdaCron integrates with HTTP requests and 3 AWS services.
It is ready be extended for other services and, in general, it is
ready to reach any service available by an API.

Expand Down Expand Up @@ -251,7 +251,7 @@ from a file or a set of tasks in a directory.
Parameters:

* **--task-file (-t)**: File that contains a task definition.
* **--task-directory (-d)**: Directory with a set of files with taqsks definitions.
* **--task-directory (-d)**: Directory with a set of files with tasks definitions.


## Tasks
Expand Down Expand Up @@ -330,7 +330,7 @@ The task definition must contain the following keys:
name: 'Enrich new stats every hour'
expression: '0 * * * *'
task:
type: 'bath'
type: 'batch'
jobName: 'enrich-stats'
jobDefinition: 'enrich-stats-definition:1'
jobQueue: 'jobs_high_priority'
Expand All @@ -350,20 +350,42 @@ The task definition must contain the following keys:
* **request**: YAML with parameters to send for the selected method using [Requests](http://docs.python-requests.org/en/master/)

``` yaml
name: 'helth check every hour'
name: 'health check every hour'
expression: '0 * * * *'
task:
type: 'http'
method: 'get'
request:
url: 'http://helthcheck.my-domain.com'
url: 'http://healthcheck.my-domain.com'
params:
service: 'lambda'
```

It is a wrapper over [Requests](http://docs.python-requests.org/en/master/).
All HTTP methods will be supported soon.

### Athena task

It executes the SQL query.
The task definition must contain the following keys:

* **type**: *athena*
* **QueryString**: The SQL query statements to be executed (string)
* **ResultConfiguration**: (map)
* **OutputLocation**: the location in S3 where query results are stored (string)

``` yaml
name: 'get high scores every fifteen minutes'
expression: '0 15 * * *'
task:
type: 'athena'
QueryString: 'SELECT Username, HighScore FROM Database.UserTable WHERE HighScore > 1000'
ResultConfiguration:
OutputLocation: 'http://scores.my-app.s3.amazonaws.com'
```

It is a wrapper for [boto3 Athena.Client.start_query_execution](https://boto3.readthedocs.io/en/latest/reference/services/athena.html#Athena.Client.start_query_execution). All parameters for the method can be set in the task definition.

## Frequency

#### Execution time
Expand Down
Binary file modified lambda-cron-diagram.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
3 changes: 1 addition & 2 deletions lambda-cron-diagram.xml
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
<?xml version="1.0" encoding="UTF-8"?>
<mxfile userAgent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36" version="6.2.4" editor="www.draw.io" type="device"><diagram name="Page-1">7VrLcqM6EP0aL02BeHqZOJOZxdxbqcpi5q4oGWRbFUCMkGM7X38lQLZlwJZjcKZmkiwMjV59TqulbmlkT9PNVwrz5T8kRskImPFmZD+MALCsicl/hGRbSTynFiwojutCe8EzfkO1UBZb4RgVSkFGSMJwrgojkmUoYooMUkrWarE5SdRec7hADcFzBJOm9AeO2bKSBq65l39DeLGUPVtm/WUGo5cFJaus7m8E7Hn5V31OoWyrLl8sYUzWByL7y8ieUkJY9ZRupigR2ErYqnqPHV9346YoYzoVQFXhFSYrJEdcjottJRYxLJZIFDdH9v2SpQl/tPgjH3ouiqSbhTACA64LYBSMUA5OCLM45OQwPoyQGwd+RXRrFHY4W0UviInajJIXNCUJobyNjGS8qfvm6GuFeHWGNgeiWpuviKSI0S0vIr8GflWltjxbcrbe8+g6lWh5QKHr1dZTW85i1/IePf5QA9gOpt0zmBFJ8xWrwMwQWxP6grOFkcB0FsNBMQwUDIHZguGkiaEPrsfQ+UMwdExbwdCSBnaIod/E0HOux9DrGUOY52GB6CuOUGEUv4rw1wqJlm8G3sTRws7uYQ77PWOHuQ+k3PBCMg/ZkpteYSwZy8OcEkYiktwQRVs2cQgjaIFxcj2MwXkYURbfiaWav0UJLAocqXB2IoBiZfFu6n+on9ni6msZRQlkfGVSdw8tStc9PBFO5h5eYE5UeOUeRzZRkBWNUF3rcA0+0xAIjhpikC4QazRUcrBTW4uWyUfSItW1OtTVpO0Chqy+GOoacv8MyR3wx1IEbkYR6IuiriEPQJHGpvkcRZwZuv1ZryHly3/ixXDl6xOimI8JUVlmg9lPWZk/HxTnb8elOy2gglPZrlbAKLuv38TD7sLImlPnmFNt43COGrKHMw6NIOBGxgGsnq3Du9CLeCrqNhjMi7hqT+CYX21D6RryAIaiEencyFD8Sc+G4l9oKP4R6vZghnLc0+S9htI15P4NxdUwFBEU4Agm3+EMJU+kwAyTjH+aEcZIqhqNLHuX4IUow0jeCEJGPJYq//iXOU6SA/l9+d8W7rytKDKKiMdGq0SYUC+pCFdd1i2/GcPIsO/QNBzQbRraoaDGVkzk+nJ9PXcZSziTLZin9ZfKSc/mBQ39vZapEdjX6++9X/8ddFcDMG6mTew2V9CDvq5G0NoyjzrnXCI+3O8SwgdzqE4Jn56LMvEMLpmdc1f8V7OzSiwLP65kLI7nLSaFb2BOS2HEJFqlJYktc9fr4rSTO7dBnTQCJd1wwotrU6cR2Eq9cVom/8+7xD4ILju7K/LqkELAD+XLHG8EJ/f1eB5EWohDcCeUBo9RnFklK3OcxYiKPCaXxpBB/lOyxX+XcF0/2QIr3zRNH4zF17EdjLlpoPE2TcYWCIw8W1xF6W5npTUd++DUs85zOrj7cdTVp7kvGcoZeRpB5t/sjC7eSdzOGXkaIeCnM7qG0g9wRhqb8MGdkaV6o5ZwejBv5H56o1PeyOmg+HfwRhqngZ/e6BpKP8AbaZxSDu6NpCJb9fUW3ugzUDvpjfwOiruD7GZSZTB39Bmpvcsd6XN6e3fka0Rqw59Zmqo/GvDM8qinKw7+O4bcfxJZXs36o6YdODvtYArf+BSD62JcMJRFOBHS8rbm47S+GRb+u7sWFkYk34Z3P57DB0z5IMJptSTu5mV3NrxzVW2YdPeCGhzlekEz9xm0XLuTsqumcB/HlhcdUe/PrgwrAJrnV60u4V0nVOC0M3HVW6TWpJl27+voUu0JOEeX2HSdiWPZ7UO+2pnw1/2F5Kr4/ta3/eV/</diagram></mxfile>
<mxfile userAgent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.115 Safari/537.36" version="6.9.5" editor="www.draw.io" type="device"><diagram name="Page-1" id="9c1861c3-233a-52e1-f412-60270fc0d40d">7VvLcuI4FP0alrhs+ckyIZ2excxUqrLonpVL2AJUsS23LALk60fCFiA/QASbpLoDC+xrWZbOufdYVxIje5puvlOYL/8hMUpGwIw3I/thBAAw/YD/CMu2tFiWY5aWBcVxZTsYnvEbqoyy2ArHqFAKMkIShnPVGJEsQxFTbJBSslaLzUmiPjWHC9QwPEcwaVp/4JgtS2vgmgf7XwgvlvLJllldmcHoZUHJKqueNwL2fPcpL6dQ1lWVL5YwJusjk/1tZE8pIaw8SjdTlAhwJWzlfY8dV/ftpihjOjeA8oZXmKyQbPGuXWwrsYhhsUSiuDmy75csTfihxQ9503NRJN0shBcYcF0Ao2CEcnBCmMUhJ4fxZoTcO/ArolujsMPZKnpBTNzNKHlBU5IQyuvISMarum+2vuoQv52hzZGp6s13RFLE6JYXkVcDv7ylcj1bcrY+8Og6pWl5RKHrVd5Tec5iX/MBPX5QAdgOpt0zmBFJ8xUrwcwQWxP6grOFkcB0FsNBMQwUDIHZguGkiaEPrsfQ+U0wdMyJgqEF3CaGfhNDz7keQ69nDGGehwWirzhChVH8KsJfKyRqHg48Cyjg+S3+14Kd3UMM+z1jh7kGUu54IZmHbMldrzCWjOVhTgkjEUkGRdFVpdDzmzCCFhgn18MYnIcRZfGdeFXzsyiBRYEjFc5OBFCsvLyb/T/un9ki9ZWNogQy/mZSRw8tna6e8EQ4mQd4QS3CbTnGkVUUZEUjVN11/A4+UxEIahUxSBeINSracbDvthYtk4+kRXbX6uiuJm0XMGT1xVBXk/tnSI6AP5YicDOKQF8UdTV5AIo0Bs3nKOLM0O3P6h2yO/lPnBiuPH1CFPM2ISrLbDD7KW/mx0fF+Vm9dKcHlHAqw9USGGX09UkU1pqYCqdOnVNt53BqFdnDOYdGEnAj5wBWz97hXaginoq6DQZTEVd9Eqjzq+0oXU0ewFE0Mp0bOYo/6dlR/Asdxa+hbg/mKPUnTd7rKF1N7t9RXA1HEUkBjmDyN5yh5IkUmGGS8UszwhhJVaeRZe8SvBBlGMkbSciIJwu7D78yx0lyZL/ffdvSnbcVRUYR8dxolQgX6mUqwq2l0b7VcA2Z9h27hgO6XUM7FdQYiom5vly/n/sZSziTNZin+y87J5XNCxr991pCI7Cv77/3/v7vobsagLHX5LtNCnror6uRtLbEUWfMJeLC/X5C+CiGqinh07EoJ57BJdE5d8W3jM5yYlnouDJjUY9bTArfwJyWwohJtEp3JLbErtfFaSd3zRkv6QTKdMMJFdemTiOxlf3G6W7y/7wk9kHw7mF3RV4uUgj4oTyZ443g5L5qz4OYFuIQ3IlOg8cozqwdK3OcxYiKeUxujSGD/GfHFv9dwnV1ZAusfNM0fTAWV8d2MOaugcbbNBlbIDDybHEVpfuRlVY49sGpZ53ndHD5cdS3T3NcMpQYeRpJ5p8sRhePJG4nRp5GCvglRtdQ+gFipDEIH1yMLFWNWtLpwdTI/VKjU2rkdFD8GdRIYzXwS42uofQD1EhjlXJwNZId2aqnt1Cjr0TtpBr5HRR3J9nNSZXB5OgrU3uXHOlzens58jUyteHXLE1VjwZcs6w96YqF/44m9z+JLLdm/VZhB86GHUzhGw8xuC7GBUNZhBNh3e3WfJxWO8PCf/fbwsKI5Nvw7sdz+IApb0Q4LV+J+7jsng3v9F99FbYntuIOVjBpeLA0KXO9PezX8ftYtrxoifqwdmVYAdBcv+pE+dyCFLhQTFx1F6k1aU6797V0qT4JOLVNbLpi4lh2e5MHEJN+N2naBmRLlMFmdD26gWs73M7Lxphz3rpJrmUo2/CQE0FX2zfSssDke4bb5PpgvSrwNDLrT7GZpBY674pC53TQBR3v1eGDTg6vLg46oO6UdvyaFr876Pjp4V8AZfHDfy3sb/8D</diagram></mxfile>
13 changes: 13 additions & 0 deletions lambda_cron/aws/lib/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ def get_http_task_runner(self, task):
def get_batch_task_runner(self, task):
return BatchJobTask(task)

def get_athena_task_runner(self, task):
return AthenaQueryTask(task)


class Task:

Expand Down Expand Up @@ -97,3 +100,13 @@ def get_batch_client(self):
def run(self):
self.task.pop('type')
self.get_batch_client().submit_job(**self.task)


class AthenaQueryTask(Task):

def get_athena_client(self):
return boto3.client('athena')

def run(self):
self.task.pop('type')
self.get_athena_client().start_query_execution(**self.task)
17 changes: 17 additions & 0 deletions lambda_cron/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,23 @@
},
"required": ["type", "jobName", "jobQueue", "jobDefinition"]
},
{
"properties": {
"type": { "type": "string", "enum": ["athena"] },
"QueryString": { "type": "string" },
"ClientRequestToken": { "type": "string" },
"QueryExecutionContext": { "type": "object" },
"ResultConfiguration": {
"type": "object",
"properties": {
"OutputLocation": { "type": "string" },
"EncryptionConfiguration": { "type": "object" }
},
"required": ["OutputLocation"]
}
},
"required": ["type", "QueryString", "ResultConfiguration"]
},
{
"properties": {
"type": { "type": "string", "enum": ["http"] },
Expand Down
14 changes: 8 additions & 6 deletions lambda_cron/template.cfn.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,10 @@ Resources:
Resource: arn:aws:logs:*:*:*
- Effect: Allow
Action:
- s3:ListBucket
Resource: !Sub arn:aws:s3:::${Bucket}
- Effect: Allow
Action:
- s3:GetObject
Resource: !Sub arn:aws:s3:::${Bucket}/tasks/*
- s3:Get*
- s3:List*
- s3:PutObject
Resource: ['*']
- Effect: Allow
Action:
- sqs:SendMessage
Expand All @@ -91,6 +89,10 @@ Resources:
Action:
- batch:SubmitJob
Resource: ['*']
- Effect: Allow
Action:
- athena:StartQueryExecution
Resource: ['*']

LambdaCronHourlyEvent:
Type: AWS::Events::Rule
Expand Down
54 changes: 53 additions & 1 deletion tests/aws/test_task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
import pytest
import json
from mock import patch
from lambda_cron.aws.lib.task_runner import TaskRunner, QueueTask, InvokeLambdaTask, HttpTask, BatchJobTask
from lambda_cron.aws.lib.task_runner import TaskRunner, QueueTask, InvokeLambdaTask, HttpTask, BatchJobTask, \
AthenaQueryTask
from lambda_cron.aws.lib.cron_checker import CronChecker


Expand Down Expand Up @@ -377,3 +378,54 @@ def test_batch_should_run_basic(get_batch_client_mock, batch_client_spy, cron_ch
assert batch_client_spy.parameters['jobQueue'] == 'testing-batch-job-queue'
assert 'parameters' in batch_client_spy.parameters
assert batch_client_spy.parameters['parameters'] == BATCH_TASK_BODY['parameters']


class AthenaClientSpy:
def __init__(self):
self.parameters = None
self.calls = 0

def start_query_execution(self, **kwargs):
self.parameters = kwargs
self.calls += 1


@pytest.fixture(scope="function")
def athena_client_spy():
return AthenaClientSpy()


ATHENA_TASK_BODY =\
{
'type': 'athena',
'QueryString': 'SELECT * FROM testing',
'ResultConfiguration':
{
'OutputLocation': 'bucketname.s3.aws.com/foo/bar/',
}
}


@pytest.fixture(scope="function")
def athena_task_definition():
return {
'name': 'Test task',
'expression': '0 11 * * *',
'task': dict(ATHENA_TASK_BODY)
}


@patch.object(AthenaQueryTask, 'get_athena_client')
def test_athena_should_run_basic(get_athena_client_mock, athena_client_spy, cron_checker, athena_task_definition):
get_athena_client_mock.return_value = athena_client_spy

task_runner = TaskRunner(cron_checker)
task_runner.run(athena_task_definition)

assert athena_client_spy.calls == 1
assert 'QueryString' in athena_client_spy.parameters
assert athena_client_spy.parameters['QueryString'] == 'SELECT * FROM testing'
assert 'ResultConfiguration' in athena_client_spy.parameters
assert type(athena_client_spy.parameters['ResultConfiguration']) == dict
assert sorted(athena_client_spy.parameters['ResultConfiguration'].keys()) == ['OutputLocation']
assert athena_client_spy.parameters['ResultConfiguration']['OutputLocation'] == 'bucketname.s3.aws.com/foo/bar/'
8 changes: 8 additions & 0 deletions tests/cli/command/test_validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ def test_validate_command_lambda_task():
validate_command.run()


def test_validate_command_athena_task():
cli_arguments = Namespace()
cli_arguments.task_file = get_test_task_path('valid/athena_task.yml')
cli_arguments.task_directory = None
validate_command = ValidateCommand(CliConfig('test'), cli_arguments)
validate_command.run()


def test_validate_command_with_error():
cli_arguments = Namespace()
cli_arguments.task_file = get_test_task_path('invalid/invalid_task.yml')
Expand Down
8 changes: 8 additions & 0 deletions tests/resources/tasks/valid/athena_task.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
name: 'test athena task'
expression: '0 11 * * *'
task:
type: 'athena'
QueryString: 'SELECT * FROM TestTable'
ResultConfiguration:
OutputLocation: 'test-bucket'