Skip to content

Commit

Permalink
feat(backend): Refactor apiserver, add Pipeline and other v2beta serv…
Browse files Browse the repository at this point in the history
…ices (#8633)

* Staging WIP

* Fix recurring run, switch to pipeline_version_id

* Regenerate python client

* Refactor packages for pipeline service v2

* Stage progress

* Add support for pipeline service v2beta1

* Go mod tidy

* Update licenses

* Set default namespace to be empty

* Separate deletion of pipeline versions

* Change report ReportRunMetrics to obey AIP

* Use pipeline version id in run proto

* Update run.proto

* Upgrade storage

* Remove dependencies on api

* Stage refactoring progress

* Refactor apiserver

* Update v1beta1 unit test

* Update dependencies

* Fix integration test

* Revert visualization server changes (workaround)

* Address comments

* Basic lint the api server

* Update licenses file

* Adjust intergtaion tests

* Fix typos in the tests

* Update tests

* Update test

* Add support v1 delepe pipeline

* Remove gorm.Model to split into a different PR

* Lint api server

* Fix default experiment in upgrade test

* Update storage states

* Testing upgrade test run

* Add copying pod logs to the GCS bucket

* Fix upgrade test

* Adjust backend test for list

* Fix the tests

* Fix workflow manifest discrepancy at create

* Add a few unit-tests

* Add misc service and fix persistence agent

* Remove default values from TEXT columns

* Address comments. Fixes #8702.

* Address comment, reorder resource, server, storage

* Fix front end integration test

* Fix FE integration test

* Fix tabs, add pauses, make more explicit FE test

* Revert FE test changes

* Enable default experiment and disable its deletion

* Replace empty namespace with POD_NAMESPACE

* Default experiment is now the oldest

* Emplace empty with POD_NAMESPACE for argo only

* Fix api integration test

* Revert changes to default experiment's description

* Improve FE test flakiness

* Force refresh in the FE test

* Fix FE integration test

* Add delay in FE test

* Add local debug to FE test

* Update tests

* Fix typo

* Address comments

* Enforce template version verification

* Fix workflow manifest discrepancy with v1

* Fix FE test

* Revert changes in persistence agent

* Limit to one resource owner

* Change default run state to unknown

* Revert changes to FE integration test

* Make states consistent

* Remove links from error messages

* Address comments

* Address comments

* Change the order for teardown

---------

Co-authored-by: Chen Sun <chensun@users.noreply.github.com>
  • Loading branch information
gkcalat and chensun authored Feb 3, 2023
1 parent c4a2a35 commit 135dc64
Show file tree
Hide file tree
Showing 279 changed files with 24,369 additions and 16,046 deletions.
40 changes: 19 additions & 21 deletions backend/api/hack/generator.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/bash

# Copyright 2018-2022 The Kubeflow Authors
# Copyright 2018 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -67,7 +67,7 @@ ${PROTOCCOMPILER} -I. -Ibackend/api/${API_VERSION} \
cp -a ${TMP_OUTPUT}/backend/api/${API_VERSION}/*.swagger.json ./backend/api/${API_VERSION}/swagger
# Generate a single swagger json file from the swagger json files of all models.
# Note: use backend/backend/api/${API_VERSION}/swagger/{run,job,experiment,pipeline,pipeline.upload,healthz}.swagger.json when apt-get can install jq-1.6
if [ $API_VERSION -eq v1beta1 ]; then
if [[ "$API_VERSION" == "v1beta1" ]]; then
jq -s 'reduce .[] as $item ({}; . * $item) | .info.title = "Kubeflow Pipelines API" | .info.description = "This file contains REST API specification for Kubeflow Pipelines. The file is autogenerated from the swagger definition." | .info.version = "'$KFP_VERSION'" | .info.contact = { "name": "google", "email": "kubeflow-pipelines@google.com", "url": "https://www.google.com" } | .info.license = { "name": "Apache 2.0", "url": "https://raw.githubusercontent.com/kubeflow/pipelines/master/LICENSE" }' \
backend/api/${API_VERSION}/swagger/experiment.swagger.json \
backend/api/${API_VERSION}/swagger/run.swagger.json \
Expand All @@ -83,7 +83,7 @@ else
> "backend/api/${API_VERSION}/swagger/kfp_api_single_file.swagger.json"
fi
# Generate go_http_client from swagger json.
if [ $API_VERSION -eq v1beta1 ]; then
if [[ "$API_VERSION" == "v1beta1" ]]; then
swagger generate client \
-f backend/api/${API_VERSION}/swagger/job.swagger.json \
-A job \
Expand Down Expand Up @@ -128,26 +128,24 @@ swagger generate client \
-c pipeline_upload_client \
-m pipeline_upload_model \
-t backend/api/${API_VERSION}/go_http_client
# TODO(gkcalat): remove if-statement as new protos are added.
if [ $API_VERSION -eq v1beta1 ]; then
swagger generate client \
-f backend/api/${API_VERSION}/swagger/visualization.swagger.json \
-A visualization \
--principal models.Principal \
-c visualization_client \
-m visualization_model \
-t backend/api/${API_VERSION}/go_http_client
swagger generate client \
-f backend/api/${API_VERSION}/swagger/healthz.swagger.json \
-A healthz \
--principal models.Principal \
-c healthz_client \
-m healthz_model \
-t backend/api/${API_VERSION}/go_http_client
fi
swagger generate client \
-f backend/api/${API_VERSION}/swagger/visualization.swagger.json \
-A visualization \
--principal models.Principal \
-c visualization_client \
-m visualization_model \
-t backend/api/${API_VERSION}/go_http_client
swagger generate client \
-f backend/api/${API_VERSION}/swagger/healthz.swagger.json \
-A healthz \
--principal models.Principal \
-c healthz_client \
-m healthz_model \
-t backend/api/${API_VERSION}/go_http_client

# Hack to fix an issue with go-swagger
# See https://github.com/go-swagger/go-swagger/issues/1381 for details.
if [ $API_VERSION -eq v1beta1 ]; then
if [[ "$API_VERSION" == "v1beta1" ]]; then
sed -i -- 's/MaxConcurrency int64 `json:"max_concurrency,omitempty"`/MaxConcurrency int64 `json:"max_concurrency,omitempty,string"`/g' backend/api/${API_VERSION}/go_http_client/job_model/api_job.go
sed -i -- 's/IntervalSecond int64 `json:"interval_second,omitempty"`/IntervalSecond int64 `json:"interval_second,omitempty,string"`/g' backend/api/${API_VERSION}/go_http_client/job_model/api_periodic_schedule.go
sed -i -- 's/MaxConcurrency string `json:"max_concurrency,omitempty"`/MaxConcurrency int64 `json:"max_concurrency,omitempty,string"`/g' backend/api/${API_VERSION}/go_http_client/job_model/api_job.go
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Method | HTTP request | Description


# **upload_pipeline**
> ApiPipeline upload_pipeline(uploadfile, name=name, description=description)
> ApiPipeline upload_pipeline(uploadfile, name=name, description=description, namespace=namespace)


Expand Down Expand Up @@ -50,9 +50,10 @@ with kfp_server_api.ApiClient(configuration) as api_client:
uploadfile = '/path/to/file' # file | The pipeline to upload. Maximum size of 32MB is supported.
name = 'name_example' # str | (optional)
description = 'description_example' # str | (optional)
namespace = 'namespace_example' # str | (optional)

try:
api_response = api_instance.upload_pipeline(uploadfile, name=name, description=description)
api_response = api_instance.upload_pipeline(uploadfile, name=name, description=description, namespace=namespace)
pprint(api_response)
except ApiException as e:
print("Exception when calling PipelineUploadServiceApi->upload_pipeline: %s\n" % e)
Expand All @@ -65,6 +66,7 @@ Name | Type | Description | Notes
**uploadfile** | **file**| The pipeline to upload. Maximum size of 32MB is supported. |
**name** | **str**| | [optional]
**description** | **str**| | [optional]
**namespace** | **str**| | [optional]

### Return type

Expand All @@ -88,7 +90,7 @@ Name | Type | Description | Notes
[[Back to top]](#) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to Model list]](../README.md#documentation-for-models) [[Back to README]](../README.md)

# **upload_pipeline_version**
> ApiPipelineVersion upload_pipeline_version(uploadfile, name=name, pipelineid=pipelineid, description=description)
> ApiPipelineVersion upload_pipeline_version(uploadfile, name=name, pipelineid=pipelineid, description=description, namespace=namespace)


Expand Down Expand Up @@ -130,9 +132,10 @@ with kfp_server_api.ApiClient(configuration) as api_client:
name = 'name_example' # str | (optional)
pipelineid = 'pipelineid_example' # str | (optional)
description = 'description_example' # str | (optional)
namespace = 'namespace_example' # str | (optional)

try:
api_response = api_instance.upload_pipeline_version(uploadfile, name=name, pipelineid=pipelineid, description=description)
api_response = api_instance.upload_pipeline_version(uploadfile, name=name, pipelineid=pipelineid, description=description, namespace=namespace)
pprint(api_response)
except ApiException as e:
print("Exception when calling PipelineUploadServiceApi->upload_pipeline_version: %s\n" % e)
Expand All @@ -146,6 +149,7 @@ Name | Type | Description | Notes
**name** | **str**| | [optional]
**pipelineid** | **str**| | [optional]
**description** | **str**| | [optional]
**namespace** | **str**| | [optional]

### Return type

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ def upload_pipeline(self, uploadfile, **kwargs): # noqa: E501
:type name: str
:param description:
:type description: str
:param namespace:
:type namespace: str
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _preload_content: if False, the urllib3.HTTPResponse object will
Expand Down Expand Up @@ -83,6 +85,8 @@ def upload_pipeline_with_http_info(self, uploadfile, **kwargs): # noqa: E501
:type name: str
:param description:
:type description: str
:param namespace:
:type namespace: str
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _return_http_data_only: response data without head status code
Expand All @@ -107,7 +111,8 @@ def upload_pipeline_with_http_info(self, uploadfile, **kwargs): # noqa: E501
all_params = [
'uploadfile',
'name',
'description'
'description',
'namespace'
]
all_params.extend(
[
Expand Down Expand Up @@ -140,6 +145,8 @@ def upload_pipeline_with_http_info(self, uploadfile, **kwargs): # noqa: E501
query_params.append(('name', local_var_params['name'])) # noqa: E501
if 'description' in local_var_params and local_var_params['description'] is not None: # noqa: E501
query_params.append(('description', local_var_params['description'])) # noqa: E501
if 'namespace' in local_var_params and local_var_params['namespace'] is not None: # noqa: E501
query_params.append(('namespace', local_var_params['namespace'])) # noqa: E501

header_params = {}

Expand Down Expand Up @@ -193,6 +200,8 @@ def upload_pipeline_version(self, uploadfile, **kwargs): # noqa: E501
:type pipelineid: str
:param description:
:type description: str
:param namespace:
:type namespace: str
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _preload_content: if False, the urllib3.HTTPResponse object will
Expand Down Expand Up @@ -227,6 +236,8 @@ def upload_pipeline_version_with_http_info(self, uploadfile, **kwargs): # noqa:
:type pipelineid: str
:param description:
:type description: str
:param namespace:
:type namespace: str
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _return_http_data_only: response data without head status code
Expand All @@ -252,7 +263,8 @@ def upload_pipeline_version_with_http_info(self, uploadfile, **kwargs): # noqa:
'uploadfile',
'name',
'pipelineid',
'description'
'description',
'namespace'
]
all_params.extend(
[
Expand Down Expand Up @@ -287,6 +299,8 @@ def upload_pipeline_version_with_http_info(self, uploadfile, **kwargs): # noqa:
query_params.append(('pipelineid', local_var_params['pipelineid'])) # noqa: E501
if 'description' in local_var_params and local_var_params['description'] is not None: # noqa: E501
query_params.append(('description', local_var_params['description'])) # noqa: E501
if 'namespace' in local_var_params and local_var_params['namespace'] is not None: # noqa: E501
query_params.append(('namespace', local_var_params['namespace'])) # noqa: E501

header_params = {}

Expand Down
86 changes: 86 additions & 0 deletions backend/api/v2beta1/auth.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2023 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

syntax = "proto3";

option go_package = "github.com/kubeflow/pipelines/backend/api/v2beta1/go_client";
package kubeflow.pipelines.backend.api.v2beta1;

import "google/api/annotations.proto";
import "google/protobuf/empty.proto";
import "protoc-gen-swagger/options/annotations.proto";
import "google/rpc/status.proto";

option (grpc.gateway.protoc_gen_swagger.options.openapiv2_swagger) = {
responses: {
key: "default";
value: {
schema: {
json_schema: {
ref: ".google.rpc.Status";
}
}
}
}
// Use bearer token for authorizing access to job service.
// Kubernetes client library(https://kubernetes.io/docs/reference/using-api/client-libraries/)
// uses bearer token as default for authorization. The section below
// ensures security definition object is generated in the swagger definition.
// For more details see https://github.com/OAI/OpenAPI-Specification/blob/3.0.0/versions/2.0.md#securityDefinitionsObject
security_definitions: {
security: {
key: "Bearer";
value: {
type: TYPE_API_KEY;
in: IN_HEADER;
name: "authorization";
}
}
}
security: {
security_requirement: {
key: "Bearer";
value: {};
}
}
};

service AuthService {
rpc Authorize(AuthorizeRequest) returns (google.protobuf.Empty) {
option (google.api.http) = {
get: "/apis/v2beta1/auth"
};
}
}

// Ask for authorization of an access by providing resource's namespace, type
// and verb. User identity is not part of the message, because it is expected
// to be parsed from request headers. Caller should proxy user request's headers.
message AuthorizeRequest {
// Type of resources in pipelines system.
enum Resources {
UNASSIGNED_RESOURCES = 0;
VIEWERS = 1;
}
// Type of verbs that act on the resources.
enum Verb {
UNASSIGNED_VERB = 0;
CREATE = 1;
GET = 2;
DELETE = 3;
}
string namespace = 1; // Namespace the resource belongs to.
Resources resources = 2; // Resource type asking for authorization.
Verb verb = 3; // Verb on the resource asking for authorization.
}
4 changes: 2 additions & 2 deletions backend/api/v2beta1/experiment.proto
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2018-2022 The Kubeflow Authors
// Copyright 2018 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -88,7 +88,7 @@ message Experiment {
// Describes whether an entity is available or archived.
enum StorageState {
// Default state. This state in not used
STORAGESTATE_UNSPECIFIED = 0;
STORAGE_STATE_UNSPECIFIED = 0;

// Entity is available.
AVAILABLE = 1;
Expand Down
2 changes: 1 addition & 1 deletion backend/api/v2beta1/filter.proto
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2018-2022 The Kubeflow Authors
// Copyright 2018 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
Loading

0 comments on commit 135dc64

Please sign in to comment.