14
14
import sdk_security
15
15
import sdk_tasks
16
16
17
+ from tests import hdfs_auth
17
18
from tests import utils
18
19
19
20
20
21
log = logging .getLogger (__name__ )
21
22
DEFAULT_HDFS_TASK_COUNT = 10
22
23
GENERIC_HDFS_USER_PRINCIPAL = "hdfs@{realm}" .format (realm = sdk_auth .REALM )
24
+ ALICE_PRINCIPAL = "alice@{realm}" .format (realm = sdk_auth .REALM )
23
25
KEYTAB_SECRET_PATH = os .getenv ("KEYTAB_SECRET_PATH" , "__dcos_base64___keytab" )
24
26
# To do: change when no longer using HDFS stub universe
25
27
HDFS_PACKAGE_NAME = 'beta-hdfs'
26
28
HDFS_SERVICE_NAME = 'hdfs'
27
- KERBEROS_ARGS = ["--kerberos-principal" , GENERIC_HDFS_USER_PRINCIPAL ,
28
- "--keytab-secret-path" , "/{}" .format (KEYTAB_SECRET_PATH )]
29
+ KERBEROS_ARGS = ["--kerberos-principal" , ALICE_PRINCIPAL ,
30
+ "--keytab-secret-path" , "/{}" .format (KEYTAB_SECRET_PATH ),
31
+ "--conf" , "spark.mesos.driverEnv.SPARK_USER={}" .format (utils .SPARK_USER )]
29
32
HDFS_CLIENT_ID = "hdfsclient"
30
33
SPARK_HISTORY_USER = "nobody"
31
34
@@ -64,21 +67,27 @@ def hdfs_with_kerberos(configure_security_hdfs):
64
67
)
65
68
)
66
69
principals .append (GENERIC_HDFS_USER_PRINCIPAL )
70
+ principals .append (ALICE_PRINCIPAL )
67
71
68
72
kerberos_env = sdk_auth .KerberosEnvironment ()
69
73
kerberos_env .add_principals (principals )
70
74
kerberos_env .finalize ()
71
75
service_kerberos_options = {
72
76
"service" : {
73
- "kerberos" : {
74
- "enabled" : True ,
75
- "kdc_host_name" : kerberos_env .get_host (),
76
- "kdc_host_port" : kerberos_env .get_port (),
77
- "keytab_secret" : kerberos_env .get_keytab_path (),
78
- "primary" : primaries [0 ],
79
- "primary_http" : primaries [1 ],
80
- "realm" : sdk_auth .REALM
77
+ "security" : {
78
+ "kerberos" : {
79
+ "enabled" : True ,
80
+ "kdc" : {
81
+ "hostname" : kerberos_env .get_host (),
82
+ "port" : int (kerberos_env .get_port ())
83
+ },
84
+ "keytab_secret" : kerberos_env .get_keytab_path (),
85
+ "realm" : kerberos_env .get_realm ()
86
+ }
81
87
}
88
+ },
89
+ "hdfs" : {
90
+ "security_auth_to_local" : hdfs_auth .get_principal_to_user_mapping ()
82
91
}
83
92
}
84
93
@@ -116,16 +125,24 @@ def setup_hdfs_client(hdfs_with_kerberos):
116
125
sdk_marathon .install_app (hdfsclient_app_def )
117
126
118
127
sdk_auth .kinit (HDFS_CLIENT_ID , keytab = "hdfs.keytab" , principal = GENERIC_HDFS_USER_PRINCIPAL )
128
+ hdfs_cmd ("mkdir -p /users/alice" )
129
+ hdfs_cmd ("chown alice:users /users/alice" )
119
130
yield
120
131
121
132
finally :
122
133
sdk_marathon .destroy_app (HDFS_CLIENT_ID )
123
134
124
135
136
+ def hdfs_cmd (cmd ):
137
+ sdk_tasks .task_exec (HDFS_CLIENT_ID , "bin/hdfs dfs -{}" .format (cmd ))
138
+
139
+
125
140
@pytest .fixture (scope = 'module' )
126
141
def setup_history_server (hdfs_with_kerberos , setup_hdfs_client , configure_universe ):
127
142
try :
128
- sdk_tasks .task_exec (HDFS_CLIENT_ID , "bin/hdfs dfs -mkdir /history" )
143
+ sdk_auth .kinit (HDFS_CLIENT_ID , keytab = "hdfs.keytab" , principal = GENERIC_HDFS_USER_PRINCIPAL )
144
+ hdfs_cmd ("mkdir /history" )
145
+ hdfs_cmd ("chmod 1777 /history" )
129
146
130
147
shakedown .install_package (
131
148
package_name = utils .HISTORY_PACKAGE_NAME ,
@@ -161,28 +178,34 @@ def setup_spark(hdfs_with_kerberos, setup_history_server, configure_security_spa
161
178
utils .teardown_spark ()
162
179
163
180
181
+ def _run_terasort_job (terasort_class , app_args , expected_output ):
182
+ jar_url = 'https://downloads.mesosphere.io/spark/examples/spark-terasort-1.1-jar-with-dependencies_2.11.jar'
183
+ submit_args = ["--class" , terasort_class ] + KERBEROS_ARGS
184
+ utils .run_tests (app_url = jar_url ,
185
+ app_args = " " .join (app_args ),
186
+ expected_output = expected_output ,
187
+ args = submit_args )
188
+
189
+
164
190
@pytest .mark .skipif (not utils .hdfs_enabled (), reason = 'HDFS_ENABLED is false' )
165
191
@pytest .mark .sanity
166
192
def test_terasort_suite ():
167
- jar_url = 'https://downloads.mesosphere.io/spark/examples/spark-terasort-1.1-jar-with-dependencies_2.11.jar'
193
+ data_dir = "hdfs:///users/alice"
194
+ terasort_in = "{}/{}" .format (data_dir , "terasort_in" )
195
+ terasort_out = "{}/{}" .format (data_dir , "terasort_out" )
196
+ terasort_validate = "{}/{}" .format (data_dir , "terasort_validate" )
168
197
169
- teragen_args = ["--class" , "com.github.ehiggs.spark.terasort.TeraGen" ] + KERBEROS_ARGS
170
- utils .run_tests (app_url = jar_url ,
171
- app_args = "1g hdfs:///terasort_in" ,
172
- expected_output = "Number of records written" ,
173
- args = teragen_args )
198
+ _run_terasort_job (terasort_class = "com.github.ehiggs.spark.terasort.TeraGen" ,
199
+ app_args = ["1g" , terasort_in ],
200
+ expected_output = "Number of records written" )
174
201
175
- terasort_args = ["--class" , "com.github.ehiggs.spark.terasort.TeraSort" ] + KERBEROS_ARGS
176
- utils .run_tests (app_url = jar_url ,
177
- app_args = "hdfs:///terasort_in hdfs:///terasort_out" ,
178
- expected_output = "" ,
179
- args = terasort_args )
202
+ _run_terasort_job (terasort_class = "com.github.ehiggs.spark.terasort.TeraSort" ,
203
+ app_args = [terasort_in , terasort_out ],
204
+ expected_output = "" )
180
205
181
- teravalidate_args = ["--class" , "com.github.ehiggs.spark.terasort.TeraValidate" ] + KERBEROS_ARGS
182
- utils .run_tests (app_url = jar_url ,
183
- app_args = "hdfs:///terasort_out hdfs:///terasort_validate" ,
184
- expected_output = "partitions are properly sorted" ,
185
- args = teravalidate_args )
206
+ _run_terasort_job (terasort_class = "com.github.ehiggs.spark.terasort.TeraValidate" ,
207
+ app_args = [terasort_out , terasort_validate ],
208
+ expected_output = "partitions are properly sorted" )
186
209
187
210
188
211
@pytest .mark .skipif (not utils .hdfs_enabled (), reason = 'HDFS_ENABLED is false' )
@@ -208,8 +231,9 @@ def has_running_executors():
208
231
"--conf" , "spark.cores.max=8" ,
209
232
"--conf" , "spark.executors.cores=4" ]
210
233
234
+ data_dir = "hdfs:///users/alice"
211
235
driver_id = utils .submit_job (app_url = utils .SPARK_EXAMPLES ,
212
- app_args = "10.0.0.1 9090 hdfs:/// netcheck hdfs:/// outfile" ,
236
+ app_args = "10.0.0.1 9090 {dir}/ netcheck {dir}/ outfile" . format ( dir = data_dir ) ,
213
237
app_name = utils .SPARK_APP_NAME ,
214
238
args = (KERBEROS_ARGS + job_args ))
215
239
log .info ("Started supervised driver {}" .format (driver_id ))
0 commit comments