Skip to content

Commit

Permalink
Scorecard: Implement GetCacheScoreCard RPC (#1907)
Browse files Browse the repository at this point in the history
  • Loading branch information
bduffany authored Apr 22, 2022
1 parent eee0e01 commit b098738
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 38 deletions.
1 change: 1 addition & 0 deletions server/build_event_protocol/build_event_handler/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go_library(
"//server/interfaces",
"//server/metrics",
"//server/remote_cache/hit_tracker",
"//server/remote_cache/scorecard",
"//server/tables",
"//server/terminal",
"//server/util/alert",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"flag"
"fmt"
"io"
"path/filepath"
"strconv"
"strings"
"sync"
Expand All @@ -24,6 +23,7 @@ import (
"github.com/buildbuddy-io/buildbuddy/server/interfaces"
"github.com/buildbuddy-io/buildbuddy/server/metrics"
"github.com/buildbuddy-io/buildbuddy/server/remote_cache/hit_tracker"
"github.com/buildbuddy-io/buildbuddy/server/remote_cache/scorecard"
"github.com/buildbuddy-io/buildbuddy/server/tables"
"github.com/buildbuddy-io/buildbuddy/server/terminal"
"github.com/buildbuddy-io/buildbuddy/server/util/alert"
Expand Down Expand Up @@ -213,34 +213,6 @@ func (r *statsRecorder) Enqueue(ctx context.Context, invocation *inpb.Invocation
}
}

func scoreCardBlobName(invocationID string) string {
blobFileName := invocationID + "-scorecard.pb"
return filepath.Join(invocationID, blobFileName)
}

func writeScoreCard(ctx context.Context, env environment.Env, invocationID string, scoreCard *capb.ScoreCard) error {
scoreCardBuf, err := proto.Marshal(scoreCard)
if err != nil {
return err
}
blobStore := env.GetBlobstore()
_, err = blobStore.WriteBlob(ctx, scoreCardBlobName(invocationID), scoreCardBuf)
return err
}

func readScoreCard(ctx context.Context, env environment.Env, invocationID string) (*capb.ScoreCard, error) {
blobStore := env.GetBlobstore()
buf, err := blobStore.ReadBlob(ctx, scoreCardBlobName(invocationID))
if err != nil {
return nil, err
}
sc := &capb.ScoreCard{}
if err := proto.Unmarshal(buf, sc); err != nil {
return nil, err
}
return sc, nil
}

func (r *statsRecorder) Start() {
ctx := r.env.GetServerContext()
for i := 0; i < numStatsRecorderWorkers; i++ {
Expand Down Expand Up @@ -270,7 +242,10 @@ func (r *statsRecorder) handleTask(ctx context.Context, task *recordStatsTask) {
fillInvocationFromCacheStats(stats, ti)
}
if scoreCard := hit_tracker.ScoreCard(ctx, r.env, task.invocationJWT.id); scoreCard != nil {
if err := writeScoreCard(ctx, r.env, task.invocationJWT.id, scoreCard); err != nil {
// Store results using the default sort order so we don't have to sort on
// each request.
scorecard.SortResults(scoreCard.Results)
if err := scorecard.Write(ctx, r.env, task.invocationJWT.id, scoreCard); err != nil {
log.Errorf("Error writing scorecard blob: %s", err)
}
}
Expand Down Expand Up @@ -979,13 +954,20 @@ func LookupInvocation(env environment.Env, ctx context.Context, iid string) (*in
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
var scoreCard *capb.ScoreCard
// The cache ScoreCard is not stored in the table invocation, so we do this lookup
// after converting the table invocation to a proto invocation.
if ti.InvocationStatus == int64(inpb.Invocation_PARTIAL_INVOCATION_STATUS) {
scoreCard = hit_tracker.ScoreCard(ctx, env, iid)
} else {
if sc, err := readScoreCard(ctx, env, iid); err == nil {
scoreCard = sc
// When detailed stats are enabled, the scorecard is not inlined in the
// invocation.
if !hit_tracker.DetailedStatsEnabled() {
// The cache ScoreCard is not stored in the table invocation, so we do this lookup
// after converting the table invocation to a proto invocation.
if ti.InvocationStatus == int64(inpb.Invocation_PARTIAL_INVOCATION_STATUS) {
scoreCard = hit_tracker.ScoreCard(ctx, env, iid)
} else {
sc, err := scorecard.Read(ctx, env, iid)
if err != nil {
log.Warningf("Failed to read scorecard for invocation %s: %s", iid, err)
} else {
scoreCard = sc
}
}
}
if scoreCard != nil {
Expand Down
1 change: 1 addition & 0 deletions server/buildbuddy_server/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ go_library(
"//server/environment",
"//server/eventlog",
"//server/interfaces",
"//server/remote_cache/scorecard",
"//server/role_filter",
"//server/tables",
"//server/target",
Expand Down
3 changes: 2 additions & 1 deletion server/buildbuddy_server/buildbuddy_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/buildbuddy-io/buildbuddy/server/environment"
"github.com/buildbuddy-io/buildbuddy/server/eventlog"
"github.com/buildbuddy-io/buildbuddy/server/interfaces"
"github.com/buildbuddy-io/buildbuddy/server/remote_cache/scorecard"
"github.com/buildbuddy-io/buildbuddy/server/role_filter"
"github.com/buildbuddy-io/buildbuddy/server/tables"
"github.com/buildbuddy-io/buildbuddy/server/target"
Expand Down Expand Up @@ -843,7 +844,7 @@ func (s *BuildBuddyServer) GetUsage(ctx context.Context, req *usagepb.GetUsageRe
}

func (s *BuildBuddyServer) GetCacheScoreCard(ctx context.Context, req *capb.GetCacheScoreCardRequest) (*capb.GetCacheScoreCardResponse, error) {
return nil, status.UnimplementedError("Not implemented")
return scorecard.GetCacheScoreCard(ctx, s.env, req)
}

type bsLookup struct {
Expand Down
4 changes: 4 additions & 0 deletions server/remote_cache/hit_tracker/hit_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ const (
// New counter types go here!
)

func DetailedStatsEnabled() bool {
return *detailedStatsEnabled
}

func cacheTypePrefix(actionCache bool, name string) string {
if actionCache {
return "action-cache-" + name
Expand Down
16 changes: 16 additions & 0 deletions server/remote_cache/scorecard/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "scorecard",
srcs = ["scorecard.go"],
importpath = "github.com/buildbuddy-io/buildbuddy/server/remote_cache/scorecard",
visibility = ["//visibility:public"],
deps = [
"//proto:cache_go_proto",
"//proto:pagination_go_proto",
"//server/environment",
"//server/util/paging",
"//server/util/status",
"@com_github_golang_protobuf//proto:go_default_library",
],
)
138 changes: 138 additions & 0 deletions server/remote_cache/scorecard/scorecard.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package scorecard

import (
"context"
"path/filepath"
"sort"
"time"

"github.com/buildbuddy-io/buildbuddy/server/environment"
"github.com/buildbuddy-io/buildbuddy/server/util/paging"
"github.com/buildbuddy-io/buildbuddy/server/util/status"
"github.com/golang/protobuf/proto"

capb "github.com/buildbuddy-io/buildbuddy/proto/cache"
pgpb "github.com/buildbuddy-io/buildbuddy/proto/pagination"
)

const (
// Default page size for the scorecard when a page token is not included in
// the request.
defaultScoreCardPageSize = 250
)

// GetCacheScoreCard returns a list of detailed, per-request cache stats.
func GetCacheScoreCard(ctx context.Context, env environment.Env, req *capb.GetCacheScoreCardRequest) (*capb.GetCacheScoreCardResponse, error) {
page := &pgpb.OffsetLimit{Offset: 0, Limit: defaultScoreCardPageSize}
if req.PageToken != "" {
reqPage, err := paging.DecodeOffsetLimit(req.PageToken)
if err != nil {
return nil, err
}
page = reqPage
}

if page.Offset < 0 || page.Limit < 0 {
return nil, status.InvalidArgumentError("invalid page token")
}

scorecard, err := Read(ctx, env, req.InvocationId)
if err != nil {
return nil, err
}

start := page.Offset
if start > int64(len(scorecard.Results)) {
start = int64(len(scorecard.Results))
}
end := start + page.Limit
if end > int64(len(scorecard.Results)) {
end = int64(len(scorecard.Results))
}

nextPageToken := ""
if end < int64(len(scorecard.Results)) {
next, err := paging.EncodeOffsetLimit(&pgpb.OffsetLimit{
Offset: int64(end),
Limit: defaultScoreCardPageSize,
})
if err != nil {
return nil, err
}
nextPageToken = next
}

return &capb.GetCacheScoreCardResponse{
Results: scorecard.Results[start:end],
NextPageToken: nextPageToken,
}, nil
}

// SortResults sorts scorecard results, grouping by actions sorted by min
// request start timestamp, and sorting the requests within each action by start
// time.
// TODO(bduffany): More sorting options
func SortResults(results []*capb.ScoreCard_Result) {
// Compute the min result start for each action. (Each result belongs to
// an action, identified by action ID).
minStartTime := make(map[string]time.Time)
for _, result := range results {
t := result.StartTime.AsTime()
existing, ok := minStartTime[result.ActionId]
if !ok {
minStartTime[result.ActionId] = t
continue
}
if result.StartTime.AsTime().Before(existing) {
minStartTime[result.ActionId] = t
}
}
sort.Slice(results, func(i, j int) bool {
// For results with different parent actions, sort results earlier if
// their parent action's min start time comes first.
if results[i].ActionId != results[j].ActionId {
ti := minStartTime[results[i].ActionId]
tj := minStartTime[results[j].ActionId]
if ti.Equal(tj) {
// If two actions happen to have exactly the same start time, break the
// tie by action ID so that the sort is deterministic.
return results[i].ActionId < results[j].ActionId
}
return ti.Before(tj)
}
// Within actions, sort by start time.
return results[i].StartTime.AsTime().Before(results[j].StartTime.AsTime())
})
}

func blobName(invocationID string) string {
// WARNING: Things will break if this is changed, because we use this name
// to lookup data from historical invocations.
blobFileName := invocationID + "-scorecard.pb"
return filepath.Join(invocationID, blobFileName)
}

// Read reads the invocation cache scorecard from the configured blobstore.
func Read(ctx context.Context, env environment.Env, invocationID string) (*capb.ScoreCard, error) {
blobStore := env.GetBlobstore()
buf, err := blobStore.ReadBlob(ctx, blobName(invocationID))
if err != nil {
return nil, err
}
sc := &capb.ScoreCard{}
if err := proto.Unmarshal(buf, sc); err != nil {
return nil, err
}
return sc, nil
}

// Write writes the invocation cache scorecard to the configured blobstore.
func Write(ctx context.Context, env environment.Env, invocationID string, scoreCard *capb.ScoreCard) error {
scoreCardBuf, err := proto.Marshal(scoreCard)
if err != nil {
return err
}
blobStore := env.GetBlobstore()
_, err = blobStore.WriteBlob(ctx, blobName(invocationID), scoreCardBuf)
return err
}

0 comments on commit b098738

Please sign in to comment.