diff --git a/bin/cml/runner.js b/bin/cml/runner.js index 6cfbcdab8..874f01186 100755 --- a/bin/cml/runner.js +++ b/bin/cml/runner.js @@ -11,7 +11,6 @@ const tf = require('../../src/terraform'); let cml; let RUNNER; -let RUNNER_ID; let RUNNER_JOBS_RUNNING = []; let RUNNER_SHUTTING_DOWN = false; let RUNNER_TIMER = 0; @@ -48,11 +47,6 @@ const shutdown = async (opts) => { const retryWorkflows = async () => { try { if (!noRetry) { - if (cml.driver === 'github') { - const job = await cml.runnerJob({ runnerId: RUNNER_ID }); - if (job) RUNNER_JOBS_RUNNING = [job]; - } - if (RUNNER_JOBS_RUNNING.length > 0) { await Promise.all( RUNNER_JOBS_RUNNING.map( @@ -80,7 +74,7 @@ const shutdown = async (opts) => { }; if (error) { - winston.error(error, { reason, status: 'terminated' }); + winston.error(error, { status: 'terminated' }); } else { winston.info('runner status', { reason, status: 'terminated' }); } @@ -128,6 +122,8 @@ const runCloud = async (opts) => { workdir } = opts; + await tf.checkMinVersion(); + if (gpu === 'tesla') winston.warn( 'GPU model "tesla" has been deprecated; please use "v100" instead.' @@ -163,6 +159,7 @@ const runCloud = async (opts) => { }); await fs.writeFile(tfMainPath, tpl); + await tf.init({ dir: tfPath }); await tf.apply({ dir: tfPath }); @@ -211,24 +208,54 @@ const runCloud = async (opts) => { const runLocal = async (opts) => { winston.info(`Launching ${cml.driver} runner`); - const { workdir, name, labels, single, idleTimeout, noRetry, dockerVolumes } = - opts; + const { + workdir, + name, + labels, + single, + idleTimeout, + noRetry, + dockerVolumes, + tfResource, + tpiVersion + } = opts; + + if (tfResource) { + await tf.checkMinVersion(); + + const tfPath = workdir; + await fs.mkdir(tfPath, { recursive: true }); + const tfMainPath = join(tfPath, 'main.tf'); + const tpl = tf.iterativeProviderTpl({ tpiVersion }); + await fs.writeFile(tfMainPath, tpl); + + await tf.init({ dir: tfPath }); + await tf.apply({ dir: tfPath }); + + const path = join(tfPath, 'terraform.tfstate'); + const tfstate = await tf.loadTfState({ path }); + tfstate.resources = [ + JSON.parse(Buffer.from(tfResource, 'base64').toString('utf-8')) + ]; + await tf.saveTfState({ tfstate, path }); + } const dataHandler = async (data) => { - const log = cml.parseRunnerLog({ data }); - log && winston.info('runner status', log); - - if (log && log.status === 'job_started') { - RUNNER_JOBS_RUNNING.push({ id: log.job, date: log.date }); - } else if (log && log.status === 'job_ended') { - const { job: jobId } = log; - RUNNER_JOBS_RUNNING = RUNNER_JOBS_RUNNING.filter( - (job) => job.id !== jobId - ); - RUNNER_TIMER = 0; + const logs = await cml.parseRunnerLog({ data }); + for (const log of logs) { + winston.info('runner status', log); + + if (log.status === 'job_started') { + RUNNER_JOBS_RUNNING.push({ id: log.job, date: log.date }); + } - if (single && cml.driver === 'bitbucket') { - await shutdown({ ...opts, reason: 'single job' }); + if (log.status === 'job_ended') { + const { job: jobId } = log; + RUNNER_JOBS_RUNNING = RUNNER_JOBS_RUNNING.filter( + (job) => job.id !== jobId + ); + + if (single) await shutdown({ ...opts, reason: 'single job' }); } } }; @@ -245,63 +272,25 @@ const runLocal = async (opts) => { proc.stderr.on('data', dataHandler); proc.stdout.on('data', dataHandler); proc.on('disconnect', () => - shutdown({ ...opts, reason: `runner disconnected` }) - ); - proc.on('close', (exit) => - shutdown({ ...opts, reason: `runner closed with exit code ${exit}` }) + shutdown({ ...opts, error: new Error('runner proccess lost') }) ); + proc.on('close', (exit) => { + const reason = `runner closed with exit code ${exit}`; + if (exit === 0) shutdown({ ...opts, reason }); + else shutdown({ ...opts, error: new Error(reason) }); + }); RUNNER = proc; - ({ id: RUNNER_ID } = await cml.runnerByName({ name })); - if (idleTimeout > 0) { const watcher = setInterval(async () => { - let idle = RUNNER_JOBS_RUNNING.length === 0; + const idle = RUNNER_JOBS_RUNNING.length === 0; if (RUNNER_TIMER >= idleTimeout) { - try { - if (cml.driver === 'github') { - const job = await cml.runnerJob({ runnerId: RUNNER_ID }); - - if (!job && !idle) { - winston.error( - `Runner is idle as per the GitHub API but busy as per CML internal state. Resetting jobs. Retrying in ${idleTimeout} seconds...` - ); - winston.warn( - `CML GitHub driver response: ${JSON.stringify(job)}` - ); - winston.warn( - `CML internal state: ${JSON.stringify(RUNNER_JOBS_RUNNING)}` - ); - - RUNNER_JOBS_RUNNING = []; - } - - if (job && idle) { - winston.error( - `Runner is busy as per the GitHub API but idle as per CML internal state. Retrying in ${idleTimeout} seconds...` - ); - - idle = false; - } - } - } catch (err) { - winston.error( - `Error connecting the SCM: ${err.message}. Will try again in ${idleTimeout} secs` - ); - - idle = false; - } - - if (idle) { - shutdown({ ...opts, reason: `timeout:${idleTimeout}` }); - clearInterval(watcher); - } else { - RUNNER_TIMER = 0; - } + shutdown({ ...opts, reason: `timeout:${idleTimeout}` }); + clearInterval(watcher); } - RUNNER_TIMER++; + RUNNER_TIMER = idle ? RUNNER_TIMER + 1 : 0; }, 1000); } @@ -333,23 +322,26 @@ const runLocal = async (opts) => { }; const run = async (opts) => { + process.on('unhandledRejection', (reason) => + shutdown({ ...opts, error: new Error(reason) }) + ); + process.on('uncaughtException', (error) => shutdown({ ...opts, error })); + ['SIGTERM', 'SIGINT', 'SIGQUIT'].forEach((signal) => { process.on(signal, () => shutdown({ ...opts, reason: signal })); }); opts.workdir = opts.workdir || `${homedir()}/.cml/${opts.name}`; const { - tpiVersion, driver, repo, token, + workdir, cloud, labels, name, reuse, - dockerVolumes, - tfResource, - workdir + dockerVolumes } = opts; cml = new CML({ driver, repo, token }); @@ -359,27 +351,8 @@ const run = async (opts) => { if (dockerVolumes.length && cml.driver !== 'gitlab') winston.warn('Parameters --docker-volumes is only supported in gitlab'); - if (cloud || tfResource) await tf.checkMinVersion(); - - // prepare tf - if (tfResource) { - const tfPath = workdir; - - await fs.mkdir(tfPath, { recursive: true }); - const tfMainPath = join(tfPath, 'main.tf'); - const tpl = tf.iterativeProviderTpl({ tpiVersion }); - await fs.writeFile(tfMainPath, tpl); - await tf.init({ dir: tfPath }); - await tf.apply({ dir: tfPath }); - const path = join(tfPath, 'terraform.tfstate'); - const tfstate = await tf.loadTfState({ path }); - tfstate.resources = [ - JSON.parse(Buffer.from(tfResource, 'base64').toString('utf-8')) - ]; - await tf.saveTfState({ tfstate, path }); - } - const runners = await cml.runners(); + const runner = await cml.runnerByName({ name, runners }); if (runner) { if (!reuse) @@ -402,13 +375,9 @@ const run = async (opts) => { process.exit(0); } - try { - winston.info(`Preparing workdir ${workdir}...`); - await fs.mkdir(workdir, { recursive: true }); - await fs.chmod(workdir, '766'); - } catch (err) { - winston.warn(err.message); - } + winston.info(`Preparing workdir ${workdir}...`); + await fs.mkdir(workdir, { recursive: true }); + await fs.chmod(workdir, '766'); if (cloud) await runCloud(opts); else await runLocal(opts); @@ -433,17 +402,21 @@ exports.handler = async (opts) => { exports.builder = (yargs) => yargs.env('CML_RUNNER').options( kebabcaseKeys({ - tpiVersion: { + driver: { type: 'string', - default: '>= 0.9.10', + choices: ['github', 'gitlab', 'bitbucket'], description: - 'Pin the iterative/iterative terraform provider to a specific version. i.e. "= 0.10.4" See: https://www.terraform.io/language/expressions/version-constraints', - hidden: true + 'Platform where the repository is hosted. If not specified, it will be inferred from the environment' }, - dockerVolumes: { - type: 'array', - default: [], - description: 'Docker volumes. This feature is only supported in GitLab' + repo: { + type: 'string', + description: + 'Repository to be used for registering the runner. If not specified, it will be inferred from the environment' + }, + token: { + type: 'string', + description: + 'Personal access token to register a self-hosted runner on the repository. If not specified, it will be inferred from the environment' }, labels: { type: 'string', @@ -479,21 +452,16 @@ exports.builder = (yargs) => description: "Don't launch a new runner if an existing one has the same name or overlapping labels" }, - driver: { - type: 'string', - choices: ['github', 'gitlab', 'bitbucket'], - description: - 'Platform where the repository is hosted. If not specified, it will be inferred from the environment' - }, - repo: { + workdir: { type: 'string', - description: - 'Repository to be used for registering the runner. If not specified, it will be inferred from the environment' + hidden: true, + alias: 'path', + description: 'Runner working directory' }, - token: { - type: 'string', - description: - 'Personal access token to register a self-hosted runner on the repository. If not specified, it will be inferred from the environment' + dockerVolumes: { + type: 'array', + default: [], + description: 'Docker volumes. This feature is only supported in GitLab' }, cloud: { type: 'string', @@ -573,6 +541,13 @@ exports.builder = (yargs) => description: 'Specifies the subnet to use within AWS', alias: 'cloud-aws-subnet-id' }, + tpiVersion: { + type: 'string', + default: '>= 0.9.10', + description: + 'Pin the iterative/iterative terraform provider to a specific version. i.e. "= 0.10.4" See: https://www.terraform.io/language/expressions/version-constraints', + hidden: true + }, cmlVersion: { type: 'string', default: require('../../package.json').version, @@ -589,12 +564,6 @@ exports.builder = (yargs) => hidden: true, description: 'Seconds to wait for collecting logs on failure (https://github.com/iterative/cml/issues/413)' - }, - workdir: { - type: 'string', - hidden: true, - alias: 'path', - description: 'Runner working directory' } }) ); diff --git a/bin/cml/runner.test.js b/bin/cml/runner.test.js index bc2876723..b09d10d6c 100644 --- a/bin/cml/runner.test.js +++ b/bin/cml/runner.test.js @@ -63,9 +63,19 @@ describe('CML e2e', () => { --version Show version number [boolean] --log Maximum log level [string] [choices: \\"error\\", \\"warn\\", \\"info\\", \\"debug\\"] [default: \\"info\\"] - --docker-volumes Docker volumes. This feature is only - supported in GitLab - [array] [default: []] + --driver Platform where the repository is + hosted. If not specified, it will be + inferred from the environment + [string] [choices: \\"github\\", \\"gitlab\\", \\"bitbucket\\"] + --repo Repository to be used for + registering the runner. If not + specified, it will be inferred from + the environment [string] + --token Personal access token to register a + self-hosted runner on the + repository. If not specified, it + will be inferred from the + environment [string] --labels One or more user-defined labels for this runner (delimited with commas) [string] [default: \\"cml\\"] @@ -84,19 +94,9 @@ describe('CML e2e', () => { --reuse Don't launch a new runner if an existing one has the same name or overlapping labels [boolean] - --driver Platform where the repository is - hosted. If not specified, it will be - inferred from the environment - [string] [choices: \\"github\\", \\"gitlab\\", \\"bitbucket\\"] - --repo Repository to be used for - registering the runner. If not - specified, it will be inferred from - the environment [string] - --token Personal access token to register a - self-hosted runner on the - repository. If not specified, it - will be inferred from the - environment [string] + --docker-volumes Docker volumes. This feature is only + supported in GitLab + [array] [default: []] --cloud Cloud to deploy the runner [string] [choices: \\"aws\\", \\"azure\\", \\"gcp\\", \\"kubernetes\\"] --cloud-region Region where the instance is diff --git a/src/cml.js b/src/cml.js index b42fea5f1..1bd96bd39 100755 --- a/src/cml.js +++ b/src/cml.js @@ -257,77 +257,38 @@ class CML { return await getDriver(this).runnerToken(); } - parseRunnerLog(opts = {}) { + async parseRunnerLog(opts = {}) { let { data } = opts; - if (!data) return; + if (!data) return []; - const date = new Date(); + data = data.toString('utf8'); - try { - data = data.toString('utf8'); - - let log = { - level: 'info', - date: date.toISOString(), - repo: this.repo - }; - - if (this.driver === GITHUB) { - const id = 'gh'; - if (data.includes('Running job')) { - log.job = id; - log.status = 'job_started'; - } else if (data.includes('completed with result')) { - log.job = id; - log.status = 'job_ended'; - log.success = data.includes('Succeeded'); - } else if (data.includes('Listening for Jobs')) { - log.status = 'ready'; - } + const logs = []; + const patterns = await getDriver(this).runnerLogStatusPatterns(); + for (const [status, pattern] of Object.entries(patterns)) { + const regex = new RegExp(pattern); + if (regex.test(data)) { + const date = new Date(); + const log = { + job: 'dummy', + status, + date: date.toISOString(), + repo: this.repo + }; - const [, message] = data.split(/[A-Z]:\s/); - return { ...log, message: (message || data).replace(/\n/g, '') }; - } + if (status === 'job_ended') log.success = false; - if (this.driver === GITLAB) { - const { msg, job, duration_s: duration } = JSON.parse(data); - log = { ...log, job }; - - if (msg.endsWith('received')) { - log.status = 'job_started'; - } else if (duration) { - log.status = 'job_ended'; - log.success = msg.includes('Job succeeded'); - } else if (msg.includes('Starting runner for')) { - log.status = 'ready'; - } - return log; - } + log.level = log.success ? 'info' : 'error'; - if (this.driver === BB) { - const id = 'bb'; - if (data.includes('Getting step StepId{accountUuid={')) { - log.job = id; - log.status = 'job_started'; - } else if ( - data.includes('Completing step with result Result{status=') - ) { - log.job = id; - log.status = 'job_ended'; - log.success = data.includes('status=PASSED'); - } else if (data.includes('Updating runner status to "ONLINE"')) { - log.status = 'ready'; + if (status === 'job_ended_succeded') { + logs[logs.length - 1].success = true; + } else { + logs.push(log); } - - log.level = log.success ? 'info' : 'error'; - return log.status ? log : null; } - } catch (err) { - winston.warn(`Failed parsing log: ${err.message}`); - winston.warn( - `Original log bytes, as Base64: ${Buffer.from(data).toString('base64')}` - ); } + + return logs; } async startRunner(opts = {}) { diff --git a/src/cml.test.js b/src/cml.test.js index 6a2bc7fd5..ce6285f31 100644 --- a/src/cml.test.js +++ b/src/cml.test.js @@ -79,23 +79,24 @@ describe('Github tests', () => { test('Runner logs', async () => { const cml = new CML(); cml.driver = 'github'; - let log = cml.parseRunnerLog({ data: 'Listening for Jobs' }); - expect(log.status).toBe('ready'); - - log = cml.parseRunnerLog({ data: 'Running job' }); - expect(log.status).toBe('job_started'); - - log = cml.parseRunnerLog({ data: 'completed with result: Succeeded' }); - expect(log.status).toBe('job_ended'); - expect(log.success).toBe(true); - - log = cml.parseRunnerLog({ data: 'completed with result: Failed' }); - expect(log.status).toBe('job_ended'); - expect(log.success).toBe(false); - - log = cml.parseRunnerLog({ data: 'completed with result: Canceled' }); - expect(log.status).toBe('job_ended'); - expect(log.success).toBe(false); + let logs = await cml.parseRunnerLog({ + data: ` +2022-06-05 16:25:56Z: Listening for Jobs +2022-06-05 16:26:35Z: Running job: train +2022-06-05 16:28:03Z: Job train completed with result: Failed +` + }); + expect(logs.length).toBe(3); + expect(logs[0].status).toBe('ready'); + expect(logs[1].status).toBe('job_started'); + expect(logs[2].status).toBe('job_ended'); + expect(logs[2].success).toBe(false); + + logs = await cml.parseRunnerLog({ + data: '2022-06-05 16:28:03Z: Job train completed with result: Succeeded' + }); + expect(logs[0].status).toBe('job_ended'); + expect(logs[0].success).toBe(true); }); }); @@ -222,26 +223,63 @@ describe('Gitlab tests', () => { test('Runner logs', async () => { const cml = new CML(); cml.driver = 'gitlab'; - let log = await cml.parseRunnerLog({ - data: '{"level":"info","msg":"Starting runner for https://gitlab.com with token 2SGFrnGt ...","time":"2021-07-02T16:45:05Z"}' - }); - expect(log.status).toBe('ready'); - - log = cml.parseRunnerLog({ - data: '{"job":1396213069,"level":"info","msg":"Checking for jobs... received","repo_url":"https://gitlab.com/iterative.ai/fashion_mnist.git","runner":"2SGFrnGt","time":"2021-07-02T16:45:47Z"}' + let logs = await cml.parseRunnerLog({ + data: ` +{"level":"info","msg":"Starting runner for https://gitlab.com with token 2SGFrnGt ...","time":"2021-07-02T16:45:05Z"} +{"job":1396213069,"level":"info","msg":"Checking for jobs... received","repo_url":"https://gitlab.com/iterative.ai/fashion_mnist.git","runner":"2SGFrnGt","time":"2021-07-02T16:45:47Z"} +{"duration_s":120.0120526,"job":1396213069,"level":"warning","msg":"Job failed: execution took longer than 2m0s seconds","project":27856642,"runner":"2SGFrnGt","time":"2021-07-02T16:47:47Z"} +` }); - expect(log.status).toBe('job_started'); + expect(logs.length).toBe(3); + expect(logs[0].status).toBe('ready'); + expect(logs[1].status).toBe('job_started'); + expect(logs[2].status).toBe('job_ended'); + expect(logs[2].success).toBe(false); - log = cml.parseRunnerLog({ + logs = await cml.parseRunnerLog({ data: '{"duration_s":7.706165838,"job":2177867438,"level":"info","msg":"Job succeeded","project":27939020,"runner":"fe36krFK","time":"2022-03-08T18:12:57+01:00"}' }); - expect(log.status).toBe('job_ended'); - expect(log.success).toBe(true); + expect(logs[0].status).toBe('job_ended'); + expect(logs[0].success).toBe(true); + }); +}); - log = cml.parseRunnerLog({ - data: '{"duration_s":120.0120526,"job":1396213069,"level":"warning","msg":"Job failed: execution took longer than 2m0s seconds","project":27856642,"runner":"2SGFrnGt","time":"2021-07-02T16:47:47Z"}' +describe('Bitbucket tests', () => { + const OLD_ENV = process.env; + + const { TEST_BITBUCKET_TOKEN: TOKEN = 'DUMMY' } = process.env; + + beforeEach(() => { + jest.resetModules(); + + process.env = {}; + process.env.REPO_TOKEN = TOKEN; + }); + + afterAll(() => { + process.env = OLD_ENV; + }); + + test('Runner logs', async () => { + const cml = new CML(); + cml.driver = 'bitbucket'; + let logs = await cml.parseRunnerLog({ + data: ` +[2022-06-05 17:23:41,945] Updating runner status to "ONLINE" and checking for new steps assigned to the runner after 0 seconds and then every 30 seconds. +[2022-06-05 17:24:12,246] Getting step StepId{accountUuid={XXXXX-XXX-XXX-XXXXXXXX}, repositoryUuid={XXXXX-XXX-XXX-XXXXXXXX}, pipelineUuid={XXXXX-XXX-XXX-XXXXXXXX}, stepUuid={XXXXX-XXX-XXX-XXXXXXXX}}. +[2022-06-05 17:24:53,466] Completing step with result Result{status=FAILED, error=None}. +` + }); + expect(logs.length).toBe(3); + expect(logs[0].status).toBe('ready'); + expect(logs[1].status).toBe('job_started'); + expect(logs[2].status).toBe('job_ended'); + expect(logs[2].success).toBe(false); + + logs = await cml.parseRunnerLog({ + data: '[2022-06-05 17:24:53,466] Completing step with result Result{status=PASSED, error=None}.' }); - expect(log.status).toBe('job_ended'); - expect(log.success).toBe(false); + expect(logs[0].status).toBe('job_ended'); + expect(logs[0].success).toBe(true); }); }); diff --git a/src/drivers/bitbucket_cloud.js b/src/drivers/bitbucket_cloud.js index a56b436fd..68f30d01f 100644 --- a/src/drivers/bitbucket_cloud.js +++ b/src/drivers/bitbucket_cloud.js @@ -266,6 +266,15 @@ class BitbucketCloud { return href; } + runnerLogStatusPatterns() { + return { + ready: /Updating runner status to "ONLINE"/, + job_started: /Getting step StepId/, + job_ended: /Completing step with result/, + job_ended_succeded: /Completing step with result Result{status=PASSED/ + }; + } + async prAutoMerge({ pullRequestId, mergeMode, mergeMessage }) { winston.warn( 'Auto-merge is unsupported by Bitbucket Cloud; see https://jira.atlassian.com/browse/BCLOUD-14286. Trying to merge immediately...' diff --git a/src/drivers/github.js b/src/drivers/github.js index 0cf8376d8..7f4fb2dc9 100644 --- a/src/drivers/github.js +++ b/src/drivers/github.js @@ -329,6 +329,15 @@ class Github { return this.parseRunner(runner); } + runnerLogStatusPatterns() { + return { + ready: /Listening for Jobs/, + job_started: /Running job/, + job_ended: /completed with result/, + job_ended_succeded: /completed with result: Succeeded/ + }; + } + parseRunner(runner) { const { id, name, busy, status, labels } = runner; return { diff --git a/src/drivers/gitlab.js b/src/drivers/gitlab.js index 3d522f3a3..e816fcbeb 100644 --- a/src/drivers/gitlab.js +++ b/src/drivers/gitlab.js @@ -252,6 +252,15 @@ class Gitlab { }; } + runnerLogStatusPatterns() { + return { + ready: /Starting runner for/, + job_started: /"job":.+received/, + job_ended: /"duration_s":/, + job_ended_succeded: /"duration_s":.+Job succeeded/ + }; + } + async prCreate(opts = {}) { const projectPath = await this.projectPath(); const { source, target, title, description, autoMerge } = opts;