Skip to content

Commit

Permalink
Refactor debuginfo/metadata
Browse files Browse the repository at this point in the history
Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>
  • Loading branch information
kakkoyun committed Jun 9, 2022
1 parent 2d37e5d commit 3fb208d
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 42 deletions.
61 changes: 33 additions & 28 deletions pkg/debuginfo/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,30 +27,25 @@ import (
)

var (
logger log.Logger
ErrDebugInfoMetadataShouldExist = errors.New("debug info metadata should exist")
ErrDebugInfoMetadataExpectedStateUploading = errors.New("debug info metadata state should be uploading")
)

type metadataState int64

const (
// There was an unexpected error. The error will be filled in in the return
// There was an unexpected error. The error will be filled in the return
// value
metadataStateError metadataState = iota
// There's no debug info metadata. This could mean that an older Parca version
// uploaded the debug info files, but there's not record of the metadata, yet.
// There's no debug info metadata. This could mean that an older version
// uploaded the debug info files, but there's no record of the metadata, yet.
metadataStateEmpty
// The debug info file is being uploaded.
metadataStateUploading
// The debug info file is fully uploaded.
metadataStateUploaded
)

func setMetadataLogger(l log.Logger) {
logger = log.With(l, "component", "debuginfo-metadata")
}

func (m metadataState) String() string {
d := map[metadataState]string{
metadataStateError: "METADATA_STATE_ERROR",
Expand All @@ -66,58 +61,68 @@ func (m metadataState) String() string {
return val
}

type DebugInfoMetadata struct {
type metadataManager struct {
logger log.Logger

bucket objstore.Bucket
}

func newMetadataManager(logger log.Logger, bucket objstore.Bucket) *metadataManager {
return &metadataManager{logger: log.With(logger, "component", "debuginfo-metadata"), bucket: bucket}
}

type metadata struct {
State metadataState `json:"state"`
StartedUploadAt int64 `json:"started_upload_at"`
FinishedUploadAt int64 `json:"finished_upload_at"`
}

func metadataUpdate(ctx context.Context, bucket objstore.Bucket, buildID string, state metadataState) error {
level.Debug(logger).Log("msg", "Attempting state update to", "state", state)
func (m *metadataManager) update(ctx context.Context, buildID string, state metadataState) error {
level.Debug(m.logger).Log("msg", "Attempting state update to", "state", state)

switch state {
case metadataStateUploading:
_, err := bucket.Get(ctx, metadataPath(buildID))
_, err := m.bucket.Get(ctx, metadataObjectPath(buildID))
// The metadata file should not exist yet. Not erroring here because there's
// room for a race condition.
if err == nil {
level.Info(logger).Log("msg", "There should not be a metadata file")
level.Info(m.logger).Log("msg", "There should not be a metadata file")
return nil
}

if !bucket.IsObjNotFoundErr(err) {
level.Error(logger).Log("msg", "Expected IsObjNotFoundErr but got", "err", err)
if !m.bucket.IsObjNotFoundErr(err) {
level.Error(m.logger).Log("msg", "Expected IsObjNotFoundErr but got", "err", err)
return err
}

// Let's write the metadata.
metadataBytes, _ := json.MarshalIndent(&DebugInfoMetadata{
metadataBytes, _ := json.MarshalIndent(&metadata{
State: metadataStateUploading,
StartedUploadAt: time.Now().Unix(),
}, "", "\t")
r := bytes.NewReader(metadataBytes)
if err := bucket.Upload(ctx, metadataPath(buildID), r); err != nil {
level.Error(logger).Log("msg", "Creating the metadata file failed", "err", err)
if err := m.bucket.Upload(ctx, metadataObjectPath(buildID), r); err != nil {
level.Error(m.logger).Log("msg", "Creating the metadata file failed", "err", err)
return err
}

case metadataStateUploaded:
r, err := bucket.Get(ctx, metadataPath(buildID))
r, err := m.bucket.Get(ctx, metadataObjectPath(buildID))
if err != nil {
level.Error(logger).Log("msg", "Expected metadata file", "err", err)
level.Error(m.logger).Log("msg", "Expected metadata file", "err", err)
return ErrDebugInfoMetadataShouldExist
}
buf := new(bytes.Buffer)
_, err = buf.ReadFrom(r)
if err != nil {
level.Error(logger).Log("msg", "ReadFrom failed", "err", err)
level.Error(m.logger).Log("msg", "ReadFrom failed", "err", err)
return err
}

metaData := &DebugInfoMetadata{}
metaData := &metadata{}

if err := json.Unmarshal(buf.Bytes(), metaData); err != nil {
level.Error(logger).Log("msg", "Parsing JSON metadata failed", "err", err)
level.Error(m.logger).Log("msg", "Parsing JSON metadata failed", "err", err)
return err
}

Expand All @@ -136,30 +141,30 @@ func metadataUpdate(ctx context.Context, bucket objstore.Bucket, buildID string,
metadataBytes, _ := json.MarshalIndent(&metaData, "", "\t")
newData := bytes.NewReader(metadataBytes)

if err := bucket.Upload(ctx, metadataPath(buildID), newData); err != nil {
if err := m.bucket.Upload(ctx, metadataObjectPath(buildID), newData); err != nil {
return err
}

}
return nil
}

func fetchMetadataState(ctx context.Context, bucket objstore.Bucket, buildID string) (metadataState, error) {
r, err := bucket.Get(ctx, metadataPath(buildID))
func (m *metadataManager) fetch(ctx context.Context, buildID string) (metadataState, error) {
r, err := m.bucket.Get(ctx, metadataObjectPath(buildID))
if err != nil {
return metadataStateEmpty, nil
}

buf := new(bytes.Buffer)
buf.ReadFrom(r)

metaData := &DebugInfoMetadata{}
metaData := &metadata{}
if err := json.Unmarshal(buf.Bytes(), metaData); err != nil {
return metadataStateError, err
}
return metaData.State, nil
}

func metadataPath(buildID string) string {
func metadataObjectPath(buildID string) string {
return path.Join(buildID, "metadata")
}
10 changes: 5 additions & 5 deletions pkg/debuginfo/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ import (
"testing"

"github.com/go-kit/log"
"github.com/parca-dev/parca/pkg/symbol"
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore/client"
"github.com/thanos-io/objstore/filesystem"

"github.com/parca-dev/parca/pkg/symbol"
)

func TestMetadata(t *testing.T) {
Expand Down Expand Up @@ -63,16 +64,15 @@ func TestMetadata(t *testing.T) {
require.NoError(t, err)

// Test that the initial state should be empty.
setMetadataLogger(logger)
state, err := fetchMetadataState(context.Background(), store.bucket, "fake-build-id")
state, err := store.metadataManager.fetch(context.Background(), "fake-build-id")
require.NoError(t, err)
require.Equal(t, metadataStateEmpty, state)

// Updating the state should be written to blob storage.
err = metadataUpdate(context.Background(), store.bucket, "fake-build-id", metadataStateUploading)
err = store.metadataManager.update(context.Background(), "fake-build-id", metadataStateUploading)
require.NoError(t, err)

state, err = fetchMetadataState(context.Background(), store.bucket, "fake-build-id")
state, err = store.metadataManager.fetch(context.Background(), "fake-build-id")
require.NoError(t, err)
require.Equal(t, metadataStateUploading, state)
}
16 changes: 7 additions & 9 deletions pkg/debuginfo/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ type Store struct {
bucket objstore.Bucket
debuginfodClient DebugInfodClient

symbolizer *symbol.Symbolizer
symbolizer *symbol.Symbolizer
metadataManager *metadataManager
}

// NewStore returns a new debug info store.
Expand All @@ -96,14 +97,13 @@ func NewStore(logger log.Logger, symbolizer *symbol.Symbolizer, config *Config,
return nil, fmt.Errorf("instantiate cache: %w", err)
}

setMetadataLogger(logger)

return &Store{
logger: log.With(logger, "component", "debuginfo"),
bucket: bucket,
cacheDir: cache.Directory,
symbolizer: symbolizer,
debuginfodClient: debuginfodClient,
metadataManager: newMetadataManager(logger, bucket),
}, nil
}

Expand Down Expand Up @@ -192,7 +192,7 @@ func (s *Store) Upload(stream debuginfopb.DebugInfoService_UploadServer) error {
level.Debug(s.logger).Log("msg", "trying to upload debug info", "buildid", buildID)
ctx := stream.Context()

result, err := fetchMetadataState(ctx, s.bucket, buildID)
result, err := s.metadataManager.fetch(ctx, buildID)
if err == nil {
level.Debug(s.logger).Log("msg", "fetchMetadataState", "result", result)

Expand All @@ -205,7 +205,7 @@ func (s *Store) Upload(stream debuginfopb.DebugInfoService_UploadServer) error {
return status.Error(codes.AlreadyExists, "debuginfo already exists, being uploaded right now")
}
} else {
level.Error(s.logger).Log("msg", "fetchMetadataState failed with", "err", err)
level.Error(s.logger).Log("msg", "fetchMetadataState failed", "err", err)
}

found, err := s.find(ctx, buildID)
Expand Down Expand Up @@ -242,8 +242,7 @@ func (s *Store) Upload(stream debuginfopb.DebugInfoService_UploadServer) error {
}
}

err = metadataUpdate(ctx, s.bucket, buildID, metadataStateUploading)
if err != nil {
if err := s.metadataManager.update(ctx, buildID, metadataStateUploading); err != nil {
level.Error(s.logger).Log("msg", "metadataUpdate failed with", "err", err)
}

Expand All @@ -256,8 +255,7 @@ func (s *Store) Upload(stream debuginfopb.DebugInfoService_UploadServer) error {
return status.Errorf(codes.Unknown, msg)
}

err = metadataUpdate(ctx, s.bucket, buildID, metadataStateUploaded)
if err != nil {
if err := s.metadataManager.update(ctx, buildID, metadataStateUploaded); err != nil {
level.Error(s.logger).Log("msg", "metadataUpdate failed with", "err", err)
}
level.Debug(s.logger).Log("msg", "debug info uploaded", "buildid", buildID)
Expand Down

0 comments on commit 3fb208d

Please sign in to comment.