Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rename pool-coordinator to yurt-coordinator for yurthub #1532

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
rename pool-coordinator to yurt-coordinator for yurthub
  • Loading branch information
JameKeal committed Jun 8, 2023
commit c489a41e798e712cbd11133bd96483bb3afa2676
2 changes: 1 addition & 1 deletion cmd/yurthub/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
YurtHubNamespace: options.YurtHubNamespace,
ProxiedClient: proxiedClient,
DiskCachePath: options.DiskCachePath,
CoordinatorPKIDir: filepath.Join(options.RootDir, "poolcoordinator"),
CoordinatorPKIDir: filepath.Join(options.RootDir, "yurtcoordinator"),
EnableCoordinator: options.EnableCoordinator,
CoordinatorServerURL: coordinatorServerURL,
CoordinatorStoragePrefix: options.CoordinatorStoragePrefix,
Expand Down
12 changes: 6 additions & 6 deletions cmd/yurthub/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ func NewYurtHubOptions() *YurtHubOptions {
MinRequestTimeout: time.Second * 1800,
CACertHashes: make([]string, 0),
UnsafeSkipCAVerification: true,
CoordinatorServerAddr: fmt.Sprintf("https://%s:%s", util.DefaultPoolCoordinatorAPIServerSvcName, util.DefaultPoolCoordinatorAPIServerSvcPort),
CoordinatorStorageAddr: fmt.Sprintf("https://%s:%s", util.DefaultPoolCoordinatorEtcdSvcName, util.DefaultPoolCoordinatorEtcdSvcPort),
CoordinatorServerAddr: fmt.Sprintf("https://%s:%s", util.DefaultYurtCoordinatorAPIServerSvcName, util.DefaultYurtCoordinatorAPIServerSvcPort),
CoordinatorStorageAddr: fmt.Sprintf("https://%s:%s", util.DefaultYurtCoordinatorEtcdSvcName, util.DefaultYurtCoordinatorEtcdSvcPort),
CoordinatorStoragePrefix: "/registry",
LeaderElection: componentbaseconfig.LeaderElectionConfiguration{
LeaderElect: true,
Expand Down Expand Up @@ -208,17 +208,17 @@ func (o *YurtHubOptions) AddFlags(fs *pflag.FlagSet) {
fs.DurationVar(&o.MinRequestTimeout, "min-request-timeout", o.MinRequestTimeout, "An optional field indicating at least how long a proxy handler must keep a request open before timing it out. Currently only honored by the local watch request handler(use request parameter timeoutSeconds firstly), which picks a randomized value above this number as the connection timeout, to spread out load.")
fs.StringSliceVar(&o.CACertHashes, "discovery-token-ca-cert-hash", o.CACertHashes, "For token-based discovery, validate that the root CA public key matches this hash (format: \"<type>:<value>\").")
fs.BoolVar(&o.UnsafeSkipCAVerification, "discovery-token-unsafe-skip-ca-verification", o.UnsafeSkipCAVerification, "For token-based discovery, allow joining without --discovery-token-ca-cert-hash pinning.")
fs.BoolVar(&o.EnableCoordinator, "enable-coordinator", o.EnableCoordinator, "make yurthub aware of the pool coordinator")
fs.BoolVar(&o.EnableCoordinator, "enable-coordinator", o.EnableCoordinator, "make yurthub aware of the yurt coordinator")
fs.StringVar(&o.CoordinatorServerAddr, "coordinator-server-addr", o.CoordinatorServerAddr, "Coordinator APIServer address in format https://host:port")
fs.StringVar(&o.CoordinatorStoragePrefix, "coordinator-storage-prefix", o.CoordinatorStoragePrefix, "Pool-Coordinator etcd storage prefix, same as etcd-prefix of Kube-APIServer")
fs.StringVar(&o.CoordinatorStorageAddr, "coordinator-storage-addr", o.CoordinatorStorageAddr, "Address of Pool-Coordinator etcd, in the format host:port")
fs.StringVar(&o.CoordinatorStoragePrefix, "coordinator-storage-prefix", o.CoordinatorStoragePrefix, "Yurt-Coordinator etcd storage prefix, same as etcd-prefix of Kube-APIServer")
fs.StringVar(&o.CoordinatorStorageAddr, "coordinator-storage-addr", o.CoordinatorStorageAddr, "Address of Yurt-Coordinator etcd, in the format host:port")
bindFlags(&o.LeaderElection, fs)
}

// bindFlags binds the LeaderElectionConfiguration struct fields to a flagset
func bindFlags(l *componentbaseconfig.LeaderElectionConfiguration, fs *pflag.FlagSet) {
fs.BoolVar(&l.LeaderElect, "leader-elect", l.LeaderElect, ""+
"Start a leader election client and gain leadership based on pool coordinator")
"Start a leader election client and gain leadership based on yurt coordinator")
fs.DurationVar(&l.LeaseDuration.Duration, "leader-elect-lease-duration", l.LeaseDuration.Duration, ""+
"The duration that non-leader candidates will wait after observing a leadership "+
"renewal until attempting to acquire leadership of a led but unrenewed leader "+
Expand Down
4 changes: 2 additions & 2 deletions cmd/yurthub/app/options/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ func TestNewYurtHubOptions(t *testing.T) {
MinRequestTimeout: time.Second * 1800,
CACertHashes: make([]string, 0),
UnsafeSkipCAVerification: true,
CoordinatorServerAddr: fmt.Sprintf("https://%s:%s", util.DefaultPoolCoordinatorAPIServerSvcName, util.DefaultPoolCoordinatorAPIServerSvcPort),
CoordinatorStorageAddr: fmt.Sprintf("https://%s:%s", util.DefaultPoolCoordinatorEtcdSvcName, util.DefaultPoolCoordinatorEtcdSvcPort),
CoordinatorServerAddr: fmt.Sprintf("https://%s:%s", util.DefaultYurtCoordinatorAPIServerSvcName, util.DefaultYurtCoordinatorAPIServerSvcPort),
CoordinatorStorageAddr: fmt.Sprintf("https://%s:%s", util.DefaultYurtCoordinatorEtcdSvcName, util.DefaultYurtCoordinatorEtcdSvcPort),
CoordinatorStoragePrefix: "/registry",
LeaderElection: componentbaseconfig.LeaderElectionConfiguration{
LeaderElect: true,
Expand Down
38 changes: 19 additions & 19 deletions cmd/yurthub/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ import (
"github.com/openyurtio/openyurt/pkg/yurthub/gc"
"github.com/openyurtio/openyurt/pkg/yurthub/healthchecker"
hubrest "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/rest"
"github.com/openyurtio/openyurt/pkg/yurthub/poolcoordinator"
coordinatorcertmgr "github.com/openyurtio/openyurt/pkg/yurthub/poolcoordinator/certmanager"
"github.com/openyurtio/openyurt/pkg/yurthub/proxy"
"github.com/openyurtio/openyurt/pkg/yurthub/server"
"github.com/openyurtio/openyurt/pkg/yurthub/tenant"
"github.com/openyurtio/openyurt/pkg/yurthub/transport"
"github.com/openyurtio/openyurt/pkg/yurthub/util"
"github.com/openyurtio/openyurt/pkg/yurthub/yurtcoordinator"
coordinatorcertmgr "github.com/openyurtio/openyurt/pkg/yurthub/yurtcoordinator/certmanager"
)

// NewCmdStartYurtHub creates a *cobra.Command object with default parameters
Expand Down Expand Up @@ -108,15 +108,15 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error {

var cloudHealthChecker healthchecker.MultipleBackendsHealthChecker
if cfg.WorkingMode == util.WorkingModeEdge {
klog.Infof("%d. create health checkers for remote servers and pool coordinator", trace)
klog.Infof("%d. create health checkers for remote servers and yurt coordinator", trace)
cloudHealthChecker, err = healthchecker.NewCloudAPIServerHealthChecker(cfg, cloudClients, ctx.Done())
if err != nil {
return fmt.Errorf("could not new cloud health checker, %w", err)
}
} else {
klog.Infof("%d. disable health checker for node %s because it is a cloud node", trace, cfg.NodeName)
// In cloud mode, cloud health checker is not needed.
// This fake checker will always report that the cloud is healthy and pool coordinator is unhealthy.
// This fake checker will always report that the cloud is healthy and yurt coordinator is unhealthy.
cloudHealthChecker = healthchecker.NewFakeChecker(true, make(map[string]int))
}
trace++
Expand Down Expand Up @@ -155,7 +155,7 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error {

var coordinatorHealthCheckerGetter func() healthchecker.HealthChecker = getFakeCoordinatorHealthChecker
var coordinatorTransportManagerGetter func() transport.Interface = getFakeCoordinatorTransportManager
var coordinatorGetter func() poolcoordinator.Coordinator = getFakeCoordinator
var coordinatorGetter func() yurtcoordinator.Coordinator = getFakeCoordinator
var coordinatorServerURLGetter func() *url.URL = getFakeCoordinatorServerURL

if cfg.EnableCoordinator {
Expand Down Expand Up @@ -237,12 +237,12 @@ func coordinatorRun(ctx context.Context,
coordinatorInformerRegistryChan chan struct{}) (
func() healthchecker.HealthChecker,
func() transport.Interface,
func() poolcoordinator.Coordinator,
func() yurtcoordinator.Coordinator,
func() *url.URL) {

var coordinatorHealthChecker healthchecker.HealthChecker
var coordinatorTransportMgr transport.Interface
var coordinator poolcoordinator.Coordinator
var coordinator yurtcoordinator.Coordinator
var coordinatorServiceUrl *url.URL

go func() {
Expand All @@ -261,9 +261,9 @@ func coordinatorRun(ctx context.Context,
}
klog.Info("coordinatorRun sync service complete")

// resolve pool-coordinator-apiserver and etcd from domain to ips
// resolve yurt-coordinator-apiserver and etcd from domain to ips
serviceList := cfg.SharedFactory.Core().V1().Services().Lister()
// if pool-coordinator-apiserver and pool-coordinator-etcd address is ip, don't need to resolve
// if yurt-coordinator-apiserver and yurt-coordinator-etcd address is ip, don't need to resolve
apiServerIP := net.ParseIP(cfg.CoordinatorServerURL.Hostname())
etcdUrl, err := url.Parse(cfg.CoordinatorStorageAddr)
if err != nil {
Expand Down Expand Up @@ -295,7 +295,7 @@ func coordinatorRun(ctx context.Context,
cfg.CoordinatorStorageAddr = fmt.Sprintf("https://%s:%s", etcdService.Spec.ClusterIP, etcdUrl.Port())
}

coorTransportMgr, err := poolCoordinatorTransportMgrGetter(coorCertManager, ctx.Done())
coorTransportMgr, err := yurtCoordinatorTransportMgrGetter(coorCertManager, ctx.Done())
if err != nil {
klog.Errorf("coordinator failed to create coordinator transport manager, %v", err)
return
Expand All @@ -307,7 +307,7 @@ func coordinatorRun(ctx context.Context,
Timeout: time.Duration(cfg.HeartbeatTimeoutSeconds) * time.Second,
})
if err != nil {
klog.Errorf("coordinator failed to get coordinator client for pool coordinator, %v", err)
klog.Errorf("coordinator failed to get coordinator client for yurt coordinator, %v", err)
return
}

Expand All @@ -317,15 +317,15 @@ func coordinatorRun(ctx context.Context,
return
}

var elector *poolcoordinator.HubElector
elector, err = poolcoordinator.NewHubElector(cfg, coordinatorClient, coorHealthChecker, cloudHealthChecker, ctx.Done())
var elector *yurtcoordinator.HubElector
elector, err = yurtcoordinator.NewHubElector(cfg, coordinatorClient, coorHealthChecker, cloudHealthChecker, ctx.Done())
if err != nil {
klog.Errorf("coordinator failed to create hub elector, %v", err)
return
}
go elector.Run(ctx.Done())

coor, err := poolcoordinator.NewCoordinator(ctx, cfg, cloudHealthChecker, restConfigMgr, coorCertManager, coorTransportMgr, elector)
coor, err := yurtcoordinator.NewCoordinator(ctx, cfg, cloudHealthChecker, restConfigMgr, coorCertManager, coorTransportMgr, elector)
if err != nil {
klog.Errorf("coordinator failed to create coordinator, %v", err)
return
Expand All @@ -342,14 +342,14 @@ func coordinatorRun(ctx context.Context,
return coordinatorHealthChecker
}, func() transport.Interface {
return coordinatorTransportMgr
}, func() poolcoordinator.Coordinator {
}, func() yurtcoordinator.Coordinator {
return coordinator
}, func() *url.URL {
return coordinatorServiceUrl
}
}

func poolCoordinatorTransportMgrGetter(coordinatorCertMgr *coordinatorcertmgr.CertManager, stopCh <-chan struct{}) (transport.Interface, error) {
func yurtCoordinatorTransportMgrGetter(coordinatorCertMgr *coordinatorcertmgr.CertManager, stopCh <-chan struct{}) (transport.Interface, error) {
err := wait.PollImmediate(5*time.Second, 4*time.Minute, func() (done bool, err error) {
klog.Info("waiting for preparing certificates for coordinator client and node lease proxy client")
if coordinatorCertMgr.GetAPIServerClientCert() == nil {
Expand All @@ -366,13 +366,13 @@ func poolCoordinatorTransportMgrGetter(coordinatorCertMgr *coordinatorcertmgr.Ce

coordinatorTransportMgr, err := transport.NewTransportManager(coordinatorCertMgr, stopCh)
if err != nil {
return nil, fmt.Errorf("failed to create transport manager for pool coordinator, %v", err)
return nil, fmt.Errorf("failed to create transport manager for yurt coordinator, %v", err)
}
return coordinatorTransportMgr, nil
}

func getFakeCoordinator() poolcoordinator.Coordinator {
return &poolcoordinator.FakeCoordinator{}
func getFakeCoordinator() yurtcoordinator.Coordinator {
return &yurtcoordinator.FakeCoordinator{}
}

func getFakeCoordinatorHealthChecker() healthchecker.HealthChecker {
Expand Down
2 changes: 1 addition & 1 deletion pkg/yurthub/cachemanager/cache_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func (cm *cacheManager) queryOneObject(req *http.Request) (runtime.Object, error
// Note:
// When cloud-edge network is healthy, the inMemoryCache can be updated with response from cloud side.
// While cloud-edge network is broken, the inMemoryCache can only be full filled with data from edge cache,
// such as local disk and pool-coordinator.
// such as local disk and yurt-coordinator.
if isInMemoryCacheMiss {
if inMemoryCacheKey, err := inMemoryCacheKeyFunc(info); err != nil {
klog.Errorf("cannot in-memory cache key for req %s, %v", util.ReqString(req), err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/yurthub/healthchecker/health_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type coordinatorHealthChecker struct {
heartbeatInterval int
}

// NewCoordinatorHealthChecker returns a health checker for verifying pool coordinator status.
// NewCoordinatorHealthChecker returns a health checker for verifying yurt coordinator status.
func NewCoordinatorHealthChecker(cfg *config.YurtHubConfiguration, checkerClient kubernetes.Interface, cloudServerHealthChecker HealthChecker, stopCh <-chan struct{}) (HealthChecker, error) {
chc := &coordinatorHealthChecker{
cloudServerHealthChecker: cloudServerHealthChecker,
Expand Down
48 changes: 24 additions & 24 deletions pkg/yurthub/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ type HubMetrics struct {
closableConnsCollector *prometheus.GaugeVec
proxyTrafficCollector *prometheus.CounterVec
proxyLatencyCollector *prometheus.GaugeVec
poolCoordinatorYurthubRoleCollector *prometheus.GaugeVec
poolCoordinatorHealthyStatusCollector *prometheus.GaugeVec
poolCoordinatorReadyStatusCollector *prometheus.GaugeVec
yurtCoordinatorYurthubRoleCollector *prometheus.GaugeVec
yurtCoordinatorHealthyStatusCollector *prometheus.GaugeVec
yurtCoordinatorReadyStatusCollector *prometheus.GaugeVec
}

func newHubMetrics() *HubMetrics {
Expand Down Expand Up @@ -111,28 +111,28 @@ func newHubMetrics() *HubMetrics {
Help: "collector of proxy latency of incoming requests(unit: ms)",
},
[]string{"client", "verb", "resource", "subresources", "type"})
poolCoordinatorYurthubRoleCollector := prometheus.NewGaugeVec(
yurtCoordinatorYurthubRoleCollector := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "pool_coordinator_yurthub_role",
Help: "pool coordinator status of yurthub. 1: LeaderHub, 2: FollowerHub 3: Pending",
Name: "yurt_coordinator_yurthub_role",
Help: "yurt coordinator status of yurthub. 1: LeaderHub, 2: FollowerHub 3: Pending",
},
[]string{})
poolCoordinatorHealthyStatusCollector := prometheus.NewGaugeVec(
yurtCoordinatorHealthyStatusCollector := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "pool_coordinator_healthy_status",
Help: "pool coordinator heahty status 1: healthy, 0: unhealthy",
Name: "yurt_coordinator_healthy_status",
Help: "yurt coordinator heahty status 1: healthy, 0: unhealthy",
},
[]string{})
poolCoordinatorReadyStatusCollector := prometheus.NewGaugeVec(
yurtCoordinatorReadyStatusCollector := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "pool_coordinator_ready_status",
Help: "pool coordinator ready status 1: ready, 0: notReady",
Name: "yurt_coordinator_ready_status",
Help: "yurt coordinator ready status 1: ready, 0: notReady",
},
[]string{})
prometheus.MustRegister(serversHealthyCollector)
Expand All @@ -142,9 +142,9 @@ func newHubMetrics() *HubMetrics {
prometheus.MustRegister(closableConnsCollector)
prometheus.MustRegister(proxyTrafficCollector)
prometheus.MustRegister(proxyLatencyCollector)
prometheus.MustRegister(poolCoordinatorYurthubRoleCollector)
prometheus.MustRegister(poolCoordinatorHealthyStatusCollector)
prometheus.MustRegister(poolCoordinatorReadyStatusCollector)
prometheus.MustRegister(yurtCoordinatorYurthubRoleCollector)
prometheus.MustRegister(yurtCoordinatorHealthyStatusCollector)
prometheus.MustRegister(yurtCoordinatorReadyStatusCollector)
return &HubMetrics{
serversHealthyCollector: serversHealthyCollector,
inFlightRequestsCollector: inFlightRequestsCollector,
Expand All @@ -153,9 +153,9 @@ func newHubMetrics() *HubMetrics {
closableConnsCollector: closableConnsCollector,
proxyTrafficCollector: proxyTrafficCollector,
proxyLatencyCollector: proxyLatencyCollector,
poolCoordinatorHealthyStatusCollector: poolCoordinatorHealthyStatusCollector,
poolCoordinatorReadyStatusCollector: poolCoordinatorReadyStatusCollector,
poolCoordinatorYurthubRoleCollector: poolCoordinatorYurthubRoleCollector,
yurtCoordinatorHealthyStatusCollector: yurtCoordinatorHealthyStatusCollector,
yurtCoordinatorReadyStatusCollector: yurtCoordinatorReadyStatusCollector,
yurtCoordinatorYurthubRoleCollector: yurtCoordinatorYurthubRoleCollector,
}
}

Expand All @@ -172,16 +172,16 @@ func (hm *HubMetrics) ObserveServerHealthy(server string, status int) {
hm.serversHealthyCollector.WithLabelValues(server).Set(float64(status))
}

func (hm *HubMetrics) ObservePoolCoordinatorYurthubRole(status int32) {
hm.poolCoordinatorYurthubRoleCollector.WithLabelValues().Set(float64(status))
func (hm *HubMetrics) ObserveYurtCoordinatorYurthubRole(status int32) {
hm.yurtCoordinatorYurthubRoleCollector.WithLabelValues().Set(float64(status))
}

func (hm *HubMetrics) ObservePoolCoordinatorReadyStatus(status int32) {
hm.poolCoordinatorReadyStatusCollector.WithLabelValues().Set(float64(status))
func (hm *HubMetrics) ObserveYurtCoordinatorReadyStatus(status int32) {
hm.yurtCoordinatorReadyStatusCollector.WithLabelValues().Set(float64(status))
}

func (hm *HubMetrics) ObservePoolCoordinatorHealthyStatus(status int32) {
hm.poolCoordinatorHealthyStatusCollector.WithLabelValues().Set(float64(status))
func (hm *HubMetrics) ObserveYurtCoordinatorHealthyStatus(status int32) {
hm.yurtCoordinatorHealthyStatusCollector.WithLabelValues().Set(float64(status))
}

func (hm *HubMetrics) IncInFlightRequests(verb, resource, subresource, client string) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/yurthub/proxy/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (lp *LocalProxy) localWatch(w http.ResponseWriter, req *http.Request) error
return nil
}

// if poolcoordinator becomes healthy, exit the watch wait
// if yurtcoordinator becomes healthy, exit the watch wait
if isPoolScopedListWatch && lp.isCoordinatorReady() {
return nil
}
Expand Down
Loading