Skip to content

Commit

Permalink
Merge pull request #334 from tilezen/zerebubuth/use-boto3-for-store
Browse files Browse the repository at this point in the history
Use boto3 for tile store
  • Loading branch information
zerebubuth authored May 4, 2018
2 parents be6e4b1 + baeb857 commit 5703b81
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 55 deletions.
4 changes: 2 additions & 2 deletions tests/test_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def test_example_coord(self):
layer = 'all'
tile_key = s3_tile_key(date_str, path, layer, coord,
json_format.extension)
self.assertEqual(tile_key, '/20160121/b707d/osm/all/8/72/105.json')
self.assertEqual(tile_key, '20160121/b707d/osm/all/8/72/105.json')

def test_no_path(self):
from tilequeue.store import s3_tile_key
Expand All @@ -81,7 +81,7 @@ def test_no_path(self):
layer = 'all'
tile_key = s3_tile_key(date_str, path, layer, coord,
json_format.extension)
self.assertEqual(tile_key, '/20160121/cfc61/all/8/72/105.json')
self.assertEqual(tile_key, '20160121/cfc61/all/8/72/105.json')


class WriteTileIfChangedTest(unittest.TestCase):
Expand Down
3 changes: 2 additions & 1 deletion tilequeue/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -1979,7 +1979,8 @@ def tilequeue_batch_enqueue(cfg, args):
logger = make_logger(cfg, 'batch_enqueue')

import boto3
client = boto3.client('batch', region_name='us-east-1')
region_name = os.environ.get('AWS_DEFAULT_REGION', 'us-east-1')
client = boto3.client('batch', region_name=region_name)

logger.info('Batch enqueue ...')

Expand Down
137 changes: 85 additions & 52 deletions tilequeue/store.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# define locations to store the rendered data

from boto import connect_s3
from boto.s3.bucket import Bucket
import boto3
from botocore.exceptions import ClientError
from builtins import range
from future.utils import raise_from
import md5
Expand All @@ -12,6 +12,7 @@
import random
import threading
import time
from cStringIO import StringIO


def calc_hash(s):
Expand All @@ -32,7 +33,7 @@ def s3_tile_key(date, path, layer, coord, extension):
ext=extension,
)
md5_hash = calc_hash(path_to_hash)
s3_path = '/%(date)s/%(md5)s%(path_to_hash)s' % dict(
s3_path = '%(date)s/%(md5)s%(path_to_hash)s' % dict(
date=date,
md5=md5_hash,
path_to_hash=path_to_hash,
Expand Down Expand Up @@ -96,39 +97,60 @@ def func(*args, **kwargs):
class S3(object):

def __init__(
self, bucket, date_prefix, path, reduced_redundancy,
delete_retry_interval, logger):
self.bucket = bucket
self, s3_client, bucket_name, date_prefix, path,
reduced_redundancy, delete_retry_interval, logger,
object_acl):
self.s3_client = s3_client
self.bucket_name = bucket_name
self.date_prefix = date_prefix
self.path = path
self.reduced_redundancy = reduced_redundancy
self.delete_retry_interval = delete_retry_interval
self.logger = logger
self.object_acl = object_acl

def write_tile(self, tile_data, coord, format, layer):
key_name = s3_tile_key(
self.date_prefix, self.path, layer, coord, format.extension)
key = self.bucket.new_key(key_name)

storage_class = 'STANDARD'
if self.reduced_redundancy:
storage_class = 'REDUCED_REDUNDANCY'

@_backoff_and_retry(Exception, logger=self.logger)
def write_to_s3():
key.set_contents_from_string(
tile_data,
headers={'Content-Type': format.mimetype},
policy='public-read',
reduced_redundancy=self.reduced_redundancy,
)
try:
self.s3_client.put_object(
Bucket=self.bucket_name,
Key=key_name,
Body=tile_data,
ContentType=format.mimetype,
ACL=self.object_acl,
StorageClass=storage_class,
)
except ClientError as e:
# it's really useful for debugging if we know exactly what
# request is failing.
raise RuntimeError(
"Error while trying to write %r to bucket %r: %s"
% (key_name, self.bucket_name, str(e)))

write_to_s3()

def read_tile(self, coord, format, layer):
key_name = s3_tile_key(
self.date_prefix, self.path, layer, coord, format.extension)
key = self.bucket.get_key(key_name)
if key is None:
return None
tile_data = key.get_contents_as_string()
return tile_data

try:
io = StringIO()
self.s3_client.download_fileobj(self.bucket_name, key_name, io)
return io.bytes()

except ClientError as e:
if e.response['Error']['Code'] != '404':
raise

return None

def delete_tiles(self, coords, format, layer):
key_names = [
Expand All @@ -138,22 +160,31 @@ def delete_tiles(self, coords, format, layer):
]

num_deleted = 0
while key_names:
del_result = self.bucket.delete_keys(key_names)
num_deleted += len(del_result.deleted)

key_names = []
for error in del_result.errors:
# retry on internal error. documentation implies that the only
# possible two errors are AccessDenied and InternalError.
# retrying when access denied seems unlikely to work, but an
# internal error might be transient.
if error.code == 'InternalError':
key_names.append(error.key)

# pause a bit to give transient errors a chance to clear.
if key_names:
time.sleep(self.delete_retry_interval)
chunk_size = 1000
for idx in xrange(0, len(key_names), chunk_size):
chunk = key_names[idx:idx+chunk_size]

while chunk:
objects = [dict(Key=k) for k in chunk]
del_result = self.s3_client.delete_objects(
Bucket=self.bucket_name,
Delete=dict(Objects=objects),
)
num_deleted += len(del_result['Deleted'])

chunk = []
for error in del_result['Errors']:
# retry on internal error. documentation implies that the
# only possible two errors are AccessDenied and
# InternalError. retrying when access denied seems
# unlikely to work, but an internal error might be
# transient.
if error['Code'] == 'InternalError':
chunk.append(error['Key'])

# pause a bit to give transient errors a chance to clear.
if chunk:
time.sleep(self.delete_retry_interval)

# make sure that we deleted all the tiles - this seems like the
# expected behaviour from the calling code.
Expand All @@ -164,11 +195,17 @@ def delete_tiles(self, coords, format, layer):

def list_tiles(self, format, layer):
ext = '.' + format.extension
for key_obj in self.bucket.list(prefix=self.date_prefix):
key = key_obj.key
coord = parse_coordinate_from_path(key, ext, layer)
if coord:
yield coord
paginator = self.s3_client.get_paginator('list_objects_v2')
page_iter = paginator.paginate(
Bucket=self.bucket_name,
Prefix=self.date_prefix
)
for page in page_iter:
for key_obj in page['Contents']:
key = key_obj['Key']
coord = parse_coordinate_from_path(key, ext, layer)
if coord:
yield coord


def make_dir_path(base_path, coord, layer):
Expand Down Expand Up @@ -352,13 +389,12 @@ def list_tiles(self, format, layer):


def make_s3_store(bucket_name,
aws_access_key_id=None, aws_secret_access_key=None,
path='osm', reduced_redundancy=False, date_prefix='',
delete_retry_interval=60, logger=None):
conn = connect_s3(aws_access_key_id, aws_secret_access_key)
bucket = Bucket(conn, bucket_name)
s3_store = S3(bucket, date_prefix, path, reduced_redundancy,
delete_retry_interval, logger)
delete_retry_interval=60, logger=None,
object_acl='public-read'):
s3 = boto3.client('s3')
s3_store = S3(s3, bucket_name, date_prefix, path, reduced_redundancy,
delete_retry_interval, logger, object_acl)
return s3_store


Expand Down Expand Up @@ -408,16 +444,13 @@ def make_store(yml, credentials={}, logger=None):
reduced_redundancy = yml.get('reduced-redundancy')
date_prefix = yml.get('date-prefix')
delete_retry_interval = yml.get('delete-retry-interval')

assert credentials, 'S3 store configured, but no AWS credentials ' \
'provided. AWS credentials are required to use S3.'
aws_access_key_id = credentials.get('aws_access_key_id')
aws_secret_access_key = credentials.get('aws_secret_access_key')
object_acl = yml.get('object-acl', 'public-read')

return make_s3_store(
bucket, aws_access_key_id, aws_secret_access_key, path=path,
bucket, path=path,
reduced_redundancy=reduced_redundancy, date_prefix=date_prefix,
delete_retry_interval=delete_retry_interval, logger=logger)
delete_retry_interval=delete_retry_interval, logger=logger,
object_acl=object_acl)

else:
raise ValueError('Unrecognized store type: `{}`'.format(store_type))

0 comments on commit 5703b81

Please sign in to comment.