From 7f77e8bc7f6b9ddde8fc270824fcc0dc88c5c8ec Mon Sep 17 00:00:00 2001 From: haibingzhao Date: Thu, 6 Aug 2020 16:06:55 +0800 Subject: [PATCH] feat(frontend): support tensorboard viewer and other visualize Results using volume mount path. Part of #4208 (#4236) * support local file storage type for local volume mount path, refer: https://github.com/kubeflow/pipelines/issues/4208 * add todo comment to support directory and filePath include wildcards '*', detail refer: https://github.com/kubeflow/pipelines/issues/4208 * revert old code indent * run 'npm run format' to format code * support tensorboard viewer and other visualize Results using volume mount path, modify 'file' schema to 'volume': 1. source schema: volume://volume-name/relative/path/from/volume/xxx.csv 2. for tensorboard(also support Series1:volume://volume-name/path_to_model_dir_1,Series2:volume://volume-name/path_to_model_dir_2): * check volume-name was specified in podTemplateSpec( which was inject by VIEWER_TENSORBOARD_POD_TEMPLATE_SPEC_PATH env) * check /relative/path/from/volume/xxx file path was prefix-mounted in podTemplateSpec 3. for others: * check volume-name was specified at ml-pipeline-ui pod * check /relative/path/from/volume/xxx.csv file path exist * fix test and add more tests * change error message not found to not exist. * fix tensorboard create test * combining volume mount path and key as artifacts path * extra complex code to a function and add more test * use ml-pipeline-ui container name to find server container instead of use containers[0] * fix review suggestion: https://github.com/kubeflow/pipelines/pull/4236 * format code * extract how to find file path on a pod volume to a common function, and optimize error message * fix k8s-helper.test error * add more documentation and fix mistake: volumeMountPath to filePathInVolume * fix test error * Update k8s-helper.test.ts * format error message Co-authored-by: Yuan (Bob) Gong --- frontend/server/app.test.ts | 554 ++++++++++++++++++++++++ frontend/server/handlers/artifacts.ts | 65 ++- frontend/server/helpers/server-info.ts | 55 +++ frontend/server/k8s-helper.test.ts | 167 +++++++ frontend/server/k8s-helper.ts | 50 ++- frontend/server/utils.test.ts | 171 +++++++- frontend/server/utils.ts | 109 +++++ frontend/src/lib/WorkflowParser.test.ts | 16 + frontend/src/lib/WorkflowParser.ts | 8 + 9 files changed, 1183 insertions(+), 12 deletions(-) create mode 100644 frontend/server/helpers/server-info.ts create mode 100644 frontend/server/k8s-helper.test.ts diff --git a/frontend/server/app.test.ts b/frontend/server/app.test.ts index 1c9997d19cf..0a48a8e2ce0 100644 --- a/frontend/server/app.test.ts +++ b/frontend/server/app.test.ts @@ -23,8 +23,12 @@ import { UIServer } from './app'; import { loadConfigs } from './configs'; import * as minioHelper from './minio-helper'; import { TEST_ONLY as K8S_TEST_EXPORT } from './k8s-helper'; +import * as serverInfo from './helpers/server-info'; import { Server } from 'http'; import { commonSetup } from './integration-tests/test-helper'; +import * as fs from 'fs'; +import * as path from 'path'; +import * as os from 'os'; jest.mock('minio'); jest.mock('node-fetch'); @@ -414,6 +418,230 @@ describe('UIServer apis', () => { .get('/artifacts/get?source=gcs&bucket=ml-pipeline&key=hello%2Fworld.txt&peek=5') .expect(200, artifactContent.slice(0, 5), done); }); + + it('responds with a volume artifact if source=volume', done => { + const artifactContent = 'hello world'; + const tempPath = path.join(fs.mkdtempSync(os.tmpdir()), 'content'); + fs.writeFileSync(tempPath, artifactContent); + + jest.spyOn(serverInfo, 'getHostPod').mockImplementation(() => + Promise.resolve([ + { + spec: { + containers: [ + { + volumeMounts: [ + { + name: 'artifact', + mountPath: path.dirname(tempPath), + subPath: 'subartifact', + }, + ], + name: 'ml-pipeline-ui', + }, + ], + volumes: [ + { + name: 'artifact', + persistentVolumeClaim: { + claimName: 'artifact_pvc', + }, + }, + ], + }, + } as any, + undefined, + ]), + ); + + const configs = loadConfigs(argv, {}); + app = new UIServer(configs); + + const request = requests(app.start()); + request + .get('/artifacts/get?source=volume&bucket=artifact&key=subartifact/content') + .expect(200, artifactContent, done); + }); + + it('responds with a partial volume artifact if peek=5 is set', done => { + const artifactContent = 'hello world'; + const tempPath = path.join(fs.mkdtempSync(os.tmpdir()), 'content'); + fs.writeFileSync(tempPath, artifactContent); + + jest.spyOn(serverInfo, 'getHostPod').mockImplementation(() => + Promise.resolve([ + { + spec: { + containers: [ + { + volumeMounts: [ + { + name: 'artifact', + mountPath: path.dirname(tempPath), + }, + ], + name: 'ml-pipeline-ui', + }, + ], + volumes: [ + { + name: 'artifact', + persistentVolumeClaim: { + claimName: 'artifact_pvc', + }, + }, + ], + }, + } as any, + undefined, + ]), + ); + + const configs = loadConfigs(argv, {}); + app = new UIServer(configs); + + const request = requests(app.start()); + request + .get(`/artifacts/get?source=volume&bucket=artifact&key=content&peek=5`) + .expect(200, artifactContent.slice(0, 5), done); + }); + + it('responds error with a not exist volume', done => { + jest.spyOn(serverInfo, 'getHostPod').mockImplementation(() => + Promise.resolve([ + { + metadata: { + name: 'ml-pipeline-ui', + }, + spec: { + containers: [ + { + volumeMounts: [ + { + name: 'artifact', + mountPath: '/foo/bar/path', + }, + ], + name: 'ml-pipeline-ui', + }, + ], + volumes: [ + { + name: 'artifact', + persistentVolumeClaim: { + claimName: 'artifact_pvc', + }, + }, + ], + }, + } as any, + undefined, + ]), + ); + + const configs = loadConfigs(argv, {}); + app = new UIServer(configs); + + const request = requests(app.start()); + request + .get(`/artifacts/get?source=volume&bucket=notexist&key=content`) + .expect( + 404, + 'Failed to open volume://notexist/content, Cannot find file "volume://notexist/content" in pod "ml-pipeline-ui": volume "notexist" not configured', + done, + ); + }); + + it('responds error with a not exist volume mount path if source=volume', done => { + jest.spyOn(serverInfo, 'getHostPod').mockImplementation(() => + Promise.resolve([ + { + metadata: { + name: 'ml-pipeline-ui', + }, + spec: { + containers: [ + { + volumeMounts: [ + { + name: 'artifact', + mountPath: '/foo/bar/path', + subPath: 'subartifact', + }, + ], + name: 'ml-pipeline-ui', + }, + ], + volumes: [ + { + name: 'artifact', + persistentVolumeClaim: { + claimName: 'artifact_pvc', + }, + }, + ], + }, + } as any, + undefined, + ]), + ); + + const configs = loadConfigs(argv, {}); + app = new UIServer(configs); + + const request = requests(app.start()); + request + .get(`/artifacts/get?source=volume&bucket=artifact&key=notexist/config`) + .expect( + 404, + 'Failed to open volume://artifact/notexist/config, Cannot find file "volume://artifact/notexist/config" in pod "ml-pipeline-ui": volume "artifact" not mounted or volume "artifact" with subPath (which is prefix of notexist/config) not mounted', + done, + ); + }); + + it('responds error with a not exist volume mount artifact if source=volume', done => { + jest.spyOn(serverInfo, 'getHostPod').mockImplementation(() => + Promise.resolve([ + { + spec: { + containers: [ + { + volumeMounts: [ + { + name: 'artifact', + mountPath: '/foo/bar', + subPath: 'subartifact', + }, + ], + name: 'ml-pipeline-ui', + }, + ], + volumes: [ + { + name: 'artifact', + persistentVolumeClaim: { + claimName: 'artifact_pvc', + }, + }, + ], + }, + } as any, + undefined, + ]), + ); + + const configs = loadConfigs(argv, {}); + app = new UIServer(configs); + + const request = requests(app.start()); + request + .get(`/artifacts/get?source=volume&bucket=artifact&key=subartifact/notxist.csv`) + .expect( + 500, + "Failed to open volume://artifact/subartifact/notxist.csv: Error: ENOENT: no such file or directory, stat '/foo/bar/notxist.csv'", + done, + ); + }); }); describe('/system', () => { @@ -601,6 +829,40 @@ describe('UIServer apis', () => { }); describe('/apps/tensorboard', () => { + const POD_TEMPLATE_SPEC = { + spec: { + containers: [ + { + volumeMounts: [ + { + name: 'tensorboard', + mountPath: '/logs', + }, + { + name: 'data', + subPath: 'tensorboard', + mountPath: '/data', + }, + ], + }, + ], + volumes: [ + { + name: 'tensorboard', + persistentVolumeClaim: { + claimName: 'logs', + }, + }, + { + name: 'data', + persistentVolumeClaim: { + claimName: 'data', + }, + }, + ], + }, + }; + let k8sGetCustomObjectSpy: jest.SpyInstance; let k8sDeleteCustomObjectSpy: jest.SpyInstance; let k8sCreateCustomObjectSpy: jest.SpyInstance; @@ -933,6 +1195,298 @@ describe('UIServer apis', () => { ); }); + it('creates tensorboard viewer with exist volume', done => { + let getRequestCount = 0; + k8sGetCustomObjectSpy.mockImplementation(() => { + ++getRequestCount; + switch (getRequestCount) { + case 1: + return Promise.reject('Not found'); + case 2: + return Promise.resolve( + newGetTensorboardResponse({ + name: 'viewer-abcdefg', + logDir: 'Series1:/logs/log-dir-1,Series2:/logs/log-dir-2', + tensorflowImage: 'tensorflow:2.0.0', + }), + ); + default: + throw new Error('only expected to be called twice in this test'); + } + }); + k8sCreateCustomObjectSpy.mockImplementation(() => Promise.resolve()); + + const tempPath = path.join(fs.mkdtempSync(os.tmpdir()), 'config.json'); + fs.writeFileSync(tempPath, JSON.stringify(POD_TEMPLATE_SPEC)); + app = new UIServer( + loadConfigs(argv, { VIEWER_TENSORBOARD_POD_TEMPLATE_SPEC_PATH: tempPath }), + ); + + requests(app.start()) + .post( + `/apps/tensorboard?logdir=${encodeURIComponent( + 'Series1:volume://tensorboard/log-dir-1,Series2:volume://tensorboard/log-dir-2', + )}&namespace=test-ns&tfversion=2.0.0`, + ) + .expect( + 200, + 'http://viewer-abcdefg-service.test-ns.svc.cluster.local:80/tensorboard/viewer-abcdefg/', + err => { + expect(k8sGetCustomObjectSpy.mock.calls[0]).toMatchInlineSnapshot(` + Array [ + "kubeflow.org", + "v1beta1", + "test-ns", + "viewers", + "viewer-a800f945f0934d978f9cce9959b82ff44dac8493", + ] + `); + expect(k8sCreateCustomObjectSpy.mock.calls[0]).toMatchInlineSnapshot(` + Array [ + "kubeflow.org", + "v1beta1", + "test-ns", + "viewers", + Object { + "apiVersion": "kubeflow.org/v1beta1", + "kind": "Viewer", + "metadata": Object { + "name": "viewer-a800f945f0934d978f9cce9959b82ff44dac8493", + "namespace": "test-ns", + }, + "spec": Object { + "podTemplateSpec": Object { + "spec": Object { + "containers": Array [ + Object { + "volumeMounts": Array [ + Object { + "mountPath": "/logs", + "name": "tensorboard", + }, + Object { + "mountPath": "/data", + "name": "data", + "subPath": "tensorboard", + }, + ], + }, + ], + "volumes": Array [ + Object { + "name": "tensorboard", + "persistentVolumeClaim": Object { + "claimName": "logs", + }, + }, + Object { + "name": "data", + "persistentVolumeClaim": Object { + "claimName": "data", + }, + }, + ], + }, + }, + "tensorboardSpec": Object { + "logDir": "Series1:/logs/log-dir-1,Series2:/logs/log-dir-2", + "tensorflowImage": "tensorflow/tensorflow:2.0.0", + }, + "type": "tensorboard", + }, + }, + ] + `); + expect(k8sGetCustomObjectSpy.mock.calls[1]).toMatchInlineSnapshot(` + Array [ + "kubeflow.org", + "v1beta1", + "test-ns", + "viewers", + "viewer-a800f945f0934d978f9cce9959b82ff44dac8493", + ] + `); + done(err); + }, + ); + }); + + it('creates tensorboard viewer with exist subPath volume', done => { + let getRequestCount = 0; + k8sGetCustomObjectSpy.mockImplementation(() => { + ++getRequestCount; + switch (getRequestCount) { + case 1: + return Promise.reject('Not found'); + case 2: + return Promise.resolve( + newGetTensorboardResponse({ + name: 'viewer-abcdefg', + logDir: 'Series1:/data/log-dir-1,Series2:/data/log-dir-2', + tensorflowImage: 'tensorflow:2.0.0', + }), + ); + default: + throw new Error('only expected to be called twice in this test'); + } + }); + k8sCreateCustomObjectSpy.mockImplementation(() => Promise.resolve()); + + const tempPath = path.join(fs.mkdtempSync(os.tmpdir()), 'config.json'); + fs.writeFileSync(tempPath, JSON.stringify(POD_TEMPLATE_SPEC)); + app = new UIServer( + loadConfigs(argv, { VIEWER_TENSORBOARD_POD_TEMPLATE_SPEC_PATH: tempPath }), + ); + + requests(app.start()) + .post( + `/apps/tensorboard?logdir=${encodeURIComponent( + 'Series1:volume://data/tensorboard/log-dir-1,Series2:volume://data/tensorboard/log-dir-2', + )}&namespace=test-ns&tfversion=2.0.0`, + ) + .expect( + 200, + 'http://viewer-abcdefg-service.test-ns.svc.cluster.local:80/tensorboard/viewer-abcdefg/', + err => { + expect(k8sGetCustomObjectSpy.mock.calls[0]).toMatchInlineSnapshot(` + Array [ + "kubeflow.org", + "v1beta1", + "test-ns", + "viewers", + "viewer-82d7d06a6ecb1e4dcba66d06b884d6445a88e4ca", + ] + `); + expect(k8sCreateCustomObjectSpy.mock.calls[0]).toMatchInlineSnapshot(` + Array [ + "kubeflow.org", + "v1beta1", + "test-ns", + "viewers", + Object { + "apiVersion": "kubeflow.org/v1beta1", + "kind": "Viewer", + "metadata": Object { + "name": "viewer-82d7d06a6ecb1e4dcba66d06b884d6445a88e4ca", + "namespace": "test-ns", + }, + "spec": Object { + "podTemplateSpec": Object { + "spec": Object { + "containers": Array [ + Object { + "volumeMounts": Array [ + Object { + "mountPath": "/logs", + "name": "tensorboard", + }, + Object { + "mountPath": "/data", + "name": "data", + "subPath": "tensorboard", + }, + ], + }, + ], + "volumes": Array [ + Object { + "name": "tensorboard", + "persistentVolumeClaim": Object { + "claimName": "logs", + }, + }, + Object { + "name": "data", + "persistentVolumeClaim": Object { + "claimName": "data", + }, + }, + ], + }, + }, + "tensorboardSpec": Object { + "logDir": "Series1:/data/log-dir-1,Series2:/data/log-dir-2", + "tensorflowImage": "tensorflow/tensorflow:2.0.0", + }, + "type": "tensorboard", + }, + }, + ] + `); + expect(k8sGetCustomObjectSpy.mock.calls[1]).toMatchInlineSnapshot(` + Array [ + "kubeflow.org", + "v1beta1", + "test-ns", + "viewers", + "viewer-82d7d06a6ecb1e4dcba66d06b884d6445a88e4ca", + ] + `); + done(err); + }, + ); + }); + + it('creates tensorboard viewer with not exist volume and return error', done => { + const errorSpy = jest.spyOn(console, 'error'); + errorSpy.mockImplementation(); + + k8sGetCustomObjectSpy.mockImplementation(() => { + return Promise.reject('Not found'); + }); + + const tempPath = path.join(fs.mkdtempSync(os.tmpdir()), 'config.json'); + fs.writeFileSync(tempPath, JSON.stringify(POD_TEMPLATE_SPEC)); + app = new UIServer( + loadConfigs(argv, { VIEWER_TENSORBOARD_POD_TEMPLATE_SPEC_PATH: tempPath }), + ); + + requests(app.start()) + .post( + `/apps/tensorboard?logdir=${encodeURIComponent( + 'volume://notexistvolume/logs/log-dir-1', + )}&namespace=test-ns&tfversion=2.0.0`, + ) + .expect( + 500, + `Failed to start Tensorboard app: Cannot find file "volume://notexistvolume/logs/log-dir-1" in pod "unknown": volume "notexistvolume" not configured`, + err => { + expect(errorSpy).toHaveBeenCalledTimes(1); + done(err); + }, + ); + }); + + it('creates tensorboard viewer with not exist subPath volume mount and return error', done => { + const errorSpy = jest.spyOn(console, 'error'); + errorSpy.mockImplementation(); + + k8sGetCustomObjectSpy.mockImplementation(() => { + return Promise.reject('Not found'); + }); + + const tempPath = path.join(fs.mkdtempSync(os.tmpdir()), 'config.json'); + fs.writeFileSync(tempPath, JSON.stringify(POD_TEMPLATE_SPEC)); + app = new UIServer( + loadConfigs(argv, { VIEWER_TENSORBOARD_POD_TEMPLATE_SPEC_PATH: tempPath }), + ); + + requests(app.start()) + .post( + `/apps/tensorboard?logdir=${encodeURIComponent( + 'volume://data/notexit/mountnotexist/log-dir-1', + )}&namespace=test-ns&tfversion=2.0.0`, + ) + .expect( + 500, + `Failed to start Tensorboard app: Cannot find file "volume://data/notexit/mountnotexist/log-dir-1" in pod "unknown": volume "data" not mounted or volume "data" with subPath (which is prefix of notexit/mountnotexist/log-dir-1) not mounted`, + err => { + expect(errorSpy).toHaveBeenCalledTimes(1); + done(err); + }, + ); + }); + it('returns error when there is an existing tensorboard with different version', done => { const errorSpy = jest.spyOn(console, 'error'); errorSpy.mockImplementation(); diff --git a/frontend/server/handlers/artifacts.ts b/frontend/server/handlers/artifacts.ts index cc647f0c3c7..a8baef76d76 100644 --- a/frontend/server/handlers/artifacts.ts +++ b/frontend/server/handlers/artifacts.ts @@ -14,20 +14,24 @@ import fetch from 'node-fetch'; import { AWSConfigs, HttpConfigs, MinioConfigs, ProcessEnv } from '../configs'; import { Client as MinioClient } from 'minio'; -import { PreviewStream } from '../utils'; +import { PreviewStream, findFileOnPodVolume } from '../utils'; import { createMinioClient, getObjectStream } from '../minio-helper'; +import * as serverInfo from '../helpers/server-info'; import { Handler, Request, Response } from 'express'; import { Storage } from '@google-cloud/storage'; import proxy from 'http-proxy-middleware'; import { HACK_FIX_HPM_PARTIAL_RESPONSE_HEADERS } from '../consts'; +import * as fs from 'fs'; +import { V1Container } from '@kubernetes/client-node/dist/api'; + /** * ArtifactsQueryStrings describes the expected query strings key value pairs * in the artifact request object. */ interface ArtifactsQueryStrings { /** artifact source. */ - source: 'minio' | 's3' | 'gcs' | 'http' | 'https'; + source: 'minio' | 's3' | 'gcs' | 'http' | 'https' | 'volume'; /** bucket name. */ bucket: string; /** artifact key/path that is uri encoded. */ @@ -101,6 +105,16 @@ export function getArtifactsHandler(artifactsConfigs: { )(req, res); break; + case 'volume': + await getVolumeArtifactsHandler( + { + bucket, + key, + }, + peek, + )(req, res); + break; + default: res.status(500).send('Unknown storage source: ' + source); return; @@ -240,6 +254,53 @@ function getGCSArtifactHandler(options: { key: string; bucket: string }, peek: n }; } +function getVolumeArtifactsHandler(options: { bucket: string; key: string }, peek: number = 0) { + const { key, bucket } = options; + return async (req: Request, res: Response) => { + try { + const [pod, err] = await serverInfo.getHostPod(); + if (err) { + res.status(500).send(err); + return; + } + + if (!pod) { + res.status(500).send('Could not get server pod'); + return; + } + + // ml-pipeline-ui server container name also be called 'ml-pipeline-ui-artifact' in KFP multi user mode. + // https://github.com/kubeflow/manifests/blob/master/pipeline/installs/multi-user/pipelines-profile-controller/sync.py#L212 + const [filePath, parseError] = findFileOnPodVolume(pod, { + containerNames: ['ml-pipeline-ui', 'ml-pipeline-ui-artifact'], + volumeMountName: bucket, + filePathInVolume: key, + }); + if (parseError) { + res.status(404).send(`Failed to open volume://${bucket}/${key}, ${parseError}`); + return; + } + + // TODO: support directory and support filePath include wildcards '*' + const stat = await fs.promises.stat(filePath); + if (stat.isDirectory()) { + res + .status(400) + .send( + `Failed to open volume://${bucket}/${key}, file ${filePath} is directory, does not support now`, + ); + return; + } + + fs.createReadStream(filePath) + .pipe(new PreviewStream({ peek })) + .pipe(res); + } catch (err) { + res.status(500).send(`Failed to open volume://${bucket}/${key}: ${err}`); + } + }; +} + const ARTIFACTS_PROXY_DEFAULTS = { serviceName: 'ml-pipeline-ui-artifact', servicePort: '80', diff --git a/frontend/server/helpers/server-info.ts b/frontend/server/helpers/server-info.ts new file mode 100644 index 00000000000..66029e86d2b --- /dev/null +++ b/frontend/server/helpers/server-info.ts @@ -0,0 +1,55 @@ +// Copyright 2019-2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import * as fs from 'fs'; +import { V1Pod } from '@kubernetes/client-node'; +import { getPod } from '../k8s-helper'; + +const namespaceFilePath = '/var/run/secrets/kubernetes.io/serviceaccount/namespace'; +let serverNamespace: string | undefined; +let hostPod: V1Pod | undefined; + +// The file path contains pod namespace when in Kubernetes cluster. +if (fs.existsSync(namespaceFilePath)) { + serverNamespace = fs.readFileSync(namespaceFilePath, 'utf-8'); +} + +// get ml-pipeline-ui host pod +export async function getHostPod(): Promise<[V1Pod | undefined, undefined] | [undefined, string]> { + // use cached hostPod + if (hostPod) { + return [hostPod, undefined]; + } + + if (!serverNamespace) { + return [undefined, "server namespace can't be obtained"]; + } + + // get ml-pipeline-ui server pod name + const { HOSTNAME: POD_NAME } = process.env; + if (!POD_NAME) { + return [undefined, "server pod name can't be obtained"]; + } + + const [pod, err] = await getPod(POD_NAME, serverNamespace); + + if (err) { + const { message, additionalInfo } = err; + console.error(message, additionalInfo); + return [undefined, `Failed to get host pod: ${message}`]; + } + + hostPod = pod; + return [hostPod, undefined]; +} diff --git a/frontend/server/k8s-helper.test.ts b/frontend/server/k8s-helper.test.ts new file mode 100644 index 00000000000..935bf60aea6 --- /dev/null +++ b/frontend/server/k8s-helper.test.ts @@ -0,0 +1,167 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +import { TEST_ONLY as K8S_TEST_EXPORT } from './k8s-helper'; + +describe('k8s-helper', () => { + describe('parseTensorboardLogDir', () => { + const podTemplateSpec = { + spec: { + containers: [ + { + volumeMounts: [ + { + name: 'output', + mountPath: '/data', + }, + { + name: 'artifact', + subPath: 'pipeline1', + mountPath: '/data1', + }, + { + name: 'artifact', + subPath: 'pipeline2', + mountPath: '/data2', + }, + ], + }, + ], + volumes: [ + { + name: 'output', + hostPath: { + path: '/data/output', + type: 'Directory', + }, + }, + { + name: 'artifact', + persistentVolumeClaim: { + claimName: 'artifact_pvc', + }, + }, + ], + }, + }; + + it('handles not volume storage', () => { + const logdir = 'gs://testbucket/test/key/path'; + const url = K8S_TEST_EXPORT.parseTensorboardLogDir(logdir, podTemplateSpec); + expect(url).toEqual(logdir); + }); + + it('handles not volume storage with Series', () => { + const logdir = + 'Series1:gs://testbucket/test/key/path1,Series2:gs://testbucket/test/key/path2'; + const url = K8S_TEST_EXPORT.parseTensorboardLogDir(logdir, podTemplateSpec); + expect(url).toEqual(logdir); + }); + + it('handles volume storage without subPath', () => { + const logdir = 'volume://output'; + const url = K8S_TEST_EXPORT.parseTensorboardLogDir(logdir, podTemplateSpec); + expect(url).toEqual('/data'); + }); + + it('handles volume storage without subPath with Series', () => { + const logdir = 'Series1:volume://output/volume/path1,Series2:volume://output/volume/path2'; + const url = K8S_TEST_EXPORT.parseTensorboardLogDir(logdir, podTemplateSpec); + expect(url).toEqual('Series1:/data/volume/path1,Series2:/data/volume/path2'); + }); + + it('handles volume storage with subPath', () => { + const logdir = 'volume://artifact/pipeline1'; + const url = K8S_TEST_EXPORT.parseTensorboardLogDir(logdir, podTemplateSpec); + expect(url).toEqual('/data1'); + }); + + it('handles volume storage with subPath with Series', () => { + const logdir = + 'Series1:volume://artifact/pipeline1/path1,Series2:volume://artifact/pipeline2/path2'; + const url = K8S_TEST_EXPORT.parseTensorboardLogDir(logdir, podTemplateSpec); + expect(url).toEqual('Series1:/data1/path1,Series2:/data2/path2'); + }); + + it('handles volume storage without subPath throw volume not configured error', () => { + const logdir = 'volume://other/path'; + expect(() => K8S_TEST_EXPORT.parseTensorboardLogDir(logdir, podTemplateSpec)).toThrowError( + 'Cannot find file "volume://other/path" in pod "unknown": volume "other" not configured', + ); + }); + + it('handles volume storage without subPath throw volume not configured error with Series', () => { + const logdir = 'Series1:volume://output/volume/path1,Series2:volume://other/volume/path2'; + expect(() => K8S_TEST_EXPORT.parseTensorboardLogDir(logdir, podTemplateSpec)).toThrowError( + 'Cannot find file "volume://other/volume/path2" in pod "unknown": volume "other" not configured', + ); + }); + + it('handles volume storage without subPath throw volume not mounted', () => { + const noMountPodTemplateSpec = { + spec: { + volumes: [ + { + name: 'artifact', + persistentVolumeClaim: { + claimName: 'artifact_pvc', + }, + }, + ], + }, + }; + const logdir = 'volume://artifact/path1'; + expect(() => + K8S_TEST_EXPORT.parseTensorboardLogDir(logdir, noMountPodTemplateSpec), + ).toThrowError( + 'Cannot find file "volume://artifact/path1" in pod "unknown": container "" not found', + ); + }); + + it('handles volume storage without volumeMounts throw volume not mounted', () => { + const noMountPodTemplateSpec = { + spec: { + containers: [ + { + volumeMounts: [ + { + name: 'other', + mountPath: '/data', + }, + ], + }, + ], + volumes: [ + { + name: 'artifact', + persistentVolumeClaim: { + claimName: 'artifact_pvc', + }, + }, + ], + }, + }; + const logdir = 'volume://artifact/path'; + expect(() => K8S_TEST_EXPORT.parseTensorboardLogDir(logdir, podTemplateSpec)).toThrowError( + 'Cannot find file "volume://artifact/path" in pod "unknown": volume "artifact" not mounted', + ); + }); + + it('handles volume storage with subPath throw volume mount not found', () => { + const logdir = 'volume://artifact/other'; + expect(() => K8S_TEST_EXPORT.parseTensorboardLogDir(logdir, podTemplateSpec)).toThrowError( + 'Cannot find file "volume://artifact/other" in pod "unknown": volume "artifact" not mounted or volume "artifact" with subPath (which is prefix of other) not mounted', + ); + }); + }); +}); diff --git a/frontend/server/k8s-helper.ts b/frontend/server/k8s-helper.ts index 156197a20f8..3fb7da20f69 100644 --- a/frontend/server/k8s-helper.ts +++ b/frontend/server/k8s-helper.ts @@ -23,7 +23,7 @@ import { import * as crypto from 'crypto-js'; import * as fs from 'fs'; import { PartialArgoWorkflow } from './workflow-helper'; -import { parseError } from './utils'; +import { parseError, findFileOnPodVolume } from './utils'; // If this is running inside a k8s Pod, its namespace should be written at this // path, this is also how we can tell whether we're running in the cluster. @@ -62,6 +62,37 @@ function getNameOfViewerResource(logdir: string): string { return 'viewer-' + crypto.SHA1(logdir); } +/** + * Parse logdir to Support volume:// url in logdir, + * otherwise, there's no need to parse logdir, we can just use them. + */ +function parseTensorboardLogDir(logdir: string, podTemplateSpec: object): string { + const urls: string[] = []; + const seriesParts = logdir.split(','); + for (const seriesPart of seriesParts) { + const strPath = seriesPart.replace(/Series\d+:/g, ''); + if (!strPath.startsWith('volume://')) { + urls.push(strPath); + continue; + } + // volume storage: parse real local mount path + const pathParts = strPath.substr('volume://'.length).split('/'); + const bucket = pathParts[0]; + const key = pathParts.slice(1).join('/'); + + const [filePath, err] = findFileOnPodVolume(podTemplateSpec, { + volumeMountName: bucket, + filePathInVolume: key, + containerNames: undefined, + }); + if (err) { + throw new Error(err); + } + urls.push(filePath); + } + return urls.length === 1 ? urls[0] : urls.map((c, i) => `Series${i + 1}:` + c).join(','); +} + /** * Create Tensorboard instance via CRD with the given logdir if there is no * existing Tensorboard instance. @@ -83,7 +114,6 @@ export async function newTensorboardInstance( ); } } - const body = { apiVersion: viewerGroup + '/' + viewerVersion, kind: 'Viewer', @@ -94,7 +124,7 @@ export async function newTensorboardInstance( spec: { podTemplateSpec, tensorboardSpec: { - logDir: logdir, + logDir: parseTensorboardLogDir(logdir, podTemplateSpec), tensorflowImage: tfImageName + ':' + tfversion, }, type: 'tensorboard', @@ -129,13 +159,14 @@ export async function getTensorboardInstance( // Viewer CRD pod has tensorboard instance running at port 6006 while // viewer CRD service has tensorboard instance running at port 80. Since // we return service address here (instead of pod address), so use 80. + + // remove to check viewer.body.spec.tensorboardSpec.logDir===logdir + // actually getNameOfViewerResource(logdir) may have hash collision + // but if there is a hash collision, not check logdir will return error tensorboard link + // if check logdir and then create Viewer CRD with same name will break anyway. + // TODO fix hash collision (viewer: any) => { - if ( - viewer && - viewer.body && - viewer.body.spec.tensorboardSpec.logDir === logdir && - viewer.body.spec.type === 'tensorboard' - ) { + if (viewer && viewer.body && viewer.body.spec.type === 'tensorboard') { const address = `http://${viewer.body.metadata.name}-service.${namespace}.svc.cluster.local:80/tensorboard/${viewer.body.metadata.name}/`; const tfImageParts = viewer.body.spec.tensorboardSpec.tensorflowImage.split(':', 2); const tfVersion = tfImageParts.length == 2 ? tfImageParts[1] : ''; @@ -311,4 +342,5 @@ export async function getK8sSecret(name: string, key: string) { export const TEST_ONLY = { k8sV1Client, k8sV1CustomObjectClient, + parseTensorboardLogDir, }; diff --git a/frontend/server/utils.test.ts b/frontend/server/utils.test.ts index b76eca1fe0b..999eee0a707 100644 --- a/frontend/server/utils.test.ts +++ b/frontend/server/utils.test.ts @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. import { PassThrough } from 'stream'; -import { PreviewStream } from './utils'; +import { PreviewStream, findFileOnPodVolume, resolveFilePathOnVolume } from './utils'; describe('utils', () => { describe('PreviewStream', () => { @@ -36,4 +36,173 @@ describe('utils', () => { dst.once('readable', () => expect(dst.read().toString()).toBe(input)); }); }); + + describe('findFileOnPodVolume', () => { + const podTemplateSpec = { + spec: { + containers: [ + { + volumeMounts: [ + { + name: 'output', + mountPath: '/main', + }, + { + name: 'artifact', + subPath: 'pipeline1', + mountPath: '/main1', + }, + { + name: 'artifact', + subPath: 'pipeline2', + mountPath: '/main2', + }, + ], + name: 'main', + }, + { + volumeMounts: [ + { + name: 'output', + mountPath: '/data', + }, + { + name: 'artifact', + subPath: 'pipeline1', + mountPath: '/data1', + }, + { + name: 'artifact', + subPath: 'pipeline2', + mountPath: '/data2', + }, + ], + name: 'ml-pipeline-ui', + }, + ], + volumes: [ + { + name: 'output', + hostPath: { + path: '/data/output', + type: 'Directory', + }, + }, + { + name: 'artifact', + persistentVolumeClaim: { + claimName: 'artifact_pvc', + }, + }, + ], + }, + }; + + it('parse file path with containerNames', () => { + const [filePath, err] = findFileOnPodVolume(podTemplateSpec, { + containerNames: ['ml-pipeline-ui', 'ml-pipeline-ui-artifact'], + volumeMountName: 'output', + filePathInVolume: 'a/b/c', + }); + expect(err).toEqual(undefined); + expect(filePath).toEqual('/data/a/b/c'); + }); + + it('parse file path with containerNames and subPath', () => { + const [filePath, err] = findFileOnPodVolume(podTemplateSpec, { + containerNames: ['ml-pipeline-ui', 'ml-pipeline-ui-artifact'], + volumeMountName: 'artifact', + filePathInVolume: 'pipeline1/a/b/c', + }); + expect(err).toEqual(undefined); + expect(filePath).toEqual('/data1/a/b/c'); + }); + + it('parse file path without containerNames', () => { + const [filePath, err] = findFileOnPodVolume(podTemplateSpec, { + containerNames: undefined, + volumeMountName: 'output', + filePathInVolume: 'a/b/c', + }); + expect(err).toEqual(undefined); + expect(filePath).toEqual('/main/a/b/c'); + }); + + it('parse file path error with not exist volume', () => { + const [filePath, err] = findFileOnPodVolume(podTemplateSpec, { + containerNames: undefined, + volumeMountName: 'other', + filePathInVolume: 'a/b/c', + }); + expect(err).toEqual( + 'Cannot find file "volume://other/a/b/c" in pod "unknown": volume "other" not configured', + ); + expect(filePath).toEqual(''); + }); + + it('parse file path error with not exist container', () => { + const [filePath, err] = findFileOnPodVolume(podTemplateSpec, { + containerNames: ['other1', 'other2'], + volumeMountName: 'output', + filePathInVolume: 'a/b/c', + }); + expect(err).toEqual( + 'Cannot find file "volume://output/a/b/c" in pod "unknown": container "other1" or "other2" not found', + ); + expect(filePath).toEqual(''); + }); + + it('parse file path error with volume not mount error', () => { + const [filePath, err] = findFileOnPodVolume(podTemplateSpec, { + containerNames: undefined, + volumeMountName: 'artifact', + filePathInVolume: 'a/b/c', + }); + expect(err).toEqual( + 'Cannot find file "volume://artifact/a/b/c" in pod "unknown": volume "artifact" not mounted or volume "artifact" with subPath (which is prefix of a/b/c) not mounted', + ); + expect(filePath).toEqual(''); + }); + }); + + describe('resolveFilePathOnVolume', () => { + it('undefined volumeMountSubPath', () => { + const path = resolveFilePathOnVolume({ + filePathInVolume: 'a/b/c', + volumeMountPath: '/data', + volumeMountSubPath: undefined, + }); + expect(path).toEqual(['/data/a/b/c', undefined]); + }); + + it('with volumeMountSubPath', () => { + const path = resolveFilePathOnVolume({ + volumeMountPath: '/data', + filePathInVolume: 'a/b/c', + volumeMountSubPath: 'a', + }); + expect(path).toEqual(['/data/b/c', undefined]); + }); + + it('with multiple layer volumeMountSubPath', () => { + const path = resolveFilePathOnVolume({ + volumeMountPath: '/data', + filePathInVolume: 'a/b/c', + volumeMountSubPath: 'a/b', + }); + expect(path).toEqual(['/data/c', undefined]); + }); + + it('with not exist volumeMountSubPath', () => { + const path = resolveFilePathOnVolume({ + volumeMountPath: '/data', + filePathInVolume: 'a/b/c', + volumeMountSubPath: 'other', + }); + expect(path).toEqual([ + '', + 'File a/b/c not mounted, expecting the file to be inside volume mount subpath other', + ]); + }); + }); }); diff --git a/frontend/server/utils.ts b/frontend/server/utils.ts index f4e1ef18b78..386a46cdad6 100644 --- a/frontend/server/utils.ts +++ b/frontend/server/utils.ts @@ -13,6 +13,7 @@ // limitations under the License. import { readFileSync } from 'fs'; import { Transform, TransformOptions } from 'stream'; +import path from 'path'; /** get the server address from host, port, and schema (defaults to 'http'). */ export function getAddress({ @@ -65,6 +66,114 @@ export function loadJSON(filepath?: string, defaultValue?: T): T | undefined } } +/** + * find final file path in pod: + * 1. check volume and volume mount exist in pod + * 2. if volume mount configured with subPath, check filePathInVolume startsWith subPath and prune filePathInVolume + * 3. concat volume mount path with pruned filePathInVolume as final path or error message if check failed + * @param pod contains volumes and volume mounts info + * @param options + * - containerNames optional, will match to find container or container[0] in pod will be used + * - volumeMountName container volume mount name + * - filePathInVolume file path in volume + * @return [final file path, error message if check failed] + */ +export function findFileOnPodVolume( + pod: any, + options: { + containerNames: string[] | undefined; + volumeMountName: string; + filePathInVolume: string; + }, +): [string, string | undefined] { + const { containerNames, volumeMountName, filePathInVolume } = options; + + const volumes = pod?.spec?.volumes; + const prefixErrorMessage = `Cannot find file "volume://${volumeMountName}/${filePathInVolume}" in pod "${pod + ?.metadata?.name || 'unknown'}":`; + // volumes not specified or volume named ${volumeMountName} not specified + if (!Array.isArray(volumes) || !volumes.find(v => v?.name === volumeMountName)) { + return ['', `${prefixErrorMessage} volume "${volumeMountName}" not configured`]; + } + + // get pod main container + let container; + if (Array.isArray(pod.spec.containers)) { + if (containerNames) { + // find main container by container name match containerNames + container = pod.spec.containers.find((c: { [name: string]: string }) => + containerNames.includes(c.name), + ); + } else { + // use containers[0] as pod main container + container = pod.spec.containers[0]; + } + } + + if (!container) { + const containerNamesMessage = containerNames ? containerNames.join('" or "') : ''; + return ['', `${prefixErrorMessage} container "${containerNamesMessage}" not found`]; + } + + const volumeMounts = container.volumeMounts; + if (!Array.isArray(volumeMounts)) { + return ['', `${prefixErrorMessage} volume "${volumeMountName}" not mounted`]; + } + + // find volumes mount + const volumeMount = volumeMounts.find(v => { + // volume name must be same + if (v?.name !== volumeMountName) { + return false; + } + // if volume subPath set, volume subPath must be prefix of key + if (v?.subPath) { + return filePathInVolume.startsWith(v.subPath); + } + return true; + }); + + if (!volumeMount) { + return [ + '', + `${prefixErrorMessage} volume "${volumeMountName}" not mounted or volume "${volumeMountName}" with subPath (which is prefix of ${filePathInVolume}) not mounted`, + ]; + } + + // resolve file path + const [filePath, err] = resolveFilePathOnVolume({ + filePathInVolume, + volumeMountPath: volumeMount.mountPath, + volumeMountSubPath: volumeMount.subPath, + }); + + if (err) { + return ['', `${prefixErrorMessage} err`]; + } + return [filePath, undefined]; +} + +export function resolveFilePathOnVolume(volume: { + filePathInVolume: string; + volumeMountPath: string; + volumeMountSubPath: string | undefined; +}): [string, string | undefined] { + const { filePathInVolume, volumeMountPath, volumeMountSubPath } = volume; + if (!volumeMountSubPath) { + return [path.join(volumeMountPath, filePathInVolume), undefined]; + } + if (filePathInVolume.startsWith(volumeMountSubPath)) { + return [ + path.join(volumeMountPath, filePathInVolume.substring(volumeMountSubPath.length)), + undefined, + ]; + } + return [ + '', + `File ${filePathInVolume} not mounted, expecting the file to be inside volume mount subpath ${volumeMountSubPath}`, + ]; +} + export interface PreviewStreamOptions extends TransformOptions { peek: number; } diff --git a/frontend/src/lib/WorkflowParser.test.ts b/frontend/src/lib/WorkflowParser.test.ts index 95ff3c58890..a77a78722e8 100644 --- a/frontend/src/lib/WorkflowParser.test.ts +++ b/frontend/src/lib/WorkflowParser.test.ts @@ -1108,6 +1108,22 @@ describe('WorkflowParser', () => { source: StorageService.HTTPS, }); }); + + it('handles volume file without path', () => { + expect(WorkflowParser.parseStoragePath('volume://output')).toEqual({ + bucket: 'output', + key: '', + source: StorageService.VOLUME, + }); + }); + + it('handles volume file with path', () => { + expect(WorkflowParser.parseStoragePath('volume://output/path/foo/bar')).toEqual({ + bucket: 'output', + key: 'path/foo/bar', + source: StorageService.VOLUME, + }); + }); }); describe('getOutboundNodes', () => { diff --git a/frontend/src/lib/WorkflowParser.ts b/frontend/src/lib/WorkflowParser.ts index 9a6f46fb251..3b5c21033ec 100644 --- a/frontend/src/lib/WorkflowParser.ts +++ b/frontend/src/lib/WorkflowParser.ts @@ -37,6 +37,7 @@ export enum StorageService { HTTPS = 'https', MINIO = 'minio', S3 = 's3', + VOLUME = 'volume', } export interface StoragePath { @@ -361,6 +362,13 @@ export default class WorkflowParser { key: pathParts.slice(1).join('/'), source: StorageService.HTTPS, }; + } else if (strPath.startsWith('volume://')) { + const pathParts = strPath.substr('volume://'.length).split('/'); + return { + bucket: pathParts[0], + key: pathParts.slice(1).join('/'), + source: StorageService.VOLUME, + }; } else { throw new Error('Unsupported storage path: ' + strPath); }