Skip to content

Commit

Permalink
feat(backend): Merge kfp-tekton backend code (#10678)
Browse files Browse the repository at this point in the history
* Merge kfp-tekton backend code

Signed-off-by: Ricardo M. Oliveira <rmartine@redhat.com>

* Add swf work

Signed-off-by: Ricardo M. Oliveira <rmartine@redhat.com>

---------

Signed-off-by: Ricardo M. Oliveira <rmartine@redhat.com>
  • Loading branch information
rimolive authored Apr 16, 2024
1 parent cb3b24b commit 60a443e
Show file tree
Hide file tree
Showing 58 changed files with 12,102 additions and 69 deletions.
2 changes: 1 addition & 1 deletion backend/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

# 1. Build api server application
FROM golang:1.20.4-buster as builder
FROM golang:1.21.7-bookworm as builder
RUN apt-get update && apt-get install -y cmake clang musl-dev openssl
WORKDIR /go/src/github.com/kubeflow/pipelines
COPY . .
Expand Down
4 changes: 2 additions & 2 deletions backend/Dockerfile.cacheserver
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

# Dockerfile for building the source code of cache_server
FROM golang:1.20.4-alpine3.17 as builder
FROM golang:1.21.7-alpine3.19 as builder

RUN apk update && apk upgrade && \
apk add --no-cache bash git openssh gcc musl-dev
Expand All @@ -31,7 +31,7 @@ RUN go-licenses csv ./backend/src/cache > /tmp/licenses.csv && \
diff /tmp/licenses.csv backend/third_party_licenses/cache_server.csv && \
go-licenses save ./backend/src/cache --save_path /tmp/NOTICES

FROM alpine:3.17
FROM alpine:3.19

RUN adduser -S appuser
USER appuser
Expand Down
4 changes: 2 additions & 2 deletions backend/Dockerfile.conformance
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

# Dockerfile for building the source code of conformance tests
FROM golang:1.20.4-alpine3.17 as builder
FROM golang:1.21.7-alpine3.19 as builder

RUN apk update && apk upgrade && \
apk add --no-cache bash git openssh gcc musl-dev
Expand All @@ -40,4 +40,4 @@ COPY --from=builder /test.tar.gz /
RUN tar -xzvf /test.tar.gz
WORKDIR /test/integration

ENTRYPOINT [ "./run.sh" ]
ENTRYPOINT [ "./run.sh" ]
4 changes: 2 additions & 2 deletions backend/Dockerfile.driver
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

FROM golang:1.20.9-alpine3.17 as builder
FROM golang:1.21.7-alpine3.19 as builder

WORKDIR /go/src/github.com/kubeflow/pipelines
COPY . .
Expand All @@ -27,7 +27,7 @@ RUN go-licenses csv ./backend/src/v2/cmd/driver > /tmp/licenses.csv && \
diff /tmp/licenses.csv backend/third_party_licenses/driver.csv && \
go-licenses save ./backend/src/v2/cmd/driver --save_path /tmp/NOTICES

FROM alpine:3.17
FROM alpine:3.19

RUN adduser -S appuser
USER appuser
Expand Down
4 changes: 2 additions & 2 deletions backend/Dockerfile.launcher
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

FROM golang:1.20.9-alpine3.17 as builder
FROM golang:1.21.7-alpine3.19 as builder

WORKDIR /go/src/github.com/kubeflow/pipelines
COPY . .
Expand All @@ -27,7 +27,7 @@ RUN go-licenses csv ./backend/src/v2/cmd/launcher-v2 > /tmp/licenses.csv && \
diff /tmp/licenses.csv backend/third_party_licenses/launcher.csv && \
go-licenses save ./backend/src/v2/cmd/launcher-v2 --save_path /tmp/NOTICES

FROM alpine:3.17
FROM alpine:3.19

RUN adduser -S appuser
USER appuser
Expand Down
8 changes: 5 additions & 3 deletions backend/Dockerfile.persistenceagent
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

FROM golang:1.20.4-alpine3.17 as builder
FROM golang:1.21.7-alpine3.19 as builder

WORKDIR /go/src/github.com/kubeflow/pipelines
COPY . .
Expand All @@ -30,7 +30,7 @@ RUN go-licenses csv ./backend/src/agent/persistence > /tmp/licenses.csv && \
diff /tmp/licenses.csv backend/third_party_licenses/persistence_agent.csv && \
go-licenses save ./backend/src/agent/persistence --save_path /tmp/NOTICES

FROM alpine:3.17
FROM alpine:3.19

RUN adduser -S appuser
USER appuser
Expand All @@ -51,4 +51,6 @@ ENV TTL_SECONDS_AFTER_WORKFLOW_FINISH 86400
ENV NUM_WORKERS 2
ENV LOG_LEVEL info

CMD persistence_agent --logtostderr=true --namespace=${NAMESPACE} --ttlSecondsAfterWorkflowFinish=${TTL_SECONDS_AFTER_WORKFLOW_FINISH} --numWorker ${NUM_WORKERS} --logLevel=${LOG_LEVEL}
ENV EXECUTIONTYPE Workflow

CMD persistence_agent --logtostderr=true --namespace=${NAMESPACE} --ttlSecondsAfterWorkflowFinish=${TTL_SECONDS_AFTER_WORKFLOW_FINISH} --numWorker ${NUM_WORKERS} --executionType ${EXECUTIONTYPE} --logLevel=${LOG_LEVEL}
4 changes: 2 additions & 2 deletions backend/Dockerfile.scheduledworkflow
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

FROM golang:1.20.4-alpine3.17 as builder
FROM golang:1.21.7-alpine3.19 as builder

WORKDIR /go/src/github.com/kubeflow/pipelines
COPY . .
Expand All @@ -30,7 +30,7 @@ RUN go-licenses csv ./backend/src/crd/controller/scheduledworkflow > /tmp/licens
diff /tmp/licenses.csv backend/third_party_licenses/swf.csv && \
go-licenses save ./backend/src/crd/controller/scheduledworkflow --save_path /tmp/NOTICES

FROM alpine:3.17
FROM alpine:3.19

RUN apk --no-cache add tzdata

Expand Down
2 changes: 1 addition & 1 deletion backend/Dockerfile.viewercontroller
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

FROM golang:1.20.4-alpine3.17 as builder
FROM golang:1.21.7-alpine3.19 as builder

RUN apk update && apk upgrade
RUN apk add --no-cache git gcc musl-dev
Expand Down
2 changes: 1 addition & 1 deletion backend/src/agent/persistence/client/pipeline_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ func NewPipelineClient(
return &PipelineClient{
initializeTimeout: initializeTimeout,
timeout: timeout,
tokenRefresher: tokenRefresher,
reportServiceClient: api.NewReportServiceClient(connection),
tokenRefresher: tokenRefresher,
runServiceClient: api.NewRunServiceClient(connection),
}, nil
}
Expand Down
8 changes: 7 additions & 1 deletion backend/src/agent/persistence/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ var (
numWorker int
clientQPS float64
clientBurst int
executionType string
saTokenRefreshIntervalInSecs int64
)

Expand All @@ -62,6 +63,7 @@ const (
numWorkerName = "numWorker"
clientQPSFlagName = "clientQPS"
clientBurstFlagName = "clientBurst"
executionTypeFlagName = "executionType"
saTokenRefreshIntervalFlagName = "saTokenRefreshIntervalInSecs"
)

Expand All @@ -76,6 +78,9 @@ func main() {
// set up signals so we handle the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler()

// Use the util to store the ExecutionType
util.SetExecutionType(util.ExecutionType(executionType))

cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
if err != nil {
log.Fatalf("Error building kubeconfig: %s", err.Error())
Expand All @@ -99,7 +104,7 @@ func main() {
log.SetLevel(level)

clientParam := util.ClientParameters{QPS: float64(cfg.QPS), Burst: cfg.Burst}
execInformer := util.NewExecutionInformerOrFatal(util.ArgoWorkflow, namespace, time.Second*30, clientParam)
execInformer := util.NewExecutionInformerOrFatal(util.CurrentExecutionType(), namespace, time.Second*30, clientParam)

var swfInformerFactory swfinformers.SharedInformerFactory
if namespace == "" {
Expand Down Expand Up @@ -158,6 +163,7 @@ func init() {
// k8s.io/client-go/rest/config.go#RESTClientFor
flag.Float64Var(&clientQPS, clientQPSFlagName, 5, "The maximum QPS to the master from this client.")
flag.IntVar(&clientBurst, clientBurstFlagName, 10, "Maximum burst for throttle from this client.")
flag.StringVar(&executionType, executionTypeFlagName, "Workflow", "Custom Resource's name of the backend Orchestration Engine")
// TODO use viper/config file instead. Sync `saTokenRefreshIntervalFlagName` with the value from manifest file by using ENV var.
flag.Int64Var(&saTokenRefreshIntervalInSecs, saTokenRefreshIntervalFlagName, DefaultSATokenRefresherIntervalInSecs, "Persistence agent service account token read interval in seconds. "+
"Defines how often `/var/run/secrets/kubeflow/tokens/kubeflow-persistent_agent-api-token` to be read")
Expand Down
4 changes: 4 additions & 0 deletions backend/src/agent/persistence/worker/swf_saver.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ func (c *ScheduledWorkflowSaver) Save(key string, namespace string, name string,

}

// TODO: wait for officially update to v2beta1
// temporally hack this to v2beta1
swf.APIVersion = "kubeflow.org/v2beta1"
swf.Kind = "ScheduledWorkflow"
// Save this Scheduled Workflow to the database.
err = c.pipelineClient.ReportScheduledWorkflow(swf)
retry := util.HasCustomCode(err, util.CUSTOM_CODE_TRANSIENT)
Expand Down
8 changes: 8 additions & 0 deletions backend/src/apiserver/client/argo_fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ func (c *FakeExecClient) Execution(namespace string) util.ExecutionInterface {
return c.workflowClientFake
}

func (c *FakeExecClient) Compare(old, new interface{}) bool {
return false
}

func (c *FakeExecClient) GetWorkflowCount() int {
return len(c.workflowClientFake.workflows)
}
Expand Down Expand Up @@ -71,3 +75,7 @@ func NewFakeExecClientWithBadWorkflow() *FakeExecClientWithBadWorkflow {
func (c *FakeExecClientWithBadWorkflow) Execution(namespace string) util.ExecutionInterface {
return c.workflowClientFake
}

func (c *FakeExecClientWithBadWorkflow) Compare(old, new interface{}) bool {
return false
}
2 changes: 1 addition & 1 deletion backend/src/apiserver/client_manager/client_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (c *ClientManager) init() {
Burst: common.GetIntConfigWithDefault(clientBurst, 10),
}

c.execClient = util.NewExecutionClientOrFatal(util.ArgoWorkflow, common.GetDurationConfig(initConnectionTimeout), clientParams)
c.execClient = util.NewExecutionClientOrFatal(util.CurrentExecutionType(), common.GetDurationConfig(initConnectionTimeout), clientParams)

c.swfClient = client.NewScheduledWorkflowClientOrFatal(common.GetDurationConfig(initConnectionTimeout), clientParams)

Expand Down
20 changes: 19 additions & 1 deletion backend/src/apiserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,20 @@ import (
"github.com/kubeflow/pipelines/backend/src/apiserver/model"
"github.com/kubeflow/pipelines/backend/src/apiserver/resource"
"github.com/kubeflow/pipelines/backend/src/apiserver/server"
"github.com/kubeflow/pipelines/backend/src/apiserver/template"
"github.com/kubeflow/pipelines/backend/src/common/util"
"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)

const (
executionTypeEnv = "ExecutionType"
launcherEnv = "Launcher"
)

var (
logLevelFlag = flag.String("logLevel", "", "Defines the log level for the application.")
rpcPortFlag = flag.String("rpcPortFlag", ":8887", "RPC Port")
Expand All @@ -62,6 +69,14 @@ func main() {
flag.Parse()

initConfig()
// check ExecutionType Settings if presents
if viper.IsSet(executionTypeEnv) {
util.SetExecutionType(util.ExecutionType(common.GetStringConfig(executionTypeEnv)))
}
if viper.IsSet(launcherEnv) {
template.Launcher = common.GetStringConfig(launcherEnv)
}

clientManager := cm.NewClientManager()
resourceManager := resource.NewResourceManager(
&clientManager,
Expand Down Expand Up @@ -122,13 +137,14 @@ func startRpcServer(resourceManager *resource.ResourceManager) {
)
sharedJobServer := server.NewJobServer(resourceManager, &server.JobServerOptions{CollectMetrics: *collectMetricsFlag})
sharedRunServer := server.NewRunServer(resourceManager, &server.RunServerOptions{CollectMetrics: *collectMetricsFlag})
sharedReportServer := server.NewReportServer(resourceManager)

apiv1beta1.RegisterExperimentServiceServer(s, sharedExperimentServer)
apiv1beta1.RegisterPipelineServiceServer(s, sharedPipelineServer)
apiv1beta1.RegisterJobServiceServer(s, sharedJobServer)
apiv1beta1.RegisterRunServiceServer(s, sharedRunServer)
apiv1beta1.RegisterTaskServiceServer(s, server.NewTaskServer(resourceManager))
apiv1beta1.RegisterReportServiceServer(s, server.NewReportServer(resourceManager))
apiv1beta1.RegisterReportServiceServer(s, sharedReportServer)

apiv1beta1.RegisterVisualizationServiceServer(
s,
Expand All @@ -143,6 +159,7 @@ func startRpcServer(resourceManager *resource.ResourceManager) {
apiv2beta1.RegisterPipelineServiceServer(s, sharedPipelineServer)
apiv2beta1.RegisterRecurringRunServiceServer(s, sharedJobServer)
apiv2beta1.RegisterRunServiceServer(s, sharedRunServer)
apiv2beta1.RegisterReportServiceServer(s, sharedReportServer)

// Register reflection service on gRPC server.
reflection.Register(s)
Expand Down Expand Up @@ -175,6 +192,7 @@ func startHttpProxy(resourceManager *resource.ResourceManager) {
registerHttpHandlerFromEndpoint(apiv2beta1.RegisterPipelineServiceHandlerFromEndpoint, "PipelineService", ctx, runtimeMux)
registerHttpHandlerFromEndpoint(apiv2beta1.RegisterRecurringRunServiceHandlerFromEndpoint, "RecurringRunService", ctx, runtimeMux)
registerHttpHandlerFromEndpoint(apiv2beta1.RegisterRunServiceHandlerFromEndpoint, "RunService", ctx, runtimeMux)
registerHttpHandlerFromEndpoint(apiv2beta1.RegisterReportServiceHandlerFromEndpoint, "ReportService", ctx, runtimeMux)

// Create a top level mux to include both pipeline upload server and gRPC servers.
topMux := mux.NewRouter()
Expand Down
22 changes: 16 additions & 6 deletions backend/src/apiserver/resource/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,20 @@ func (r *ResourceManager) CreateRun(ctx context.Context, run *model.Run) (*model
return nil, util.NewInternalServerError(util.NewInvalidInputError("Namespace cannot be empty when creating an Argo workflow. Check if you have specified POD_NAMESPACE or try adding the parent namespace to the request"), "Failed to create a run due to empty namespace")
}
executionSpec.SetExecutionNamespace(k8sNamespace)

// assign OwnerReference to scheduledworkflow
if run.RecurringRunId != "" {
job, err := r.jobStore.GetJob(run.RecurringRunId)
if err != nil {
return nil, util.NewInternalServerError(util.NewInvalidInputError("RecurringRunId doesn't exist: %s", run.RecurringRunId), "Failed to create a run due to invalid recurring run id")
}
swf, err := r.swfClient.ScheduledWorkflow(job.Namespace).Get(ctx, job.K8SName, v1.GetOptions{})
if err != nil {
return nil, util.NewInternalServerError(util.NewInvalidInputError("ScheduledWorkflow doesn't exist: %s", job.K8SName), "Failed to create a run due to invalid name")
}
executionSpec.SetOwnerReferences(swf)
}

newExecSpec, err := r.getWorkflowClient(k8sNamespace).Create(ctx, executionSpec, v1.CreateOptions{})
if err != nil {
if err, ok := err.(net.Error); ok && err.Timeout() {
Expand Down Expand Up @@ -708,11 +722,7 @@ func (r *ResourceManager) ListJobs(filterContext *model.FilterContext, opts *lis

// Terminates a workflow by setting its activeDeadlineSeconds to 0.
func TerminateWorkflow(ctx context.Context, wfClient util.ExecutionInterface, name string) error {
patchObj := map[string]interface{}{
"spec": map[string]interface{}{
"activeDeadlineSeconds": 0,
},
}
patchObj := util.GetTerminatePatch(util.CurrentExecutionType())
patch, err := json.Marshal(patchObj)
if err != nil {
return util.NewInternalServerError(err, "Failed to terminate workflow %s due to error parsing the patch", name)
Expand Down Expand Up @@ -881,7 +891,7 @@ func (r *ResourceManager) readRunLogFromArchive(workflowManifest string, nodeId
return util.NewInternalServerError(util.NewInvalidInputError("Runtime workflow manifest cannot empty"), "Failed to read logs from archive %v due to empty runtime workflow manifest", nodeId)
}

execSpec, err := util.NewExecutionSpecJSON(util.ArgoWorkflow, []byte(workflowManifest))
execSpec, err := util.NewExecutionSpecJSON(util.CurrentExecutionType(), []byte(workflowManifest))
if err != nil {
return util.NewInternalServerError(err, "Failed to read logs from archive %v due error reading execution spec", nodeId)
}
Expand Down
2 changes: 1 addition & 1 deletion backend/src/apiserver/server/api_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ func toApiParametersV1(p string) []*apiv1beta1.Parameter {
if p == "" || p == "null" || p == "[]" {
return apiParams
}
params, err := util.UnmarshalParameters(util.ArgoWorkflow, p)
params, err := util.UnmarshalParameters(util.CurrentExecutionType(), p)
if err != nil {
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion backend/src/apiserver/server/report_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (s *ReportServer) ReportScheduledWorkflow(ctx context.Context,
}

func validateReportWorkflowRequest(wfManifest string) (*util.ExecutionSpec, error) {
execSpec, err := util.NewExecutionSpecJSON(util.ArgoWorkflow, []byte(wfManifest))
execSpec, err := util.NewExecutionSpecJSON(util.CurrentExecutionType(), []byte(wfManifest))
if err != nil {
return nil, util.NewInvalidInputError("Could not unmarshal workflow: %v: %v", err, wfManifest)
}
Expand Down
2 changes: 1 addition & 1 deletion backend/src/apiserver/template/argo_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (t *Argo) ParametersJSON() (string, error) {
if t == nil {
return "", nil
}
return util.MarshalParameters(util.ArgoWorkflow, t.wf.SpecParameters())
return util.MarshalParameters(util.CurrentExecutionType(), t.wf.SpecParameters())
}

func NewArgoTemplateFromWorkflow(wf *workflowapi.Workflow) (*Argo, error) {
Expand Down
4 changes: 3 additions & 1 deletion backend/src/apiserver/template/template_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,9 @@ func TestScheduledWorkflow(t *testing.T) {
Parameters: []scheduledworkflow.Parameter{{Name: "y", Value: "\"world\""}},
Spec: "",
},
NoCatchup: util.BoolPointer(true),
PipelineId: "1",
PipelineName: "pipeline name",
NoCatchup: util.BoolPointer(true),
},
}

Expand Down
Loading

0 comments on commit 60a443e

Please sign in to comment.