diff --git a/README.md b/README.md index 94218b0..8cc6394 100644 --- a/README.md +++ b/README.md @@ -57,11 +57,85 @@ spec: - --configmap-name=thanos-receive - --configmap-generated-name=thanos-receive-generated - --file-name=hashrings.json - image: quat.io/observatorium/thanos-receive-controller + image: quay.io/observatorium/thanos-receive-controller name: thanos-receive-controller +EOF ``` -Finally, deploy StatefulSets of Thanos receivers labeled with `controller.receive.thanos.io=thanos-receive-controller`. -The controller lists all of the StatefulSets with that label and matches the value of their `controller.receive.thanos.io/hashring` labels to the hashring names in the configuration file. +Finally, deploy StatefulSets of Thanos receivers labeled with `controller.receive.thanos.io=thanos-receive-controller`, and with the hashring name in the `controller.receive.thanos.io/hashring` label, e.g.: + +```shell +cat <<'EOF' | kubectl apply -f - +apiVersion: apps/v1 +kind: StatefulSet +metadata: + labels: + app.kubernetes.io/instance: hashring0 + app.kubernetes.io/name: thanos-receive + controller.receive.thanos.io: thanos-receive-controller + controller.receive.thanos.io/hashring: hashring0 + name: thanos-receive-hashring0 +spec: + replicas: 3 + selector: + matchLabels: + app.kubernetes.io/instance: hashring0 + app.kubernetes.io/name: thanos-receive + serviceName: thanos-receive-hashring0 + template: + metadata: + labels: + app.kubernetes.io/instance: hashring0 + app.kubernetes.io/name: thanos-receive + spec: + containers: + - args: + - receive + - --grpc-address=0.0.0.0:10901 + - --http-address=0.0.0.0:10902 + - --remote-write.address=0.0.0.0:19291 + - --tsdb.path=/var/thanos/receive + - --label=replica="$(NAME)" + - --label=receive="true" + - --tsdb.retention=6h + - --receive.hashrings-file=/var/lib/thanos-receive/hashrings.json + - --receive.local-endpoint=$(NAME).thanos-receive-hashring0.$(NAMESPACE).svc.cluster.local:10901 + env: + - name: NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + image: quay.io/thanos/thanos + name: thanos-receive + ports: + - containerPort: 10901 + name: grpc + - containerPort: 10902 + name: http + - containerPort: 19291 + name: remote-write + volumes: + - configMap: + name: observatorium-tenants-generated + name: observatorium-tenants +``` + +The controller lists all of the StatefulSets with the `controller.receive.thanos.io=thanos-receive-controller` label and matches the value of their `controller.receive.thanos.io/hashring` labels to the hashring names in the configuration file. The endpoints for each hashring will be populated automatically by the controller and the complete configuration file will be placed in a ConfigMap named `thanos-receive-generated`. This configuration should be consumed as a ConfigMap volume by the Thanos receivers. + +## Advanced + +Thanos receivers can handle potentially private data for various tenants. +When a Thanos receiver Pod is deleted, or the StatefulSet is otherwise scaled down, PersistentVolumes holding this potentially sensitive data may be left in the cluster. +In order to ensure that the PersistentVolume used by a Thanos receiver can be safely reused, the Thanos Receive Controller will automatically launch a short-lived Job that mounts these PersistentVolumes and cleans them up. +The cleanup process consists of: +1. ensuring any leftover TSDB blocks are backed-up to object storage by running a [thanos-replicate](https://github.com/observatorium/thanos-replicate/) container for each PersistentVolume; +in order to run this container, the Thanos Receive Controller expects the Thanos receiver StatefulSet to be labeled with `controller.receive.thanos.io/objstore-secret`, pointing to a Secret containing the Thanos object storage configuration file, and `controller.receive.thanos.io/objstore-secret-key`, specifying which key in the Secret holds the file; +additional environment variables for the `thanos-replicate` container, e.g. `AWS_SECRET_ACCESS_KEY`, can be provided by adding those variables as keys in a Secret and specifying that Secret's name in the `controller.receive.thanos.io/env-var-secret` label; +if any of the replication processes fails to run, the cleanup process is aborted and is retried after some backoff +1. removing all data in the PersistentVolumes by running a container that mounts all of the PersistentVolumes and runs `rm rf` on each mount. diff --git a/main.go b/main.go index ead37f9..134fbfe 100644 --- a/main.go +++ b/main.go @@ -12,6 +12,7 @@ import ( "net/http/pprof" "os" "os/signal" + "path/filepath" "strings" "sync" "syscall" @@ -25,32 +26,40 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" kerrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/wait" appsinformers "k8s.io/client-go/informers/apps/v1" coreinformers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/util/workqueue" ) -type label = string - const ( resyncPeriod = 5 * time.Minute + pollPeriod = 1 * time.Second + pollTimeout = 5 * time.Minute internalServerShutdownTimeout = time.Second hashringLabelKey = "controller.receive.thanos.io/hashring" + objstoreSecretLabelKey = "controller.receive.thanos.io/objstore-secret" + objstoreSecretKeyLabelKey = "controller.receive.thanos.io/objstore-secret-key" + envVarSecretLabelKey = "controller.receive.thanos.io/env-var-secret" + fromMountPath = "/pvc/from" // Metric label values - fetch label = "fetch" - decode label = "decode" - save label = "save" - create label = "create" - update label = "update" - other label = "other" + fetchLabel = "fetch" + decodeLabel = "decode" + saveLabel = "save" + createLabel = "create" + deleteLabel = "delete" + updateLabel = "update" + otherLabel = "other" ) func main() { @@ -65,6 +74,8 @@ func main() { Port int Scheme string InternalAddr string + CleanupImage string + CleanUp bool }{} flag.StringVar(&config.KubeConfig, "kubeconfig", "", "Path to kubeconfig") @@ -77,6 +88,8 @@ func main() { flag.IntVar(&config.Port, "port", 10901, "The port on which receive components are listening for write requests") flag.StringVar(&config.Scheme, "scheme", "http", "The URL scheme on which receive components accept write requests") flag.StringVar(&config.InternalAddr, "internal-addr", ":8080", "The address on which internal server runs") + flag.StringVar(&config.CleanupImage, "cleanup-image", "quay.io/observatorium/thanos-replicate", "The container image to use for cleanup operations") + flag.BoolVar(&config.CleanUp, "cleanup", true, "Should the controller clean up PVCs?") flag.Parse() logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) @@ -131,6 +144,8 @@ func main() { scheme: config.Scheme, labelKey: labelKey, labelValue: labelValue, + cleanupImage: config.CleanupImage, + cleanUp: config.CleanUp, } c := newController(klient, logger, opt) c.registerMetrics(reg) @@ -297,19 +312,25 @@ type options struct { scheme string labelKey string labelValue string + cleanupImage string + cleanUp bool } type controller struct { options *options - queue *queue + podQ workqueue.RateLimitingInterface + stsQ *queue logger log.Logger klient kubernetes.Interface cmapInf cache.SharedIndexInformer + podInf cache.SharedIndexInformer ssetInf cache.SharedIndexInformer reconcileAttempts prometheus.Counter reconcileErrors *prometheus.CounterVec + cleanupAttempts prometheus.Counter + cleanupErrors *prometheus.CounterVec configmapChangeAttempts prometheus.Counter configmapChangeErrors *prometheus.CounterVec configmapHash prometheus.Gauge @@ -325,11 +346,13 @@ func newController(klient kubernetes.Interface, logger log.Logger, o *options) * return &controller{ options: o, - queue: newQueue(), + podQ: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Pod"), + stsQ: newQueue(), logger: logger, klient: klient, cmapInf: coreinformers.NewConfigMapInformer(klient, o.namespace, resyncPeriod, nil), + podInf: coreinformers.NewPodInformer(klient, o.namespace, resyncPeriod, nil), ssetInf: appsinformers.NewFilteredStatefulSetInformer(klient, o.namespace, resyncPeriod, nil, func(lo *v1.ListOptions) { lo.LabelSelector = labels.Set{o.labelKey: o.labelValue}.String() }), @@ -344,6 +367,16 @@ func newController(klient kubernetes.Interface, logger log.Logger, o *options) * }, []string{"type"}, ), + cleanupAttempts: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "thanos_receive_controller_cleanup_attempts_total", + Help: "Total number of cleanups.", + }), + cleanupErrors: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_receive_controller_cleanup_errors_total", + Help: "Total number of cleanup errors.", + }, + []string{"type"}, + ), configmapChangeAttempts: prometheus.NewCounter(prometheus.CounterOpts{ Name: "thanos_receive_controller_configmap_change_attempts_total", Help: "Total number of configmap change attempts.", @@ -384,13 +417,17 @@ func newController(klient kubernetes.Interface, logger log.Logger, o *options) * func (c *controller) registerMetrics(reg *prometheus.Registry) { if reg != nil { c.reconcileAttempts.Add(0) - c.reconcileErrors.WithLabelValues(fetch).Add(0) - c.reconcileErrors.WithLabelValues(decode).Add(0) - c.reconcileErrors.WithLabelValues(save).Add(0) + c.reconcileErrors.WithLabelValues(fetchLabel).Add(0) + c.reconcileErrors.WithLabelValues(decodeLabel).Add(0) + c.reconcileErrors.WithLabelValues(saveLabel).Add(0) c.configmapChangeAttempts.Add(0) - c.configmapChangeErrors.WithLabelValues(create).Add(0) - c.configmapChangeErrors.WithLabelValues(update).Add(0) - c.configmapChangeErrors.WithLabelValues(other).Add(0) + c.configmapChangeErrors.WithLabelValues(createLabel).Add(0) + c.configmapChangeErrors.WithLabelValues(updateLabel).Add(0) + c.configmapChangeErrors.WithLabelValues(otherLabel).Add(0) + c.cleanupErrors.WithLabelValues(createLabel).Add(0) + c.cleanupErrors.WithLabelValues(decodeLabel).Add(0) + c.cleanupErrors.WithLabelValues(deleteLabel).Add(0) + c.cleanupErrors.WithLabelValues(fetchLabel).Add(0) reg.MustRegister( c.reconcileAttempts, c.reconcileErrors, @@ -400,29 +437,41 @@ func (c *controller) registerMetrics(reg *prometheus.Registry) { c.configmapLastSuccessfulChangeTime, c.hashringNodes, c.hashringTenants, + c.cleanupErrors, ) } } func (c *controller) run(stop <-chan struct{}) error { - defer c.queue.stop() + defer c.podQ.ShutDown() + defer c.stsQ.stop() go c.cmapInf.Run(stop) go c.ssetInf.Run(stop) + if c.options.cleanUp { + go c.podInf.Run(stop) + } if err := c.waitForCacheSync(stop); err != nil { return err } c.cmapInf.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(_ interface{}) { c.queue.add() }, - DeleteFunc: func(_ interface{}) { c.queue.add() }, - UpdateFunc: func(_, _ interface{}) { c.queue.add() }, + AddFunc: func(_ interface{}) { c.stsQ.add() }, + DeleteFunc: func(_ interface{}) { c.stsQ.add() }, + UpdateFunc: func(_, _ interface{}) { c.stsQ.add() }, }) c.ssetInf.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(_ interface{}) { c.queue.add() }, - DeleteFunc: func(_ interface{}) { c.queue.add() }, - UpdateFunc: func(_, _ interface{}) { c.queue.add() }, + AddFunc: func(_ interface{}) { c.stsQ.add() }, + DeleteFunc: func(_ interface{}) { c.stsQ.add() }, + UpdateFunc: func(_, _ interface{}) { c.stsQ.add() }, }) - go c.worker() + go c.stsWorker() + + if c.options.cleanUp { + c.podInf.AddEventHandler(cache.ResourceEventHandlerFuncs{ + DeleteFunc: func(obj interface{}) { c.podQ.Add(obj) }, + }) + go c.podWorker() + } <-stop return nil @@ -437,6 +486,10 @@ func (c *controller) waitForCacheSync(stop <-chan struct{}) error { }{ {"ConfigMap", c.cmapInf}, {"StatefulSet", c.ssetInf}, + {"Pod", c.podInf}, + } + if !c.options.cleanUp { + informers = informers[:1] } for _, inf := range informers { if !cache.WaitForCacheSync(stop, inf.informer.HasSynced) { @@ -453,17 +506,217 @@ func (c *controller) waitForCacheSync(stop <-chan struct{}) error { return nil } -func (c *controller) worker() { - for c.queue.get() { +func (c *controller) podWorker() { + fn := func() bool { + key, quit := c.podQ.Get() + if quit { + return false + } + defer c.podQ.Done(key) + if err := c.cleanUp(key.(*corev1.Pod)); err != nil { + level.Error(c.logger).Log("msg", "unable to clean up PVC", "err", err) + c.podQ.AddRateLimited(key) + return true + } + c.podQ.Forget(key) + return true + } + for fn() { + } +} + +func (c *controller) stsWorker() { + for c.stsQ.get() { c.sync() } } +func (c *controller) resolvePodOwnerRef(namespace string, refs []metav1.OwnerReference) (*appsv1.StatefulSet, error) { + for _, ref := range refs { + // If the owner reference points at the wrong kind of object, skip. + if ref.Kind != "StatefulSet" { + continue + } + // If the owner reference points at something that we don't have, then skip. + obj, ok, err := c.ssetInf.GetStore().GetByKey(fmt.Sprintf("%s/%s", namespace, ref.Name)) + if !ok { + continue + } + if err != nil { + return nil, err + } + sts := obj.(*appsv1.StatefulSet) + if sts.UID != ref.UID { + return nil, errors.Wrap(err, "owner reference UID does not match StatefulSet") + } + return sts, nil + } + return nil, nil +} + +func (c *controller) generateHelper(name string, sts *appsv1.StatefulSet) *batchv1.Job { + initContainerTemplate := corev1.Container{ + Name: "replicate", + Image: c.options.cleanupImage, + ImagePullPolicy: corev1.PullIfNotPresent, + Args: []string{ + "run", + "--single-run", + "--objstorefrom.config=$(OBJSTORE_CONFIG_FROM)", + "--objstoreto.config=$(OBJSTORE_CONFIG_TO)", + "--log.level=debug", + }, + Env: []corev1.EnvVar{ + { + Name: "OBJSTORE_CONFIG_FROM", + Value: "type: FILESYSTEM\nconfig:\n directory: " + fromMountPath, + }, + { + Name: "OBJSTORE_CONFIG_TO", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: sts.Labels[objstoreSecretLabelKey], + }, + Key: sts.Labels[objstoreSecretKeyLabelKey], + }, + }, + }, + }, + } + // Inject extra environment variables into the cleanup Pod if provided. + if _, ok := sts.Labels[envVarSecretLabelKey]; ok { + initContainerTemplate.EnvFrom = []corev1.EnvFromSource{ + { + SecretRef: &corev1.SecretEnvSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: sts.Labels[envVarSecretLabelKey], + }, + }, + }, + } + } + + helper := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cleanup-" + name, + }, + Spec: batchv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + RestartPolicy: corev1.RestartPolicyNever, + InitContainers: make([]corev1.Container, len(sts.Spec.VolumeClaimTemplates)), + Containers: []corev1.Container{ + { + Name: "cleanup", + Image: c.options.cleanupImage, + Command: []string{"rm", "-rf"}, + ImagePullPolicy: corev1.PullIfNotPresent, + VolumeMounts: make([]corev1.VolumeMount, len(sts.Spec.VolumeClaimTemplates)), + }, + }, + Volumes: make([]corev1.Volume, len(sts.Spec.VolumeClaimTemplates)), + }, + }, + }, + } + var v corev1.Volume + var vname, mountPath string + for i, t := range sts.Spec.VolumeClaimTemplates { + // Create a new Volu,e for this template. + vname = fmt.Sprintf("%s-%s", t.Name, name) + v = corev1.Volume{ + Name: vname, + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: vname, + }, + }, + } + helper.Spec.Template.Spec.Volumes[i] = v + // Create an init container to replicate this Volume. + helper.Spec.Template.Spec.InitContainers[i] = *initContainerTemplate.DeepCopy() + helper.Spec.Template.Spec.InitContainers[i].VolumeMounts = []corev1.VolumeMount{{ + Name: v.Name, + MountPath: fromMountPath, + }} + // Add this Volume to the container that removes files. + mountPath = filepath.Join("/pvc", vname) + helper.Spec.Template.Spec.Containers[0].VolumeMounts[i] = corev1.VolumeMount{ + Name: v.Name, + MountPath: mountPath, + } + helper.Spec.Template.Spec.Containers[0].Command = append(helper.Spec.Template.Spec.Containers[0].Command, filepath.Join(mountPath, "*")) + } + + return helper +} + +func (c *controller) cleanUp(pod *corev1.Pod) error { + c.cleanupAttempts.Inc() + sts, err := c.resolvePodOwnerRef(pod.Namespace, pod.GetOwnerReferences()) + if err != nil { + c.cleanupErrors.WithLabelValues(fetchLabel).Inc() + return errors.Wrap(err, "could not get StatefulSet") + } + // This probably means that the Pod did not belong to a StatefulSet with + // our label selector, i.e. not a StatefulSet we are watching. + if sts == nil { + return nil + } + // Nothing to clean up. + if len(sts.Spec.VolumeClaimTemplates) == 0 { + return nil + } + + _, secretOK := sts.Labels[objstoreSecretLabelKey] + _, keyOK := sts.Labels[objstoreSecretKeyLabelKey] + if !secretOK || !keyOK { + c.cleanupErrors.WithLabelValues(decodeLabel).Inc() + return fmt.Errorf("StatefulSet %s/%s is missing either the %s or %s label", sts.Namespace, sts.Name, objstoreSecretLabelKey, objstoreSecretKeyLabelKey) + } + helper := c.generateHelper(pod.Name, sts) + + if _, err := c.klient.BatchV1().Jobs(pod.Namespace).Create(helper); err != nil && !kerrors.IsAlreadyExists(err) { + c.cleanupErrors.WithLabelValues(createLabel).Inc() + return errors.Wrap(err, "unable to create the cleanup Pod") + } + + defer func() { + policy := metav1.DeletePropagationForeground + err := c.klient.BatchV1().Jobs(pod.Namespace).Delete(helper.Name, &metav1.DeleteOptions{PropagationPolicy: &policy}) + if err != nil { + level.Error(c.logger).Log("msg", "unable to delete the cleanup Job", "err", err) + c.cleanupErrors.WithLabelValues(deleteLabel).Inc() + } + }() + + ctx, cancel := context.WithTimeout(context.Background(), pollTimeout) + defer cancel() + + err = wait.PollUntil(pollPeriod, + func() (bool, error) { + if j, err := c.klient.BatchV1().Jobs(pod.Namespace).Get(helper.Name, v1.GetOptions{}); err != nil { + return false, err + } else if j.Status.Succeeded == 1 { + return true, nil + } + return false, nil + }, + ctx.Done()) + if err != nil { + c.cleanupErrors.WithLabelValues(fetchLabel).Inc() + return errors.Wrap(err, "clean up PersistentVolumeClaim") + } + + return nil +} + func (c *controller) sync() { c.reconcileAttempts.Inc() configMap, ok, err := c.cmapInf.GetStore().GetByKey(fmt.Sprintf("%s/%s", c.options.namespace, c.options.configMapName)) if !ok || err != nil { - c.reconcileErrors.WithLabelValues(fetch).Inc() + c.reconcileErrors.WithLabelValues(fetchLabel).Inc() level.Warn(c.logger).Log("msg", "could not fetch ConfigMap", "err", err, "name", c.options.configMapName) return } @@ -471,7 +724,7 @@ func (c *controller) sync() { var hashrings []receive.HashringConfig if err := json.Unmarshal([]byte(cm.Data[c.options.fileName]), &hashrings); err != nil { - c.reconcileErrors.WithLabelValues(decode).Inc() + c.reconcileErrors.WithLabelValues(decodeLabel).Inc() level.Warn(c.logger).Log("msg", "failed to decode configuration", "err", err) return } @@ -488,7 +741,7 @@ func (c *controller) sync() { c.populate(hashrings, statefulsets) if err := c.saveHashring(hashrings); err != nil { - c.reconcileErrors.WithLabelValues(save).Inc() + c.reconcileErrors.WithLabelValues(saveLabel).Inc() level.Error(c.logger).Log("msg", "failed to save hashrings") } } @@ -539,14 +792,14 @@ func (c *controller) saveHashring(hashring []receive.HashringConfig) error { if kerrors.IsNotFound(err) { _, err = c.klient.CoreV1().ConfigMaps(c.options.namespace).Create(cm) if err != nil { - c.configmapChangeErrors.WithLabelValues(create).Inc() + c.configmapChangeErrors.WithLabelValues(createLabel).Inc() return err } c.configmapLastSuccessfulChangeTime.Set(float64(time.Now().Unix())) return nil } if err != nil { - c.configmapChangeErrors.WithLabelValues(other).Inc() + c.configmapChangeErrors.WithLabelValues(otherLabel).Inc() return err } @@ -556,7 +809,7 @@ func (c *controller) saveHashring(hashring []receive.HashringConfig) error { _, err = c.klient.CoreV1().ConfigMaps(c.options.namespace).Update(cm) if err != nil { - c.configmapChangeErrors.WithLabelValues(update).Inc() + c.configmapChangeErrors.WithLabelValues(updateLabel).Inc() return err }