-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathprotobuf.go
82 lines (72 loc) · 2.63 KB
/
protobuf.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package protobuf
import (
"fmt"
"github.com/Masterminds/semver"
"github.com/cloudchacho/hedwig-go"
hedwigProtobuf "github.com/cloudchacho/hedwig-go/protobuf"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/timestamppb"
)
type FirehoseEncoderDecoder struct {
*hedwigProtobuf.EncoderDecoder
typeUrls map[hedwig.MessageTypeMajorVersion]string
}
func (fcd FirehoseEncoderDecoder) DecodeData(messageType string, version *semver.Version, data interface{}) (interface{}, error) {
return data, nil
}
// EncodeData encodes the message with appropriate format for firehose storage files
// Type of data must be proto.Message
func (fcd FirehoseEncoderDecoder) EncodeData(data interface{}, useMessageTransport bool, metaAttrs hedwig.MetaAttributes) ([]byte, error) {
if useMessageTransport {
panic("Message Transport should not be used for firehose encoding")
}
dst := &anypb.Any{}
dataBytes, ok := data.([]byte)
if ok {
// during initial read from pubsub data will be of type []byte
dst.Value = dataBytes
msgType, ver, _ := fcd.DecodeMessageType(metaAttrs.Schema)
dst.TypeUrl = fcd.typeUrls[hedwig.MessageTypeMajorVersion{
MessageType: msgType,
MajorVersion: uint(ver.Major()),
}]
} else {
// during reading of staging firehose files and rewriting to final bucket data will be of type anypb.Any
dst = data.(*anypb.Any)
}
container := &hedwigProtobuf.PayloadV1{
FormatVersion: fmt.Sprintf("%d.%d", metaAttrs.FormatVersion.Major(), metaAttrs.FormatVersion.Minor()),
Id: metaAttrs.ID,
Metadata: &hedwigProtobuf.MetadataV1{
Publisher: metaAttrs.Publisher,
Timestamp: timestamppb.New(metaAttrs.Timestamp),
Headers: metaAttrs.Headers,
},
Schema: metaAttrs.Schema,
Data: dst,
}
payload, err := proto.Marshal(container)
if err != nil {
// Unable to convert to bytes
return nil, err
}
return payload, nil
}
// VerifyKnownMinorVersion checks that message version is known to us
func (fcd FirehoseEncoderDecoder) VerifyKnownMinorVersion(messageType string, version *semver.Version) error {
// no minor verification
return nil
}
// EncodeMessageType encodes the message type with appropriate format for firehose storage files
func (fcd FirehoseEncoderDecoder) EncodeMessageType(messageType string, version *semver.Version) string {
return fmt.Sprintf("%s/%d.%d", messageType, version.Major(), version.Minor())
}
func (fcd FirehoseEncoderDecoder) IsBinary() bool {
return true
}
func NewFirehoseEncodeDecoder(typeUrls map[hedwig.MessageTypeMajorVersion]string) *FirehoseEncoderDecoder {
return &FirehoseEncoderDecoder{
typeUrls: typeUrls,
}
}