Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AUTHORS.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ Contributors:
* Alex Gaynor
* Branch Vincent
* Jacob Williams
* Nicolas Paris

Creator:
--------
Expand Down
4 changes: 4 additions & 0 deletions athenacli/athenaclirc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ s3_staging_dir = ''
# Name of athena workgroup that you want to use
work_group = '' # e.g. primary

# Query result reuse settings (requires Athena engine version 3)
result_reuse_enable = False
result_reuse_minutes = 60

[main]
# log_file location.
log_file = ~/.athenacli/app.log
Expand Down
25 changes: 23 additions & 2 deletions athenacli/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@

class AWSConfig(object):
def __init__(self, aws_access_key_id, aws_secret_access_key,
region, s3_staging_dir, work_group, profile, config):
region, s3_staging_dir, work_group, profile, config,
result_reuse_enable=None, result_reuse_minutes=None):
key = 'aws_profile %s' % profile
try:
_cfg = config[key]
except:
except Exception as e:
# this assumes that the profile is only known in the regular AWS config -> the boto lib will get it
# from there. This is especially important if we have some kind of additional temporary session keys for
# which the login fails if we set aws_access_key_id/aws_secret_access_key here
_cfg = defaultdict(lambda: None)
# For result reuse settings, provide explicit defaults when profile section is missing
_cfg['result_reuse_enable'] = 'False'
_cfg['result_reuse_minutes'] = '60'

self.aws_access_key_id = self.get_val(aws_access_key_id, _cfg['aws_access_key_id'])
self.aws_secret_access_key = self.get_val(aws_secret_access_key, _cfg['aws_secret_access_key'])
Expand All @@ -36,6 +40,23 @@ def __init__(self, aws_access_key_id, aws_secret_access_key,
self.work_group = self.get_val(work_group, _cfg['work_group'])
# enable connection to assume role
self.role_arn = self.get_val(_cfg.get('role_arn'))
# query result reuse settings
config_reuse_enable = _cfg.get('result_reuse_enable')
if config_reuse_enable and isinstance(config_reuse_enable, str):
config_reuse_enable = config_reuse_enable.lower() in ('true', '1', 'yes', 'on')
elif config_reuse_enable is None:
config_reuse_enable = False
self.result_reuse_enable = result_reuse_enable if result_reuse_enable is not None else config_reuse_enable

config_reuse_minutes = _cfg.get('result_reuse_minutes')
if config_reuse_minutes and isinstance(config_reuse_minutes, str):
try:
config_reuse_minutes = int(config_reuse_minutes)
except ValueError:
config_reuse_minutes = 60
elif config_reuse_minutes is None:
config_reuse_minutes = 60
self.result_reuse_minutes = self.get_val(result_reuse_minutes, config_reuse_minutes, 60)

def get_val(self, *vals):
"""Return the first True value in `vals` list, otherwise return None."""
Expand Down
17 changes: 13 additions & 4 deletions athenacli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ class AthenaCli(object):
MAX_LEN_PROMPT = 45

def __init__(self, region, aws_access_key_id, aws_secret_access_key,
s3_staging_dir, work_group, athenaclirc, profile, database):
s3_staging_dir, work_group, athenaclirc, profile, database,
result_reuse_enable=None, result_reuse_minutes=None):

config_files = [DEFAULT_CONFIG_FILE]
if os.path.exists(os.path.expanduser(athenaclirc)):
Expand All @@ -71,7 +72,8 @@ def __init__(self, region, aws_access_key_id, aws_secret_access_key,
self.init_logging(_cfg['main']['log_file'], _cfg['main']['log_level'])

aws_config = AWSConfig(
aws_access_key_id, aws_secret_access_key, region, s3_staging_dir, work_group, profile, _cfg
aws_access_key_id, aws_secret_access_key, region, s3_staging_dir, work_group, profile, _cfg,
result_reuse_enable, result_reuse_minutes
)

try:
Expand Down Expand Up @@ -200,7 +202,9 @@ def connect(self, aws_config, database):
s3_staging_dir = aws_config.s3_staging_dir,
work_group = aws_config.work_group,
role_arn = aws_config.role_arn,
database = database
database = database,
result_reuse_enable = aws_config.result_reuse_enable,
result_reuse_minutes = aws_config.result_reuse_minutes
)

def handle_editor_command(self, text):
Expand Down Expand Up @@ -616,10 +620,13 @@ def is_mutating(status):
@click.option('--work_group', type=str, help="Amazon Athena workgroup in which query is run, default is primary")
@click.option('--athenaclirc', default=ATHENACLIRC, type=click.Path(dir_okay=False), help="Location of athenaclirc file.")
@click.option('--profile', type=str, default='default', help='AWS profile')
@click.option('--result-reuse-enable', default=None, type=bool, help='Enable query result reuse (requires Athena engine version 3)')
@click.option('--result-reuse-minutes', type=int, help='TTL for query result reuse in minutes (default: 60)')
@click.option('--table-format', type=str, default='csv', help='Table format used with -e option.')
@click.argument('database', default='default', nargs=1)
def cli(execute, region, aws_access_key_id, aws_secret_access_key,
s3_staging_dir, work_group, athenaclirc, profile, table_format, database):
s3_staging_dir, work_group, athenaclirc, profile, result_reuse_enable,
result_reuse_minutes, table_format, database):
'''A Athena terminal client with auto-completion and syntax highlighting.

\b
Expand Down Expand Up @@ -651,6 +658,8 @@ def cli(execute, region, aws_access_key_id, aws_secret_access_key,
work_group=work_group,
athenaclirc=athenaclirc,
profile=profile,
result_reuse_enable=result_reuse_enable,
result_reuse_minutes=result_reuse_minutes,
database=database
)

Expand Down
37 changes: 25 additions & 12 deletions athenacli/sqlexecute.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ def __init__(
s3_staging_dir,
work_group,
role_arn,
database
database,
result_reuse_enable=False,
result_reuse_minutes=60
):
# Handle database parameter that may contain catalog.database format
if database and '.' in database:
Expand All @@ -42,6 +44,8 @@ def __init__(
self.role_arn = role_arn
self.database = database
self.catalog_name = catalog_name or 'AwsDataCatalog'
self.result_reuse_enable = result_reuse_enable
self.result_reuse_minutes = result_reuse_minutes
self.connect()

def connect(self, database=None):
Expand All @@ -50,17 +54,26 @@ def connect(self, database=None):
catalog_name, database = database.split('.', 1)
else:
catalog_name = None
conn = pyathena.connect(
aws_access_key_id=self.aws_access_key_id,
aws_secret_access_key=self.aws_secret_access_key,
region_name=self.region_name,
s3_staging_dir=self.s3_staging_dir,
work_group=self.work_group,
schema_name=database or self.database,
role_arn=self.role_arn,
poll_interval=0.2, # 200ms
catalog_name=catalog_name or self.catalog_name
)

# Prepare connection parameters
conn_params = {
'aws_access_key_id': self.aws_access_key_id,
'aws_secret_access_key': self.aws_secret_access_key,
'region_name': self.region_name,
's3_staging_dir': self.s3_staging_dir,
'work_group': self.work_group,
'schema_name': database or self.database,
'role_arn': self.role_arn,
'poll_interval': 0.2, # 200ms
'catalog_name': catalog_name or self.catalog_name
}

# Add result reuse parameters if enabled
if self.result_reuse_enable:
conn_params['result_reuse_enable'] = True
conn_params['result_reuse_minutes'] = self.result_reuse_minutes

conn = pyathena.connect(**conn_params)
self.database = database or self.database

if hasattr(self, 'conn'):
Expand Down
13 changes: 10 additions & 3 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
TBD
=====
1.7.0 (TBD)
============

Features:
---------
* Add support for Amazon Athena query result reuse with configurable TTL
- Add `--result-reuse-enable` CLI option to enable/disable query result reuse
- Add `--result-reuse-minutes` CLI option to configure TTL in minutes
- Add `result_reuse_enable` and `result_reuse_minutes` configuration options in athenaclirc
- Requires Athena engine version 3
- Can drastically improve query performance for repeated queries
* Allow catalog to be specified as part of the database argument. ([<catalog>.]<database>)

1.6.8 (2022/05/15)
Expand All @@ -25,7 +33,6 @@ Bugfix:
==================

* Update the default branch to 'main'

1.6.4 (2022/04/24)
==================

Expand Down