Skip to content

Commit

Permalink
docs: submit job sample + sample maintenance (#449)
Browse files Browse the repository at this point in the history
* adding submit-job sample, updating quickstart sample, updating docstrings

* prettier lint fixes

* prettier lint fixes

Co-authored-by: Benjamin E. Coe <bencoe@google.com>
  • Loading branch information
bradmiro and bcoe authored Nov 30, 2020
1 parent 77fc997 commit 0a1dda8
Showing 1 changed file with 17 additions and 53 deletions.
70 changes: 17 additions & 53 deletions packages/google-cloud-dataproc/samples/quickstart.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
// usage: node quickstart.js <PROJECT_ID> <REGION> <CLUSTER_NAME> <JOB_FILE_PATH>

// [START dataproc_quickstart]
// This quickstart sample walks a user through creating a Cloud Dataproc
// This quickstart sample walks a user through creating a Dataproc
// cluster, submitting a PySpark job from Google Cloud Storage to the
// cluster, reading the output of the job and deleting the cluster, all
// using the Node.js client library.
Expand Down Expand Up @@ -80,64 +80,33 @@ function main(projectId, region, clusterName, jobFilePath) {
},
};

let [jobResp] = await jobClient.submitJob(job);
const jobId = jobResp.reference.jobId;
const [jobOperation] = await jobClient.submitJobAsOperation(job);
const [jobResponse] = await jobOperation.promise();

console.log(`Submitted job "${jobId}".`);
const matches = jobResponse.driverOutputResourceUri.match(
'gs://(.*?)/(.*)'
);

// Terminal states for a job
const terminalStates = new Set(['DONE', 'ERROR', 'CANCELLED']);
const storage = new Storage();

// Create a timeout such that the job gets cancelled if not
// in a termimal state after a fixed period of time.
const timeout = 600000;
const start = new Date();
const output = await storage
.bucket(matches[1])
.file(`${matches[2]}.000000000`)
.download();

// Wait for the job to finish.
const jobReq = {
projectId: projectId,
region: region,
jobId: jobId,
};
// Output a success message.
console.log(`Job finished successfully: ${output}`);

while (!terminalStates.has(jobResp.status.state)) {
if (new Date() - timeout > start) {
await jobClient.cancelJob(jobReq);
console.log(
`Job ${jobId} timed out after threshold of ` +
`${timeout / 60000} minutes.`
);
break;
}
await sleep(1);
[jobResp] = await jobClient.getJob(jobReq);
}

const clusterReq = {
// Delete the cluster once the job has terminated.
const deleteClusterReq = {
projectId: projectId,
region: region,
clusterName: clusterName,
};

const [clusterResp] = await clusterClient.getCluster(clusterReq);

const storage = new Storage();

const output = await storage
.bucket(clusterResp.config.configBucket)
.file(
`google-cloud-dataproc-metainfo/${clusterResp.clusterUuid}/` +
`jobs/${jobId}/driveroutput.000000000`
)
.download();

// Output a success message.
console.log(
`Job ${jobId} finished with state ${jobResp.status.state}:\n${output}`
const [deleteOperation] = await clusterClient.deleteCluster(
deleteClusterReq
);

// Delete the cluster once the job has terminated.
const [deleteOperation] = await clusterClient.deleteCluster(clusterReq);
await deleteOperation.promise();

// Output a success message
Expand All @@ -147,11 +116,6 @@ function main(projectId, region, clusterName, jobFilePath) {
quickstart();
}

// Helper function to sleep for the given number of seconds
function sleep(seconds) {
return new Promise(resolve => setTimeout(resolve, seconds * 1000));
}

const args = process.argv.slice(2);

if (args.length !== 4) {
Expand Down

0 comments on commit 0a1dda8

Please sign in to comment.