Skip to content

Commit

Permalink
Implement basic Storage Indexer (#814)
Browse files Browse the repository at this point in the history
* Prepare code for storage indexer

* Fix

* Changelog

* Import cloud.google.com/go/storage

* Prepare tests

* FakeServer prepared

* WIP

* Tests

* Fix

* Changelog

* fix

* WIP

* WIP

* WIP

* WIP

* Fix: changelog

* Typo

* Fix: imports

* Load cursor

* Load package index

* Fixes

* WIP

* Use json.RawMessage

* Lack of data stream manifest name

* Fix: go.mod

* Fix tests

* Fix

* Fix

* Fix

* Fix: go.mod

* Fix
  • Loading branch information
mtojek authored Jun 7, 2022
1 parent 4499a1c commit 9785e9b
Show file tree
Hide file tree
Showing 11 changed files with 584,976 additions and 407,120 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

* Prepare stub for Storage Indexer. Disable fetching packages from Package Storage v1. [#811](https://github.com/elastic/package-registry/pull/811)
* Support input packages. [#809](https://github.com/elastic/package-registry/pull/809)
* Implement storage indexer. [#814](https://github.com/elastic/package-registry/pull/814)

### Deprecated

Expand Down
25 changes: 20 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ import (
"syscall"
"time"

gstorage "cloud.google.com/go/storage"
"github.com/gorilla/mux"
"github.com/pkg/errors"
"go.uber.org/zap"

"go.elastic.co/apm"
"go.elastic.co/apm/module/apmgorilla"
"go.uber.org/zap"

ucfgYAML "github.com/elastic/go-ucfg/yaml"

Expand All @@ -45,7 +45,10 @@ var (
dryRun bool
configPath string

featureStorageIndexer bool
featureStorageIndexer bool
storageIndexerBucketInternal string
storageIndexerBucketPublic string
storageIndexerWatchInterval time.Duration

defaultConfig = Config{
CacheTimeIndex: 10 * time.Second,
Expand All @@ -64,8 +67,12 @@ func init() {
// This flag is experimental and might be removed in the future or renamed
flag.BoolVar(&dryRun, "dry-run", false, "Runs a dry-run of the registry without starting the web service (experimental).")
flag.BoolVar(&packages.ValidationDisabled, "disable-package-validation", false, "Disable package content validation.")
// This flag is a technical preview and might be removed in the future or renamed
// The following storage related flags are technical preview and might be removed in the future or renamed
flag.BoolVar(&featureStorageIndexer, "feature-storage-indexer", false, "Enable storage indexer to include packages from Package Storage v2 (technical preview).")
flag.StringVar(&storageIndexerBucketInternal, "storage-indexer-bucket-internal", "", "Path to the internal Package Storage bucket (with gs:// prefix).")
flag.StringVar(&storageIndexerBucketPublic, "storage-indexer-bucket-public", "", "Path to the public Package Storage bucket (with gs:// prefix).")
flag.DurationVar(&storageIndexerWatchInterval, "storage-indexer-watch-interval", 1*time.Minute, "Address of the package-registry service.")

}

type Config struct {
Expand Down Expand Up @@ -131,7 +138,15 @@ func initServer(logger *zap.Logger) *http.Server {

var indexers []Indexer
if featureStorageIndexer {
indexers = append(indexers, storage.NewIndexer())
storageClient, err := gstorage.NewClient(ctx)
if err != nil {
logger.Fatal("can't initialize storage client", zap.Error(err))
}
indexers = append(indexers, storage.NewIndexer(storageClient, storage.IndexerOptions{
PackageStorageBucketInternal: storageIndexerBucketInternal,
PackageStorageBucketPublic: storageIndexerBucketPublic,
WatchInterval: storageIndexerWatchInterval,
}))
} else {
indexers = append(indexers, packages.NewFileSystemIndexer(packagesBasePaths...))
indexers = append(indexers, packages.NewZipFileSystemIndexer(packagesBasePaths...))
Expand Down
12 changes: 8 additions & 4 deletions storage/cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import (
"context"
"encoding/json"
"io/ioutil"
"log"

"cloud.google.com/go/storage"
"github.com/pkg/errors"
"go.uber.org/zap"

"github.com/elastic/package-registry/util"
)

type cursor struct {
Expand All @@ -27,12 +29,13 @@ func (c *cursor) String() string {
}

func loadCursor(ctx context.Context, storageClient *storage.Client, bucketName, rootStoragePath string) (*cursor, error) {
log.Println("Load cursor file")
logger := util.Logger()
logger.Debug("load cursor file")

rootedCursorStoragePath := joinObjectPaths(rootStoragePath, cursorStoragePath)
objectReader, err := storageClient.Bucket(bucketName).Object(rootedCursorStoragePath).NewReader(ctx)
if err == storage.ErrObjectNotExist {
log.Printf("Cursor file doesn't exist, most likely a first run (path: %s)", rootedCursorStoragePath)
logger.Debug("cursor file doesn't exist, most likely a first run", zap.String("path", rootedCursorStoragePath))
return new(cursor), nil
}
if err != nil {
Expand All @@ -51,6 +54,7 @@ func loadCursor(ctx context.Context, storageClient *storage.Client, bucketName,
return nil, errors.Wrapf(err, "can't unmarshal the cursor file")
}

log.Printf("Loaded cursor file: %s", c.String())
logger.Debug("cursor file doesn't exist, most likely a first run", zap.String("path", rootedCursorStoragePath))
logger.Debug("loaded cursor file", zap.String("cursor", c.String()))
return &c, nil
}
55 changes: 41 additions & 14 deletions storage/fakestorage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,39 @@ func prepareFakeServer(t *testing.T, indexPath string) *fakestorage.Server {
indexContent, err := ioutil.ReadFile(indexPath)
require.NoError(t, err, "index file must be populated")

const firstRevision = "1"
serverObjects := prepareServerObjects(t, firstRevision, indexContent)
return fakestorage.NewServer(serverObjects)
}

func updateFakeServer(t *testing.T, server *fakestorage.Server, revision, indexPath string) {
indexContent, err := ioutil.ReadFile(indexPath)
require.NoError(t, err, "index file must be populated")

serverObjects := prepareServerObjects(t, revision, indexContent)

for _, so := range serverObjects {
server.CreateObject(so)
}
}

func prepareServerObjects(t *testing.T, revision string, indexContent []byte) []fakestorage.Object {
var index searchIndexAll
err = json.Unmarshal(indexContent, &index)
err := json.Unmarshal(indexContent, &index)
require.NoError(t, err, "index file must be valid")
require.NotEmpty(t, index.Packages, "index file must contain some package entries")

const firstRevision = "1"

var serverObjects []fakestorage.Object
// Add cursor and index file
serverObjects = append(serverObjects, fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{
BucketName: fakePackageStorageBucketInternal, Name: cursorStoragePath,
},
Content: []byte(`{"cursor":"` + firstRevision + `"}`),
Content: []byte(`{"current":"` + revision + `"}`),
})
serverObjects = append(serverObjects, fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{
BucketName: fakePackageStorageBucketInternal, Name: joinObjectPaths(v2MetadataStoragePath, firstRevision, searchIndexAllFile),
BucketName: fakePackageStorageBucketInternal, Name: joinObjectPaths(v2MetadataStoragePath, revision, searchIndexAllFile),
},
Content: indexContent,
})
Expand All @@ -56,13 +71,15 @@ func prepareFakeServer(t *testing.T, indexPath string) *fakestorage.Server {
nameVersion := fmt.Sprintf("%s-%s", aPackage.PackageManifest.Name, aPackage.PackageManifest.Version)

// Add fake static resources: docs, img
for _, asset := range aPackage.Assets {
if !strings.HasPrefix(asset, "docs") &&
!strings.HasPrefix(asset, "img") {
for _, asset := range aPackage.PackageManifest.Assets {
assetPath, err := filepath.Rel(filepath.Join("/package", aPackage.PackageManifest.Name, aPackage.PackageManifest.Version), asset)
require.NoError(t, err, "relative path expected")
if !strings.HasPrefix(assetPath, "docs") &&
!strings.HasPrefix(assetPath, "img") {
continue
}

path := joinObjectPaths(artifactsStaticStoragePath, nameVersion, asset)
path := joinObjectPaths(artifactsStaticStoragePath, nameVersion, assetPath)
serverObjects = append(serverObjects, fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{
BucketName: fakePackageStorageBucketPublic, Name: path,
Expand All @@ -71,23 +88,31 @@ func prepareFakeServer(t *testing.T, indexPath string) *fakestorage.Server {
})
}

// Add fake .zip.sig
path := joinObjectPaths(artifactsPackagesStoragePath, nameVersion+".zip.sig")
serverObjects = append(serverObjects, fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{
BucketName: fakePackageStorageBucketPublic, Name: path,
},
Content: []byte(filepath.Base(path)),
})

// Add fake .zip package
path := joinObjectPaths(artifactsPackagesStoragePath, nameVersion+".zip")
path = joinObjectPaths(artifactsPackagesStoragePath, nameVersion+".zip")
serverObjects = append(serverObjects, fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{
BucketName: fakePackageStorageBucketPublic, Name: path,
},
Content: []byte(filepath.Base(path)),
})
}

t.Logf("Prepared %d packages with total %d server objects.", len(index.Packages), len(serverObjects))
return fakestorage.NewServer(serverObjects)
return serverObjects
}

func TestPrepareFakeServer(t *testing.T) {
// given
indexFile := "testdata/search-index-all-1.json"
indexFile := "testdata/search-index-all-full.json"
testIndexFile, err := os.ReadFile(indexFile)
require.NoErrorf(t, err, "index file should be present in testdata")

Expand All @@ -100,11 +125,13 @@ func TestPrepareFakeServer(t *testing.T) {
require.NotNil(t, client, "client should be initialized")

aCursor := readObject(t, client.Bucket(fakePackageStorageBucketInternal).Object(cursorStoragePath))
assert.Equal(t, []byte(`{"cursor":"1"}`), aCursor)
assert.Equal(t, []byte(`{"current":"1"}`), aCursor)
anIndex := readObject(t, client.Bucket(fakePackageStorageBucketInternal).Object(joinObjectPaths(v2MetadataStoragePath, "1", searchIndexAllFile)))
assert.Equal(t, testIndexFile, anIndex)
packageZip := readObject(t, client.Bucket(fakePackageStorageBucketPublic).Object(joinObjectPaths(artifactsPackagesStoragePath, "1password-1.1.1.zip")))
assert.NotZero(t, len(packageZip), ".zip package must have fake content")
packageSig := readObject(t, client.Bucket(fakePackageStorageBucketPublic).Object(joinObjectPaths(artifactsPackagesStoragePath, "1password-1.1.1.zip.sig")))
assert.NotZero(t, len(packageSig), ".zip.sig must have fake content")

// check few static files
readme := readObject(t, client.Bucket(fakePackageStorageBucketPublic).Object(joinObjectPaths(artifactsStaticStoragePath, "1password-1.1.1", "docs/README.md")))
Expand Down
145 changes: 60 additions & 85 deletions storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,102 +4,77 @@

package storage

import (
"context"
"encoding/json"
"io/ioutil"

"cloud.google.com/go/storage"
"github.com/pkg/errors"
"go.uber.org/zap"

"github.com/elastic/package-registry/packages"
"github.com/elastic/package-registry/util"
)

type searchIndexAll struct {
Packages []packageIndex `json:"packages"`
}

type packageIndex struct {
PackageManifest packageManifest `json:"package"`
DataStreamManifests []dataStreamManifest `json:"data_streams"`

Assets []string `json:"assets"`
PackageManifest packages.Package `json:"package_manifest"`
}

type packageManifest struct {
FormatVersion string `json:"format_version,omitempty"`
Name string `json:"name,omitempty"`
Title string `json:"title,omitempty"`
Version string `json:"version,omitempty"`
Release string `json:"release,omitempty"`
License string `json:"license,omitempty"`
Description string `json:"description,omitempty"`
Type string `json:"type,omitempty"`
Icons []image `json:"icons,omitempty"`
Screenshots []image `json:"screenshots,omitempty"`
Conditions *struct {
Kibana *struct {
Version string `json:"version,omitempty"`
} `json:"kibana,omitempty"`
} `json:"conditions,omitempty"`
Owner *struct {
Github string `json:"github,omitempty"`
} `json:"owner,omitempty"`
Categories []string `json:"categories,omitempty"`

PolicyTemplates []struct {
Name string `json:"name,omitempty"`
Title string `json:"title,omitempty"`
Categories []string `json:"categories,omitempty"`
DataStreams []string `json:"data_streams,omitempty"`
Description string `json:"description,omitempty"`
Icons []image `json:"icons,omitempty"`
Input struct {
Title string `json:"title,omitempty"`
Type string `json:"type,omitempty"`
Description string `json:"description,omitempty"`
InputGroup string `json:"input_group,omitempty"`
TemplatePath string `json:"template_path,omitempty"`
Vars []variable `json:"vars,omitempty"`
} `json:"input,omitempty"`
Screenshots []image `json:"screenshots,omitempty"`
Vars []variable `json:"vars,omitempty"`
} `json:"policy_templates,omitempty"`
func loadSearchIndexAll(ctx context.Context, storageClient *storage.Client, bucketName, rootStoragePath string, aCursor cursor) (*searchIndexAll, error) {
indexFile := searchIndexAllFile

logger := util.Logger()
logger.Debug("load search-index-all index", zap.String("index.file", indexFile))

content, err := loadIndexContent(ctx, storageClient, indexFile, bucketName, rootStoragePath, aCursor)
if err != nil {
return nil, errors.Wrap(err, "can't load search-index-all content")
}

var sia searchIndexAll
if content == nil {
return &sia, nil
}

err = json.Unmarshal(content, &sia)
if err != nil {
return nil, errors.Wrap(err, "can't unmarshal search-index-all")
}
return &sia, nil
}

type image struct {
Src string `json:"src,omitempty"`
Title string `json:"title,omitempty"`
Size string `json:"size,omitempty"`
Type string `json:"type,omitempty"`
func loadIndexContent(ctx context.Context, storageClient *storage.Client, indexFile, bucketName, rootStoragePath string, aCursor cursor) ([]byte, error) {
logger := util.Logger()
logger.Debug("load index content", zap.String("index.file", indexFile))

rootedIndexStoragePath := buildIndexStoragePath(rootStoragePath, aCursor, indexFile)
objectReader, err := storageClient.Bucket(bucketName).Object(rootedIndexStoragePath).NewReader(ctx)
if err != nil {
return nil, errors.Wrapf(err, "can't read the index file (path: %s)", rootedIndexStoragePath)
}
defer objectReader.Close()

b, err := ioutil.ReadAll(objectReader)
if err != nil {
return nil, errors.Wrapf(err, "ioutil.ReadAll failed")
}

return b, nil
}

type variable struct {
Name string `json:"name,omitempty"`
Type string `json:"type,omitempty"`
Title string `json:"title,omitempty"`
Description string `json:"description,omitempty"`
Multi bool `json:"multi,omitempty"`
Required bool `json:"required,omitempty"`
ShowUser bool `json:"show_user,omitempty"`
Default interface{} `json:"default,omitempty"`
func buildIndexStoragePath(rootStoragePath string, aCursor cursor, indexFile string) string {
return joinObjectPaths(rootStoragePath, v2MetadataStoragePath, aCursor.Current, indexFile)
}

type dataStreamManifest struct {
Title string `json:"title,omitempty"`
Type string `type:"type,omitempty"`
Dataset string `json:"dataset,omitempty"`
Hidden bool `json:"hidden,omitempty"`
IlmPolicy string `json:"ilm_policy,omitempty"`
DatasetIsPrefix bool `json:"dataset_is_prefix,omitempty"`
Release string `json:"release,omitempty"`
Streams []struct {
Title string `json:"title,omitempty"`
Description string `json:"description,omitempty"`
Enabled bool `json:"enabled,omitempty"`
Input string `json:"input,omitempty"`
TemplatePath string `json:"template_path,omitempty"`
Vars []variable `json:"vars,omitempty"`
} `json:"streams,omitempty" `
Elasticsearch *struct {
IndexTemplate *struct {
Settings map[string]interface{} `json:"settings,omitempty"`
Mappings map[string]interface{} `json:"mappings,omitempty"`
}
IngestPipeline *struct {
Name string `json:"name,omitempty"`
} `json:"ingest_pipeline,omitempty"`
Privileges *struct {
Indices []string `json:"indices,omitempty"`
} `json:"privileges,omitempty"`
} `json:"elasticsearch,omitempty"`
func transformSearchIndexAllToPackages(sia searchIndexAll) (packages.Packages, error) {
var transformedPackages packages.Packages
for i := range sia.Packages {
transformedPackages = append(transformedPackages, &sia.Packages[i].PackageManifest)
}
return transformedPackages, nil
}
Loading

0 comments on commit 9785e9b

Please sign in to comment.