Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use pgzip in the compactor. #3672

Merged
merged 1 commit into from
May 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ require (
github.com/joncrlsn/dque v2.2.1-0.20200515025108-956d14155fa2+incompatible
github.com/json-iterator/go v1.1.10
github.com/klauspost/compress v1.11.3
github.com/klauspost/pgzip v1.2.5
github.com/mitchellh/mapstructure v1.4.1
github.com/modern-go/reflect2 v1.0.1
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1072,6 +1072,8 @@ github.com/klauspost/cpuid v1.3.1 h1:5JNjFYYQrZeKRJ0734q51WCEEn2huer72Dc7K+R/b6s
github.com/klauspost/cpuid v1.3.1/go.mod h1:bYW4mA6ZgKPob1/Dlai2LviZJO7KGI3uoWLd42rAQw4=
github.com/klauspost/crc32 v0.0.0-20161016154125-cb6bfca970f6/go.mod h1:+ZoRqAPRLkC4NPOvfYeR5KNOrY6TD+/sAC3HXPZgDYg=
github.com/klauspost/pgzip v1.0.2-0.20170402124221-0bf5dcad4ada/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs=
github.com/klauspost/pgzip v1.2.5 h1:qnWYvvKqedOF2ulHpMG72XQol4ILEJ8k2wwRl/Km8oE=
github.com/klauspost/pgzip v1.2.5/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs=
github.com/knq/sysutil v0.0.0-20191005231841-15668db23d08/go.mod h1:dFWs1zEqDjFtnBXsd1vPOZaLsESovai349994nHx3e0=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
Expand Down
54 changes: 48 additions & 6 deletions pkg/storage/stores/shipper/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,60 @@ import (
"os"
"runtime/debug"
"strings"
"sync"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/local"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/go-kit/kit/log/level"
gzip "github.com/klauspost/pgzip"
"go.etcd.io/bbolt"

"github.com/grafana/loki/pkg/chunkenc"
)

const delimiter = "/"

var (
gzipReader = sync.Pool{}
gzipWriter = sync.Pool{}
)

// getGzipReader gets or creates a new CompressionReader and reset it to read from src
func getGzipReader(src io.Reader) io.Reader {
if r := gzipReader.Get(); r != nil {
reader := r.(*gzip.Reader)
err := reader.Reset(src)
if err != nil {
panic(err)
}
return reader
}
reader, err := gzip.NewReader(src)
if err != nil {
panic(err)
}
return reader
}

// putGzipReader places back in the pool a CompressionReader
func putGzipReader(reader io.Reader) {
gzipReader.Put(reader)
}

// getGzipWriter gets or creates a new CompressionWriter and reset it to write to dst
func getGzipWriter(dst io.Writer) io.WriteCloser {
if w := gzipWriter.Get(); w != nil {
writer := w.(*gzip.Writer)
writer.Reset(dst)
return writer
}
return gzip.NewWriter(dst)
}

// PutWriter places back in the pool a CompressionWriter
func putGzipWriter(writer io.WriteCloser) {
gzipWriter.Put(writer)
}

type StorageClient interface {
GetObject(ctx context.Context, objectKey string) (io.ReadCloser, error)
}
Expand All @@ -44,8 +86,8 @@ func GetFileFromStorage(ctx context.Context, storageClient StorageClient, object

var objectReader io.Reader = readCloser
if strings.HasSuffix(objectKey, ".gz") {
decompressedReader := chunkenc.Gzip.GetReader(readCloser)
defer chunkenc.Gzip.PutReader(decompressedReader)
decompressedReader := getGzipReader(readCloser)
defer putGzipReader(decompressedReader)

objectReader = decompressedReader
}
Expand Down Expand Up @@ -108,8 +150,8 @@ func CompressFile(src, dest string) error {
}
}()

compressedWriter := chunkenc.Gzip.GetWriter(compressedFile)
defer chunkenc.Gzip.PutWriter(compressedWriter)
compressedWriter := getGzipWriter(compressedFile)
defer putGzipWriter(compressedWriter)

_, err = io.Copy(compressedWriter, uncompressedFile)
if err != nil {
Expand Down
24 changes: 24 additions & 0 deletions vendor/github.com/klauspost/pgzip/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions vendor/github.com/klauspost/pgzip/.travis.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 27 additions & 0 deletions vendor/github.com/klauspost/pgzip/GO_LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions vendor/github.com/klauspost/pgzip/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

135 changes: 135 additions & 0 deletions vendor/github.com/klauspost/pgzip/README.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading