Skip to content

Commit

Permalink
WIP embed spegel
Browse files Browse the repository at this point in the history
Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
  • Loading branch information
brandond committed Dec 1, 2023
1 parent 78975ca commit 5c4eb23
Show file tree
Hide file tree
Showing 9 changed files with 631 additions and 101 deletions.
157 changes: 120 additions & 37 deletions go.mod

Large diffs are not rendered by default.

396 changes: 345 additions & 51 deletions go.sum

Large diffs are not rendered by default.

46 changes: 42 additions & 4 deletions pkg/agent/containerd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ import (
"strings"

"github.com/containerd/containerd/remotes/docker"
"github.com/k3s-io/k3s/pkg/agent/proxy"
"github.com/k3s-io/k3s/pkg/agent/templates"
util2 "github.com/k3s-io/k3s/pkg/agent/util"
"github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/spegel"
"github.com/k3s-io/k3s/pkg/version"
"github.com/rancher/wharfie/pkg/registries"
"github.com/sirupsen/logrus"
)

Expand All @@ -35,10 +38,40 @@ func writeContainerdConfig(cfg *config.Node, containerdConfig templates.Containe
}

// writeContainerdHosts merges registry mirrors/configs, and renders and saves hosts.toml from the filled template
func writeContainerdHosts(cfg *config.Node, containerdConfig templates.ContainerdConfig) error {
func writeContainerdHosts(cfg *config.Node, containerdConfig templates.ContainerdConfig, proxy proxy.Proxy) error {
registry := containerdConfig.PrivateRegistryConfig
hosts := map[string]templates.HostConfig{}

// TODO: move mutation out of this function
spegelAddr := spegel.LocalAddr
if proxy.IsSupervisorLBEnabled() {
u, _ := url.Parse(proxy.SupervisorURL())
spegelAddr = u.Host
}

// FIXME inject local mirror CA
if registry.Configs == nil {
registry.Configs = map[string]registries.RegistryConfig{}
}
registry.Configs[spegelAddr] = registries.RegistryConfig{
TLS: &registries.TLSConfig{
CAFile: "/var/lib/rancher/k3s/agent/server-ca.crt",
},
}

// FIXME inject local mirror endpoint in first position
if registry.Mirrors == nil {
registry.Mirrors = map[string]registries.Mirror{}
}
for _, host := range spegel.Registries {
mirror, ok := registry.Mirrors[host]
if !ok {
mirror = registries.Mirror{}
}
mirror.Endpoints = append([]string{"https://" + spegelAddr}, mirror.Endpoints...)
registry.Mirrors[host] = mirror
}

hosts := map[string]templates.HostConfig{}
for host, mirror := range registry.Mirrors {
defaultHost, _ := docker.DefaultHost(host)
config := templates.HostConfig{
Expand All @@ -57,12 +90,17 @@ func writeContainerdHosts(cfg *config.Node, containerdConfig templates.Container
// structure, which is defined in rancher/wharfie.
for _, endpoint := range mirror.Endpoints {
if endpointURL, err := url.Parse(endpoint); err == nil {
config.Endpoints = append(config.Endpoints, templates.RegistryEndpoint{
registryEndpoint := templates.RegistryEndpoint{
OverridePath: endpointURL.Path != "" && endpointURL.Path != "/" && !strings.HasSuffix(endpointURL.Path, "/v2"),
Config: registry.Configs[endpointURL.Host],
Rewrites: mirror.Rewrites,
URI: endpoint,
})
}
// FIXME disable local mirror rewrites
if endpointURL.Host == spegelAddr {
registryEndpoint.Rewrites = nil
}
config.Endpoints = append(config.Endpoints, registryEndpoint)
}
}
hosts[host] = config
Expand Down
7 changes: 4 additions & 3 deletions pkg/agent/containerd/config_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
fuseoverlayfs "github.com/containerd/fuse-overlayfs-snapshotter"
stargz "github.com/containerd/stargz-snapshotter/service"
"github.com/docker/docker/pkg/parsers/kernel"
"github.com/k3s-io/k3s/pkg/agent/proxy"
"github.com/k3s-io/k3s/pkg/agent/templates"
"github.com/k3s-io/k3s/pkg/cgroups"
"github.com/k3s-io/k3s/pkg/daemons/config"
Expand Down Expand Up @@ -39,7 +40,7 @@ func getContainerdArgs(cfg *config.Node) []string {

// setupContainerdConfig generates the containerd.toml, using a template combined with various
// runtime configurations and registry mirror settings provided by the administrator.
func setupContainerdConfig(ctx context.Context, cfg *config.Node) error {
func setupContainerdConfig(ctx context.Context, cfg *config.Node, proxy proxy.Proxy) error {
privRegistries, err := registries.GetPrivateRegistries(cfg.AgentConfig.PrivateRegistry)
if err != nil {
return err
Expand Down Expand Up @@ -86,11 +87,11 @@ func setupContainerdConfig(ctx context.Context, cfg *config.Node) error {
logrus.Warnf("SELinux is enabled for "+version.Program+" but process is not running in context '%s', "+version.Program+"-selinux policy may need to be applied", SELinuxContextType)
}

if err := writeContainerdConfig(cfg, containerdConfig); err != nil {
if err := writeContainerdHosts(cfg, containerdConfig, proxy); err != nil {
return err
}

return writeContainerdHosts(cfg, containerdConfig)
return writeContainerdConfig(cfg, containerdConfig)
}

func Client(address string) (*containerd.Client, error) {
Expand Down
6 changes: 3 additions & 3 deletions pkg/agent/containerd/config_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func getContainerdArgs(cfg *config.Node) []string {

// setupContainerdConfig generates the containerd.toml, using a template combined with various
// runtime configurations and registry mirror settings provided by the administrator.
func setupContainerdConfig(ctx context.Context, cfg *config.Node) error {
func setupContainerdConfig(ctx context.Context, cfg *config.Node, proxy proxy.Proxy) error {
privRegistries, err := registries.GetPrivateRegistries(cfg.AgentConfig.PrivateRegistry)
if err != nil {
return err
Expand All @@ -45,11 +45,11 @@ func setupContainerdConfig(ctx context.Context, cfg *config.Node) error {
NoDefaultEndpoint: cfg.Containerd.NoDefault,
}

if err := writeContainerdConfig(cfg, containerdConfig); err != nil {
if err := writeContainerdHosts(cfg, containerdConfig, proxy); err != nil {
return err
}

return writeContainerdHosts(cfg, containerdConfig)
return writeContainerdConfig(cfg, containerdConfig)
}

func Client(address string) (*containerd.Client, error) {
Expand Down
5 changes: 3 additions & 2 deletions pkg/agent/containerd/containerd.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/containerd/containerd/pkg/cri/constants"
"github.com/containerd/containerd/reference/docker"
"github.com/k3s-io/k3s/pkg/agent/cri"
"github.com/k3s-io/k3s/pkg/agent/proxy"
util2 "github.com/k3s-io/k3s/pkg/agent/util"
"github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/version"
Expand All @@ -33,8 +34,8 @@ import (

// Run configures and starts containerd as a child process. Once it is up, images are preloaded
// or pulled from files found in the agent images directory.
func Run(ctx context.Context, cfg *config.Node) error {
if err := setupContainerdConfig(ctx, cfg); err != nil {
func Run(ctx context.Context, cfg *config.Node, proxy proxy.Proxy) error {
if err := setupContainerdConfig(ctx, cfg, proxy); err != nil {
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func run(ctx context.Context, cfg cmds.Agent, proxy proxy.Proxy) error {
return err
}
} else if nodeConfig.ContainerRuntimeEndpoint == "" {
if err := containerd.Run(ctx, nodeConfig); err != nil {
if err := containerd.Run(ctx, nodeConfig, proxy); err != nil {
return err
}
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/daemons/control/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/daemons/control/deps"
"github.com/k3s-io/k3s/pkg/daemons/executor"
"github.com/k3s-io/k3s/pkg/spegel"
"github.com/k3s-io/k3s/pkg/util"
"github.com/k3s-io/k3s/pkg/version"
"github.com/pkg/errors"
Expand Down Expand Up @@ -60,6 +61,8 @@ func Server(ctx context.Context, cfg *config.Control) error {
if err := apiServer(ctx, cfg); err != nil {
return err
}

setupRegistry(cfg)
}

// Wait for an apiserver to become available before starting additional controllers,
Expand Down Expand Up @@ -222,6 +225,14 @@ func apiServer(ctx context.Context, cfg *config.Control) error {
return executor.APIServer(ctx, runtime.ETCDReady, args)
}

func setupRegistry(config *config.Control) {
config.Runtime.ClusterControllerStarts["spegel"] = func(ctx context.Context) {
if err := spegel.Register(ctx, config); err != nil {
logrus.Fatalf("Failed to start spegel distributed registry mirror: %v", err)
}
}
}

func defaults(config *config.Control) {
if config.AdvertisePort == 0 {
config.AdvertisePort = config.HTTPSPort
Expand Down
102 changes: 102 additions & 0 deletions pkg/spegel/spegel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package spegel

import (
"context"
"log"
"net/url"
"os"
"strconv"
"time"

"github.com/go-logr/logr"
"github.com/go-logr/stdr"
"github.com/gorilla/mux"
"github.com/k3s-io/k3s/pkg/clientaccess"
"github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/util"
"github.com/sirupsen/logrus"
"github.com/xenitab/spegel/pkg/oci"
"github.com/xenitab/spegel/pkg/registry"
"github.com/xenitab/spegel/pkg/routing"
"github.com/xenitab/spegel/pkg/state"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/utils/pointer"
)

var (
Registries = []string{"docker.io", "registry.k8s.io"}
LocalAddr = "127.0.0.1:6443"
)

func Register(ctx context.Context, controlConfig *config.Control) error {
logger := stdr.NewWithOptions(log.New(logrus.StandardLogger().Writer(), "spegel ", log.LstdFlags), stdr.Options{Verbosity: pointer.Int(10)})
ctx = logr.NewContext(ctx, logger)
containerdSock := "/run/k3s/containerd/containerd.sock"
configPath := "/var/lib/rancher/k3s/agent/etc/containerd/certs.d"
registryNS := "k8s.io"

urls := []url.URL{}
for _, r := range Registries {
if u, err := url.Parse("https://" + r); err == nil {
urls = append(urls, *u)
}
}

ociClient, err := oci.NewContainerd(containerdSock, registryNS, configPath, urls)
if err != nil {
return err
}
err = ociClient.Verify(ctx)
if err != nil {
return err
}

restConfig, err := clientcmd.BuildConfigFromFlags("", controlConfig.Runtime.KubeConfigSupervisor)
if err != nil {
return err
}
restConfig.UserAgent = util.GetUserAgent("spegel")

k8s, err := clientset.NewForConfig(restConfig)
if err != nil {
return err
}

resolveLatestTag := false
resolveRetries, _ := strconv.ParseInt(os.Getenv("SPEGEL_RESOLVE_RETRIES"), 10, 64)
resolveTimeout := time.Second * 5
registryScheme := "https"
registryPort := strconv.Itoa(controlConfig.SupervisorPort)
registryAddr := ":" + registryPort
routerAddr := ":5001"

bootstrapper := routing.NewKubernetesBootstrapper(k8s, "kube-system", "spegel-leader-election")
router, err := routing.NewP2PRouter(ctx, routerAddr, bootstrapper, registryPort, registryScheme)
if err != nil {
return err
}
go func() {
<-ctx.Done()
router.Close()
}()

go func() {
state.Track(ctx, ociClient, router, resolveLatestTag)
}()

caCert, err := os.ReadFile("/var/lib/rancher/k3s/agent/server-ca.crt")
if err != nil {
return err
}
client := clientaccess.GetHTTPClient(caCert, "", "")
reg := registry.NewRegistry(ociClient, router, LocalAddr, int(resolveRetries), resolveTimeout, resolveLatestTag, client.Transport)
regSvr := reg.Server(registryAddr, logr.FromContextOrDiscard(ctx))

muxer := mux.NewRouter().SkipClean(true)
muxer.PathPrefix("/v2").Handler(regSvr.Handler)
muxer.NotFoundHandler = controlConfig.Runtime.Handler
controlConfig.Runtime.Handler = muxer

return nil
}

0 comments on commit 5c4eb23

Please sign in to comment.