Skip to content

Commit

Permalink
Used pooled buffering for compression and batch serialization (#292)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Jun 22, 2020
1 parent 970e269 commit 2d66c7b
Show file tree
Hide file tree
Showing 15 changed files with 212 additions and 86 deletions.
5 changes: 4 additions & 1 deletion pulsar/client_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"io/ioutil"
"testing"
"time"

"github.com/stretchr/testify/assert"
)
Expand All @@ -34,7 +35,8 @@ func TestClient(t *testing.T) {

func TestTLSConnectionCAError(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: serviceURLTLS,
URL: serviceURLTLS,
OperationTimeout: 5 * time.Second,
})
assert.NoError(t, err)

Expand Down Expand Up @@ -105,6 +107,7 @@ func TestTLSConnectionHostNameVerification(t *testing.T) {
func TestTLSConnectionHostNameVerificationError(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: "pulsar+ssl://127.0.0.1:6651",
OperationTimeout: 5 * time.Second,
TLSTrustCertsFilePath: caCertsPath,
TLSValidateHostname: true,
})
Expand Down
2 changes: 1 addition & 1 deletion pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -859,7 +859,7 @@ func (pc *partitionConsumer) Decompress(msgMeta *pb.MessageMetadata, payload int
pc.compressionProviders[msgMeta.GetCompression()] = provider
}

uncompressed, err := provider.Decompress(payload.ReadableSlice(), int(msgMeta.GetUncompressedSize()))
uncompressed, err := provider.Decompress(nil, payload.ReadableSlice(), int(msgMeta.GetUncompressedSize()))
if err != nil {
return nil, err
}
Expand Down
25 changes: 19 additions & 6 deletions pulsar/internal/batch_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ const (
DefaultMaxMessagesPerBatch = 1000
)

type ConnectionHolder interface {
GetConnection() Connection
}

// BatchBuilder wraps the objects needed to build a batch.
type BatchBuilder struct {
buffer Buffer
Expand All @@ -58,11 +62,13 @@ type BatchBuilder struct {
callbacks []interface{}

compressionProvider compression.Provider
cnxHolder ConnectionHolder
}

// NewBatchBuilder init batch builder and return BatchBuilder pointer. Build a new batch message container.
func NewBatchBuilder(maxMessages uint, maxBatchSize uint, producerName string, producerID uint64,
compressionType pb.CompressionType, level compression.Level) (*BatchBuilder, error) {
compressionType pb.CompressionType, level compression.Level,
cnxHolder ConnectionHolder) (*BatchBuilder, error) {
if maxMessages == 0 {
maxMessages = DefaultMaxMessagesPerBatch
}
Expand All @@ -85,6 +91,7 @@ func NewBatchBuilder(maxMessages uint, maxBatchSize uint, producerName string, p
},
callbacks: []interface{}{},
compressionProvider: getCompressionProvider(compressionType, level),
cnxHolder: cnxHolder,
}

if compressionType != pb.CompressionType_NONE {
Expand Down Expand Up @@ -149,7 +156,7 @@ func (bb *BatchBuilder) reset() {
}

// Flush all the messages buffered in the client and wait until all messages have been successfully persisted.
func (bb *BatchBuilder) Flush() (batchData []byte, sequenceID uint64, callbacks []interface{}) {
func (bb *BatchBuilder) Flush() (batchData Buffer, sequenceID uint64, callbacks []interface{}) {
if bb.numMessages == 0 {
// No-Op for empty batch
return nil, 0, nil
Expand All @@ -160,16 +167,22 @@ func (bb *BatchBuilder) Flush() (batchData []byte, sequenceID uint64, callbacks
bb.cmdSend.Send.NumMessages = proto.Int32(int32(bb.numMessages))

uncompressedSize := bb.buffer.ReadableBytes()
compressed := bb.compressionProvider.Compress(bb.buffer.ReadableSlice())
bb.msgMetadata.UncompressedSize = &uncompressedSize

buffer := NewBuffer(4096)
serializeBatch(buffer, bb.cmdSend, bb.msgMetadata, compressed)
cnx := bb.cnxHolder.GetConnection()
var buffer Buffer
if cnx == nil {
buffer = NewBuffer(int(uncompressedSize))
} else {
buffer = cnx.GetBufferFromPool()
}

serializeBatch(buffer, bb.cmdSend, bb.msgMetadata, bb.buffer, bb.compressionProvider)

callbacks = bb.callbacks
sequenceID = bb.cmdSend.Send.GetSequenceId()
bb.reset()
return buffer.ReadableSlice(), sequenceID, callbacks
return buffer, sequenceID, callbacks
}

func (bb *BatchBuilder) Close() error {
Expand Down
13 changes: 7 additions & 6 deletions pulsar/internal/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type Buffer interface {
PutUint32(n uint32, writerIdx uint32)

Resize(newSize uint32)
ResizeIfNeeded(spaceNeeded uint32)

// Clear will clear the current buffer data.
Clear()
Expand Down Expand Up @@ -154,9 +155,9 @@ func (b *buffer) Resize(newSize uint32) {
b.writerIdx = size
}

func (b *buffer) resizeIfNeeded(spaceNeeded int) {
if b.WritableBytes() < uint32(spaceNeeded) {
capacityNeeded := uint32(cap(b.data) + spaceNeeded)
func (b *buffer) ResizeIfNeeded(spaceNeeded uint32) {
if b.WritableBytes() < spaceNeeded {
capacityNeeded := uint32(cap(b.data)) + spaceNeeded
minCapacityIncrease := uint32(cap(b.data) * 3 / 2)
if capacityNeeded < minCapacityIncrease {
capacityNeeded = minCapacityIncrease
Expand All @@ -174,7 +175,7 @@ func (b *buffer) ReadUint16() uint16 {
}

func (b *buffer) WriteUint32(n uint32) {
b.resizeIfNeeded(4)
b.ResizeIfNeeded(4)
binary.BigEndian.PutUint32(b.WritableSlice(), n)
b.writerIdx += 4
}
Expand All @@ -184,13 +185,13 @@ func (b *buffer) PutUint32(n uint32, idx uint32) {
}

func (b *buffer) WriteUint16(n uint16) {
b.resizeIfNeeded(2)
b.ResizeIfNeeded(2)
binary.BigEndian.PutUint16(b.WritableSlice(), n)
b.writerIdx += 2
}

func (b *buffer) Write(s []byte) {
b.resizeIfNeeded(len(s))
b.ResizeIfNeeded(uint32(len(s)))
copy(b.WritableSlice(), s)
b.writerIdx += uint32(len(s))
}
Expand Down
31 changes: 19 additions & 12 deletions pulsar/internal/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"errors"
"fmt"

"github.com/apache/pulsar-client-go/pulsar/internal/compression"

"github.com/golang/protobuf/proto"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -211,19 +213,17 @@ func addSingleMessageToBatch(wb Buffer, smm proto.Message, payload []byte) {
wb.Write(payload)
}

func serializeBatch(wb Buffer, cmdSend proto.Message, msgMetadata proto.Message, payload []byte) {
func serializeBatch(wb Buffer, cmdSend proto.Message, msgMetadata proto.Message,
uncompressedPayload Buffer,
compressionProvider compression.Provider) {
// Wire format
// [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD]
cmdSize := proto.Size(cmdSend)
msgMetadataSize := proto.Size(msgMetadata)
payloadSize := len(payload)

magicAndChecksumLength := 2 + 4 /* magic + checksumLength */
headerContentSize := 4 + cmdSize + magicAndChecksumLength + 4 + msgMetadataSize
// cmdLength + cmdSize + magicLength + checksumSize + msgMetadataLength + msgMetadataSize
totalSize := headerContentSize + payloadSize

wb.WriteUint32(uint32(totalSize)) // External frame
frameSizeIdx := wb.WriterIndex()
wb.WriteUint32(0) // Skip frame size until we now the size
frameStartIdx := wb.WriterIndex()

// Write cmd
wb.WriteUint32(uint32(cmdSize))
Expand All @@ -248,13 +248,20 @@ func serializeBatch(wb Buffer, cmdSend proto.Message, msgMetadata proto.Message,
}

wb.Write(serialized)
wb.Write(payload)

// Make sure the buffer has enough space to hold the compressed data
// and perform the compression in-place
maxSize := uint32(compressionProvider.CompressMaxSize(int(uncompressedPayload.ReadableBytes())))
wb.ResizeIfNeeded(maxSize)
b := compressionProvider.Compress(wb.WritableSlice()[:0], uncompressedPayload.ReadableSlice())
wb.WrittenBytes(uint32(len(b)))

// Write checksum at created checksum-placeholder
endIdx := wb.WriterIndex()
checksum := Crc32cCheckSum(wb.Get(metadataStartIdx, endIdx-metadataStartIdx))
frameEndIdx := wb.WriterIndex()
checksum := Crc32cCheckSum(wb.Get(metadataStartIdx, frameEndIdx-metadataStartIdx))

// set computed checksum
// Set Sizes and checksum in the fixed-size header
wb.PutUint32(frameEndIdx-frameStartIdx, frameSizeIdx) // External frame
wb.PutUint32(checksum, checksumIdx)
}

Expand Down
9 changes: 6 additions & 3 deletions pulsar/internal/compression/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,18 @@ const (

// Provider is a interface of compression providers
type Provider interface {
// Return the max possible size for a compressed buffer given the uncompressed data size
CompressMaxSize(originalSize int) int

// Compress a []byte, the param is a []byte with the uncompressed content.
// The reader/writer indexes will not be modified. The return is a []byte
// with the compressed content.
Compress(data []byte) []byte
Compress(dst, src []byte) []byte

// Decompress a []byte. The buffer needs to have been compressed with the matching Encoder.
// The compressedData is compressed content, originalSize is the size of the original content.
// The src is compressed content. If dst is passed, the decompressed data will be written there
// The return were the result will be passed, if err is nil, the buffer was decompressed, no nil otherwise.
Decompress(compressedData []byte, originalSize int) ([]byte, error)
Decompress(dst, src []byte, originalSize int) ([]byte, error)

// Returns a new instance of the same provider, with the same exact configuration
Clone() Provider
Expand Down
12 changes: 8 additions & 4 deletions pulsar/internal/compression/compression_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@ func testCompression(b *testing.B, provider Provider) {
}

dataLen := int64(len(data))
compressed := make([]byte, 1024*1024)

b.ResetTimer()

for i := 0; i < b.N; i++ {
provider.Compress(data)
provider.Compress(compressed[:0], data)
b.SetBytes(dataLen)
}
}
Expand All @@ -49,14 +50,15 @@ func testDecompression(b *testing.B, provider Provider) {
b.Error(err)
}

dataCompressed := provider.Compress(data)
dataCompressed := provider.Compress(nil, data)
dataDecompressed := make([]byte, 1024*1024)

dataLen := int64(len(data))

b.ResetTimer()

for i := 0; i < b.N; i++ {
provider.Decompress(dataCompressed, int(dataLen))
provider.Decompress(dataDecompressed[:0], dataCompressed, int(dataLen))
b.SetBytes(dataLen)
}
}
Expand Down Expand Up @@ -108,8 +110,10 @@ func BenchmarkCompressionParallel(b *testing.B) {
b.Run(p.name, func(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
localProvider := p.provider.Clone()
compressed := make([]byte, 1024*1024)

for pb.Next() {
localProvider.Compress(data)
localProvider.Compress(compressed[:0], data)
b.SetBytes(dataLen)
}
})
Expand Down
24 changes: 20 additions & 4 deletions pulsar/internal/compression/compression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,24 @@ func TestCompression(t *testing.T) {
p := provider
t.Run(p.name, func(t *testing.T) {
hello := []byte("test compression data")
compressed := p.provider.Compress(hello)
uncompressed, err := p.provider.Decompress(compressed, len(hello))
compressed := make([]byte, 1024)
compressed = p.provider.Compress(compressed, hello)

uncompressed := make([]byte, 1024)
uncompressed, err := p.provider.Decompress(uncompressed, compressed, len(hello))
assert.Nil(t, err)
assert.ElementsMatch(t, hello, uncompressed)
})
}
}

func TestCompressionNoBuffers(t *testing.T) {
for _, provider := range providers {
p := provider
t.Run(p.name, func(t *testing.T) {
hello := []byte("test compression data")
compressed := p.provider.Compress(nil, hello)
uncompressed, err := p.provider.Decompress(nil, compressed, len(hello))
assert.Nil(t, err)
assert.ElementsMatch(t, hello, uncompressed)
})
Expand All @@ -56,7 +72,7 @@ func TestJavaCompatibility(t *testing.T) {
p := provider
t.Run(p.name, func(t *testing.T) {
hello := []byte("hello")
uncompressed, err := p.provider.Decompress(p.compressedHello, len(hello))
uncompressed, err := p.provider.Decompress(nil, p.compressedHello, len(hello))
assert.Nil(t, err)
assert.ElementsMatch(t, hello, uncompressed)
})
Expand All @@ -67,7 +83,7 @@ func TestDecompressionError(t *testing.T) {
for _, provider := range providers {
p := provider
t.Run(p.name, func(t *testing.T) {
_, err := p.provider.Decompress([]byte{0x05}, 10)
_, err := p.provider.Decompress(nil, []byte{0x05}, 10)
assert.NotNil(t, err)
})
}
Expand Down
43 changes: 32 additions & 11 deletions pulsar/internal/compression/lz4.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ import (
"github.com/pierrec/lz4"
)

const (
minLz4DestinationBufferSize = 1024 * 1024
)

type lz4Provider struct {
hashTable []int
}
Expand All @@ -34,22 +38,35 @@ func NewLz4Provider() Provider {
}
}

func (l *lz4Provider) Compress(data []byte) []byte {
func (l *lz4Provider) CompressMaxSize(originalSize int) int {
s := lz4.CompressBlockBound(originalSize)
if s < minLz4DestinationBufferSize {
return minLz4DestinationBufferSize
}

return s
}

func (l *lz4Provider) Compress(dst, data []byte) []byte {
maxSize := lz4.CompressBlockBound(len(data))
compressed := make([]byte, maxSize)
size, err := lz4.CompressBlock(data, compressed, l.hashTable)
if cap(dst) >= maxSize {
dst = dst[0:maxSize] // Reuse dst buffer
} else {
dst = make([]byte, maxSize)
}
size, err := lz4.CompressBlock(data, dst, l.hashTable)
if err != nil {
panic("Failed to compress")
}

if size == 0 {
// The data block was not compressed. Just repeat it with
// the block header flag to signal it's not compressed
headerSize := writeSize(len(data), compressed)
copy(compressed[headerSize:], data)
return compressed[:len(data)+headerSize]
headerSize := writeSize(len(data), dst)
copy(dst[headerSize:], data)
return dst[:len(data)+headerSize]
}
return compressed[:size]
return dst[:size]
}

// Write the encoded size for the uncompressed payload
Expand All @@ -69,10 +86,14 @@ func writeSize(size int, dst []byte) int {
return i + 1
}

func (lz4Provider) Decompress(compressedData []byte, originalSize int) ([]byte, error) {
uncompressed := make([]byte, originalSize)
_, err := lz4.UncompressBlock(compressedData, uncompressed)
return uncompressed, err
func (lz4Provider) Decompress(dst, src []byte, originalSize int) ([]byte, error) {
if cap(dst) >= originalSize {
dst = dst[0:originalSize] // Reuse dst buffer
} else {
dst = make([]byte, originalSize)
}
_, err := lz4.UncompressBlock(src, dst)
return dst, err
}

func (lz4Provider) Close() error {
Expand Down
Loading

0 comments on commit 2d66c7b

Please sign in to comment.