Skip to content
This repository was archived by the owner on Jul 16, 2024. It is now read-only.

Commit 4e84e76

Browse files
committed
Default batch size now takes into account lambda concurrency
1 parent d3b14d2 commit 4e84e76

File tree

4 files changed

+11
-8
lines changed

4 files changed

+11
-8
lines changed

src/python/driver.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ def write_job_config(job_id, job_bucket, n_mappers, r_func, r_handler):
103103
for obj in s3.Bucket(bucket).objects.filter(Prefix=config["prefix"]).all():
104104
all_keys.append(obj)
105105

106-
bsize = lambdautils.compute_batch_size(all_keys, lambda_memory)
106+
bsize = lambdautils.compute_batch_size(all_keys, lambda_memory, concurrent_lambdas)
107107
batches = lambdautils.batch_creator(all_keys, bsize)
108108
n_mappers = len(batches)
109109
document = xray_recorder.current_subsegment()

src/python/jobinfo.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
2-
"jobBucket": "smallya-useast-1",
3-
"mapCount": 29,
2+
"jobBucket": "aws-bigdata-mapreduce-sr18",
3+
"mapCount": 202,
44
"reducerFunction": "BL-reducer-bl-release",
55
"reducerHandler": "reducer.lambda_handler",
66
"jobId": "bl-release"

src/python/lambdautils.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ def create_lambda_function(self):
4545
Description = self.function_name,
4646
MemorySize = self.memory,
4747
Timeout = self.timeout,
48-
TracingConfig={'Mode':'Active'}
48+
TracingConfig={'Mode':'PassThrough'}
4949
)
5050
self.function_arn = response['FunctionArn']
5151
print response
@@ -127,7 +127,7 @@ def cleanup_logs(cls, func_name):
127127
response = log_client.delete_log_group(logGroupName='/aws/lambda/' + func_name)
128128
return response
129129

130-
def compute_batch_size(keys, lambda_memory, gzip=False):
130+
def compute_batch_size(keys, lambda_memory, concurrent_lambdas):
131131
max_mem_for_data = 0.6 * lambda_memory * 1000 * 1000;
132132
size = 0.0
133133
for key in keys:
@@ -137,8 +137,11 @@ def compute_batch_size(keys, lambda_memory, gzip=False):
137137
size += key.size
138138
avg_object_size = size/len(keys)
139139
print "Dataset size: %s, nKeys: %s, avg: %s" %(size, len(keys), avg_object_size)
140-
b_size = int(round(max_mem_for_data/avg_object_size))
141-
return b_size
140+
if avg_object_size < max_mem_for_data and len(keys) < concurrent_lambdas:
141+
b_size = 1
142+
else:
143+
b_size = int(round(max_mem_for_data/avg_object_size))
144+
return b_size
142145

143146
def batch_creator(all_keys, batch_size):
144147
'''

src/python/reducerCoordinator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ def get_mapper_files(files):
6060

6161
def get_reducer_batch_size(keys):
6262
#TODO: Paramertize memory size
63-
batch_size = lambdautils.compute_batch_size(keys, 1536)
63+
batch_size = lambdautils.compute_batch_size(keys, 1536, 1000)
6464
return max(batch_size, 2) # At least 2 in a batch - Condition for termination
6565

6666
def check_job_done(files):

0 commit comments

Comments
 (0)