Skip to content

Commit b8d22f2

Browse files
committed
major refactoring of the serverless objects; use aws sam for testing with the lambda local option
1 parent 609e966 commit b8d22f2

9 files changed

+165
-217
lines changed

doubleml_serverless/double_ml_aws_lambda.py

Lines changed: 81 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,18 @@
11
import pandas as pd
22
import asyncio
33
import aiobotocore
4+
from botocore import UNSIGNED
5+
from botocore.config import Config
46
import json
57

6-
from .lambda_functions.cv_predict import lambda_cv_predict
8+
from abc import ABC, abstractmethod
9+
10+
#from .lambda_functions.cv_predict import lambda_cv_predict
711
from ._helper import _extract_preds, _extract_lambda_metrics
812

913

10-
class DoubleMLLambda:
14+
class DoubleMLLambda(ABC):
15+
1116
def __init__(self,
1217
lambda_function_name,
1318
aws_region):
@@ -41,24 +46,54 @@ def aws_lambda_metrics(self):
4146
metrics['Avg Max Memory Used (MB)'] = df['Max Memory Used'].mean()
4247
return metrics
4348

49+
@abstractmethod
50+
def _ml_nuisance_aws_lambda(self, cv_params):
51+
pass
52+
53+
@abstractmethod
54+
def _est_causal_pars_and_se(self):
55+
pass
56+
57+
@abstractmethod
58+
def _clean_scores(self):
59+
pass
60+
61+
def fit_aws_lambda(self, n_lambdas_cv='n_folds * n_rep', seed=None, keep_scores=True):
62+
"""
63+
Parameters
64+
----------
65+
n_lambdas_cv : str
66+
67+
seed : int or None
68+
69+
keep_scores : bool
70+
"""
71+
if (not isinstance(n_lambdas_cv, str)) | (n_lambdas_cv not in ['n_folds * n_rep', 'n_rep']):
72+
raise ValueError('n_lambdas_cv must be "n_folds * n_rep" or "n_rep"'
73+
f' got {str(n_lambdas_cv)}')
74+
75+
# ml estimation of nuisance models and computation of score elements
76+
cv_params = {'n_lambdas_cv': n_lambdas_cv,
77+
'seed': seed}
78+
self._ml_nuisance_aws_lambda(cv_params)
79+
80+
self._est_causal_pars_and_se()
81+
82+
if not keep_scores:
83+
self._clean_scores()
84+
85+
return self
86+
4487
def invoke_lambdas(self, payloads, smpls, params_names, n_obs, n_rep, n_jobs_cv):
45-
if self.lambda_function_name == 'local':
46-
assert self.aws_region == 'local'
47-
# this callable option is just for local testing
48-
context = dict()
49-
results = []
50-
for this_payload in payloads:
51-
xx = json.dumps(this_payload)
52-
yy = json.loads(xx)
53-
this_res = dict()
54-
this_res['payload'] = json.dumps(lambda_cv_predict(yy, context))
55-
results.append(this_res)
88+
if self.aws_region == 'local':
89+
loop = asyncio.get_event_loop()
90+
results = loop.run_until_complete(self.__invoke_aws_lambdas_locally(payloads))
5691
else:
5792
loop = asyncio.get_event_loop()
5893
results = loop.run_until_complete(self.__invoke_aws_lambdas(payloads))
5994
preds, requests = _extract_preds(results, smpls, params_names,
6095
n_obs, n_rep, n_jobs_cv)
61-
if self.lambda_function_name != 'local':
96+
if self.aws_region != 'local':
6297
df_lambda_metrics = _extract_lambda_metrics(results)
6398
self.aws_lambda_detailed_metrics = self.aws_lambda_detailed_metrics.append(
6499
pd.concat((requests, df_lambda_metrics), axis=1))
@@ -90,3 +125,35 @@ async def __invoke_single_aws_lambda(self, session, payload):
90125

91126
return res
92127

128+
async def __invoke_aws_lambdas_locally(self, payloads):
129+
session = aiobotocore.get_session()
130+
tasks = []
131+
for this_payload in payloads:
132+
tasks.append(self.__invoke_single_aws_lambda_locally(session, this_payload))
133+
results = await asyncio.gather(*tasks)
134+
return results
135+
136+
async def __invoke_single_aws_lambda_locally(self, session, payload):
137+
async with session.create_client('lambda',
138+
endpoint_url='http://127.0.0.1:3001',
139+
use_ssl=False,
140+
verify=False,
141+
config=Config(signature_version=UNSIGNED,
142+
read_timeout=0,
143+
retries={'max_attempts': 0})
144+
) as lambda_client:
145+
# print(f'Invoking {payload["learner"]} {payload["i_rep"]} {payload["i_fold"]}')
146+
response = await lambda_client.invoke(
147+
FunctionName=self.lambda_function_name,
148+
InvocationType='RequestResponse',
149+
LogType='None',
150+
Payload=json.dumps(payload),
151+
)
152+
# print(f'Done {payload["learner"]} {payload["i_rep"]} {payload["i_fold"]}')
153+
res = dict()
154+
async with response['Payload'] as stream:
155+
res['payload'] = await stream.read()
156+
# res['log'] = response['LogResult']
157+
# print(f'Finished {payload["learner"]} {payload["i_rep"]} {payload["i_fold"]}')
158+
159+
return res

doubleml_serverless/double_ml_iivm_aws_lambda.py

Lines changed: 18 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -41,43 +41,15 @@ def __init__(self,
4141
lambda_function_name,
4242
aws_region)
4343

44-
# this method overwrites DoubleML.fit() to implement the fit via aws lambda
45-
def fit(self, n_jobs_cv='n_folds * n_rep', seed=None, keep_scores=True):
46-
"""
47-
Parameters
48-
----------
49-
n_jobs_cv : str
50-
51-
seed : int or None
52-
53-
keep_scores : bool
54-
"""
55-
56-
if (not isinstance(n_jobs_cv, str)) | (n_jobs_cv not in ['n_folds * n_rep', 'n_rep']):
57-
raise ValueError('n_jobs_cv must be "n_folds * n_rep" or "n_rep"'
58-
f' got {str(n_jobs_cv)}')
59-
44+
def _ml_nuisance_aws_lambda(self, cv_params):
6045
assert self._dml_data.n_treat == 1
6146
self._i_treat = 0
6247

63-
# ml estimation of nuisance models and computation of score elements
64-
psi_a, psi_b = self._ml_nuisance_and_score_elements(self.smpls, n_jobs_cv, seed)
65-
self._psi_a[:, :, self._i_treat] = psi_a
66-
self._psi_b[:, :, self._i_treat] = psi_b
67-
68-
self._est_causal_pars_and_se()
69-
70-
if not keep_scores:
71-
self._clean_scores()
72-
73-
return self
74-
75-
def _ml_nuisance_and_score_elements(self, smpls, n_jobs_cv, seed):
7648
x, y = check_X_y(self._dml_data.x, self._dml_data.y)
7749
x, z = check_X_y(x, np.ravel(self._dml_data.z))
7850
x, d = check_X_y(x, self._dml_data.d)
7951
# get train indices for z == 0 and z == 1
80-
smpls_z0, smpls_z1 = _get_cond_smpls(smpls, z)
52+
smpls_z0, smpls_z1 = _get_cond_smpls(self.smpls, z)
8153

8254
payload = self._dml_data.get_payload()
8355

@@ -111,32 +83,31 @@ def _ml_nuisance_and_score_elements(self, smpls, n_jobs_cv, seed):
11183
method='predict_proba')
11284

11385
all_payloads = [payload_ml_g0, payload_ml_g1, payload_ml_m, payload_ml_r0, payload_ml_r1]
114-
all_smpls = [smpls_z0, smpls_z1, smpls, smpls_z0, smpls_z1]
86+
all_smpls = [smpls_z0, smpls_z1, self.smpls, smpls_z0, smpls_z1]
11587

11688
payloads = _attach_smpls(all_payloads,
11789
all_smpls,
11890
self.n_folds,
11991
self.n_rep,
12092
self._dml_data.n_obs,
121-
n_jobs_cv,
93+
cv_params['n_lambdas_cv'],
12294
[True, True, False, True, True],
123-
seed)
95+
cv_params['seed'])
12496

125-
preds = self.invoke_lambdas(payloads, smpls, self.params_names,
97+
preds = self.invoke_lambdas(payloads, self.smpls, self.params_names,
12698
self._dml_data.n_obs, self.n_rep,
127-
n_jobs_cv)
128-
129-
psi_a = np.full((self._dml_data.n_obs, self.n_rep), np.nan)
130-
psi_b = np.full((self._dml_data.n_obs, self.n_rep), np.nan)
99+
cv_params['n_lambdas_cv'])
131100

132101
for i_rep in range(self.n_rep):
133102
# compute score elements
134-
psi_a[:, i_rep], psi_b[:, i_rep] = self._score_elements(y, z, d,
135-
preds['ml_g0'][:, i_rep],
136-
preds['ml_g1'][:, i_rep],
137-
preds['ml_m'][:, i_rep],
138-
preds['ml_r0'][:, i_rep],
139-
preds['ml_r1'][:, i_rep],
140-
smpls[i_rep])
141-
142-
return psi_a, psi_b
103+
104+
self._psi_a[:, i_rep, self._i_treat], self._psi_b[:, i_rep, self._i_treat] = \
105+
self._score_elements(y, z, d,
106+
preds['ml_g0'][:, i_rep],
107+
preds['ml_g1'][:, i_rep],
108+
preds['ml_m'][:, i_rep],
109+
preds['ml_r0'][:, i_rep],
110+
preds['ml_r1'][:, i_rep],
111+
self.smpls[i_rep])
112+
113+
return

doubleml_serverless/double_ml_irm_aws_lambda.py

Lines changed: 16 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -39,42 +39,14 @@ def __init__(self,
3939
lambda_function_name,
4040
aws_region)
4141

42-
# this method overwrites DoubleML.fit() to implement the fit via aws lambda
43-
def fit(self, n_jobs_cv='n_folds * n_rep', seed=None, keep_scores=True):
44-
"""
45-
Parameters
46-
----------
47-
n_jobs_cv : str
48-
49-
seed : int or None
50-
51-
keep_scores : bool
52-
"""
53-
54-
if (not isinstance(n_jobs_cv, str)) | (n_jobs_cv not in ['n_folds * n_rep', 'n_rep']):
55-
raise ValueError('n_jobs_cv must be "n_folds * n_rep" or "n_rep"'
56-
f' got {str(n_jobs_cv)}')
57-
42+
def _ml_nuisance_aws_lambda(self, cv_params):
5843
assert self._dml_data.n_treat == 1
5944
self._i_treat = 0
6045

61-
# ml estimation of nuisance models and computation of score elements
62-
psi_a, psi_b = self._ml_nuisance_and_score_elements(self.smpls, n_jobs_cv, seed)
63-
self._psi_a[:, :, self._i_treat] = psi_a
64-
self._psi_b[:, :, self._i_treat] = psi_b
65-
66-
self._est_causal_pars_and_se()
67-
68-
if not keep_scores:
69-
self._clean_scores()
70-
71-
return self
72-
73-
def _ml_nuisance_and_score_elements(self, smpls, n_jobs_cv, seed):
7446
x, y = check_X_y(self._dml_data.x, self._dml_data.y)
7547
x, d = check_X_y(x, self._dml_data.d)
7648
# get train indices for d == 0 and d == 1
77-
smpls_d0, smpls_d1 = _get_cond_smpls(smpls, d)
49+
smpls_d0, smpls_d1 = _get_cond_smpls(self.smpls, d)
7850

7951
payload = self._dml_data.get_payload()
8052

@@ -97,33 +69,31 @@ def _ml_nuisance_and_score_elements(self, smpls, n_jobs_cv, seed):
9769
method='predict_proba')
9870
if (self.score == 'ATE') | callable(self.score):
9971
all_payloads = [payload_ml_g0, payload_ml_g1, payload_ml_m]
100-
all_smpls = [smpls_d0, smpls_d1, smpls]
72+
all_smpls = [smpls_d0, smpls_d1, self.smpls]
10173
else:
10274
all_payloads = [payload_ml_g0, payload_ml_m]
103-
all_smpls = [smpls_d0, smpls]
75+
all_smpls = [smpls_d0, self.smpls]
10476

10577
payloads = _attach_smpls(all_payloads,
10678
all_smpls,
10779
self.n_folds,
10880
self.n_rep,
10981
self._dml_data.n_obs,
110-
n_jobs_cv,
82+
cv_params['n_lambdas_cv'],
11183
[True, True, False],
112-
seed)
84+
cv_params['seed'])
11385

114-
preds = self.invoke_lambdas(payloads, smpls, self.params_names,
86+
preds = self.invoke_lambdas(payloads, self.smpls, self.params_names,
11587
self._dml_data.n_obs, self.n_rep,
116-
n_jobs_cv)
117-
118-
psi_a = np.full((self._dml_data.n_obs, self.n_rep), np.nan)
119-
psi_b = np.full((self._dml_data.n_obs, self.n_rep), np.nan)
88+
cv_params['n_lambdas_cv'])
12089

12190
for i_rep in range(self.n_rep):
12291
# compute score elements
123-
psi_a[:, i_rep], psi_b[:, i_rep] = self._score_elements(y, d,
124-
preds['ml_g0'][:, i_rep],
125-
preds['ml_g1'][:, i_rep],
126-
preds['ml_m'][:, i_rep],
127-
smpls[i_rep])
128-
129-
return psi_a, psi_b
92+
self._psi_a[:, i_rep, self._i_treat], self._psi_b[:, i_rep, self._i_treat] = \
93+
self._score_elements(y, d,
94+
preds['ml_g0'][:, i_rep],
95+
preds['ml_g1'][:, i_rep],
96+
preds['ml_m'][:, i_rep],
97+
self.smpls[i_rep])
98+
99+
return

doubleml_serverless/double_ml_pliv_aws_lambda.py

Lines changed: 14 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -35,38 +35,10 @@ def __init__(self,
3535
lambda_function_name,
3636
aws_region)
3737

38-
# this method overwrites DoubleML.fit() to implement the fit via aws lambda
39-
def fit(self, n_jobs_cv='n_folds * n_rep', seed=None, keep_scores=True):
40-
"""
41-
Parameters
42-
----------
43-
n_jobs_cv : str
44-
45-
seed : int or None
46-
47-
keep_scores : bool
48-
"""
49-
50-
if (not isinstance(n_jobs_cv, str)) | (n_jobs_cv not in ['n_folds * n_rep', 'n_rep']):
51-
raise ValueError('n_jobs_cv must be "n_folds * n_rep" or "n_rep"'
52-
f' got {str(n_jobs_cv)}')
53-
38+
def _ml_nuisance_aws_lambda(self, cv_params):
5439
assert self._dml_data.n_treat == 1
5540
self._i_treat = 0
5641

57-
# ml estimation of nuisance models and computation of score elements
58-
psi_a, psi_b = self._ml_nuisance_and_score_elements(self.smpls, n_jobs_cv, seed)
59-
self._psi_a[:, :, self._i_treat] = psi_a
60-
self._psi_b[:, :, self._i_treat] = psi_b
61-
62-
self._est_causal_pars_and_se()
63-
64-
if not keep_scores:
65-
self._clean_scores()
66-
67-
return self
68-
69-
def _ml_nuisance_and_score_elements(self, smpls, n_jobs_cv, seed):
7042
x, y = check_X_y(self._dml_data.x, self._dml_data.y)
7143
x, d = check_X_y(x, self._dml_data.d)
7244
assert self._dml_data.n_instr == 1
@@ -92,27 +64,25 @@ def _ml_nuisance_and_score_elements(self, smpls, n_jobs_cv, seed):
9264
self._dml_data.d_cols[0], self._dml_data.x_cols)
9365

9466
payloads = _attach_smpls([payload_ml_g, payload_ml_m, payload_ml_r],
95-
[smpls, smpls, smpls],
67+
[self.smpls, self.smpls, self.smpls],
9668
self.n_folds,
9769
self.n_rep,
9870
self._dml_data.n_obs,
99-
n_jobs_cv,
71+
cv_params['n_lambdas_cv'],
10072
[False, False, False],
101-
seed)
73+
cv_params['seed'])
10274

103-
preds = self.invoke_lambdas(payloads, smpls, self.params_names,
75+
preds = self.invoke_lambdas(payloads, self.smpls, self.params_names,
10476
self._dml_data.n_obs, self.n_rep,
105-
n_jobs_cv)
106-
107-
psi_a = np.full((self._dml_data.n_obs, self.n_rep), np.nan)
108-
psi_b = np.full((self._dml_data.n_obs, self.n_rep), np.nan)
77+
cv_params['n_lambdas_cv'])
10978

11079
for i_rep in range(self.n_rep):
11180
# compute score elements
112-
psi_a[:, i_rep], psi_b[:, i_rep] = self._score_elements(y, z, d,
113-
preds['ml_g'][:, i_rep],
114-
preds['ml_m'][:, i_rep],
115-
preds['ml_r'][:, i_rep],
116-
smpls[i_rep])
117-
118-
return psi_a, psi_b
81+
self._psi_a[:, i_rep, self._i_treat], self._psi_b[:, i_rep, self._i_treat] = \
82+
self._score_elements(y, z, d,
83+
preds['ml_g'][:, i_rep],
84+
preds['ml_m'][:, i_rep],
85+
preds['ml_r'][:, i_rep],
86+
self.smpls[i_rep])
87+
88+
return

0 commit comments

Comments
 (0)