Skip to content

Commit

Permalink
Use gogofast to have in-place protobuf serialization (#294)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Jun 23, 2020
1 parent 2d66c7b commit 549874b
Show file tree
Hide file tree
Showing 21 changed files with 26,875 additions and 6,237 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/DataDog/zstd v1.4.6-0.20200617134701-89f69fb7df32
github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6
github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b
github.com/golang/protobuf v1.3.1
github.com/gogo/protobuf v1.3.1
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/klauspost/compress v1.10.8
github.com/pierrec/lz4 v2.0.5+incompatible
Expand Down
18 changes: 16 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,19 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dimfeld/httptreemux v5.0.1+incompatible h1:Qj3gVcDNoOthBAqftuD596rm4wg/adLLz5xh5CmpiCA=
github.com/dimfeld/httptreemux v5.0.1+incompatible/go.mod h1:rbUlSV+CCpv/SuqUTP/8Bk2O3LyUV436/yaRGkhP6Z0=
github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls=
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/jawher/mow.cli v1.0.4/go.mod h1:5hQj2V8g+qYmLUVWqu4Wuja1pI57M83EChYLVZ0sMKk=
github.com/jawher/mow.cli v1.1.0/go.mod h1:aNaQlc7ozF3vw6IJ2dHjp2ZFiA4ozMIYY6PyuRJwlUg=
github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.10.5 h1:7q6vHIqubShURwQz8cQK6yIe/xC3IF0Vm7TGfqjewrc=
github.com/klauspost/compress v1.10.5/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.10.8 h1:eLeJ3dr/Y9+XRfJT4l+8ZjmtB5RPJhucH2HeCV5+IZY=
Expand Down Expand Up @@ -68,8 +73,17 @@ golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190808195139-e713427fea3f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/license_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ var otherCheck = regexp.MustCompile(`#
`)

var skip = map[string]bool{
"../pulsar/internal/pb/PulsarApi.pb.go": true,
"../pulsar/internal/pulsar_proto/PulsarApi.pb.go": true,
}

func TestLicense(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions pulsar/client_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ import (
"net/url"
"time"

"github.com/golang/protobuf/proto"
"github.com/gogo/protobuf/proto"

log "github.com/sirupsen/logrus"

"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/pulsar/internal/auth"
"github.com/apache/pulsar-client-go/pulsar/internal/pb"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
)

const (
Expand Down
2 changes: 1 addition & 1 deletion pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
log "github.com/sirupsen/logrus"

"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/pulsar/internal/pb"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
)

var ErrConsumerClosed = errors.New("consumer closed")
Expand Down
4 changes: 2 additions & 2 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ import (
"sync"
"time"

"github.com/golang/protobuf/proto"
"github.com/gogo/protobuf/proto"

log "github.com/sirupsen/logrus"

"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/pulsar/internal/compression"
"github.com/apache/pulsar-client-go/pulsar/internal/pb"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
)

type consumerState int
Expand Down
2 changes: 1 addition & 1 deletion pulsar/consumer_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"testing"

"github.com/apache/pulsar-client-go/pulsar/internal/compression"
"github.com/apache/pulsar-client-go/pulsar/internal/pb"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"

"github.com/stretchr/testify/assert"

Expand Down
4 changes: 2 additions & 2 deletions pulsar/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import (

pkgerrors "github.com/pkg/errors"

"github.com/golang/protobuf/proto"
"github.com/gogo/protobuf/proto"

"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/pulsar/internal/pb"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
)

// NewUnexpectedErrMsg instantiates an ErrUnexpectedMsg error.
Expand Down
4 changes: 2 additions & 2 deletions pulsar/impl_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import (
"sync"
"time"

"github.com/golang/protobuf/proto"
"github.com/gogo/protobuf/proto"

"github.com/apache/pulsar-client-go/pulsar/internal/pb"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
)

type messageID struct {
Expand Down
4 changes: 2 additions & 2 deletions pulsar/internal/batch_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
"time"

"github.com/apache/pulsar-client-go/pulsar/internal/compression"
"github.com/apache/pulsar-client-go/pulsar/internal/pb"
"github.com/golang/protobuf/proto"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/gogo/protobuf/proto"

log "github.com/sirupsen/logrus"
)
Expand Down
39 changes: 22 additions & 17 deletions pulsar/internal/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ import (

"github.com/apache/pulsar-client-go/pulsar/internal/compression"

"github.com/golang/protobuf/proto"
"github.com/gogo/protobuf/proto"

log "github.com/sirupsen/logrus"

"github.com/apache/pulsar-client-go/pulsar/internal/pb"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
)

const (
Expand Down Expand Up @@ -202,37 +202,42 @@ func baseCommand(cmdType pb.BaseCommand_Type, msg proto.Message) *pb.BaseCommand
return cmd
}

func addSingleMessageToBatch(wb Buffer, smm proto.Message, payload []byte) {
serialized, err := proto.Marshal(smm)
func addSingleMessageToBatch(wb Buffer, smm *pb.SingleMessageMetadata, payload []byte) {
metadataSize := uint32(smm.Size())
wb.WriteUint32(metadataSize)

wb.ResizeIfNeeded(metadataSize)
_, err := smm.MarshalToSizedBuffer(wb.WritableSlice()[:metadataSize])
if err != nil {
log.WithError(err).Fatal("Protobuf serialization error")
}

wb.WriteUint32(uint32(len(serialized)))
wb.Write(serialized)
wb.WrittenBytes(metadataSize)
wb.Write(payload)
}

func serializeBatch(wb Buffer, cmdSend proto.Message, msgMetadata proto.Message,
func serializeBatch(wb Buffer,
cmdSend *pb.BaseCommand,
msgMetadata *pb.MessageMetadata,
uncompressedPayload Buffer,
compressionProvider compression.Provider) {
// Wire format
// [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD]
cmdSize := proto.Size(cmdSend)
msgMetadataSize := proto.Size(msgMetadata)
cmdSize := uint32(proto.Size(cmdSend))
msgMetadataSize := uint32(proto.Size(msgMetadata))

frameSizeIdx := wb.WriterIndex()
wb.WriteUint32(0) // Skip frame size until we now the size
frameStartIdx := wb.WriterIndex()

// Write cmd
wb.WriteUint32(uint32(cmdSize))
serialized, err := proto.Marshal(cmdSend)
wb.WriteUint32(cmdSize)
wb.ResizeIfNeeded(cmdSize)
_, err := cmdSend.MarshalToSizedBuffer(wb.WritableSlice()[:cmdSize])
if err != nil {
log.WithError(err).Fatal("Protobuf error when serializing cmdSend")
}

wb.Write(serialized)
wb.WrittenBytes(cmdSize)

// Create checksum placeholder
wb.WriteUint16(magicCrc32c)
Expand All @@ -241,13 +246,13 @@ func serializeBatch(wb Buffer, cmdSend proto.Message, msgMetadata proto.Message,

// Write metadata
metadataStartIdx := wb.WriterIndex()
wb.WriteUint32(uint32(msgMetadataSize))
serialized, err = proto.Marshal(msgMetadata)
wb.WriteUint32(msgMetadataSize)
wb.ResizeIfNeeded(msgMetadataSize)
_, err = msgMetadata.MarshalToSizedBuffer(wb.WritableSlice()[:msgMetadataSize])
if err != nil {
log.WithError(err).Fatal("Protobuf error when serializing msgMetadata")
}

wb.Write(serialized)
wb.WrittenBytes(msgMetadataSize)

// Make sure the buffer has enough space to hold the compressed data
// and perform the compression in-place
Expand Down
13 changes: 7 additions & 6 deletions pulsar/internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ import (
"sync/atomic"
"time"

"github.com/golang/protobuf/proto"
"github.com/gogo/protobuf/proto"
log "github.com/sirupsen/logrus"

"github.com/apache/pulsar-client-go/pulsar/internal/auth"
"github.com/apache/pulsar-client-go/pulsar/internal/pb"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
)

const (
Expand Down Expand Up @@ -390,24 +390,25 @@ func (c *connection) internalWriteData(data Buffer) {
}
}

func (c *connection) writeCommand(cmd proto.Message) {
func (c *connection) writeCommand(cmd *pb.BaseCommand) {
// Wire format
// [FRAME_SIZE] [CMD_SIZE][CMD]
cmdSize := uint32(proto.Size(cmd))
cmdSize := uint32(cmd.Size())
frameSize := cmdSize + 4

c.writeBufferLock.Lock()
defer c.writeBufferLock.Unlock()

c.writeBuffer.Clear()
c.writeBuffer.WriteUint32(frameSize)

c.writeBuffer.WriteUint32(cmdSize)
serialized, err := proto.Marshal(cmd)
_, err := cmd.MarshalToSizedBuffer(c.writeBuffer.WritableSlice()[:cmdSize])
if err != nil {
c.log.WithError(err).Fatal("Protobuf serialization error")
}

c.writeBuffer.Write(serialized)
c.writeBuffer.WrittenBytes(cmdSize)
c.internalWriteData(c.writeBuffer)
}

Expand Down
4 changes: 2 additions & 2 deletions pulsar/internal/connection_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
"bufio"
"io"

"github.com/apache/pulsar-client-go/pulsar/internal/pb"
"github.com/golang/protobuf/proto"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
)

Expand Down
4 changes: 2 additions & 2 deletions pulsar/internal/lookup_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import (
"fmt"
"net/url"

"github.com/apache/pulsar-client-go/pulsar/internal/pb"
"github.com/golang/protobuf/proto"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/gogo/protobuf/proto"

log "github.com/sirupsen/logrus"
)
Expand Down
4 changes: 2 additions & 2 deletions pulsar/internal/lookup_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
"net/url"
"testing"

"github.com/apache/pulsar-client-go/pulsar/internal/pb"
"github.com/golang/protobuf/proto"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/assert"
)

Expand Down
Loading

0 comments on commit 549874b

Please sign in to comment.