Skip to content

Commit

Permalink
chore(v2): standardize MLMD data model. Fixes #5669 (#6054)
Browse files Browse the repository at this point in the history
* chore(v2): standardize MLMD data model

* change context type to system namespace

* update sdk snapshots

* fix go v2 tests

* update

* update v2 compat snapshots

* fix all samples

* fix must specify pipeline root

* add artifact display name

* add UI rendering of new fields

* fix sample tests

* let ui read artifact and execution names consistently

* fix samples

* fix frontend tests

* fix sample test

* fix last sample

* address feedback
  • Loading branch information
Bobgy authored Jul 20, 2021
1 parent 9f73f91 commit ee663d9
Show file tree
Hide file tree
Showing 29 changed files with 449 additions and 331 deletions.
5 changes: 3 additions & 2 deletions frontend/src/mlmd/LineageActionBar.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ import ReplayIcon from '@material-ui/icons/Replay';
import { classes, stylesheet } from 'typestyle';
import { color, commonCss, fonts, padding } from './Css';
import { CSSProperties } from 'typestyle/lib/types';
import { getArtifactName, getResourcePropertyViaFallBack } from './Utils';
import { getResourcePropertyViaFallBack } from './Utils';
import { Artifact } from 'src/third_party/mlmd';
import { ArtifactProperties, ArtifactCustomProperties } from './Api';
import { ArtifactHelpers } from './MlmdUtils';

const baseLinkButton: CSSProperties = {
backgroundColor: 'transparent',
Expand Down Expand Up @@ -159,7 +160,7 @@ export class LineageActionBar extends React.Component<
disabled={isActive}
onClick={onBreadcrumbClicked}
>
{getArtifactName(artifact)}
{ArtifactHelpers.getName(artifact)}
</button>,
);
if (!isActive) {
Expand Down
5 changes: 3 additions & 2 deletions frontend/src/mlmd/LineageView.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,11 @@ import {
} from 'src/third_party/mlmd';
import { RefObject } from 'react';
import { getArtifactTypes, getExecutionTypes } from './LineageApi';
import { getTypeName, getArtifactName } from './Utils';
import { getTypeName } from './Utils';
import { Api } from './Api';
import { LineageResource } from './LineageTypes';
import CircularProgress from '@material-ui/core/CircularProgress';
import { ArtifactHelpers } from './MlmdUtils';

const isInputEvent = (event: Event) =>
[Event.Type.INPUT.valueOf(), Event.Type.DECLARED_INPUT.valueOf()].includes(event.getType());
Expand Down Expand Up @@ -392,7 +393,7 @@ export class LineageView extends React.Component<LineageViewProps, LineageViewSt
},
error => {
console.error(
`Failed to load related data for artifact: ${getArtifactName(target)}. Details:`,
`Failed to load related data for artifact: ${ArtifactHelpers.getName(target)}. Details:`,
error,
);
this.setState({
Expand Down
26 changes: 15 additions & 11 deletions frontend/src/mlmd/MlmdUtils.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import { Workflow, WorkflowSpec, WorkflowStatus } from 'third_party/argo-ui/argo
testBestPractices();

const WORKFLOW_NAME = 'run-st448';
const RUN_ID = 'abcdefghijk';
const WORKFLOW_EMPTY: Workflow = {
metadata: {
name: WORKFLOW_NAME,
Expand All @@ -40,8 +41,8 @@ const WORKFLOW_EMPTY: Workflow = {
};

const V2_CONTEXT = new Context();
V2_CONTEXT.setName(WORKFLOW_NAME);
V2_CONTEXT.setType('kfp.PipelineRun');
V2_CONTEXT.setName(RUN_ID);
V2_CONTEXT.setType('system.PipelineRun');

const TFX_CONTEXT = new Context();
TFX_CONTEXT.setName('run.run-st448');
Expand All @@ -55,34 +56,37 @@ describe('MlmdUtils', () => {
describe('getRunContext', () => {
it('gets KFP v2 context', async () => {
mockGetContextByTypeAndName([V2_CONTEXT]);
const context = await getRunContext({
...WORKFLOW_EMPTY,
metadata: {
...WORKFLOW_EMPTY.metadata,
annotations: { 'pipelines.kubeflow.org/v2_pipeline': 'true' },
const context = await getRunContext(
{
...WORKFLOW_EMPTY,
metadata: {
...WORKFLOW_EMPTY.metadata,
annotations: { 'pipelines.kubeflow.org/v2_pipeline': 'true' },
},
},
});
RUN_ID,
);
expect(context).toEqual(V2_CONTEXT);
});

it('gets TFX context', async () => {
mockGetContextByTypeAndName([TFX_CONTEXT, V1_CONTEXT]);
const context = await getRunContext(WORKFLOW_EMPTY);
const context = await getRunContext(WORKFLOW_EMPTY, RUN_ID);
expect(context).toEqual(TFX_CONTEXT);
});

it('gets KFP v1 context', async () => {
const verify = expectWarnings();
mockGetContextByTypeAndName([V1_CONTEXT]);
const context = await getRunContext(WORKFLOW_EMPTY);
const context = await getRunContext(WORKFLOW_EMPTY, RUN_ID);
expect(context).toEqual(V1_CONTEXT);
verify();
});

it('throws error when not found', async () => {
const verify = expectWarnings();
mockGetContextByTypeAndName([]);
await expect(getRunContext(WORKFLOW_EMPTY)).rejects.toThrow();
await expect(getRunContext(WORKFLOW_EMPTY, RUN_ID)).rejects.toThrow();
verify();
});
});
Expand Down
42 changes: 32 additions & 10 deletions frontend/src/mlmd/MlmdUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import { logger } from 'src/lib/Utils';
import { isV2Pipeline } from 'src/lib/v2/WorkflowUtils';
import {
Api,
ArtifactCustomProperties,
ArtifactProperties,
ExecutionCustomProperties,
ExecutionProperties,
getResourceProperty,
Expand Down Expand Up @@ -88,14 +90,15 @@ async function getKfpRunContext(argoWorkflowName: string): Promise<Context> {
return await getContext({ name: argoWorkflowName, type: 'KfpRun' });
}

async function getKfpV2RunContext(argoWorkflowName: string): Promise<Context> {
return await getContext({ name: argoWorkflowName, type: 'kfp.PipelineRun' });
async function getKfpV2RunContext(runID: string): Promise<Context> {
return await getContext({ name: runID, type: 'system.PipelineRun' });
}

export async function getRunContext(workflow: Workflow): Promise<Context> {
export async function getRunContext(workflow: Workflow, runID: string): Promise<Context> {
console.log(workflow, runID);
const workflowName = workflow?.metadata?.name || '';
if (isV2Pipeline(workflow)) {
return await getKfpV2RunContext(workflowName);
return await getKfpV2RunContext(runID);
}
try {
return await getTfxRunContext(workflowName);
Expand Down Expand Up @@ -127,7 +130,12 @@ export async function getExecutionsFromContext(context: Context): Promise<Execut
}

export enum KfpExecutionProperties {
// kfp_pod_name is kept for backward compatibility.
// KFP v1 and TFX logs kfp_pod_name property, but KFP v2 logs pod_name.
KFP_POD_NAME = 'kfp_pod_name',
POD_NAME = 'pod_name',
DISPLAY_NAME = 'display_name',
TASK_NAME = 'task_name',
}

const EXECUTION_PROPERTY_REPOS = [ExecutionProperties, ExecutionCustomProperties];
Expand All @@ -141,28 +149,42 @@ export const ExecutionHelpers = {
undefined
);
},
getName(execution: Execution): string | number | undefined {
return (
// TODO(Bobgy): move task_name to a const when ExecutionCustomProperties are moved back to this repo.
getStringProperty(execution, 'task_name', true) ||
getName(execution: Execution): string {
return `${getStringProperty(execution, KfpExecutionProperties.DISPLAY_NAME, true) ||
getStringProperty(execution, KfpExecutionProperties.TASK_NAME, true) ||
getStringProperty(execution, ExecutionProperties.NAME) ||
getStringProperty(execution, ExecutionProperties.COMPONENT_ID) ||
getStringProperty(execution, ExecutionCustomProperties.TASK_ID, true) ||
undefined
);
'(No name)'}`;
},
getState(execution: Execution): string | number | undefined {
return getStringProperty(execution, ExecutionProperties.STATE) || undefined;
},
getKfpPod(execution: Execution): string | number | undefined {
return (
getStringProperty(execution, KfpExecutionProperties.POD_NAME, true) ||
getStringProperty(execution, KfpExecutionProperties.KFP_POD_NAME) ||
getStringProperty(execution, KfpExecutionProperties.KFP_POD_NAME, true) ||
undefined
);
},
};

export enum KfpArtifactProperties {
DISPLAY_NAME = 'display_name',
}

export const ArtifactHelpers = {
getName(a: Artifact): string {
const name =
getResourceProperty(a, KfpArtifactProperties.DISPLAY_NAME, true) ||
getResourceProperty(a, ArtifactProperties.NAME) ||
getResourceProperty(a, ArtifactCustomProperties.NAME, true) ||
'(No name)';
return `${name}`;
},
};

function getStringProperty(
resource: Artifact | Execution,
propertyName: string,
Expand Down
23 changes: 3 additions & 20 deletions frontend/src/mlmd/Utils.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import { ArtifactTypeMap } from './LineageApi';
import { Artifact, Execution, Value } from 'src/third_party/mlmd';
import { LineageTypedResource } from './LineageTypes';
import { Struct } from 'google-protobuf/google/protobuf/struct_pb';
import { ArtifactHelpers, ExecutionHelpers } from './MlmdUtils';

const UNNAMED_RESOURCE_DISPLAY_NAME = '(unnamed)';
const ARTIFACT_FIELD_REPOS = [ArtifactProperties, ArtifactCustomProperties];
const EXECUTION_FIELD_REPOS = [ExecutionProperties, ExecutionCustomProperties];

Expand Down Expand Up @@ -67,27 +67,10 @@ export function getResourcePropertyViaFallBack(
return prop as string;
}

export function getArtifactName(artifact: Artifact): string {
return (
getResourcePropertyViaFallBack(artifact, ARTIFACT_FIELD_REPOS, ['NAME']) ||
UNNAMED_RESOURCE_DISPLAY_NAME
);
}

function getExecutionName(execution: Execution): string {
return (
getResourcePropertyViaFallBack(execution, EXECUTION_FIELD_REPOS, [
'COMPONENT_ID',
'TASK_ID',
'NAME',
]) || UNNAMED_RESOURCE_DISPLAY_NAME
);
}

export function getResourceName(typedResource: LineageTypedResource): string {
return typedResource.type === 'artifact'
? getArtifactName(typedResource.resource)
: getExecutionName(typedResource.resource);
? ArtifactHelpers.getName(typedResource.resource)
: ExecutionHelpers.getName(typedResource.resource);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ exports[`LineageActionBar Adds the artifact to the history state and DOM when pu
key="breadcrumb-1"
onClick={[Function]}
>
(unnamed)
(No name)
</button>
</div>
<div>
Expand Down
9 changes: 3 additions & 6 deletions frontend/src/pages/ArtifactDetails.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import {
Api,
ArtifactCustomProperties,
ArtifactProperties,
getResourceProperty,
LineageResource,
Expand All @@ -39,6 +38,7 @@ import { ToolbarProps } from '../components/Toolbar';
import { commonCss, padding } from '../Css';
import { logger, serviceErrorToString, titleCase } from '../lib/Utils';
import { Page, PageProps } from './Page';
import { ArtifactHelpers } from 'src/mlmd/MlmdUtils';

export enum ArtifactDetailsTab {
OVERVIEW = 0,
Expand Down Expand Up @@ -157,7 +157,7 @@ class ArtifactDetails extends Page<{}, ArtifactDetailsState> {
return {
actions: {},
breadcrumbs: [{ displayName: 'Artifacts', href: RoutePage.ARTIFACTS }],
pageTitle: `Artifact #${this.id} details`,
pageTitle: `Artifact #${this.id}`,
};
}

Expand Down Expand Up @@ -185,10 +185,7 @@ class ArtifactDetails extends Page<{}, ArtifactDetailsState> {
const typeResponse = await this.api.metadataStoreService.getArtifactTypesByID(typeRequest);
const artifactType = typeResponse.getArtifactTypesList()[0] || undefined;

const artifactName =
getResourceProperty(artifact, ArtifactProperties.NAME) ||
getResourceProperty(artifact, ArtifactCustomProperties.NAME, true);
let title = artifactName ? artifactName.toString() : '';
let title = ArtifactHelpers.getName(artifact);
const version = getResourceProperty(artifact, ArtifactProperties.VERSION);
if (version) {
title += ` (version: ${version})`;
Expand Down
17 changes: 4 additions & 13 deletions frontend/src/pages/ExecutionDetails.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,8 @@
import { CircularProgress } from '@material-ui/core';
import React, { Component } from 'react';
import { Link } from 'react-router-dom';
import { getArtifactName, getLinkedArtifactsByEvents } from 'src/mlmd/MlmdUtils';
import {
Api,
ExecutionCustomProperties,
ExecutionProperties,
getArtifactTypes,
getResourceProperty,
} from 'src/mlmd/library';
import { ExecutionHelpers, getArtifactName, getLinkedArtifactsByEvents } from 'src/mlmd/MlmdUtils';
import { Api, getArtifactTypes } from 'src/mlmd/library';
import {
ArtifactType,
Event,
Expand Down Expand Up @@ -78,7 +72,7 @@ export default class ExecutionDetails extends Page<{}, ExecutionDetailsState> {
return {
actions: {},
breadcrumbs: [{ displayName: 'Executions', href: RoutePage.EXECUTIONS }],
pageTitle: `${this.id} details`,
pageTitle: `Execution #${this.id}`,
};
}

Expand Down Expand Up @@ -209,10 +203,7 @@ export class ExecutionDetailsContent extends Component<
}

const execution = executionResponse.getExecutionsList()[0];
const executionName =
getResourceProperty(execution, ExecutionProperties.COMPONENT_ID) ||
getResourceProperty(execution, ExecutionCustomProperties.TASK_ID, true);
this.props.onTitleUpdate(executionName ? executionName.toString() : '');
this.props.onTitleUpdate(ExecutionHelpers.getName(execution));

const typeRequest = new GetExecutionTypesByIDRequest();
typeRequest.setTypeIdsList([execution.getTypeId()]);
Expand Down
2 changes: 1 addition & 1 deletion frontend/src/pages/RunDetails.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,7 @@ class RunDetails extends Page<RunDetailsInternalProps, RunDetailsState> {
let mlmdExecutions: Execution[] | undefined;
// Get data about this workflow from MLMD
try {
mlmdRunContext = await getRunContext(workflow);
mlmdRunContext = await getRunContext(workflow, runId);
mlmdExecutions = await getExecutionsFromContext(mlmdRunContext);
} catch (err) {
// Data in MLMD may not exist depending on this pipeline is a TFX pipeline.
Expand Down
11 changes: 5 additions & 6 deletions samples/core/exit_handler/exit_handler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,29 @@
from .exit_handler import pipeline_exit_handler
from ...test.util import run_pipeline_func, TestCase, KfpMlmdClient

def verify(
argo_workflow_name: str, mlmd_connection_config, run: kfp_server_api.ApiRun,
**kwargs
):

def verify(mlmd_connection_config, run: kfp_server_api.ApiRun, **kwargs):
t = unittest.TestCase()
t.maxDiff = None # we always want to see full diff

t.assertEqual(run.status, 'Succeeded')

# Verify MLMD state
client = KfpMlmdClient(mlmd_connection_config=mlmd_connection_config)
tasks = client.get_tasks(argo_workflow_name=argo_workflow_name)
tasks = client.get_tasks(run_id=run.id)
task_names = [*tasks.keys()]
t.assertEqual(task_names, ['echo-msg', 'print-file', 'download-from-gcs'])

for task in task_names:
pprint(f'======= {task} =======')
pprint(tasks.get(task).get_dict())

t.assertEqual(
tasks.get('echo-msg').inputs.parameters.get('msg'),
'exit!',
)


# %%

if __name__ == '__main__':
Expand Down
Loading

0 comments on commit ee663d9

Please sign in to comment.