Skip to content

Commit 2eeff66

Browse files
authored
Merge pull request pywren#88 from pywren/sharding-config
Move runtime sharding into config variable
2 parents 7bde69a + 4dc9450 commit 2eeff66

File tree

8 files changed

+51
-51
lines changed

8 files changed

+51
-51
lines changed

pywren/executor.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import pickle
99
from multiprocessing.pool import ThreadPool
1010
import time
11+
import random
1112
import logging
1213
import botocore
1314
import glob2
@@ -31,7 +32,7 @@ class Executor(object):
3132
"""
3233

3334
def __init__(self, aws_region, s3_bucket, s3_prefix,
34-
invoker, config, job_max_runtime, shard_runtime=False):
35+
invoker, config, job_max_runtime):
3536
self.aws_region = aws_region
3637
self.s3_bucket = s3_bucket
3738
self.s3_prefix = s3_prefix
@@ -41,11 +42,11 @@ def __init__(self, aws_region, s3_bucket, s3_prefix,
4142
self.invoker = invoker
4243
self.s3client = self.session.create_client('s3', region_name = aws_region)
4344
self.job_max_runtime = job_max_runtime
44-
self.shard_runtime = shard_runtime
4545

4646
runtime_bucket = config['runtime']['s3_bucket']
4747
runtime_key = config['runtime']['s3_key']
48-
if not runtime.runtime_key_valid(runtime_bucket, runtime_key):
48+
self.runtime_meta_info = runtime.get_runtime_info(runtime_bucket, runtime_key)
49+
if not runtime.runtime_key_valid(self.runtime_meta_info):
4950
raise Exception("The indicated runtime: s3://{}/{} is not approprite for this python version".format(runtime_bucket, runtime_key))
5051

5152
def create_mod_data(self, mod_paths):
@@ -85,6 +86,18 @@ def invoke_with_keys(self, s3_func_key, s3_data_key, s3_output_key,
8586
host_job_meta, job_max_runtime,
8687
overwrite_invoke_args = None):
8788

89+
# Pick a runtime url if we have shards. If not the handler will construct it
90+
# using s3_bucket and s3_key
91+
runtime_url = ""
92+
if ('urls' in self.runtime_meta_info and
93+
isinstance(self.runtime_meta_info['urls'], list) and
94+
len(self.runtime_meta_info['urls']) > 1):
95+
num_shards = len(self.runtime_meta_info['urls'])
96+
logger.debug("Runtime is sharded, choosing from {} copies.".format(num_shards))
97+
random.seed()
98+
runtime_url = random.choice(self.runtime_meta_info['urls'])
99+
100+
88101
arg_dict = {'func_key' : s3_func_key,
89102
'data_key' : s3_data_key,
90103
'output_key' : s3_output_key,
@@ -97,7 +110,7 @@ def invoke_with_keys(self, s3_func_key, s3_data_key, s3_output_key,
97110
'runtime_s3_bucket' : self.config['runtime']['s3_bucket'],
98111
'runtime_s3_key' : self.config['runtime']['s3_key'],
99112
'pywren_version' : version.__version__,
100-
'shard_runtime_key' : self.shard_runtime}
113+
'runtime_url' : runtime_url }
101114

102115
if extra_env is not None:
103116
logger.debug("Extra environment vars {}".format(extra_env))

pywren/runtime.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@ def get_runtime_info(bucket, key):
1818
def version_str(version_info):
1919
return "{}.{}".format(version_info[0], version_info[1])
2020

21-
def runtime_key_valid(bucket, key):
22-
runtime_meta = get_runtime_info(bucket, key)
21+
def runtime_key_valid(runtime_meta):
2322
return runtime_valid(runtime_meta)
2423

2524
def runtime_valid(runtime_meta):

pywren/wren.py

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@
66
import pickle
77
from tblib import pickling_support
88
import logging
9+
import botocore
10+
import glob2
11+
import random
912
import os
1013
pickling_support.install()
1114

@@ -32,7 +35,7 @@ def default_executor(**kwargs):
3235
return lambda_executor(**kwargs)
3336

3437

35-
def lambda_executor(config= None, job_max_runtime=280, shard_runtime=False):
38+
def lambda_executor(config=None, job_max_runtime=280):
3639

3740
if config is None:
3841
config = wrenconfig.default()
@@ -43,22 +46,21 @@ def lambda_executor(config= None, job_max_runtime=280, shard_runtime=False):
4346
S3_PREFIX = config['s3']['pywren_prefix']
4447

4548
invoker = invokers.LambdaInvoker(AWS_REGION, FUNCTION_NAME)
46-
return Executor(AWS_REGION, S3_BUCKET, S3_PREFIX, invoker, config,
47-
job_max_runtime, shard_runtime=shard_runtime)
49+
return Executor(AWS_REGION, S3_BUCKET, S3_PREFIX, invoker, config,
50+
job_max_runtime)
4851

4952

50-
def dummy_executor(shard_runtime=False):
53+
def dummy_executor():
5154
config = wrenconfig.default()
5255
AWS_REGION = config['account']['aws_region']
5356
S3_BUCKET = config['s3']['bucket']
5457
S3_PREFIX = config['s3']['pywren_prefix']
5558
invoker = invokers.DummyInvoker()
5659
return Executor(AWS_REGION, S3_BUCKET, S3_PREFIX, invoker, config,
57-
100, shard_runtime=shard_runtime)
60+
100)
5861

5962

60-
def remote_executor(config= None, job_max_runtime=3600,
61-
shard_runtime=False):
63+
def remote_executor(config= None, job_max_runtime=3600):
6264
if config is None:
6365
config = wrenconfig.default()
6466

@@ -68,7 +70,9 @@ def remote_executor(config= None, job_max_runtime=3600,
6870
S3_PREFIX = config['s3']['pywren_prefix']
6971
invoker = invokers.SQSInvoker(AWS_REGION, SQS_QUEUE)
7072
return Executor(AWS_REGION, S3_BUCKET, S3_PREFIX, invoker, config,
71-
job_max_runtime, shard_runtime=shard_runtime)
73+
job_max_runtime)
74+
75+
standalone_executor = remote_executor
7276

7377

7478
def get_all_results(fs):

pywren/wrenconfig.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
MAX_AGG_DATA_SIZE = 4e6
2323

24-
MAX_S3_RUNTIME_SHARDS=50
2524
default_runtime = {'2.7' : "pywren.runtime/pywren_runtime-2.7-default.tar.gz",
2625
'3.5' : "pywren.runtime/pywren_runtime-3.5-default.tar.gz",
2726
'3.6' : "pywren.runtime/pywren_runtime-3.6-default.tar.gz"}

pywren/wrenhandler.py

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,22 +13,17 @@
1313
import traceback
1414
from threading import Thread
1515
import signal
16-
import random
1716

1817
if (sys.version_info > (3, 0)):
1918
from . import wrenutil
2019
from . import s3util
2120
from . import version
22-
from . import wrenconfig
23-
from . import wrenlogging
2421
from queue import Queue, Empty
2522

2623
else:
2724
import wrenutil
2825
import s3util
2926
import version
30-
import wrenconfig
31-
import wrenlogging
3227
from Queue import Queue, Empty
3328

3429

@@ -145,12 +140,11 @@ def generic_handler(event, context_dict):
145140

146141
runtime_s3_bucket = event['runtime_s3_bucket']
147142
runtime_s3_key = event['runtime_s3_key']
148-
if event.get('shard_runtime_key', False):
149-
random.seed()
150-
shard = random.randrange(wrenconfig.MAX_S3_RUNTIME_SHARDS)
151-
key_shard = wrenutil.get_s3_shard(runtime_s3_key, shard)
152-
runtime_s3_key_used = wrenutil.hash_s3_key(key_shard)
143+
if event.get('runtime_url'):
144+
# NOTE(shivaram): Right now we only support S3 urls.
145+
runtime_s3_bucket_used, runtime_s3_key_used = wrenutil.split_s3_url(event['runtime_url'])
153146
else:
147+
runtime_s3_bucket_used = runtime_s3_bucket
154148
runtime_s3_key_used = runtime_s3_key
155149

156150
job_max_runtime = event.get("job_max_runtime", 290) # default for lambda
@@ -224,8 +218,9 @@ def generic_handler(event, context_dict):
224218
logger.debug(subprocess.check_output("find {}".format(os.getcwd()), shell=True))
225219

226220
response_status['runtime_s3_key_used'] = runtime_s3_key_used
221+
response_status['runtime_s3_bucket_used'] = runtime_s3_bucket_used
227222

228-
runtime_cached = download_runtime_if_necessary(s3, runtime_s3_bucket,
223+
runtime_cached = download_runtime_if_necessary(s3, runtime_s3_bucket_used,
229224
runtime_s3_key_used)
230225
logger.info("Runtime ready, cached={}".format(runtime_cached))
231226
response_status['runtime_cached'] = runtime_cached

pywren/wrenutil.py

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -117,17 +117,3 @@ def split_s3_url(s3_url):
117117
bucket_name = splits[0]
118118
key = "/".join(splits[1:])
119119
return bucket_name, key
120-
121-
def hash_s3_key(s):
122-
"""
123-
MD5-hash the contents of an S3 key to enable good partitioning.
124-
used for sharding the runtimes
125-
"""
126-
DIGEST_LEN = 6
127-
m = hashlib.md5()
128-
m.update(s.encode('ascii'))
129-
digest = m.hexdigest()
130-
return "{}-{}".format(digest[:DIGEST_LEN], s)
131-
132-
def get_s3_shard(key, shard_num):
133-
return "{}.{:04d}".format(key, shard_num)

tests/test_simple.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import uuid
55
import numpy as np
66
import time
7+
import os
78
import pywren
89
import pywren.runtime
910
import subprocess
@@ -283,19 +284,29 @@ class RuntimeSharding(unittest.TestCase):
283284
download the real key
284285
"""
285286
def test_no_shard(self):
286-
wrenexec = pywren.default_executor(shard_runtime=False)
287+
config = pywren.wrenconfig.default()
288+
old_key = config['runtime']['s3_key']
289+
prefix, tar_gz = os.path.split(old_key)
290+
# Use the staging key to test as it doesn't have shards
291+
config['runtime']['s3_key'] = os.path.join("pywren.runtime.staging", tar_gz)
292+
wrenexec = pywren.default_executor(config=config)
287293

288294
def test_func(x):
289295
return x + 1
290296

291297
future = wrenexec.call_async(test_func, 7)
292298
result = future.result()
293299
base_runtime_key = wrenexec.config['runtime']['s3_key']
294-
self.assertEqual(future.run_status['runtime_s3_key_used'],
300+
self.assertEqual(future.run_status['runtime_s3_key_used'],
295301
base_runtime_key)
296302

297303
def test_shard(self):
298-
wrenexec = pywren.default_executor(shard_runtime=True)
304+
config = pywren.wrenconfig.default()
305+
old_key = config['runtime']['s3_key']
306+
prefix, tar_gz = os.path.split(old_key)
307+
# Use a runtime that has shards
308+
config['runtime']['s3_key'] = os.path.join("pywren.runtime", tar_gz)
309+
wrenexec = pywren.default_executor(config=config)
299310

300311
def test_func(x):
301312
return x + 1
@@ -304,5 +315,6 @@ def test_func(x):
304315

305316
future = wrenexec.call_async(test_func, 7)
306317
result = future.result()
318+
# NOTE: There is some probability we will hit the base key ?
307319
self.assertNotEqual(future.run_status['runtime_s3_key_used'],
308320
base_runtime_key)

tests/test_util.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,3 @@ def test_s3_split(self):
1616
with pytest.raises(ValueError) as excinfo:
1717
bad_s3_url = "notS3://foo/bar"
1818
bucket, key = pywren.wrenutil.split_s3_url(bad_s3_url)
19-
20-
def test_hash(self):
21-
22-
key = 'testkey'
23-
hashed_key = pywren.wrenutil.hash_s3_key(key)
24-
print(hashed_key)
25-
self.assertEqual(hashed_key[-len(key):], key)
26-
self.assertNotEqual(hashed_key, key)

0 commit comments

Comments
 (0)