Skip to content

Commit

Permalink
provider and consumer pod use different label
Browse files Browse the repository at this point in the history
  • Loading branch information
sxllwx committed Jun 3, 2020
1 parent 1669155 commit 667f4a0
Showing 1 changed file with 52 additions and 23 deletions.
75 changes: 52 additions & 23 deletions remoting/kubernetes/registry_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"github.com/apache/dubbo-go/common"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"os"
Expand Down Expand Up @@ -42,8 +43,9 @@ const (
// all pod annotation key
DubboIOAnnotationKey = "dubbo.io/annotation"

DubboIOLabelKey = "dubbo.io/label"
DubboIOLabelValue = "dubbo.io-value"
DubboIOLabelKey = "dubbo.io/label"
DubboIOConsumerLabelValue = "dubbo.io.consumer"
DubboIOProviderLabelValue = "dubbo.io.provider"
)

var (
Expand All @@ -58,6 +60,8 @@ type dubboRegistryController struct {
// manage lifecycle
ctx context.Context

role common.RoleType

// protect patch current pod operation
lock sync.Mutex

Expand Down Expand Up @@ -85,6 +89,7 @@ func newDubboRegistryController(ctx context.Context, roleType common.RoleType, k

c := &dubboRegistryController{
ctx: ctx,
role: roleType,
watcherSet: newWatcherSet(ctx),
needWatchedNamespace: make(map[string]struct{}),
namespacedInformerFactory: make(map[string]informers.SharedInformerFactory),
Expand All @@ -104,13 +109,12 @@ func newDubboRegistryController(ctx context.Context, roleType common.RoleType, k
return nil, perrors.WithMessage(err, "init watch set")
}

if roleType == common.CONSUMER {
// only consumer need list && watch
if err := c.initPodInformer(); err != nil {
return nil, perrors.WithMessage(err, "init pod informer")
}
go c.run()
if err := c.initPodInformer(); err != nil {
return nil, perrors.WithMessage(err, "init pod informer")
}

go c.run()

return c, nil
}

Expand All @@ -132,9 +136,15 @@ func GetInClusterKubernetesClient() (kubernetes.Interface, error) {
// 2. put every element to watcherSet
// 3. refresh watch book-mark
func (c *dubboRegistryController) initWatchSet() error {

req, err := labels.NewRequirement(DubboIOLabelKey, selection.In, []string{DubboIOConsumerLabelValue, DubboIOProviderLabelValue})
if err != nil{
return perrors.WithMessage(err, "new requirement")
}

for ns := range c.needWatchedNamespace {
pods, err := c.kc.CoreV1().Pods(ns).List(metav1.ListOptions{
LabelSelector: fields.OneTermEqualSelector(DubboIOLabelKey, DubboIOLabelValue).String(),
LabelSelector: req.String(),
})
if err != nil {
return perrors.WithMessagef(err, "list pods in namespace (%s)", ns)
Expand Down Expand Up @@ -171,22 +181,19 @@ func (c *dubboRegistryController) readConfig() error {
return nil
}

func (c *dubboRegistryController) initNamespacedPodInformer(ns string) {
func (c *dubboRegistryController) initNamespacedPodInformer(ns string) error {

req, err := labels.NewRequirement(DubboIOLabelKey, selection.In, []string{DubboIOConsumerLabelValue, DubboIOProviderLabelValue})
if err != nil{
return perrors.WithMessage(err, "new requirement")
}

informersFactory := informers.NewSharedInformerFactoryWithOptions(
c.kc,
defaultResync,
informers.WithNamespace(ns),
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
labelSelector := &metav1.LabelSelector{
MatchLabels: map[string]string{DubboIOLabelKey: DubboIOLabelValue},
}
labelMap, err := metav1.LabelSelectorAsMap(labelSelector)
if err != nil {
logger.Errorf("label selector ad map: %v", err)
return
}
options.LabelSelector = labels.SelectorFromSet(labelMap).String()
options.LabelSelector = req.String()
options.ResourceVersion = strconv.FormatUint(c.listAndWatchStartResourceVersion, 10)
}),
)
Expand All @@ -200,10 +207,16 @@ func (c *dubboRegistryController) initNamespacedPodInformer(ns string) {

c.namespacedInformerFactory[ns] = informersFactory
c.namespacedPodInformers[ns] = podInformer

return nil
}

func (c *dubboRegistryController) initPodInformer() error {

if c.role == common.PROVIDER{
return nil
}

// read need watched namespaces list
needWatchedNameSpaceList := os.Getenv(needWatchedNameSpaceKey)
if len(needWatchedNameSpaceList) == 0 {
Expand All @@ -219,7 +232,9 @@ func (c *dubboRegistryController) initPodInformer() error {

// init all watch needed pod-informer
for watchedNS := range c.needWatchedNamespace {
c.initNamespacedPodInformer(watchedNS)
if err := c.initNamespacedPodInformer(watchedNS); err != nil {
return err
}
}
return nil
}
Expand Down Expand Up @@ -282,8 +297,14 @@ func (c *dubboRegistryController) Run() {
logger.Debugf("finish start namespaced informer-factory")
}

// run
// controller process every event in work-queue
func (c *dubboRegistryController) run() {

if c.role == common.PROVIDER{
return
}

defer c.queue.ShutDown()

for ns, podInformer := range c.namespacedPodInformers {
Expand Down Expand Up @@ -438,7 +459,7 @@ func (c *dubboRegistryController) assembleDUBBOLabel(p *v1.Pod) (*v1.Pod, *v1.Po
newPod.Labels = make(map[string]string, 8)

if p.GetLabels() != nil {
if p.GetLabels()[DubboIOLabelKey] == DubboIOLabelValue {
if _, ok := p.GetLabels()[DubboIOLabelKey]; ok {
// already have label
return nil, nil, ErrDubboLabelAlreadyExist
}
Expand All @@ -449,8 +470,16 @@ func (c *dubboRegistryController) assembleDUBBOLabel(p *v1.Pod) (*v1.Pod, *v1.Po
oldPod.Labels[k] = v
newPod.Labels[k] = v
}

// assign new label for current pod
newPod.Labels[DubboIOLabelKey] = DubboIOLabelValue
switch c.role {
case common.CONSUMER:
newPod.Labels[DubboIOLabelKey] = DubboIOConsumerLabelValue
case common.PROVIDER:
newPod.Labels[DubboIOLabelKey] = DubboIOProviderLabelValue
default:
return nil, nil, perrors.New(fmt.Sprintf("unknown role %s", c.role))
}
return oldPod, newPod, nil
}

Expand Down

0 comments on commit 667f4a0

Please sign in to comment.