Skip to content

Commit 776eaca

Browse files
authored
Merge pull request #591 from matheusd/new-message-api
multi: Make NewMessage() usable for creating messages for reading
2 parents e9d7785 + dcb3395 commit 776eaca

28 files changed

+418
-668
lines changed

answer_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func TestPromiseFulfill(t *testing.T) {
5959
t.Run("Done", func(t *testing.T) {
6060
p := NewPromise(dummyMethod, dummyPipelineCaller{}, nil)
6161
done := p.Answer().Done()
62-
msg, seg, _ := NewMessage(SingleSegment(nil))
62+
msg, seg := NewSingleSegmentMessage(nil)
6363
defer msg.Release()
6464

6565
res, _ := NewStruct(seg, ObjectSize{DataSize: 8})
@@ -75,7 +75,7 @@ func TestPromiseFulfill(t *testing.T) {
7575
p := NewPromise(dummyMethod, dummyPipelineCaller{}, nil)
7676
defer p.ReleaseClients()
7777
ans := p.Answer()
78-
msg, seg, _ := NewMessage(SingleSegment(nil))
78+
msg, seg := NewSingleSegmentMessage(nil)
7979
defer msg.Release()
8080

8181
res, _ := NewStruct(seg, ObjectSize{DataSize: 8})
@@ -99,7 +99,7 @@ func TestPromiseFulfill(t *testing.T) {
9999
h := new(dummyHook)
100100
c := NewClient(h)
101101
defer c.Release()
102-
msg, seg, _ := NewMessage(SingleSegment(nil))
102+
msg, seg := NewSingleSegmentMessage(nil)
103103
defer msg.Release()
104104

105105
res, _ := NewStruct(seg, ObjectSize{PointerCount: 3})

arena.go

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,10 @@ func (ssa *SingleSegmentArena) Release() {
159159
type MultiSegmentArena struct {
160160
segs []Segment
161161

162+
// rawData is set when the individual segments were all demuxed from
163+
// the passed raw data slice.
164+
rawData []byte
165+
162166
// bp is the bufferpool assotiated with this arena's segments if it was
163167
// initialized for writing.
164168
bp *bufferpool.Pool
@@ -175,6 +179,7 @@ func MultiSegment(b [][]byte) *MultiSegmentArena {
175179
if b == nil {
176180
msa := multiSegmentPool.Get().(*MultiSegmentArena)
177181
msa.fromPool = true
182+
msa.bp = &bufferpool.Default
178183
return msa
179184
}
180185
return multiSegment(b)
@@ -190,6 +195,14 @@ func MultiSegment(b [][]byte) *MultiSegmentArena {
190195
// Calling Release is optional; if not done the garbage collector
191196
// will release the memory per usual.
192197
func (msa *MultiSegmentArena) Release() {
198+
// When this was demuxed from a single slice, return the entire slice.
199+
if msa.rawData != nil && msa.bp != nil {
200+
zeroSlice(msa.rawData)
201+
msa.bp.Put(msa.rawData)
202+
msa.bp = nil
203+
}
204+
msa.rawData = nil
205+
193206
for i := range msa.segs {
194207
if msa.bp != nil {
195208
zeroSlice(msa.segs[i].data)
@@ -236,7 +249,10 @@ var multiSegmentPool = sync.Pool{
236249

237250
// demuxArena slices data into a multi-segment arena. It assumes that
238251
// len(data) >= hdr.totalSize().
239-
func (msa *MultiSegmentArena) demux(hdr streamHeader, data []byte) error {
252+
//
253+
// bp should point to the bufferpool which will receive back data once the
254+
// arena is released. It may be nil if this should not be returned anywhere.
255+
func (msa *MultiSegmentArena) demux(hdr streamHeader, data []byte, bp *bufferpool.Pool) error {
240256
maxSeg := hdr.maxSegment()
241257
if int64(maxSeg) > int64(maxInt-1) {
242258
return errors.New("number of segments overflows int")
@@ -261,6 +277,8 @@ func (msa *MultiSegmentArena) demux(hdr streamHeader, data []byte) error {
261277
msa.segs[i].id = i
262278
}
263279

280+
msa.rawData = data
281+
msa.bp = bp
264282
return nil
265283
}
266284

canonical.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,15 @@ import (
1010
// for equivalent structs, even as the schema evolves. The blob is
1111
// suitable for hashing or signing.
1212
func Canonicalize(s Struct) ([]byte, error) {
13-
msg, seg, _ := NewMessage(SingleSegment(nil))
13+
msg, seg := NewSingleSegmentMessage(nil)
1414
if !s.IsValid() {
15+
// Ensure compatbility to existing behavior: even if the struct
16+
// is not valid, at least the root pointer is allocated and
17+
// returned as canonical. Without this,
18+
// TestCanonicalize/Struct{} fails.
19+
if _, err := msg.allocRootPointerSpace(); err != nil {
20+
return nil, err
21+
}
1522
return seg.Data(), nil
1623
}
1724
root, err := NewRootStruct(seg, canonicalStructSize(s))

canonical_test.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,23 +19,23 @@ func TestCanonicalize(t *testing.T) {
1919
}, {
2020
name: "empty struct",
2121
f: func() Struct {
22-
_, seg, _ := NewMessage(SingleSegment(nil))
22+
_, seg := NewSingleSegmentMessage(nil)
2323
s, _ := NewStruct(seg, ObjectSize{})
2424
return s
2525
},
2626
want: []byte{0xfc, 0xff, 0xff, 0xff, 0, 0, 0, 0},
2727
}, {
2828
name: "zero data, zero pointer struct",
2929
f: func() Struct {
30-
_, seg, _ := NewMessage(SingleSegment(nil))
30+
_, seg := NewSingleSegmentMessage(nil)
3131
s, _ := NewStruct(seg, ObjectSize{DataSize: 8, PointerCount: 1})
3232
return s
3333
},
3434
want: []byte{0xfc, 0xff, 0xff, 0xff, 0, 0, 0, 0},
3535
}, {
3636
name: "one word data struct",
3737
f: func() Struct {
38-
_, seg, _ := NewMessage(SingleSegment(nil))
38+
_, seg := NewSingleSegmentMessage(nil)
3939
s, _ := NewStruct(seg, ObjectSize{DataSize: 8, PointerCount: 1})
4040
s.SetUint16(0, 0xbeef)
4141
return s
@@ -47,7 +47,7 @@ func TestCanonicalize(t *testing.T) {
4747
}, {
4848
name: "two pointers to zero structs",
4949
f: func() Struct {
50-
_, seg, _ := NewMessage(SingleSegment(nil))
50+
_, seg := NewSingleSegmentMessage(nil)
5151
s, _ := NewStruct(seg, ObjectSize{PointerCount: 2})
5252
e1, _ := NewStruct(seg, ObjectSize{DataSize: 8})
5353
e2, _ := NewStruct(seg, ObjectSize{DataSize: 8})
@@ -63,7 +63,7 @@ func TestCanonicalize(t *testing.T) {
6363
}, {
6464
name: "pointer to interface",
6565
f: func() Struct {
66-
_, seg, _ := NewMessage(SingleSegment(nil))
66+
_, seg := NewSingleSegmentMessage(nil)
6767
s, _ := NewStruct(seg, ObjectSize{PointerCount: 2})
6868
iface := NewInterface(seg, 1)
6969
s.SetPtr(0, iface.ToPtr())
@@ -76,7 +76,7 @@ func TestCanonicalize(t *testing.T) {
7676
}, {
7777
name: "int list",
7878
f: func() Struct {
79-
_, seg, _ := NewMessage(SingleSegment(nil))
79+
_, seg := NewSingleSegmentMessage(nil)
8080
s, _ := NewStruct(seg, ObjectSize{PointerCount: 1})
8181
l, _ := NewInt8List(seg, 5)
8282
s.SetPtr(0, l.ToPtr())
@@ -95,7 +95,7 @@ func TestCanonicalize(t *testing.T) {
9595
}, {
9696
name: "zero int list",
9797
f: func() Struct {
98-
_, seg, _ := NewMessage(SingleSegment(nil))
98+
_, seg := NewSingleSegmentMessage(nil)
9999
s, _ := NewStruct(seg, ObjectSize{PointerCount: 1})
100100
l, _ := NewInt8List(seg, 5)
101101
s.SetPtr(0, l.ToPtr())
@@ -110,7 +110,7 @@ func TestCanonicalize(t *testing.T) {
110110
}, {
111111
name: "struct list",
112112
f: func() Struct {
113-
_, seg, _ := NewMessage(SingleSegment(nil))
113+
_, seg := NewSingleSegmentMessage(nil)
114114
s, _ := NewStruct(seg, ObjectSize{PointerCount: 1})
115115
l, _ := NewCompositeList(seg, ObjectSize{DataSize: 8, PointerCount: 1}, 2)
116116
s.SetPtr(0, l.ToPtr())
@@ -133,7 +133,7 @@ func TestCanonicalize(t *testing.T) {
133133
}, {
134134
name: "zero struct list",
135135
f: func() Struct {
136-
_, seg, _ := NewMessage(SingleSegment(nil))
136+
_, seg := NewSingleSegmentMessage(nil)
137137
s, _ := NewStruct(seg, ObjectSize{PointerCount: 1})
138138
l, _ := NewCompositeList(seg, ObjectSize{DataSize: 16, PointerCount: 2}, 3)
139139
s.SetPtr(0, l.ToPtr())
@@ -148,7 +148,7 @@ func TestCanonicalize(t *testing.T) {
148148
}, {
149149
name: "zero-length struct list",
150150
f: func() Struct {
151-
_, seg, _ := NewMessage(SingleSegment(nil))
151+
_, seg := NewSingleSegmentMessage(nil)
152152
s, _ := NewStruct(seg, ObjectSize{PointerCount: 1})
153153
l, _ := NewCompositeList(seg, ObjectSize{DataSize: 16, PointerCount: 2}, 0)
154154
s.SetPtr(0, l.ToPtr())

capability_test.go

Lines changed: 9 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -357,10 +357,8 @@ func (dr *dummyReturner) AllocResults(sz ObjectSize) (Struct, error) {
357357
if dr.s.IsValid() {
358358
return Struct{}, errors.New("AllocResults called multiple times")
359359
}
360-
_, seg, err := NewMessage(SingleSegment(nil))
361-
if err != nil {
362-
return Struct{}, err
363-
}
360+
_, seg := NewSingleSegmentMessage(nil)
361+
var err error
364362
dr.s, err = NewRootStruct(seg, sz)
365363
return dr.s, err
366364
}
@@ -377,10 +375,7 @@ func (dr *dummyReturner) ReleaseResults() {
377375
}
378376

379377
func TestToInterface(t *testing.T) {
380-
_, seg, err := NewMessage(SingleSegment(nil))
381-
if err != nil {
382-
t.Fatal(err)
383-
}
378+
_, seg := NewSingleSegmentMessage(nil)
384379
tests := []struct {
385380
ptr Ptr
386381
in Interface
@@ -399,10 +394,7 @@ func TestToInterface(t *testing.T) {
399394
}
400395

401396
func TestInterface_value(t *testing.T) {
402-
_, seg, err := NewMessage(SingleSegment(nil))
403-
if err != nil {
404-
t.Fatal(err)
405-
}
397+
_, seg := NewSingleSegmentMessage(nil)
406398
tests := []struct {
407399
in Interface
408400
val rawPointer
@@ -421,10 +413,7 @@ func TestInterface_value(t *testing.T) {
421413
}
422414

423415
func TestTransform(t *testing.T) {
424-
_, s, err := NewMessage(SingleSegment(nil))
425-
if err != nil {
426-
t.Fatal(err)
427-
}
416+
_, s := NewSingleSegmentMessage(nil)
428417
root, err := NewStruct(s, ObjectSize{PointerCount: 2})
429418
if err != nil {
430419
t.Fatal(err)
@@ -442,7 +431,7 @@ func TestTransform(t *testing.T) {
442431
b.SetUint64(0, 2)
443432
a.SetPtr(0, b.ToPtr())
444433

445-
dmsg, d, err := NewMessage(SingleSegment(nil))
434+
dmsg, d := NewSingleSegmentMessage(nil)
446435
if err != nil {
447436
t.Fatal(err)
448437
}
@@ -675,20 +664,17 @@ func deepPointerEqual(a, b Ptr) bool {
675664
if !a.IsValid() || !b.IsValid() {
676665
return false
677666
}
678-
msgA, _, _ := NewMessage(SingleSegment(nil))
667+
msgA, _ := NewSingleSegmentMessage(nil)
679668
msgA.SetRoot(a)
680669
abytes, _ := msgA.Marshal()
681-
msgB, _, _ := NewMessage(SingleSegment(nil))
670+
msgB, _ := NewSingleSegmentMessage(nil)
682671
msgB.SetRoot(b)
683672
bbytes, _ := msgB.Marshal()
684673
return bytes.Equal(abytes, bbytes)
685674
}
686675

687676
func newEmptyStruct() Struct {
688-
_, seg, err := NewMessage(SingleSegment(nil))
689-
if err != nil {
690-
panic(err)
691-
}
677+
_, seg := NewSingleSegmentMessage(nil)
692678
s, err := NewRootStruct(seg, ObjectSize{})
693679
if err != nil {
694680
panic(err)

capnpc-go/capnpc-go.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ func (g *generator) defineSchemaVar() error {
168168
}
169169
sort.Sort(uint64Slice(ids))
170170

171-
msg, seg, _ := capnp.NewMessage(capnp.SingleSegment(nil))
171+
msg, seg := capnp.NewSingleSegmentMessage(nil)
172172
req, _ := schema.NewRootCodeGeneratorRequest(seg)
173173
// TODO(light): find largest object size and use that to allocate list
174174
nodes, _ := req.NewNodes(int32(len(g.nodes)))

capnpc-go/fileparts.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,8 @@ func (sd *staticData) init(fileID uint64) {
3838
}
3939

4040
func (sd *staticData) copyData(obj capnp.Ptr) (staticDataRef, error) {
41-
m, _, err := capnp.NewMessage(capnp.SingleSegment(nil))
42-
if err != nil {
43-
return staticDataRef{}, err
44-
}
45-
err = m.SetRoot(obj)
41+
m, _ := capnp.NewSingleSegmentMessage(nil)
42+
err := m.SetRoot(obj)
4643
if err != nil {
4744
return staticDataRef{}, err
4845
}

codec.go

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,24 +59,35 @@ func (d *Decoder) Decode() (*Message, error) {
5959
if err != nil {
6060
return nil, exc.WrapError("decode", err)
6161
}
62+
63+
// Special case an empty message to return a new MultiSegment message
64+
// ready for writing. This maintains compatibility to tests and older
65+
// implementation of message and arenas.
66+
if hdr.maxSegment() == 0 && total == 0 {
67+
msg, _ := NewMultiSegmentMessage(nil)
68+
return msg, nil
69+
}
70+
6271
// TODO(someday): if total size is greater than can fit in one buffer,
6372
// attempt to allocate buffer per segment.
6473
if total > maxSize-uint64(len(hdr)) || total > uint64(maxInt) {
6574
return nil, errors.New("decode: message too large")
6675
}
6776

6877
// Read segments.
69-
buf := bufferpool.Default.Get(int(total))
78+
bp := &bufferpool.Default
79+
buf := bp.Get(int(total))
7080
if _, err := io.ReadFull(d.r, buf); err != nil {
7181
return nil, exc.WrapError("decode: read segments", err)
7282
}
7383

7484
arena := MultiSegment(nil)
75-
if err = arena.demux(hdr, buf); err != nil {
85+
if err = arena.demux(hdr, buf, bp); err != nil {
7686
return nil, exc.WrapError("decode", err)
7787
}
7888

79-
return &Message{Arena: arena}, nil
89+
msg, _, err := NewMessage(arena)
90+
return msg, err
8091
}
8192

8293
func (d *Decoder) readHeader(maxSize uint64) (streamHeader, error) {
@@ -162,11 +173,12 @@ func Unmarshal(data []byte) (*Message, error) {
162173
}
163174

164175
arena := MultiSegment(nil)
165-
if err := arena.demux(hdr, data); err != nil {
176+
if err := arena.demux(hdr, data, nil); err != nil {
166177
return nil, exc.WrapError("unmarshal", err)
167178
}
168179

169-
return &Message{Arena: arena}, nil
180+
msg, _, err := NewMessage(arena)
181+
return msg, err
170182
}
171183

172184
// UnmarshalPacked reads a packed serialized stream into a message.

0 commit comments

Comments
 (0)