Skip to content

Commit 89a65f4

Browse files
author
Mathieu Lecarme
committed
Cute session test.
1 parent 7e6d319 commit 89a65f4

File tree

1 file changed

+74
-0
lines changed

1 file changed

+74
-0
lines changed

message/session_test.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package message
2+
3+
import (
4+
"fmt"
5+
"net"
6+
"sync"
7+
"testing"
8+
"time"
9+
10+
"github.com/stretchr/testify/assert"
11+
"github.com/vmihailenco/msgpack/v5"
12+
)
13+
14+
func mockupServer(session *FluentSession) (net.Addr, error) {
15+
listener, err := net.Listen("tcp", "127.0.0.1:0")
16+
if err != nil {
17+
return nil, err
18+
}
19+
go func() {
20+
err := func() error {
21+
conn, err := listener.Accept()
22+
if err != nil {
23+
return err
24+
}
25+
return session.Loop(conn)
26+
}()
27+
if err != nil {
28+
fmt.Println(err)
29+
}
30+
}()
31+
return listener.Addr(), nil
32+
}
33+
34+
func TestSession(t *testing.T) {
35+
wg := &sync.WaitGroup{}
36+
var myRecord map[string]interface{}
37+
s, err := mockupServer(&FluentSession{
38+
Reader: New(func(tag string, time *time.Time, record map[string]interface{}) error {
39+
fmt.Println("record", record)
40+
wg.Done()
41+
myRecord = record
42+
return nil
43+
}),
44+
})
45+
assert.NoError(t, err)
46+
conn, err := net.Dial("tcp", s.String())
47+
assert.NoError(t, err)
48+
49+
encoder := msgpack.NewEncoder(conn)
50+
decoder := msgpack.NewDecoder(conn)
51+
52+
wg.Add(1)
53+
54+
err = encoder.Encode([]interface{}{
55+
"tag.name",
56+
[]interface{}{
57+
[]interface{}{1441588984, map[string]interface{}{
58+
"message": "foo",
59+
}},
60+
},
61+
map[string]interface{}{
62+
"chunk": "p8n9gmxTQVC8/nh2wlKKeQ==",
63+
"size": 1,
64+
},
65+
})
66+
assert.NoError(t, err)
67+
68+
wg.Wait()
69+
ack, err := decoder.DecodeMap()
70+
assert.NoError(t, err)
71+
assert.Equal(t, "p8n9gmxTQVC8/nh2wlKKeQ==", ack["ack"])
72+
assert.Equal(t, "foo", myRecord["message"])
73+
74+
}

0 commit comments

Comments
 (0)