Skip to content

Commit 7a00479

Browse files
author
lonnc
committed
Add Publish/Subscribe redis methods.
1 parent 027eb50 commit 7a00479

File tree

2 files changed

+198
-0
lines changed

2 files changed

+198
-0
lines changed

redis.go

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,12 @@ func readBulk(reader *bufio.Reader, head string) ([]byte, os.Error) {
7272
return data, err
7373
}
7474

75+
func writeRequest(writer io.Writer, cmd string, args ...string) os.Error {
76+
b := commandBytes(cmd, args...)
77+
_, err := writer.Write(b)
78+
return err
79+
}
80+
7581
func commandBytes(cmd string, args ...string) []byte {
7682
cmdbuf := bytes.NewBufferString(fmt.Sprintf("*%d\r\n$%d\r\n%s\r\n", len(args)+1, len(cmd), cmd))
7783
for _, s := range args {
@@ -214,6 +220,83 @@ End:
214220
return data, err
215221
}
216222

223+
func (client *Client) sendCommands(cmdArgs <-chan []string, data chan<- interface{}) (err os.Error) {
224+
// grab a connection from the pool
225+
c, err := client.popCon()
226+
227+
if err != nil {
228+
goto End
229+
}
230+
231+
reader := bufio.NewReader(c)
232+
233+
// Ping first to verify connection is open
234+
err = writeRequest(c, "PING")
235+
236+
// On first attempt permit a reconnection attempt
237+
if err == os.EOF {
238+
// Looks like we have to open a new connection
239+
c, err = client.openConnection()
240+
if err != nil {
241+
goto End
242+
}
243+
reader = bufio.NewReader(c)
244+
} else {
245+
// Read Ping response
246+
pong, err := readResponse(reader)
247+
if pong != "PONG" {
248+
return RedisError("Unexpected response to PING.")
249+
}
250+
if err != nil {
251+
goto End
252+
}
253+
}
254+
255+
errs := make(chan os.Error)
256+
257+
go func() {
258+
for cmdArg := range cmdArgs {
259+
err = writeRequest(c, cmdArg[0], cmdArg[1:]...)
260+
if err != nil {
261+
if !closed(errs) {
262+
errs <- err
263+
}
264+
break
265+
}
266+
}
267+
close(errs)
268+
}()
269+
270+
go func() {
271+
for {
272+
response, err := readResponse(reader)
273+
if err != nil {
274+
if !closed(errs) {
275+
errs <- err
276+
}
277+
break
278+
}
279+
data <- response
280+
}
281+
close(errs)
282+
}()
283+
284+
// Block until errs channel closes
285+
for e := range errs {
286+
err = e
287+
}
288+
289+
End:
290+
291+
// Close client and synchronization issues are a nightmare to solve.
292+
c.Close()
293+
294+
// Push nil back onto queue
295+
client.pushCon(nil)
296+
297+
return err
298+
}
299+
217300
func (client *Client) popCon() (*net.TCPConn, os.Error) {
218301
if client.pool == nil {
219302
client.pool = make(chan *net.TCPConn, MaxPoolSize)
@@ -1191,6 +1274,71 @@ func (client *Client) Hgetall(key string, val interface{}) os.Error {
11911274
return nil
11921275
}
11931276

1277+
//Publish/Subscribe
1278+
1279+
type Message struct {
1280+
Channel string
1281+
Message []byte
1282+
}
1283+
1284+
// Subscribe to channels, will block until the subscribe channel is closed.
1285+
func (client *Client) Subscribe(subscribe <-chan string, unsubscribe <-chan string, messages chan<- Message) os.Error {
1286+
cmds := make(chan []string, 0)
1287+
data := make(chan interface{}, 0)
1288+
1289+
go func() {
1290+
CHANNELS:
1291+
for {
1292+
select {
1293+
case channel := <-subscribe:
1294+
if channel == "" {
1295+
break CHANNELS
1296+
} else {
1297+
cmds <- []string{"SUBSCRIBE", channel}
1298+
}
1299+
1300+
case channel := <-unsubscribe:
1301+
if channel == "" {
1302+
break CHANNELS
1303+
} else {
1304+
cmds <- []string{"UNSUBSCRIBE", channel}
1305+
}
1306+
}
1307+
}
1308+
close(cmds)
1309+
close(data)
1310+
}()
1311+
1312+
go func() {
1313+
for response := range data {
1314+
db := response.([][]byte)
1315+
messageType, channel, message := string(db[0]), string(db[1]), db[2]
1316+
switch messageType {
1317+
case "message":
1318+
messages <- Message{string(channel), message}
1319+
case "subscribe":
1320+
// Ignore
1321+
case "unsubscribe":
1322+
// Ignore
1323+
default:
1324+
// log.Printf("Unknown message '%s'", messageType)
1325+
}
1326+
}
1327+
}()
1328+
1329+
err := client.sendCommands(cmds, data)
1330+
1331+
return err
1332+
}
1333+
1334+
func (client *Client) Publish(channel string, val []byte) os.Error {
1335+
_, err := client.sendCommand("PUBLISH", channel, string(val))
1336+
if err != nil {
1337+
return err
1338+
}
1339+
return nil
1340+
}
1341+
11941342
//Server commands
11951343

11961344
func (client *Client) Save() os.Error {

redis_test.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,56 @@ func TestBlpopTimeout(t *testing.T) {
238238
}
239239
}
240240

241+
func TestSubscribe(t *testing.T) {
242+
subscribe := make(chan string, 0)
243+
unsubscribe := make(chan string, 0)
244+
messages := make(chan Message, 0)
245+
246+
defer func() {
247+
close(subscribe)
248+
close(unsubscribe)
249+
close(messages)
250+
}()
251+
go func() {
252+
if err := client.Subscribe(subscribe, unsubscribe, messages); err != nil {
253+
t.Fatal("Subscribed failed", err.String())
254+
}
255+
}()
256+
subscribe <- "ccc"
257+
258+
data := []byte("foo")
259+
quit := make(chan bool, 0)
260+
defer close(quit)
261+
go func() {
262+
tick := time.Tick(10 * 1000 * 1000) // 10ms
263+
timeout := time.Tick(100 * 1000 * 1000) // 100ms
264+
LOOP:
265+
for {
266+
select {
267+
case <-quit:
268+
break LOOP
269+
case <-timeout:
270+
t.Fatal("TestSubscribe timeout")
271+
break LOOP
272+
case <-tick:
273+
if err := client.Publish("ccc", data); err != nil {
274+
t.Fatal("Pubish failed", err.String())
275+
}
276+
}
277+
}
278+
}()
279+
280+
msg := <-messages
281+
quit <- true
282+
if msg.Channel != "ccc" {
283+
t.Fatal("Unexpected channel name")
284+
}
285+
if string(msg.Message) != string(data) {
286+
t.Fatalf("Expected %s but got %s", string(data), string(msg.Message))
287+
}
288+
close(subscribe)
289+
}
290+
241291
func verifyHash(t *testing.T, key string, expected map[string][]byte) {
242292
//test Hget
243293
m1 := make(map[string][]byte)

0 commit comments

Comments
 (0)