Skip to content

Commit

Permalink
Rework
Browse files Browse the repository at this point in the history
  • Loading branch information
bduffany committed Oct 28, 2024
1 parent 9994ff4 commit 6430173
Show file tree
Hide file tree
Showing 9 changed files with 120 additions and 76 deletions.
1 change: 1 addition & 0 deletions MODULE.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ use_repo(
"com_github_bazelbuild_rules_webtesting",
"com_github_bduffany_godemon",
"com_github_bits_and_blooms_bloom_v3",
"com_github_bmkessler_streamvbyte",
"com_github_bojand_ghz",
"com_github_bradfitz_gomemcache",
"com_github_buildbuddy_io_tensorflow_proto",
Expand Down
6 changes: 6 additions & 0 deletions deps.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,12 @@ def install_go_mod_dependencies(workspace_name = "buildbuddy"):
sum = "h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY=",
version = "v0.0.0-20160611221934-b7ed37b82869",
)
go_repository(
name = "com_github_bmkessler_streamvbyte",
importpath = "github.com/bmkessler/streamvbyte",
sum = "h1:QvzfNFkZD66P1f8s8FwMMVLxPd6Wa2/uqQh8gDhq3Ss=",
version = "v0.1.0",
)
go_repository(
name = "com_github_bojand_ghz",
build_directives = [
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ require (
github.com/bazelbuild/rules_webtesting v0.0.0-20210910170740-6b2ef24cfe95
github.com/bduffany/godemon v0.0.0-20221115232931-09721d48e30e
github.com/bits-and-blooms/bloom/v3 v3.7.0
github.com/bmkessler/streamvbyte v0.1.0
github.com/bojand/ghz v0.120.0
github.com/bradfitz/gomemcache v0.0.0-20230611145640-acc696258285
github.com/buildbuddy-io/tensorflow-proto v0.0.0-20220908151343-929b41ab4dc6
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,8 @@ github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnweb
github.com/bmatcuk/doublestar/v4 v4.6.1 h1:FH9SifrbvJhnlQpztAx++wlkk70QBf0iBWDwNy7PA4I=
github.com/bmatcuk/doublestar/v4 v4.6.1/go.mod h1:xBQ8jztBU6kakFMg+8WGxn0c6z1fTSPVIjEY1Wr7jzc=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
github.com/bmkessler/streamvbyte v0.1.0 h1:QvzfNFkZD66P1f8s8FwMMVLxPd6Wa2/uqQh8gDhq3Ss=
github.com/bmkessler/streamvbyte v0.1.0/go.mod h1:0SZTdGk1trQB2LOWZC7lNfF6SDN05cfv371DkrcUWeA=
github.com/bojand/ghz v0.120.0 h1:6F4wsmZVwFg5UnD+/R+IABWk6sKE/0OKIBdUQUZnOdo=
github.com/bojand/ghz v0.120.0/go.mod h1:HfECuBZj1v02XObGnRuoZgyB1PR24/25dIYiJIMjJnE=
github.com/bradfitz/gomemcache v0.0.0-20230611145640-acc696258285 h1:Dr+ezPI5ivhMn/3WOoB86XzMhie146DNaBbhaQWZHMY=
Expand Down
15 changes: 15 additions & 0 deletions proto/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@ proto_library(
],
)

proto_library(
name = "timeseries_proto",
srcs = ["timeseries.proto"],
)

proto_library(
name = "failure_details_proto",
srcs = [
Expand Down Expand Up @@ -958,6 +963,16 @@ go_proto_library(
],
)

go_proto_library(
name = "timeseries_go_proto",
compilers = [
"@io_bazel_rules_go//proto:go_proto",
"//proto:vtprotobuf_compiler",
],
importpath = "github.com/buildbuddy-io/buildbuddy/proto/timeseries",
proto = ":timeseries_proto",
)

go_proto_library(
name = "failure_details_go_proto",
compilers = [
Expand Down
19 changes: 19 additions & 0 deletions proto/timeseries.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
syntax = "proto3";

package timeseries;

// A compressed int64 sequence optimized for timeseries data, such as a sequence
// of timestamps or a sequence of measurements taken over time. Use
// server/util/timeseries to transcode between this message and []int64.
message Timeseries {
// A streamvbyte delta-encoded int32 list representing the high bits of the
// int64 sequence.
bytes data_high = 1;

// A streamvbyte delta-encoded int32 list representing the low bits of the
// int64 sequence.
bytes data_low = 2;

// The number of integers in the uncompressed int64 list.
int64 length = 3;
}
3 changes: 3 additions & 0 deletions server/util/timeseries/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
importpath = "github.com/buildbuddy-io/buildbuddy/server/util/timeseries",
visibility = ["//visibility:public"],
deps = [
"//proto:timeseries_go_proto",
"@com_github_bmkessler_streamvbyte//:streamvbyte",
],
)
Expand All @@ -15,6 +16,8 @@ go_test(
srcs = ["timeseries_test.go"],
deps = [
":timeseries",
"//proto:timeseries_go_proto",
"//server/util/proto",
"@com_github_stretchr_testify//require",
],
)
96 changes: 33 additions & 63 deletions server/util/timeseries/timeseries.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,68 +4,35 @@ import (
"fmt"

"github.com/bmkessler/streamvbyte"
)

const (
chunkSize = 8 * 1024
timeseriespb "github.com/buildbuddy-io/buildbuddy/proto/timeseries"
)

// Buffer compactly buffers timeseries data using streamvbyte. It currently
// only supports 32-bit ints.
//
// If this is used to represent millisecond-precision unix timestamps, the max
// representable timestamp occurs in the year 2038. Consider representing these
// timestamps as durations relative to a starting timestamp.
type Buffer struct {
// Current chunk that has yet to be compressed.
buf []int32
// Compressed chunks.
chunks []Chunk
// Intermediate compression buffer.
chunk []byte
}

func (d *Buffer) Append(value int32) {
d.buf = append(d.buf, value)
if len(d.buf) >= chunkSize {
d.Pack()
// Encode packs the given data into a new Timeseries proto.
func Encode(data []int64) *timeseriespb.Timeseries {
hi := make([]uint32, len(data))
lo := make([]uint32, len(data))
for i, v := range data {
hi[i] = uint32((uint64(v) & 0xFFFFFFFF00000000) >> 32)
lo[i] = uint32((uint64(v) & 0x00000000FFFFFFFF))
}
}

// Pack compresses any buffered data, ensuring that the values will be
// returned by Chunks().
func (d *Buffer) Pack() {
maxSize := streamvbyte.MaxSize32(len(d.buf))
if len(d.chunk) < maxSize {
d.chunk = make([]byte, maxSize)
hiEnc := make([]byte, streamvbyte.MaxSize32(len(hi)))
loEnc := make([]byte, streamvbyte.MaxSize32(len(lo)))
n := streamvbyte.EncodeDeltaUint32(hiEnc, hi, 0)
hiEnc = hiEnc[:n]
n = streamvbyte.EncodeDeltaUint32(loEnc, lo, 0)
loEnc = loEnc[:n]

return &timeseriespb.Timeseries{
DataHigh: hiEnc,
DataLow: loEnc,
Length: int64(len(data)),
}
n := streamvbyte.EncodeDeltaInt32(d.chunk, d.buf, 0)
// Make a new slice so that the backing array doesn't have excess capacity,
// which will take up memory.
trimmedChunk := append(make([]byte, 0, n), d.chunk[:n]...)
d.chunks = append(d.chunks, Chunk{
Length: len(d.buf),
Data: trimmedChunk,
})
// Reuse the current buffer, setting its length to 0.
d.buf = d.buf[:0]
}

// Chunks returns all compressed chunks.
// It does not return chunks that have not yet been packed.
func (d *Buffer) Chunks() []Chunk {
return d.chunks
}

type Chunk struct {
// Length is the number of compressed int32 values stored in Data.
Length int
// Data is the compressed data.
Data []byte
}

// Decompress returns the original timeseries data given compressed chunks.
func Decompress(chunks ...Chunk) (_ []int32, err error) {
// Decode returns the original []int64 data from a Timeseries proto.
func Decode(ts *timeseriespb.Timeseries) (_ []int64, err error) {
// streamvbyte lib panics if data is malformed, so we recover() here and
// return an error in this case.
defer func() {
Expand All @@ -74,15 +41,18 @@ func Decompress(chunks ...Chunk) (_ []int32, err error) {
}
}()

var length int
for _, c := range chunks {
length += c.Length
}
out := make([]int32, length)
off := 0
for _, c := range chunks {
streamvbyte.DecodeDeltaInt32(out[off:off+c.Length], c.Data, 0)
off += c.Length
hi := make([]uint32, ts.Length)
lo := make([]uint32, ts.Length)
streamvbyte.DecodeDeltaUint32(hi, ts.DataHigh, 0)
streamvbyte.DecodeDeltaUint32(lo, ts.DataLow, 0)

out := make([]int64, ts.Length)
for i := range out {
value := uint64(0)
value |= uint64(hi[i]) << 32
value |= uint64(lo[i])
out[i] = int64(value)
}

return out, nil
}
53 changes: 40 additions & 13 deletions server/util/timeseries/timeseries_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,59 @@ import (
"fmt"
"testing"

"github.com/buildbuddy-io/buildbuddy/server/util/proto"
"github.com/buildbuddy-io/buildbuddy/server/util/timeseries"
"github.com/stretchr/testify/require"

timeseriespb "github.com/buildbuddy-io/buildbuddy/proto/timeseries"
)

func TestCompressAndDecompress(t *testing.T) {
for _, n := range []int{0, 1, 100, 8192, 8193, 15000, 25000} {
func TestEncodeAndDecode(t *testing.T) {
for _, n := range []int{0, 1, 10, 100, 1000, 10_000} {
t.Run(fmt.Sprintf("%d samples", n), func(t *testing.T) {
tsb := timeseries.Buffer{}
expectedValues := []int32{}
expectedValues := []int64{}
// Construct a sequence with some positive values, some negative
// values, some repeats
for i := 0; i < n; i++ {
val := int32(i / 2)
tsb.Append(val)
val := int64(i/2 - n/4)
expectedValues = append(expectedValues, val)
}
tsb.Pack()
values, err := timeseries.Decompress(tsb.Chunks()...)
pb := timeseries.Encode(expectedValues)

encodedSize := proto.Size(pb)
decodedSize := len(expectedValues) * 8
t.Logf("n=%d, compression_ratio=%.3f", n, float64(encodedSize)/float64(decodedSize))

values, err := timeseries.Decode(pb)
require.NoError(t, err)
require.Equal(t, expectedValues, values)
})
}
}

func TestDecompressMalformedData(t *testing.T) {
_, err := timeseries.Decompress(timeseries.Chunk{
Length: 100,
Data: []byte{0, 1, 2, 3, 4, 5, 6},
})
require.Error(t, err)
for _, test := range []struct {
name string
value *timeseriespb.Timeseries
}{
{
name: "InvalidData",
value: &timeseriespb.Timeseries{
Length: 100,
DataHigh: nil,
DataLow: nil,
},
},
{
name: "InvalidDataWithLengthMismatch",
value: &timeseriespb.Timeseries{
Length: 100,
DataHigh: []byte{0, 1, 2, 3},
DataLow: []byte{0, 1},
},
},
} {
_, err := timeseries.Decode(test.value)
require.Error(t, err)
}
}

0 comments on commit 6430173

Please sign in to comment.