Skip to content
Open

wip #4846

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 0 additions & 22 deletions charts/thin/templates/config/runtime.yaml

This file was deleted.

10 changes: 0 additions & 10 deletions charts/thin/templates/fuse/daemonset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,6 @@ spec:
- mountPath: /etc/fluid/config
name: thin-conf
readOnly: true
- mountPath: /etc/fluid/runtime
name: runtime
readOnly: true
{{- if .Values.fuse.cacheDir }}
- name: cache-dir
mountPath: {{ .Values.fuse.cacheDir }}
Expand Down Expand Up @@ -155,13 +152,6 @@ spec:
path: config.json
defaultMode: 0444
{{- end }}
- name: runtime
configMap:
name: {{ template "thin.fullname" . }}-runtimeset
items:
- key: runtime.json
path: runtime.json
defaultMode: 0444
{{- if .Values.fuse.volumes }}
{{ toYaml .Values.fuse.volumes | indent 8 }}
{{- end }}
Expand Down
2 changes: 2 additions & 0 deletions charts/thin/templates/worker/statefuleset.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{{ if .Values.worker.enabled -}}
apiVersion: apps/v1
kind: StatefulSet
metadata:
Expand Down Expand Up @@ -107,3 +108,4 @@ spec:
{{- if .Values.worker.volumes }}
{{ toYaml .Values.worker.volumes | indent 8 }}
{{- end }}
{{- end -}}
1 change: 1 addition & 0 deletions charts/thin/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ runtimeValue: ""

## WORKER ##
worker:
enabled: false
image: ""
imageTag: ""
imagePullPolicy: ""
Expand Down
13 changes: 2 additions & 11 deletions pkg/application/inject/fuse/mutator/mutator_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/go-logr/logr"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand Down Expand Up @@ -331,24 +330,16 @@ func (helper *defaultMutatorHelper) prepareFuseContainerPostStartScript() error
return err
}

ownerReference := metav1.OwnerReference{
APIVersion: dataset.APIVersion,
Kind: dataset.Kind,
Name: dataset.Name,
UID: dataset.UID,
}

// Fluid assumes pvc name is the same with runtime's name
gen := poststart.NewDefaultPostStartScriptGenerator()
cmKey := gen.GetConfigMapKeyByOwner(types.NamespacedName{Namespace: datasetNamespace, Name: datasetName}, template.FuseMountInfo.FsType)
cm := gen.BuildConfigMap(ownerReference, cmKey)

cmKey := gen.GetNamespacedConfigMapKey(types.NamespacedName{Namespace: datasetNamespace, Name: datasetName}, template.FuseMountInfo.FsType)
found, err := kubeclient.IsConfigMapExist(helper.client, cmKey.Name, cmKey.Namespace)
if err != nil {
return err
}

if !found {
cm := gen.BuildConfigMap(dataset, cmKey)
err = helper.client.Create(context.TODO(), cm)
if err != nil {
// If ConfigMap creation succeeds concurrently, continue to mutate
Expand Down
12 changes: 2 additions & 10 deletions pkg/application/inject/fuse/mutator/mutator_unprivileged.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)

Expand Down Expand Up @@ -130,17 +129,10 @@ func (mutator *unprivilegedMutatorHelper) prepareFuseContainerPostStartScript()
return err
}

ownerReference := metav1.OwnerReference{
APIVersion: dataset.APIVersion,
Kind: dataset.Kind,
Name: dataset.Name,
UID: dataset.UID,
}

// Fluid assumes pvc name is the same with runtime's name
gen := poststart.NewDefaultPostStartScriptGenerator()
cmKey := gen.GetConfigMapKeyByOwner(types.NamespacedName{Namespace: datasetNamespace, Name: datasetName}, template.FuseMountInfo.FsType)
cm := gen.BuildConfigMap(ownerReference, cmKey)
cmKey := gen.GetNamespacedConfigMapKey(types.NamespacedName{Namespace: datasetNamespace, Name: datasetName}, template.FuseMountInfo.FsType)
cm := gen.BuildConfigMap(dataset, cmKey)

found, err := kubeclient.IsConfigMapExist(mutator.client, cmKey.Name, cmKey.Namespace)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type defaultPostStartScriptGenerator struct {
func NewDefaultPostStartScriptGenerator() *defaultPostStartScriptGenerator {
return &defaultPostStartScriptGenerator{
scriptGeneratorHelper: scriptGeneratorHelper{
configMapName: "check-mount",
configMapName: "default-check-mount",
scriptFileName: "check-mount.sh",
scriptMountPath: "/check-mount.sh",
scriptContent: replacer.Replace(contentPrivilegedSidecar),
Expand Down
14 changes: 7 additions & 7 deletions pkg/application/inject/fuse/poststart/script_gen_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package poststart

import (
datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
"strings"

corev1 "k8s.io/api/core/v1"
Expand All @@ -35,27 +36,26 @@ type scriptGeneratorHelper struct {
scriptMountPath string
}

func (helper *scriptGeneratorHelper) BuildConfigMap(ownerReference metav1.OwnerReference, configMapKey types.NamespacedName) *corev1.ConfigMap {
func (helper *scriptGeneratorHelper) BuildConfigMap(dataset *datav1alpha1.Dataset, configMapKey types.NamespacedName) *corev1.ConfigMap {
data := map[string]string{}
data[helper.scriptFileName] = helper.scriptContent
// data[helper.scriptFileName] = replacer.Replace(helper.scriptContent)
return &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: configMapKey.Name,
Namespace: configMapKey.Namespace,
OwnerReferences: []metav1.OwnerReference{ownerReference},
Name: configMapKey.Name,
Namespace: configMapKey.Namespace,
Labels: map[string]string{
common.LabelAnnotationDatasetId: utils.GetDatasetId(configMapKey.Namespace, ownerReference.Name, string(ownerReference.UID)),
common.LabelAnnotationDatasetId: utils.GetDatasetId(configMapKey.Namespace, dataset.Name, string(dataset.UID)),
},
},
Data: data,
}
}

func (helper *scriptGeneratorHelper) GetConfigMapKeyByOwner(datasetKey types.NamespacedName, runtimeType string) types.NamespacedName {
func (helper *scriptGeneratorHelper) GetNamespacedConfigMapKey(datasetKey types.NamespacedName, runtimeType string) types.NamespacedName {
return types.NamespacedName{
Namespace: datasetKey.Namespace,
Name: datasetKey.Name + "-" + strings.ToLower(runtimeType) + "-" + helper.configMapName,
Name: strings.ToLower(runtimeType) + "-" + helper.configMapName,
}
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/common/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ const (
// "Sidecar": for only sidecar to skip check mount ready,
AnnotationSkipCheckMountReadyTarget = LabelAnnotationPrefix + "skip-check-mount-ready-target"

// AnnotationEnableRuntimeSetConfigMap
AnnotationEnableRuntimeSetConfig = "runtime." + LabelAnnotationPrefix + "enable-set-config"

// AnnotationEnableRuntimeSetConfigMap
AnnotationEnableRuntimeHelmValueConfig = "runtime." + LabelAnnotationPrefix + "enable-helm-value-config"

// LabelAnnotationMountingDatasets is a label/annotation key indicating which datasets are currently being used by a pod.
// i.e. fluid.io/datasets-in-use
LabelAnnotationDatasetsInUse = LabelAnnotationPrefix + "datasets-in-use"
Expand Down
39 changes: 0 additions & 39 deletions pkg/ddc/thin/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,6 @@ import (
)

func (t *ThinEngine) UpdateDatasetStatus(phase datav1alpha1.DatasetPhase) (err error) {
runtime, err := t.getRuntime()
if err != nil {
return err
}

// 2. update the dataset status
err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
dataset, err := utils.GetDataset(t.Client, t.name, t.namespace)
Expand Down Expand Up @@ -85,7 +80,6 @@ func (t *ThinEngine) UpdateDatasetStatus(phase datav1alpha1.DatasetPhase) (err e
cond)
}

datasetToUpdate.Status.CacheStates = runtime.Status.CacheStates
if !reflect.DeepEqual(dataset.Status, datasetToUpdate.Status) {
t.Log.V(1).Info("Update DatasetStatus", "dataset", fmt.Sprintf("%s/%s", datasetToUpdate.GetNamespace(), datasetToUpdate.GetName()))
err = t.Client.Status().Update(context.TODO(), datasetToUpdate)
Expand All @@ -108,39 +102,6 @@ func (t *ThinEngine) UpdateDatasetStatus(phase datav1alpha1.DatasetPhase) (err e
}

func (t *ThinEngine) UpdateCacheOfDataset() (err error) {
runtime, err := t.getRuntime()
if err != nil {
return err
}

// 2.update the dataset status
err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
dataset, err := utils.GetDataset(t.Client, t.name, t.namespace)
if err != nil {
return err
}
datasetToUpdate := dataset.DeepCopy()

datasetToUpdate.Status.CacheStates = runtime.Status.CacheStates

t.Log.Info("the dataset status", "status", datasetToUpdate.Status)

if !reflect.DeepEqual(dataset.Status, datasetToUpdate.Status) {
t.Log.V(1).Info("Update RuntimeStatus", "runtime", fmt.Sprintf("%s/%s", runtime.GetNamespace(), runtime.GetName()))
err = t.Client.Status().Update(context.TODO(), datasetToUpdate)
return err
} else {
t.Log.Info("No need to update the cache of the data")
}

return nil
})

if err != nil {
return utils.LoggingErrorExceptConflict(t.Log, err, "Failed to Update dataset",
types.NamespacedName{Namespace: t.namespace, Name: t.name})
}

return
}

Expand Down
112 changes: 2 additions & 110 deletions pkg/ddc/thin/health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,128 +17,20 @@
package thin

import (
"context"
"fmt"
"reflect"

data "github.com/fluid-cloudnative/fluid/api/v1alpha1"
"github.com/fluid-cloudnative/fluid/pkg/utils"
"github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/util/retry"
)

func (t ThinEngine) CheckRuntimeHealthy() (err error) {
// 1. Check the healthy of the workers
err = t.checkWorkersHealthy()
if err != nil {
t.Log.Error(err, "The workers are not healthy")
updateErr := t.UpdateDatasetStatus(data.FailedDatasetPhase)
if updateErr != nil {
t.Log.Error(updateErr, "Failed to update dataset")
}
return
}

// 2. Check the healthy of the fuse
// Check the healthy of the fuse
err = t.checkFuseHealthy()
if err != nil {
t.Log.Error(err, "The fuse is not healthy")
updateErr := t.UpdateDatasetStatus(data.FailedDatasetPhase)
if updateErr != nil {
t.Log.Error(updateErr, "Failed to update dataset")
}
t.Log.Error(err, "checkFuseHealthy failed")
return
}

updateErr := t.UpdateDatasetStatus(data.BoundDatasetPhase)
if updateErr != nil {
t.Log.Error(updateErr, "Failed to update dataset")
}

return
}

// checkWorkersHealthy check workers number changed
func (t *ThinEngine) checkWorkersHealthy() (err error) {
workerName := t.getWorkerName()

// Check the status of workers
workers, err := kubeclient.GetStatefulSet(t.Client, workerName, t.namespace)
if err != nil {
return err
}

healthy := false
err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {

runtime, err := t.getRuntime()
if err != nil {
return err
}

runtimeToUpdate := runtime.DeepCopy()
if workers.Status.ReadyReplicas == 0 && *workers.Spec.Replicas > 0 {
if len(runtimeToUpdate.Status.Conditions) == 0 {
runtimeToUpdate.Status.Conditions = []data.RuntimeCondition{}
}
cond := utils.NewRuntimeCondition(data.RuntimeWorkersReady, "The workers are not ready.",
fmt.Sprintf("The statefulset %s in %s are not ready, the Unavailable number is %d, please fix it.",
workers.Name,
workers.Namespace,
*workers.Spec.Replicas-workers.Status.ReadyReplicas), v1.ConditionFalse)

_, oldCond := utils.GetRuntimeCondition(runtimeToUpdate.Status.Conditions, cond.Type)

if oldCond == nil || oldCond.Type != cond.Type {
runtimeToUpdate.Status.Conditions =
utils.UpdateRuntimeCondition(runtimeToUpdate.Status.Conditions,
cond)
}

runtimeToUpdate.Status.WorkerPhase = data.RuntimePhaseNotReady

t.Log.Error(err, "the workers are not ready")
} else {
healthy = true
cond := utils.NewRuntimeCondition(data.RuntimeWorkersReady, "The workers are ready.",
"The workers are ready", v1.ConditionTrue)

_, oldCond := utils.GetRuntimeCondition(runtimeToUpdate.Status.Conditions, cond.Type)

if oldCond == nil || oldCond.Type != cond.Type {
runtimeToUpdate.Status.Conditions =
utils.UpdateRuntimeCondition(runtimeToUpdate.Status.Conditions,
cond)
}
}
runtimeToUpdate.Status.WorkerNumberReady = workers.Status.ReadyReplicas
runtimeToUpdate.Status.WorkerNumberAvailable = workers.Status.CurrentReplicas
if !reflect.DeepEqual(runtime.Status, runtimeToUpdate.Status) {
updateErr := t.Client.Status().Update(context.TODO(), runtimeToUpdate)
if updateErr != nil {
return updateErr
}
}

return err
})

if err != nil {
t.Log.Error(err, "Failed update runtime")
return err
}

if !healthy {
err = fmt.Errorf("the workers %s in %s are not ready, the unhealthy number %d",
workers.Name,
workers.Namespace,
*workers.Spec.Replicas-workers.Status.ReadyReplicas)
}

return err
}

// checkFuseHealthy check fuses number changed
func (t *ThinEngine) checkFuseHealthy() error {
return retry.RetryOnConflict(retry.DefaultBackoff, func() (err error) {
Expand Down
Loading
Loading