2323import static com .google .common .base .Preconditions .checkState ;
2424
2525import com .google .api .client .googleapis .json .GoogleJsonResponseException ;
26+ import com .google .api .services .clouddebugger .v2 .Clouddebugger ;
27+ import com .google .api .services .clouddebugger .v2 .model .Debuggee ;
28+ import com .google .api .services .clouddebugger .v2 .model .RegisterDebuggeeRequest ;
29+ import com .google .api .services .clouddebugger .v2 .model .RegisterDebuggeeResponse ;
2630import com .google .api .services .dataflow .Dataflow ;
2731import com .google .api .services .dataflow .model .DataflowPackage ;
2832import com .google .api .services .dataflow .model .Job ;
@@ -420,6 +424,43 @@ private <T> PCollection<T> applyWindow(
420424 return super .apply (new AssignWindows <>(transform ), input );
421425 }
422426
427+ private void maybeRegisterDebuggee (DataflowPipelineOptions options , String uniquifier ) {
428+ if (!options .getEnableCloudDebugger ()) {
429+ return ;
430+ }
431+
432+ if (options .getDebuggee () != null ) {
433+ throw new RuntimeException ("Should not specify the debuggee" );
434+ }
435+
436+ Clouddebugger debuggerClient = Transport .newClouddebuggerClient (options ).build ();
437+ Debuggee debuggee = registerDebuggee (debuggerClient , uniquifier );
438+ options .setDebuggee (debuggee );
439+ }
440+
441+ private Debuggee registerDebuggee (Clouddebugger debuggerClient , String uniquifier ) {
442+ RegisterDebuggeeRequest registerReq = new RegisterDebuggeeRequest ();
443+ registerReq .setDebuggee (new Debuggee ()
444+ .setProject (options .getProject ())
445+ .setUniquifier (uniquifier )
446+ .setDescription (uniquifier )
447+ .setAgentVersion ("google.com/cloud-dataflow-java/v1" ));
448+
449+ try {
450+ RegisterDebuggeeResponse registerResponse =
451+ debuggerClient .controller ().debuggees ().register (registerReq ).execute ();
452+ Debuggee debuggee = registerResponse .getDebuggee ();
453+ if (debuggee .getStatus () != null && debuggee .getStatus ().getIsError ()) {
454+ throw new RuntimeException ("Unable to register with the debugger: " +
455+ debuggee .getStatus ().getDescription ().getFormat ());
456+ }
457+
458+ return debuggee ;
459+ } catch (IOException e ) {
460+ throw new RuntimeException ("Unable to register with the debugger: " , e );
461+ }
462+ }
463+
423464 @ Override
424465 public DataflowPipelineJob run (Pipeline pipeline ) {
425466 logWarningIfPCollectionViewHasNonDeterministicKeyCoder (pipeline );
@@ -428,9 +469,7 @@ public DataflowPipelineJob run(Pipeline pipeline) {
428469 + "related to Google Compute Engine usage and other Google Cloud Services." );
429470
430471 List <DataflowPackage > packages = options .getStager ().stageFiles ();
431- JobSpecification jobSpecification =
432- translator .translate (pipeline , this , packages );
433- Job newJob = jobSpecification .getJob ();
472+
434473
435474 // Set a unique client_request_id in the CreateJob request.
436475 // This is used to ensure idempotence of job creation across retried
@@ -442,6 +481,15 @@ public DataflowPipelineJob run(Pipeline pipeline) {
442481 int randomNum = new Random ().nextInt (9000 ) + 1000 ;
443482 String requestId = DateTimeFormat .forPattern ("YYYYMMddHHmmssmmm" ).withZone (DateTimeZone .UTC )
444483 .print (DateTimeUtils .currentTimeMillis ()) + "_" + randomNum ;
484+
485+ // Try to create a debuggee ID. This must happen before the job is translated since it may
486+ // update the options.
487+ DataflowPipelineOptions dataflowOptions = options .as (DataflowPipelineOptions .class );
488+ maybeRegisterDebuggee (dataflowOptions , requestId );
489+
490+ JobSpecification jobSpecification =
491+ translator .translate (pipeline , this , packages );
492+ Job newJob = jobSpecification .getJob ();
445493 newJob .setClientRequestId (requestId );
446494
447495 String version = DataflowReleaseInfo .getReleaseInfo ().getVersion ();
@@ -450,7 +498,6 @@ public DataflowPipelineJob run(Pipeline pipeline) {
450498 newJob .getEnvironment ().setUserAgent (DataflowReleaseInfo .getReleaseInfo ());
451499 // The Dataflow Service may write to the temporary directory directly, so
452500 // must be verified.
453- DataflowPipelineOptions dataflowOptions = options .as (DataflowPipelineOptions .class );
454501 if (!Strings .isNullOrEmpty (options .getTempLocation ())) {
455502 newJob .getEnvironment ().setTempStoragePrefix (
456503 dataflowOptions .getPathValidator ().verifyPath (options .getTempLocation ()));
0 commit comments