Skip to content

Commit

Permalink
Prepare soci-snapshotter to be linked into and run by the executor
Browse files Browse the repository at this point in the history
  • Loading branch information
iain-macdonald committed Aug 25, 2023
1 parent 56d2371 commit 5a4dbb7
Show file tree
Hide file tree
Showing 15 changed files with 79 additions and 57 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ build: $(CMD)

FORCE:

soci-snapshotter-grpc: FORCE
soci-snapshotter-grpc: proto
cd cmd/ ; GO111MODULE=$(GO111MODULE_VALUE) go build -o $(OUTDIR)/$@ $(GO_BUILD_FLAGS) $(GO_LD_FLAGS) $(GO_TAGS) ./soci-snapshotter-grpc

soci: FORCE
soci: proto
cd cmd/ ; GO111MODULE=$(GO111MODULE_VALUE) go build -o $(OUTDIR)/$@ $(GO_BUILD_FLAGS) $(GO_LD_FLAGS) $(GO_TAGS) ./soci

soci-store: proto
Expand Down
18 changes: 10 additions & 8 deletions cmd/soci-store/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"os"
"os/signal"
"path/filepath"
"syscall"
"time"

socifs "github.com/awslabs/soci-snapshotter/fs"
Expand Down Expand Up @@ -68,9 +67,11 @@ const (
)

var (
configPath = flag.String("config", defaultConfigPath, "path to the configuration file")
logLevel = flag.String("log-level", defaultLogLevel.String(), "set the logging level [trace, debug, info, warn, error, fatal, panic]")
rootDir = flag.String("root", defaultRootDir, "path to the root directory for this snapshotter")
configPath = flag.String("config", defaultConfigPath, "path to the configuration file")
logLevel = flag.String("log-level", defaultLogLevel.String(), "set the logging level [trace, debug, info, warn, error, fatal, panic]")
rootDir = flag.String("root", defaultRootDir, "path to the root directory for this snapshotter")
localKeychainPort = flag.Int("local_keychain_port", 0,
"Port on which to expose the local_keychain gRPC service that accepts username/password credentials for private images. If 0, the local_keychain service is not started/exposed.")
)

type Config struct {
Expand Down Expand Up @@ -127,7 +128,7 @@ func main() {
}

// Prepare kubeconfig-based keychain if required
credsFuncs := []resolver.Credential{local_keychain.Keychain(ctx).GetCredentials}
credsFuncs := []resolver.Credential{local_keychain.Keychain(ctx, *localKeychainPort).GetCredentials}
credsFuncs = append(credsFuncs, dockerconfig.NewDockerConfigKeychain(ctx))
if config.KubeconfigKeychainConfig.EnableKeychain {
var opts []kubeconfig.Option
Expand Down Expand Up @@ -160,7 +161,7 @@ func main() {
var fsOpts []socifs.Option
opq := layer.OverlayOpaqueTrusted
fsOpts = append(fsOpts, socifs.WithGetSources(
source.FromDefaultLabels(hosts), // provides source info based on default labels
source.FromDefaultLabels(hosts), // provides source info based on default labels
), socifs.WithOverlayOpaqueType(opq))
fs, err := socifs.NewFilesystem(ctx, defaultRootDir, config.Config, fsOpts...)
if err != nil {
Expand All @@ -171,11 +172,12 @@ func main() {
if err != nil {
log.G(ctx).WithError(err).Fatalf("failed to prepare pool")
}
if err := store.Mount(ctx, mountPoint, layerManager, config.Config.Debug); err != nil {
server, err := store.Mount(ctx, mountPoint, layerManager, config.Config.Debug)
if err != nil {
log.G(ctx).WithError(err).Fatalf("failed to mount fs at %q", mountPoint)
}
defer func() {
syscall.Unmount(mountPoint, 0)
server.Unmount()
log.G(ctx).Info("Exiting")
}()

Expand Down
6 changes: 3 additions & 3 deletions cmd/soci/commands/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,14 @@ var CreateCommand = cli.Command{
// Creating the snapshotter's root path first if it does not exist, since this ensures, that
// it has the limited permission set as drwx--x--x.
// The subsequent oci.New creates a root path dir with too broad permission set.
if _, err := os.Stat(config.SociSnapshotterRootPath); os.IsNotExist(err) {
if err = os.Mkdir(config.SociSnapshotterRootPath, 0711); err != nil {
if _, err := os.Stat(config.DefaultSociSnapshotterRootPath); os.IsNotExist(err) {
if err = os.Mkdir(config.DefaultSociSnapshotterRootPath, 0711); err != nil {
return err
}
} else if err != nil {
return err
}
blobStore, err := oci.New(config.SociContentStorePath)
blobStore, err := oci.New(config.DefaultSociContentStorePath)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/soci/commands/index/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ var infoCommand = cli.Command{
if artifactType == soci.ArtifactEntryTypeLayer {
return fmt.Errorf("the provided digest is of ztoc not SOCI index. Use \"soci ztoc info\" command to get detailed info of ztoc")
}
storage, err := oci.New(config.SociContentStorePath)
storage, err := oci.New(config.DefaultSociContentStorePath)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/soci/commands/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ if they are available in the snapshotter's local content store.
}, nil
}

src, err := oci.New(config.SociContentStorePath)
src, err := oci.New(config.DefaultSociContentStorePath)
if err != nil {
return fmt.Errorf("cannot create OCI local store: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/soci/commands/rebuild_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ var RebuildDBCommand = cli.Command{
if err != nil {
return err
}
blobStore, err := oci.New(config.SociContentStorePath)
blobStore, err := oci.New(config.DefaultSociContentStorePath)
if err != nil {
return err
}
blobStorePath := filepath.Join(config.SociContentStorePath, "blobs")
blobStorePath := filepath.Join(config.DefaultSociContentStorePath, "blobs")
return artifactsDb.SyncWithLocalStore(ctx, blobStore, blobStorePath, containerdContentStore)
},
}
2 changes: 1 addition & 1 deletion cmd/soci/commands/ztoc/get-file.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ var getFileCommand = cli.Command{
}

func getZtoc(ctx context.Context, d digest.Digest) (*ztoc.Ztoc, error) {
blobStore, err := oci.New(config.SociContentStorePath)
blobStore, err := oci.New(config.DefaultSociContentStorePath)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/soci/commands/ztoc/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ var infoCommand = cli.Command{
if entry.MediaType == soci.SociIndexArtifactType {
return fmt.Errorf("the provided digest belongs to a SOCI index. Use `soci index info` to get the detailed information about it")
}
storage, err := oci.New(config.SociContentStorePath)
storage, err := oci.New(config.DefaultSociContentStorePath)
if err != nil {
return err
}
Expand Down
29 changes: 17 additions & 12 deletions fs/artifact_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"os"
"path/filepath"

"github.com/awslabs/soci-snapshotter/fs/config"
"github.com/awslabs/soci-snapshotter/service/keychain/dockerconfig"
"github.com/awslabs/soci-snapshotter/service/keychain/local_keychain"
"github.com/awslabs/soci-snapshotter/soci"
Expand Down Expand Up @@ -56,18 +55,20 @@ type resolverStorage interface {

// artifactFetcher is responsible for fetching and storing artifacts in the provided artifact store.
type artifactFetcher struct {
remoteStore resolverStorage
localStore content.Storage
refspec reference.Spec
remoteStore resolverStorage
localStore content.Storage
refspec reference.Spec
contentStorePath string
}

// Constructs a new artifact fetcher
// Takes in the image reference, the local store and the resolver
func newArtifactFetcher(refspec reference.Spec, localStore content.Storage, remoteStore resolverStorage) (*artifactFetcher, error) {
func newArtifactFetcher(refspec reference.Spec, localStore content.Storage, remoteStore resolverStorage, contentStorePath string) (*artifactFetcher, error) {
return &artifactFetcher{
localStore: localStore,
remoteStore: remoteStore,
refspec: refspec,
localStore: localStore,
remoteStore: remoteStore,
refspec: refspec,
contentStorePath: contentStorePath,
}, nil
}

Expand All @@ -81,7 +82,11 @@ func newRemoteStore(refspec reference.Spec) (*remote.Repository, error) {
Client: socihttp.NewRetryableClient(socihttp.NewRetryableClientConfig()),
Cache: auth.DefaultCache,
Credential: func(ctx context.Context, host string) (auth.Credential, error) {
username, password, err := local_keychain.Keychain(ctx).GetCredentials(host, refspec)
keychain, err := local_keychain.Get()
if err != nil {
return auth.EmptyCredential, err
}
username, password, err := keychain.GetCredentials(host, refspec)
if err != nil {
return auth.Credential{
Username: username,
Expand Down Expand Up @@ -119,7 +124,7 @@ func (f *artifactFetcher) constructRef(desc ocispec.Descriptor) string {
func (f *artifactFetcher) Fetch(ctx context.Context, desc ocispec.Descriptor) (io.ReadCloser, bool, error) {
// Try to read the requested artifact from the local filesystem first.
// This is faster and lets us bypass all of the container registry interaction when available.
localFilename := filepath.Join(config.SociContentStorePath, "blobs", "sha256", desc.Digest.Encoded())
localFilename := filepath.Join(f.contentStorePath, "blobs", "sha256", desc.Digest.Encoded())
if _, err := os.Stat(localFilename); err == nil {
file, err := os.Open(localFilename)
if err != nil {
Expand Down Expand Up @@ -177,9 +182,9 @@ func (f *artifactFetcher) Store(ctx context.Context, desc ocispec.Descriptor, re
return nil
}

func FetchSociArtifacts(ctx context.Context, refspec reference.Spec, indexDesc ocispec.Descriptor, localStore content.Storage, remoteStore resolverStorage) (*soci.Index, error) {
func FetchSociArtifacts(ctx context.Context, refspec reference.Spec, indexDesc ocispec.Descriptor, localStore content.Storage, remoteStore resolverStorage, contentStorePath string) (*soci.Index, error) {

fetcher, err := newArtifactFetcher(refspec, localStore, remoteStore)
fetcher, err := newArtifactFetcher(refspec, localStore, remoteStore, contentStorePath)
if err != nil {
return nil, fmt.Errorf("could not create an artifact fetcher: %w", err)
}
Expand Down
10 changes: 7 additions & 3 deletions fs/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ package config

const (
// Default path to OCI-compliant CAS
SociContentStorePath = "/var/lib/soci-snapshotter-grpc/content/"
DefaultSociContentStorePath = "/var/lib/soci-snapshotter-grpc/content/"

// Default path to local SOCI Index storage
SociIndexStorePath = "/var/lib/soci-snapshotter-grpc/indexes/"
DefaultSociIndexStorePath = "/var/lib/soci-snapshotter-grpc/indexes/"

// Default path to snapshotter root dir
SociSnapshotterRootPath = "/var/lib/soci-snapshotter-grpc/"
DefaultSociSnapshotterRootPath = "/var/lib/soci-snapshotter-grpc/"
)

type Config struct {
Expand All @@ -61,6 +61,10 @@ type Config struct {
MountTimeoutSec int64 `toml:"mount_timeout_sec"`
FuseMetricsEmitWaitDurationSec int64 `toml:"fuse_metrics_emit_wait_duration_sec"`

RootPath string `toml:"root_path"`
ContentStorePath string `toml:"content_store_path"`
IndexStorePath string `toml:"index_store_path"`

// BlobConfig is config for layer blob management.
BlobConfig `toml:"blob"`

Expand Down
16 changes: 10 additions & 6 deletions fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func NewFilesystem(ctx context.Context, root string, cfg config.Config, opts ...
bgEmitMetricPeriod = defaultBgMetricEmitPeriod
}

store, err := oci.New(config.SociContentStorePath)
store, err := oci.New(cfg.ContentStorePath)
if err != nil {
return nil, fmt.Errorf("cannot create local store: %w", err)
}
Expand Down Expand Up @@ -266,6 +266,8 @@ func NewFilesystem(ctx context.Context, root string, cfg config.Config, opts ...
entryTimeout: entryTimeout,
negativeTimeout: negativeTimeout,
orasStore: store,
indexStorePath: cfg.IndexStorePath,
contentStorePath: cfg.ContentStorePath,
bgFetcher: bgFetcher,
mountTimeout: mountTimeout,
fuseMetricsEmitWaitDuration: fuseMetricsEmitWaitDuration,
Expand All @@ -282,7 +284,7 @@ type sociContext struct {
fuseOperationCounter *layer.FuseOperationCounter
}

func (c *sociContext) Init(fsCtx context.Context, ctx context.Context, imageRef, indexDigest, imageManifestDigest string, store orascontent.Storage, fuseOpEmitWaitDuration time.Duration) error {
func (c *sociContext) Init(fsCtx context.Context, ctx context.Context, imageRef, indexDigest, imageManifestDigest string, store orascontent.Storage, indexStorePath, contentStorePath string, fuseOpEmitWaitDuration time.Duration) error {
var retErr error
c.fetchOnce.Do(func() {
defer func() {
Expand Down Expand Up @@ -313,7 +315,7 @@ func (c *sociContext) Init(fsCtx context.Context, ctx context.Context, imageRef,
if indexDigest == "" {
imageManifestHash := strings.TrimPrefix(imageManifestDigest, "sha256:")
log.G(ctx).Debugf("soci index digest for image %s not provided, attempting to retrieve locally/remotely", imageManifestHash)
index, err := os.ReadFile(filepath.Join(config.SociIndexStorePath, imageManifestHash))
index, err := os.ReadFile(filepath.Join(indexStorePath, imageManifestHash))
if err == nil {
indexDigest = strings.TrimSpace(string(index))
indexDesc.Digest = digest.Digest(indexDigest)
Expand All @@ -340,7 +342,7 @@ func (c *sociContext) Init(fsCtx context.Context, ctx context.Context, imageRef,

log.G(ctx).WithField("digest", indexDesc.Digest.String()).Infof("fetching SOCI artifacts using index descriptor")

index, err := FetchSociArtifacts(ctx, refspec, indexDesc, store, remoteStore)
index, err := FetchSociArtifacts(ctx, refspec, indexDesc, store, remoteStore, contentStorePath)
if err != nil {
retErr = fmt.Errorf("error trying to fetch SOCI artifacts: %w", err)
return
Expand Down Expand Up @@ -382,6 +384,8 @@ type filesystem struct {
negativeTimeout time.Duration
sociContexts sync.Map
orasStore orascontent.Storage
indexStorePath string
contentStorePath string
bgFetcher *bf.BackgroundFetcher
mountTimeout time.Duration
fuseMetricsEmitWaitDuration time.Duration
Expand Down Expand Up @@ -418,7 +422,7 @@ func (fs *filesystem) MountLocal(ctx context.Context, mountpoint string, labels
if err != nil {
return fmt.Errorf("cannot create remote store: %w", err)
}
fetcher, err := newArtifactFetcher(refspec, fs.orasStore, remoteStore)
fetcher, err := newArtifactFetcher(refspec, fs.orasStore, remoteStore, fs.contentStorePath)
if err != nil {
return fmt.Errorf("cannot create fetcher: %w", err)
}
Expand All @@ -438,7 +442,7 @@ func (fs *filesystem) getSociContext(ctx context.Context, imageRef, indexDigest,
if !ok {
return nil, fmt.Errorf("could not load index: fs soci context is invalid type for %s", indexDigest)
}
err := c.Init(fs.ctx, ctx, imageRef, indexDigest, imageManifestDigest, fs.orasStore, fs.fuseMetricsEmitWaitDuration)
err := c.Init(fs.ctx, ctx, imageRef, indexDigest, imageManifestDigest, fs.orasStore, fs.indexStorePath, fs.contentStorePath, fs.fuseMetricsEmitWaitDuration)
return c, err
}

Expand Down
28 changes: 16 additions & 12 deletions service/keychain/local_keychain/local_keychain.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ package local_keychain

import (
"context"
"flag"
"fmt"
"net"
"sync"
Expand All @@ -48,11 +47,6 @@ import (
"google.golang.org/grpc/reflection"
)

var (
localKeychainPort = flag.Int("local_keychain_port", 0,
"Port on which to expose the local_keychain gRPC service that accepts username/password credentials for private images. If 0, the local_keychain service is not started/exposed.")
)

type credentials struct {
username string
password string
Expand All @@ -68,17 +62,17 @@ type keychain struct {
var singleton *keychain
var lock = &sync.Mutex{}

func (kc *keychain) init() {
if *localKeychainPort == 0 {
func (kc *keychain) init(port int) {
if port == 0 {
log.G(context.Background()).Info("no local_keychain_port specified, not starting local_keychain gRPC server")
return
}

lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", *localKeychainPort))
lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port))
if err != nil {
log.G(context.Background()).Fatalf("failed to listen: %v", err)
} else {
log.G(context.Background()).Infof("started local keychain server on localhost:%d", *localKeychainPort)
log.G(context.Background()).Infof("started local keychain server on localhost:%d", port)
}
// Set to avoid errors: Bandwidth exhausted HTTP/2 error code: ENHANCE_YOUR_CALM Received Goaway too_many_pings
opts := []grpc.ServerOption{
Expand Down Expand Up @@ -124,14 +118,24 @@ func (kc *keychain) GetCredentials(host string, refspec reference.Spec) (string,
return "", "", nil
}

func Keychain(ctx context.Context) *keychain {
func Keychain(ctx context.Context, port int) *keychain {
lock.Lock()
defer lock.Unlock()
if singleton == nil {
singleton = &keychain{
cache: map[string]credentials{},
}
singleton.init()
singleton.init(port)
}
return singleton
}

// Returns the singleton keychain, or an error if it hasn't been initialized.
func Get() (*keychain, error) {
lock.Lock()
defer lock.Unlock()
if singleton == nil {
return nil, fmt.Errorf("local keychain must be initialized (with Keychain()) before being gotten (with Get())")
}
return singleton, nil
}
2 changes: 1 addition & 1 deletion soci/artifacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ var (

// Get the default artifacts db path
func ArtifactsDbPath() string {
return path.Join(config.SociSnapshotterRootPath, artifactsDbName)
return path.Join(config.DefaultSociSnapshotterRootPath, artifactsDbName)
}

// ArtifactEntry is a metadata object for a SOCI artifact.
Expand Down
Loading

0 comments on commit 5a4dbb7

Please sign in to comment.