1+ /*
2+ * Copyright 2020 Google LLC
3+ *
4+ * Licensed under the Apache License, Version 2.0 (the "License");
5+ * you may not use this file except in compliance with the License.
6+ * You may obtain a copy of the License at
7+ *
8+ * http://www.apache.org/licenses/LICENSE-2.0
9+ *
10+ * Unless required by applicable law or agreed to in writing, software
11+ * distributed under the License is distributed on an "AS IS" BASIS,
12+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+ * See the License for the specific language governing permissions and
14+ * limitations under the License.
15+ */
16+
17+ // [START dataproc_submit_hadoop_fs_job]
18+ import com .google .api .gax .longrunning .OperationFuture ;
19+ import com .google .cloud .dataproc .v1 .*;
20+ import com .google .cloud .storage .Blob ;
21+ import com .google .cloud .storage .Storage ;
22+ import com .google .cloud .storage .StorageOptions ;
23+
24+ import java .io .IOException ;
25+ import java .util .ArrayList ;
26+ import java .util .Arrays ;
27+ import java .util .concurrent .ExecutionException ;
28+ import java .util .regex .Matcher ;
29+ import java .util .regex .Pattern ;
30+
31+ public class SubmitHadoopFSJob {
32+
33+ public static ArrayList <String > stringToList (String s ) {
34+ return new ArrayList <>(Arrays .asList (s .split (" " )));
35+ }
36+
37+ public static void submitHadoopFSQuery () throws IOException , InterruptedException {
38+ // TODO(developer): Replace these variables before running the sample.
39+ String projectId = "your-project-id" ;
40+ String region = "your-project-region" ;
41+ String clusterName = "your-cluster-name" ;
42+ String hadoopFSQuery = "your-hadoop-fs-query" ;
43+ submitHadoopFSJob (projectId , region , clusterName , hadoopFSQuery );
44+ }
45+
46+ public static void submitHadoopFSJob (
47+ String projectId , String region , String clusterName , String hadoopFSQuery )
48+ throws IOException , InterruptedException {
49+ String myEndpoint = String .format ("%s-dataproc.googleapis.com:443" , region );
50+
51+ // Configure the settings for the job controller client.
52+ JobControllerSettings jobControllerSettings =
53+ JobControllerSettings .newBuilder ().setEndpoint (myEndpoint ).build ();
54+
55+ // Create a job controller client with the configured settings. Using a try-with-resources closes the client,
56+ // but this can also be done manually with the .close() method.
57+ try (JobControllerClient jobControllerClient =
58+ JobControllerClient .create (jobControllerSettings )) {
59+
60+ // Configure cluster placement for the job.
61+ JobPlacement jobPlacement = JobPlacement .newBuilder ().setClusterName (clusterName ).build ();
62+
63+ // Configure Hadoop job settings. The HadoopFS query is set here.
64+ HadoopJob hadoopJob = HadoopJob .newBuilder ()
65+ .setMainClass ("org.apache.hadoop.fs.FsShell" )
66+ .addAllArgs (stringToList (hadoopFSQuery ))
67+ .build ();
68+
69+ Job job = Job .newBuilder ().setPlacement (jobPlacement ).setHadoopJob (hadoopJob ).build ();
70+
71+ // Submit an asynchronous request to execute the job.
72+ OperationFuture <Job , JobMetadata > submitJobAsOperationAsyncRequest =
73+ jobControllerClient .submitJobAsOperationAsync (projectId , region , job );
74+
75+ Job response = submitJobAsOperationAsyncRequest .get ();
76+
77+ // Print output from Google Cloud Storage
78+ Matcher matches = Pattern .compile ("gs://(.*?)/(.*)" ).matcher (response .getDriverOutputResourceUri ());
79+ matches .matches ();
80+
81+ Storage storage = StorageOptions .getDefaultInstance ().getService ();
82+ Blob blob = storage .get (matches .group (1 ), String .format ("%s.000000000" , matches .group (2 )));
83+
84+ System .out .println (String .format ("Job \" %s\" finished: %s" ,
85+ response .getReference ().getJobId (),
86+ new String (blob .getContent ())));
87+
88+ } catch (ExecutionException e ) {
89+ System .err .println (String .format ("submitHadoopFSJob: %s " , e .getMessage ()));
90+ }
91+ }
92+ }
93+ // [END dataproc_submit_hadoop_fs_job]
0 commit comments