Skip to content

Commit

Permalink
feat(cnpg-i): support remote plugins (cloudnative-pg#4970)
Browse files Browse the repository at this point in the history
This patch allows the operator to work with remote CNPG-i plugins that
are expected to be deployed in a different Pod in the same namespace of
the operator.

The discovery of plugins is based on a set of annotations and labels to
be added to services to be used as a plugin.

The plugin developer or distributor is expected to provide a set of
credentials that the operator will use to authenticate to the plugin via
mTLS and vice-versa.

Signed-off-by: Leonardo Cecchi <leonardo.cecchi@enterprisedb.com>
Signed-off-by: Armando Ruocco <armando.ruocco@enterprisedb.com>
Signed-off-by: Marco Nenciarini <marco.nenciarini@enterprisedb.com>
Co-authored-by: Armando Ruocco <armando.ruocco@enterprisedb.com>
Co-authored-by: Marco Nenciarini <marco.nenciarini@enterprisedb.com>
  • Loading branch information
3 people authored Jul 10, 2024
1 parent 07def5f commit a0648d4
Show file tree
Hide file tree
Showing 36 changed files with 1,305 additions and 462 deletions.
69 changes: 0 additions & 69 deletions api/v1/cluster_plugins.go

This file was deleted.

19 changes: 19 additions & 0 deletions api/v1/cluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,16 @@ type ClusterSpec struct {
// configuration parameters
type PluginConfigurationList []PluginConfiguration

// GetNames gets the name of the plugins that are involved
// in the reconciliation of this cluster
func (pluginList PluginConfigurationList) GetNames() (result []string) {
pluginNames := make([]string, len(pluginList))
for i, pluginDeclaration := range pluginList {
pluginNames[i] = pluginDeclaration.Name
}
return pluginNames
}

const (
// PhaseSwitchover when a cluster is changing the primary node
PhaseSwitchover = "Switchover in progress"
Expand Down Expand Up @@ -551,6 +561,10 @@ const (
// PhaseHealthy for a cluster doing nothing
PhaseHealthy = "Cluster in healthy state"

// PhaseUnknownPlugin is triggered when the required CNPG-i plugin have not been
// loaded still
PhaseUnknownPlugin = "Unknown plugin"

// PhaseImageCatalogError is triggered when the cluster cannot select the image to
// apply because of an invalid or incomplete catalog
PhaseImageCatalogError = "Cluster has incomplete or invalid image catalog"
Expand Down Expand Up @@ -2781,6 +2795,11 @@ func (secretResourceVersion *SecretsResourceVersion) SetExternalClusterSecretVer
secretResourceVersion.ExternalClusterSecretVersions[secretName] = *version
}

// SetInContext records the cluster in the given context
func (cluster *Cluster) SetInContext(ctx context.Context) context.Context {
return context.WithValue(ctx, utils.ContextKeyCluster, cluster)
}

// GetImageName get the name of the image that should be used
// to create the pods
func (cluster *Cluster) GetImageName() string {
Expand Down
60 changes: 0 additions & 60 deletions api/v1/cluster_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package v1

import (
"context"
"encoding/json"
"fmt"
"slices"
Expand Down Expand Up @@ -164,27 +163,6 @@ func (r *Cluster) setDefaults(preserveUserSettings bool) {
if len(r.Spec.Tablespaces) > 0 {
r.defaultTablespaces()
}

ctx := context.Background()

// Call the plugins to help with defaulting this cluster
contextLogger := log.FromContext(ctx)
pluginClient, err := r.LoadPluginClient(ctx)
if err != nil {
contextLogger.Error(err, "Error invoking plugin in the defaulting webhook, skipping")
return
}
defer func() {
pluginClient.Close(ctx)
}()

var mutatedCluster Cluster
if err := pluginClient.MutateCluster(ctx, r, &mutatedCluster); err != nil {
contextLogger.Error(err, "Error invoking plugin in the defaulting webhook, skipping")
return
}

mutatedCluster.DeepCopyInto(r)
}

// defaultTablespaces adds the tablespace owner where the
Expand Down Expand Up @@ -309,25 +287,6 @@ var _ webhook.Validator = &Cluster{}
func (r *Cluster) ValidateCreate() (admission.Warnings, error) {
clusterLog.Info("validate create", "name", r.Name, "namespace", r.Namespace)
allErrs := r.Validate()

// Call the plugins to help validating this cluster creation
ctx := context.Background()
contextLogger := log.FromContext(ctx)
pluginClient, err := r.LoadPluginClient(ctx)
if err != nil {
contextLogger.Error(err, "Error invoking plugin in the validate/create webhook")
return nil, err
}
defer func() {
pluginClient.Close(ctx)
}()

pluginValidationResult, err := pluginClient.ValidateClusterCreate(ctx, r)
if err != nil {
contextLogger.Error(err, "Error invoking plugin in the validate/update webhook")
return nil, err
}
allErrs = append(allErrs, pluginValidationResult...)
allWarnings := r.getAdmissionWarnings()

if len(allErrs) == 0 {
Expand Down Expand Up @@ -403,25 +362,6 @@ func (r *Cluster) ValidateUpdate(old runtime.Object) (admission.Warnings, error)
r.ValidateChanges(oldCluster)...,
)

// Call the plugins to help validating this cluster update
ctx := context.Background()
contextLogger := log.FromContext(ctx)
pluginClient, err := r.LoadPluginClient(ctx)
if err != nil {
contextLogger.Error(err, "Error invoking plugin in the validate/create webhook")
return nil, err
}
defer func() {
pluginClient.Close(ctx)
}()

pluginValidationResult, err := pluginClient.ValidateClusterUpdate(ctx, oldCluster, r)
if err != nil {
contextLogger.Error(err, "Error invoking plugin in the validate/update webhook")
return nil, err
}
allErrs = append(allErrs, pluginValidationResult...)

if len(allErrs) == 0 {
return r.getAdmissionWarnings(), nil
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0
github.com/jackc/pgx/v5 v5.6.0
github.com/jackc/puddle/v2 v2.2.1
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51
github.com/kubernetes-csi/external-snapshotter/client/v8 v8.0.0
github.com/lib/pq v1.10.9
Expand Down Expand Up @@ -77,7 +78,6 @@ require (
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de // indirect
Expand Down
28 changes: 26 additions & 2 deletions internal/cmd/manager/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

// +kubebuilder:scaffold:imports
apiv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
"github.com/cloudnative-pg/cloudnative-pg/internal/cnpi/plugin/repository"
"github.com/cloudnative-pg/cloudnative-pg/internal/configuration"
"github.com/cloudnative-pg/cloudnative-pg/internal/controller"
schemeBuilder "github.com/cloudnative-pg/cloudnative-pg/internal/scheme"
Expand Down Expand Up @@ -216,16 +217,39 @@ func RunController(
return err
}

if err = controller.NewClusterReconciler(mgr, discoveryClient).SetupWithManager(ctx, mgr); err != nil {
pluginRepository := repository.New()
if err := pluginRepository.RegisterUnixSocketPluginsInPath(
configuration.Current.PluginSocketDir,
); err != nil {
setupLog.Error(err, "Unable to load sidecar CNPG-i plugins, skipping")
}

if err = controller.NewClusterReconciler(
mgr,
discoveryClient,
pluginRepository,
).SetupWithManager(ctx, mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Cluster")
return err
}

if err = controller.NewBackupReconciler(mgr, discoveryClient).SetupWithManager(ctx, mgr); err != nil {
if err = controller.NewBackupReconciler(
mgr,
discoveryClient,
pluginRepository,
).SetupWithManager(ctx, mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Backup")
return err
}

if err = controller.NewPluginReconciler(
mgr,
pluginRepository,
).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Plugin")
return err
}

if err = (&controller.ScheduledBackupReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Expand Down
17 changes: 13 additions & 4 deletions internal/cmd/manager/walarchive/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

apiv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
pluginClient "github.com/cloudnative-pg/cloudnative-pg/internal/cnpi/plugin/client"
"github.com/cloudnative-pg/cloudnative-pg/internal/cnpi/plugin/repository"
"github.com/cloudnative-pg/cloudnative-pg/internal/configuration"
"github.com/cloudnative-pg/cloudnative-pg/internal/management/cache"
cacheClient "github.com/cloudnative-pg/cloudnative-pg/internal/management/cache/client"
"github.com/cloudnative-pg/cloudnative-pg/pkg/conditions"
Expand Down Expand Up @@ -250,14 +253,20 @@ func archiveWALViaPlugins(
) error {
contextLogger := log.FromContext(ctx)

pluginClient, err := cluster.LoadSelectedPluginsClient(ctx, cluster.GetWALPluginNames())
plugins := repository.New()
if err := plugins.RegisterUnixSocketPluginsInPath(configuration.Current.PluginSocketDir); err != nil {
contextLogger.Error(err, "Error while loading local plugins")
}
defer plugins.Close()

client, err := pluginClient.WithPlugins(ctx, plugins, cluster.Spec.Plugins.GetNames()...)
if err != nil {
contextLogger.Error(err, "Error loading plugins while archiving a WAL")
contextLogger.Error(err, "Error while loading required plugins")
return err
}
defer pluginClient.Close(ctx)
defer client.Close(ctx)

return pluginClient.ArchiveWAL(ctx, cluster, walName)
return client.ArchiveWAL(ctx, cluster, walName)
}

// gatherWALFilesToArchive reads from the archived status the list of WAL files
Expand Down
17 changes: 13 additions & 4 deletions internal/cmd/manager/walrestore/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ import (
"github.com/spf13/cobra"

apiv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
pluginClient "github.com/cloudnative-pg/cloudnative-pg/internal/cnpi/plugin/client"
"github.com/cloudnative-pg/cloudnative-pg/internal/cnpi/plugin/repository"
"github.com/cloudnative-pg/cloudnative-pg/internal/configuration"
"github.com/cloudnative-pg/cloudnative-pg/internal/management/cache"
cacheClient "github.com/cloudnative-pg/cloudnative-pg/internal/management/cache/client"
"github.com/cloudnative-pg/cloudnative-pg/pkg/management/barman"
Expand Down Expand Up @@ -243,14 +246,20 @@ func restoreWALViaPlugins(
) error {
contextLogger := log.FromContext(ctx)

pluginClient, err := cluster.LoadSelectedPluginsClient(ctx, cluster.GetWALPluginNames())
plugins := repository.New()
if err := plugins.RegisterUnixSocketPluginsInPath(configuration.Current.PluginSocketDir); err != nil {
contextLogger.Error(err, "Error while loading local plugins")
}
defer plugins.Close()

client, err := pluginClient.WithPlugins(ctx, plugins, cluster.Spec.Plugins.GetNames()...)
if err != nil {
contextLogger.Error(err, "Error loading plugins while archiving a WAL")
contextLogger.Error(err, "Error while loading required plugins")
return err
}
defer pluginClient.Close(ctx)
defer client.Close(ctx)

return pluginClient.RestoreWAL(ctx, cluster, walName, destinationPathName)
return client.RestoreWAL(ctx, cluster, walName, destinationPathName)
}

// checkEndOfWALStreamFlag returns ErrEndOfWALStreamReached if the flag is set in the restorer
Expand Down
6 changes: 3 additions & 3 deletions internal/cnpi/plugin/client/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,11 @@ func (data *data) Backup(
return nil, err
}

if !slices.Contains(plugin.capabilities, identity.PluginCapability_Service_TYPE_BACKUP_SERVICE) {
if !slices.Contains(plugin.PluginCapabilities(), identity.PluginCapability_Service_TYPE_BACKUP_SERVICE) {
return nil, ErrPluginNotSupportBackup
}

if !slices.Contains(plugin.backupCapabilities, backup.BackupCapability_RPC_TYPE_BACKUP) {
if !slices.Contains(plugin.BackupCapabilities(), backup.BackupCapability_RPC_TYPE_BACKUP) {
return nil, ErrPluginNotSupportBackupEndpoint
}

Expand All @@ -144,7 +144,7 @@ func (data *data) Backup(
"clusterDefinition", request.ClusterDefinition,
"parameters", parameters)

result, err := plugin.backupClient.Backup(ctx, &request)
result, err := plugin.BackupClient().Backup(ctx, &request)
if err != nil {
contextLogger.Error(err, "Error while calling Backup, failing")
return nil, err
Expand Down
Loading

0 comments on commit a0648d4

Please sign in to comment.