Skip to content

Commit

Permalink
Add embedded registry implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
  • Loading branch information
brandond committed Dec 6, 2023
1 parent 75abb19 commit 18656bf
Show file tree
Hide file tree
Showing 10 changed files with 919 additions and 90 deletions.
162 changes: 125 additions & 37 deletions go.mod

Large diffs are not rendered by default.

414 changes: 363 additions & 51 deletions go.sum

Large diffs are not rendered by default.

14 changes: 14 additions & 0 deletions pkg/agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"os/exec"
"path/filepath"
"regexp"
"strconv"
"strings"
"time"

Expand All @@ -26,6 +27,7 @@ import (
"github.com/k3s-io/k3s/pkg/clientaccess"
"github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/daemons/control/deps"
"github.com/k3s-io/k3s/pkg/spegel"
"github.com/k3s-io/k3s/pkg/util"
"github.com/k3s-io/k3s/pkg/version"
"github.com/k3s-io/k3s/pkg/vpn"
Expand Down Expand Up @@ -679,6 +681,18 @@ func get(ctx context.Context, envInfo *cmds.Agent, proxy proxy.Proxy) (*config.N
}
nodeConfig.AgentConfig.Registry = privRegistries.Registry

if nodeConfig.EmbeddedRegistry {
conf := spegel.DefaultRegistry
conf.Address = net.JoinHostPort(nodeName, strconv.Itoa(controlConfig.SupervisorPort))
conf.ClientCAFile = clientCAFile
conf.ClientCertFile = clientK3sControllerCert
conf.ClientKeyFile = clientK3sControllerKey
conf.ServerCAFile = serverCAFile
conf.ServerCertFile = servingKubeletCert
conf.ServerKeyFile = servingKubeletKey
conf.InjectMirror(nodeConfig)
}

if err := validateNetworkConfig(nodeConfig); err != nil {
return nil, err
}
Expand Down
11 changes: 9 additions & 2 deletions pkg/agent/containerd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/k3s-io/k3s/pkg/agent/templates"
util2 "github.com/k3s-io/k3s/pkg/agent/util"
"github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/spegel"
"github.com/k3s-io/k3s/pkg/version"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -37,6 +38,7 @@ func writeContainerdConfig(cfg *config.Node, containerdConfig templates.Containe
// writeContainerdHosts merges registry mirrors/configs, and renders and saves hosts.toml from the filled template
func writeContainerdHosts(cfg *config.Node, containerdConfig templates.ContainerdConfig) error {
registry := containerdConfig.PrivateRegistryConfig
conf := spegel.DefaultRegistry
hosts := map[string]templates.HostConfig{}

for host, mirror := range registry.Mirrors {
Expand All @@ -57,12 +59,17 @@ func writeContainerdHosts(cfg *config.Node, containerdConfig templates.Container
// structure, which is defined in rancher/wharfie.
for _, endpoint := range mirror.Endpoints {
if endpointURL, err := url.Parse(endpoint); err == nil {
config.Endpoints = append(config.Endpoints, templates.RegistryEndpoint{
re := templates.RegistryEndpoint{
OverridePath: endpointURL.Path != "" && endpointURL.Path != "/" && !strings.HasSuffix(endpointURL.Path, "/v2"),
Config: registry.Configs[endpointURL.Host],
Rewrites: mirror.Rewrites,
URI: endpoint,
})
}
// Do not apply rewrites to the embedded registry endpoint
if endpointURL.Host == conf.Address {
re.Rewrites = nil
}
config.Endpoints = append(config.Endpoints, re)
}
}
hosts[host] = config
Expand Down
11 changes: 11 additions & 0 deletions pkg/agent/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/k3s-io/k3s/pkg/daemons/executor"
"github.com/k3s-io/k3s/pkg/nodeconfig"
"github.com/k3s-io/k3s/pkg/rootless"
"github.com/k3s-io/k3s/pkg/spegel"
"github.com/k3s-io/k3s/pkg/util"
"github.com/k3s-io/k3s/pkg/version"
"github.com/pkg/errors"
Expand Down Expand Up @@ -97,6 +98,16 @@ func run(ctx context.Context, cfg cmds.Agent, proxy proxy.Proxy) error {
nodeConfig.AgentConfig.EnableIPv4 = enableIPv4
nodeConfig.AgentConfig.EnableIPv6 = enableIPv6

if nodeConfig.EmbeddedRegistry {
if nodeConfig.Docker || nodeConfig.ContainerRuntimeEndpoint != "" {
return errors.New("embedded registry mirror requires embedded containerd")
}

if err := spegel.DefaultRegistry.Start(ctx, nodeConfig); err != nil {
return errors.Wrap(err, "failed to start embedded registry")
}
}

if err := setupCriCtlConfig(cfg, nodeConfig); err != nil {
return err
}
Expand Down
18 changes: 18 additions & 0 deletions pkg/cli/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,19 @@ package agent

import (
"crypto/tls"
"errors"
"fmt"
"net/http"
"os"
"path/filepath"
"runtime"

"github.com/erikdubbelboer/gspt"
"github.com/gorilla/mux"
"github.com/k3s-io/k3s/pkg/agent"
"github.com/k3s-io/k3s/pkg/cli/cmds"
"github.com/k3s-io/k3s/pkg/datadir"
"github.com/k3s-io/k3s/pkg/spegel"
"github.com/k3s-io/k3s/pkg/util"
"github.com/k3s-io/k3s/pkg/version"
"github.com/k3s-io/k3s/pkg/vpn"
Expand Down Expand Up @@ -96,5 +100,19 @@ func Run(ctx *cli.Context) error {
}
}

spegel.DefaultRegistry.Bootstrapper = spegel.NewAgentBootstrapper(cfg.ServerURL, cfg.Token, cfg.DataDir)
spegel.DefaultRegistry.HandlerFunc = func(conf *spegel.Config, router *mux.Router) error {
server := &http.Server{
Handler: router,
Addr: conf.Address,
}
go func() {
if err := server.ListenAndServeTLS(conf.ServerCertFile, conf.ServerKeyFile); err != nil && !errors.Is(err, http.ErrServerClosed) {
logrus.Fatalf("registry server failed: %v", err)
}
}()
return nil
}

return agent.Run(contextCtx, cfg)
}
17 changes: 17 additions & 0 deletions pkg/cli/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

systemd "github.com/coreos/go-systemd/daemon"
"github.com/erikdubbelboer/gspt"
"github.com/gorilla/mux"
"github.com/k3s-io/k3s/pkg/agent"
"github.com/k3s-io/k3s/pkg/agent/loadbalancer"
"github.com/k3s-io/k3s/pkg/cli/cmds"
Expand All @@ -20,6 +21,7 @@ import (
"github.com/k3s-io/k3s/pkg/etcd"
"github.com/k3s-io/k3s/pkg/rootless"
"github.com/k3s-io/k3s/pkg/server"
"github.com/k3s-io/k3s/pkg/spegel"
"github.com/k3s-io/k3s/pkg/util"
"github.com/k3s-io/k3s/pkg/version"
"github.com/k3s-io/k3s/pkg/vpn"
Expand Down Expand Up @@ -548,6 +550,21 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont
return agent.RunStandalone(ctx, agentConfig)
}

if cfg.EmbeddedRegistry {
conf := spegel.DefaultRegistry
conf.Bootstrapper = spegel.NewChainingBootstrapper(
spegel.NewServerBootstrapper(&serverConfig.ControlConfig),
spegel.NewAgentBootstrapper(cfg.ServerURL, token, agentConfig.DataDir),
spegel.NewSelfBootstrapper(),
)

conf.HandlerFunc = func(_ *spegel.Config, router *mux.Router) error {
router.NotFoundHandler = serverConfig.ControlConfig.Runtime.Handler
serverConfig.ControlConfig.Runtime.Handler = router
return nil
}
}

return agent.Run(ctx, agentConfig)
}

Expand Down
177 changes: 177 additions & 0 deletions pkg/spegel/bootstrap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package spegel

import (
"context"
"os"
"path/filepath"
"time"

"github.com/k3s-io/k3s/pkg/clientaccess"
"github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/util"
"github.com/k3s-io/k3s/pkg/version"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/rancher/wrangler/pkg/merr"
"github.com/sirupsen/logrus"
"github.com/xenitab/spegel/pkg/routing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
)

// explicit interface checks
var _ routing.Bootstrapper = &selfBootstrapper{}
var _ routing.Bootstrapper = &agentBootstrapper{}
var _ routing.Bootstrapper = &serverBootstrapper{}
var _ routing.Bootstrapper = &chainingBootstrapper{}

type selfBootstrapper struct {
id string
}

// NewSelfBootstrapper returns a stub p2p bootstrapper that just returns its own ID
func NewSelfBootstrapper() routing.Bootstrapper {
return &selfBootstrapper{}
}

func (s *selfBootstrapper) Run(_ context.Context, id string) error {
s.id = id
return nil
}

func (s *selfBootstrapper) GetAddress() (*peer.AddrInfo, error) {
return peer.AddrInfoFromString(s.id)
}

type agentBootstrapper struct {
server string
token string
clientCert string
clientKey string
}

// NewAgentBootstrapper returns a p2p bootstrapper that retrieves a peer address from its server
func NewAgentBootstrapper(server, token, dataDir string) routing.Bootstrapper {
return &agentBootstrapper{
clientCert: filepath.Join(dataDir, "agent", "client-kubelet.crt"),
clientKey: filepath.Join(dataDir, "agent", "client-kubelet.key"),
server: server,
token: token,
}
}

func (c *agentBootstrapper) Run(ctx context.Context, _ string) error {
return nil
}

func (c *agentBootstrapper) GetAddress() (*peer.AddrInfo, error) {
if c.server == "" || c.token == "" {
return nil, errors.New("cannot get addresses without server and token")
}

withCert := clientaccess.WithClientCertificate(c.clientCert, c.clientKey)
info, err := clientaccess.ParseAndValidateToken(c.server, c.token, withCert)
if err != nil {
return nil, err
}

addr, err := info.Get("/v1-" + version.Program + "/p2p")
if err != nil {
return nil, err
}

return peer.AddrInfoFromString(string(addr))
}

type serverBootstrapper struct {
controlConfig *config.Control
}

// NewServerBootstrapper returns a p2p bootstrapper that returns an address from a random cluster member.
func NewServerBootstrapper(controlConfig *config.Control) routing.Bootstrapper {
return &serverBootstrapper{
controlConfig: controlConfig,
}
}

func (s *serverBootstrapper) Run(_ context.Context, id string) error {
s.controlConfig.Runtime.ClusterControllerStarts["spegel-p2p"] = func(ctx context.Context) {
nodes := s.controlConfig.Runtime.Core.Core().V1().Node()
wait.PollImmediateUntilWithContext(ctx, 1*time.Second, func(ctx context.Context) (bool, error) {
nodeName := os.Getenv("NODE_NAME")
if nodeName == "" {
return false, nil
}
node, err := nodes.Get(nodeName, metav1.GetOptions{})
if err != nil {
return false, nil
}
if node.Annotations == nil {
node.Annotations = map[string]string{}
}
node.Annotations[P2pAddressAnnotation] = id
if _, err = nodes.Update(node); err != nil {
return false, nil
}
logrus.Infof("Node P2P address annotation added: %s", id)
return true, nil
})
}
return nil
}

func (s *serverBootstrapper) GetAddress() (*peer.AddrInfo, error) {
if s.controlConfig.Runtime.Core == nil {
return nil, errors.New("runtime core not ready")
}

nodes := s.controlConfig.Runtime.Core.Core().V1().Node()
labelSelector := labels.Set{util.ControlPlaneRoleLabelKey: "true"}.String()
nodeList, err := nodes.List(metav1.ListOptions{LabelSelector: labelSelector})
if err != nil {
return nil, err
}
for _, node := range nodeList.Items {
if addr, ok := node.Annotations[P2pAddressAnnotation]; ok {
if info, err := peer.AddrInfoFromString(addr); err == nil {
return info, nil
}
}
}

return nil, errors.New("no p2p control-plane nodes found")
}

type chainingBootstrapper struct {
bootstrappers []routing.Bootstrapper
}

// NewChainingBoostrapper returns a p2p bootstrapper that passes through to a list of bootstrappers.
func NewChainingBootstrapper(bootstrappers ...routing.Bootstrapper) routing.Bootstrapper {
return &chainingBootstrapper{
bootstrappers: bootstrappers,
}
}

func (c *chainingBootstrapper) Run(ctx context.Context, id string) error {
errs := merr.Errors{}
for _, b := range c.bootstrappers {
if err := b.Run(ctx, id); err != nil {
errs = append(errs, err)
}
}
return merr.NewErrors(errs...)
}

func (c *chainingBootstrapper) GetAddress() (*peer.AddrInfo, error) {
errs := merr.Errors{}
for _, b := range c.bootstrappers {
addr, err := b.GetAddress()
if err == nil {
return addr, nil
}
errs = append(errs, err)
}
return nil, merr.NewErrors(errs...)
}
33 changes: 33 additions & 0 deletions pkg/spegel/registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package spegel

import (
"github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/rancher/wharfie/pkg/registries"
)

// InjectMirror configures TLS for the registry mirror client, and adds the mirror address as an endpoint
// to all configured registries.
func (c *Config) InjectMirror(nodeConfig *config.Node) error {
registry := nodeConfig.AgentConfig.Registry

if registry.Configs == nil {
registry.Configs = map[string]registries.RegistryConfig{}
}
registry.Configs[c.Address] = registries.RegistryConfig{
TLS: &registries.TLSConfig{
CAFile: c.ServerCAFile,
CertFile: c.ClientCertFile,
KeyFile: c.ClientKeyFile,
},
}

if registry.Mirrors == nil {
registry.Mirrors = map[string]registries.Mirror{}
}
for host, mirror := range registry.Mirrors {
mirror.Endpoints = append([]string{"https://" + c.Address}, mirror.Endpoints...)
registry.Mirrors[host] = mirror
}

return nil
}
Loading

0 comments on commit 18656bf

Please sign in to comment.