diff --git a/backend/src/v2/config/env.go b/backend/src/v2/config/env.go index b5f0fdc5469a..a0e419dab405 100644 --- a/backend/src/v2/config/env.go +++ b/backend/src/v2/config/env.go @@ -19,11 +19,12 @@ package config import ( "context" "fmt" - "github.com/kubeflow/pipelines/backend/src/v2/objectstore" "io/ioutil" - "sigs.k8s.io/yaml" "strings" + "github.com/kubeflow/pipelines/backend/src/v2/objectstore" + "sigs.k8s.io/yaml" + "github.com/golang/glog" k8errors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -123,8 +124,7 @@ type SecretRef struct { SecretKeyKey string `json:"secretKeyKey"` } -func (c *Config) GetBucketSessionInfo() (objectstore.SessionInfo, error) { - path := c.DefaultPipelineRoot() +func (c *Config) GetBucketSessionInfo(path string) (objectstore.SessionInfo, error) { bucketConfig, err := objectstore.ParseBucketPathToConfig(path) if err != nil { return objectstore.SessionInfo{}, err diff --git a/backend/src/v2/driver/driver.go b/backend/src/v2/driver/driver.go index 4f8615a2fbbd..1433cd33b49d 100644 --- a/backend/src/v2/driver/driver.go +++ b/backend/src/v2/driver/driver.go @@ -22,8 +22,6 @@ import ( "strings" "time" - "github.com/kubeflow/pipelines/backend/src/v2/objectstore" - "github.com/golang/glog" "github.com/golang/protobuf/ptypes/timestamp" "github.com/google/uuid" @@ -34,6 +32,7 @@ import ( "github.com/kubeflow/pipelines/backend/src/v2/config" "github.com/kubeflow/pipelines/backend/src/v2/expression" "github.com/kubeflow/pipelines/backend/src/v2/metadata" + "github.com/kubeflow/pipelines/backend/src/v2/objectstore" "github.com/kubeflow/pipelines/kubernetes_platform/go/kubernetesplatform" pb "github.com/kubeflow/pipelines/third_party/ml-metadata/go/ml_metadata" "google.golang.org/protobuf/encoding/protojson" @@ -134,28 +133,30 @@ func RootDAG(ctx context.Context, opts Options, mlmd *metadata.Client) (executio } // TODO(v2): in pipeline spec, rename GCS output directory to pipeline root. pipelineRoot := opts.RuntimeConfig.GetGcsOutputDirectory() + + restConfig, err := rest.InClusterConfig() + if err != nil { + return nil, fmt.Errorf("failed to initialize kubernetes client: %w", err) + } + k8sClient, err := kubernetes.NewForConfig(restConfig) + if err != nil { + return nil, fmt.Errorf("failed to initialize kubernetes client set: %w", err) + } + cfg, err := config.FromConfigMap(ctx, k8sClient, opts.Namespace) + if err != nil { + return nil, err + } + pipelineBucketSessionInfo := objectstore.SessionInfo{} if pipelineRoot != "" { glog.Infof("PipelineRoot=%q", pipelineRoot) } else { - restConfig, err := rest.InClusterConfig() - if err != nil { - return nil, fmt.Errorf("failed to initialize kubernetes client: %w", err) - } - k8sClient, err := kubernetes.NewForConfig(restConfig) - if err != nil { - return nil, fmt.Errorf("failed to initialize kubernetes client set: %w", err) - } - cfg, err := config.FromConfigMap(ctx, k8sClient, opts.Namespace) - if err != nil { - return nil, err - } pipelineRoot = cfg.DefaultPipelineRoot() glog.Infof("PipelineRoot=%q from default config", pipelineRoot) - pipelineBucketSessionInfo, err = cfg.GetBucketSessionInfo() - if err != nil { - return nil, err - } + } + pipelineBucketSessionInfo, err = cfg.GetBucketSessionInfo(pipelineRoot) + if err != nil { + return nil, err } bucketSessionInfo, err := json.Marshal(pipelineBucketSessionInfo) if err != nil {