Skip to content

Commit f49bc03

Browse files
bryanpaluchBryan Paluch
and
Bryan Paluch
authored
zmq4: implement Topics interface for xpub sockets
Co-authored-by: Bryan Paluch <palub@amazon.com>
1 parent 9a6a79c commit f49bc03

File tree

4 files changed

+33
-20
lines changed

4 files changed

+33
-20
lines changed

pub.go

+1-20
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ package zmq4
77
import (
88
"context"
99
"net"
10-
"sort"
1110
"sync"
1211

1312
"golang.org/x/xerrors"
@@ -113,25 +112,7 @@ func (pub *pubSocket) SetOption(name string, value interface{}) error {
113112

114113
// Topics returns the sorted list of topics a socket is subscribed to.
115114
func (pub *pubSocket) Topics() []string {
116-
var (
117-
keys = make(map[string]struct{})
118-
topics []string
119-
)
120-
pub.sck.mu.RLock()
121-
for _, con := range pub.sck.conns {
122-
con.mu.RLock()
123-
for topic := range con.topics {
124-
if _, dup := keys[topic]; dup {
125-
continue
126-
}
127-
keys[topic] = struct{}{}
128-
topics = append(topics, topic)
129-
}
130-
con.mu.RUnlock()
131-
}
132-
pub.sck.mu.RUnlock()
133-
sort.Strings(topics)
134-
return topics
115+
return pub.sck.topics()
135116
}
136117

137118
// pubQReader is a queued-message reader.

socket.go

+23
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"log"
1010
"net"
1111
"os"
12+
"sort"
1213
"strings"
1314
"sync"
1415
"time"
@@ -91,6 +92,28 @@ func newSocket(ctx context.Context, sockType SocketType, opts ...Option) *socket
9192
return sck
9293
}
9394

95+
func (sck *socket) topics() []string {
96+
var (
97+
keys = make(map[string]struct{})
98+
topics []string
99+
)
100+
sck.mu.RLock()
101+
for _, con := range sck.conns {
102+
con.mu.RLock()
103+
for topic := range con.topics {
104+
if _, dup := keys[topic]; dup {
105+
continue
106+
}
107+
keys[topic] = struct{}{}
108+
topics = append(topics, topic)
109+
}
110+
con.mu.RUnlock()
111+
}
112+
sck.mu.RUnlock()
113+
sort.Strings(topics)
114+
return topics
115+
}
116+
94117
// Close closes the open Socket
95118
func (sck *socket) Close() error {
96119
sck.cancel()

xpub.go

+4
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,10 @@ func (xpub *xpubSocket) SetOption(name string, value interface{}) error {
7777
return xpub.sck.SetOption(name, value)
7878
}
7979

80+
func (xpub *xpubSocket) Topics() []string {
81+
return xpub.sck.topics()
82+
}
83+
8084
var (
8185
_ Socket = (*xpubSocket)(nil)
8286
)

zmq4_xpubsub_test.go

+5
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,11 @@ func TestXPubSub(t *testing.T) {
113113

114114
time.Sleep(1 * time.Second)
115115

116+
gotTopics := tc.xpub.(zmq4.Topics).Topics()
117+
if !reflect.DeepEqual(gotTopics, topics) {
118+
t.Fatalf("Missing or wrong topics.\ngot= %q\nwant=%q", gotTopics, topics)
119+
}
120+
116121
for _, msg := range msgs[0] {
117122
err = tc.xpub.Send(msg)
118123
if err != nil {

0 commit comments

Comments
 (0)