Skip to content

Commit ffaa05d

Browse files
joelanfordgrokspawn
authored andcommitted
a few improvements and optimizations
Signed-off-by: Joe Lanford <joe.lanford@gmail.com>
1 parent 10cbc91 commit ffaa05d

File tree

5 files changed

+50
-42
lines changed

5 files changed

+50
-42
lines changed

catalogd/cmd/catalogd/main.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,11 @@ func main() {
294294
os.Exit(1)
295295
}
296296

297-
localStorage = &storage.LocalDirV1{RootDir: storeDir, RootURL: baseStorageURL}
297+
localStorage = &storage.LocalDirV1{
298+
RootDir: storeDir,
299+
RootURL: baseStorageURL,
300+
EnableQueryHandler: features.CatalogdFeatureGate.Enabled(features.APIV1QueryHandler),
301+
}
298302

299303
// Config for the catalogd web server
300304
catalogServerConfig := serverutil.CatalogServerConfig{

catalogd/internal/storage/index.go

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package storage
33
import (
44
"cmp"
55
"encoding/json"
6-
"errors"
76
"fmt"
87
"io"
98
"slices"
@@ -107,30 +106,28 @@ func (i *index) getSectionSet(schema, packageName, name string) sets.Set[section
107106
}
108107
}
109108

110-
func newIndex(r io.Reader) (*index, error) {
109+
func newIndex(metasChan <-chan *declcfg.Meta) (*index, error) {
111110
idx := &index{
112111
BySchema: make(map[string][]section),
113112
ByPackage: make(map[string][]section),
114113
ByName: make(map[string][]section),
115114
}
116-
var meta declcfg.Meta
117-
dec := json.NewDecoder(r)
118-
for {
119-
i1 := dec.InputOffset()
120-
if err := dec.Decode(&meta); err != nil {
121-
if errors.Is(err, io.EOF) {
122-
break
123-
}
124-
return nil, err
125-
}
126-
i2 := dec.InputOffset()
127-
start := i1
128-
length := i2 - i1
115+
offset := int64(0)
116+
for meta := range metasChan {
117+
start := offset
118+
length := int64(len(meta.Blob))
119+
offset += length
129120

130121
s := section{offset: start, length: length}
131-
idx.BySchema[meta.Schema] = append(idx.BySchema[meta.Schema], s)
132-
idx.ByPackage[meta.Package] = append(idx.ByPackage[meta.Package], s)
133-
idx.ByName[meta.Name] = append(idx.ByName[meta.Name], s)
122+
if meta.Schema != "" {
123+
idx.BySchema[meta.Schema] = append(idx.BySchema[meta.Schema], s)
124+
}
125+
if meta.Package != "" {
126+
idx.ByPackage[meta.Package] = append(idx.ByPackage[meta.Package], s)
127+
}
128+
if meta.Name != "" {
129+
idx.ByName[meta.Name] = append(idx.ByName[meta.Name], s)
130+
}
134131
}
135132
return idx, nil
136133
}

catalogd/internal/storage/localdir.go

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,9 @@ import (
3131
// done so that clients accessing the content stored in RootDir/catalogName have
3232
// atomic view of the content for a catalog.
3333
type LocalDirV1 struct {
34-
RootDir string
35-
RootURL *url.URL
34+
RootDir string
35+
RootURL *url.URL
36+
EnableQueryHandler bool
3637

3738
m sync.RWMutex
3839
sf singleflight.Group
@@ -42,7 +43,7 @@ func (s *LocalDirV1) Store(ctx context.Context, catalog string, fsys fs.FS) erro
4243
s.m.Lock()
4344
defer s.m.Unlock()
4445

45-
if features.CatalogdFeatureGate.Enabled(features.APIV1QueryHandler) {
46+
if s.EnableQueryHandler {
4647
return s.storeCatalogFileAndIndex(ctx, catalog, fsys)
4748
}
4849
return s.storeCatalogFile(ctx, catalog, fsys)
@@ -88,30 +89,33 @@ func (s *LocalDirV1) storeCatalogFileAndIndex(ctx context.Context, catalog strin
8889
}
8990
defer os.Remove(tmpIndexFile.Name())
9091

91-
pr, pw := io.Pipe()
92-
mw := io.MultiWriter(tmpCatalogFile, pw)
92+
metasChan := make(chan *declcfg.Meta)
9393
eg, egCtx := errgroup.WithContext(ctx)
9494
eg.Go(func() error {
95+
defer close(metasChan)
9596
if err := declcfg.WalkMetasFS(egCtx, fsys, func(path string, meta *declcfg.Meta, err error) error {
9697
if err != nil {
9798
return err
9899
}
99-
_, err = mw.Write(meta.Blob)
100+
_, err = tmpCatalogFile.Write(meta.Blob)
100101
if err != nil {
101-
return pw.CloseWithError(err)
102+
return err
102103
}
104+
select {
105+
case <-egCtx.Done():
106+
return egCtx.Err()
107+
case metasChan <- meta:
108+
}
109+
103110
return nil
104111
}, declcfg.WithConcurrency(1)); err != nil {
105112
return fmt.Errorf("error walking FBC root: %w", err)
106113
}
107-
return pw.CloseWithError(tmpCatalogFile.Close())
114+
return tmpCatalogFile.Close()
108115
})
109116
eg.Go(func() error {
110-
idx, err := newIndex(pr)
117+
idx, err := newIndex(metasChan)
111118
if err != nil {
112-
return pr.CloseWithError(err)
113-
}
114-
if err := pr.Close(); err != nil {
115119
return err
116120
}
117121
enc := json.NewEncoder(tmpIndexFile)
@@ -142,7 +146,7 @@ func (s *LocalDirV1) Delete(catalog string) error {
142146
var errs []error
143147
errs = append(errs, os.RemoveAll(filepath.Join(s.RootDir, fmt.Sprintf("%s.jsonl", catalog))))
144148

145-
if features.CatalogdFeatureGate.Enabled(features.APIV1QueryHandler) {
149+
if s.EnableQueryHandler {
146150
errs = append(errs, os.RemoveAll(filepath.Join(s.RootDir, fmt.Sprintf("%s.index.json", catalog))))
147151
}
148152
return errors.Join(errs...)
@@ -158,7 +162,7 @@ func (s *LocalDirV1) StorageServerHandler() http.Handler {
158162
v1AllPath := s.RootURL.JoinPath("{catalog}", "api", "v1", "all").Path
159163
mux.Handle(v1AllPath, s.v1AllHandler())
160164

161-
if features.CatalogdFeatureGate.Enabled(features.APIV1QueryHandler) {
165+
if s.EnableQueryHandler {
162166
v1QueryPath := s.RootURL.JoinPath("{catalog}", "api", "v1", "query").Path
163167
mux.Handle(v1QueryPath, s.v1QueryHandler())
164168
}
@@ -171,16 +175,11 @@ func (s *LocalDirV1) v1AllHandler() http.Handler {
171175
defer s.m.RUnlock()
172176

173177
catalog := r.PathValue("catalog")
178+
w.Header().Add("Content-Type", "application/jsonl")
174179
http.ServeFile(w, r, filepath.Join(s.RootDir, fmt.Sprintf("%s.jsonl", catalog)))
175180
})
176181
gzHandler := gzhttp.GzipHandler(catalogHandler)
177-
178-
typeHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
179-
w.Header().Add("Content-Type", "application/jsonl")
180-
gzHandler.ServeHTTP(w, r)
181-
})
182-
183-
return newLoggingMiddleware(typeHandler)
182+
return newLoggingMiddleware(gzHandler)
184183
}
185184

186185
func (s *LocalDirV1) v1QueryHandler() http.Handler {
@@ -193,6 +192,13 @@ func (s *LocalDirV1) v1QueryHandler() http.Handler {
193192
pkg := r.URL.Query().Get("package")
194193
name := r.URL.Query().Get("name")
195194

195+
// If no parameters are provided, return the entire catalog (this is the same as /api/v1/all)
196+
if schema == "" && pkg == "" && name == "" {
197+
w.Header().Add("Content-Type", "application/jsonl")
198+
http.ServeFile(w, r, filepath.Join(s.RootDir, fmt.Sprintf("%s.jsonl", catalog)))
199+
return
200+
}
201+
196202
catalogFilePath := filepath.Join(s.RootDir, fmt.Sprintf("%s.jsonl", catalog))
197203
catalogFileStat, err := os.Stat(catalogFilePath)
198204
if err != nil {

catalogd/internal/storage/localdir_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ var _ = Describe("LocalDir Storage Test", func() {
5656
rootDir = d
5757

5858
baseURL = &url.URL{Scheme: "http", Host: "test-addr", Path: urlPrefix}
59-
store = LocalDirV1{RootDir: rootDir, RootURL: baseURL}
59+
store = &LocalDirV1{RootDir: rootDir, RootURL: baseURL}
6060
unpackResultFS = &fstest.MapFS{
6161
"bundle.yaml": &fstest.MapFile{Data: []byte(testBundle), Mode: os.ModePerm},
6262
"package.yaml": &fstest.MapFile{Data: []byte(testPackage), Mode: os.ModePerm},

catalogd/internal/storage/storage.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ import (
1313
type Instance interface {
1414
Store(ctx context.Context, catalog string, fsys fs.FS) error
1515
Delete(catalog string) error
16+
ContentExists(catalog string) bool
17+
1618
BaseURL(catalog string) string
1719
StorageServerHandler() http.Handler
18-
ContentExists(catalog string) bool
1920
}

0 commit comments

Comments
 (0)