Pubsub is prove of concept implement for Redis "Pub/Sub" messaging management feature. SUBSCRIBE, UNSUBSCRIBE and PUBLISH implement the Publish/Subscribe messaging paradigm where (citing Wikipedia) senders (publishers) are not programmed to send their messages to specific receivers (subscribers). (sited from here)
Disk Queue is a data structure which come from NSQ. It is message queue data structure using disk file as storage medium.
This is no API change from Pub/Sub mechanism, but change it basic concurrency process.
Topic
as a another object to handle Topic related info with Disk Queue. Each Topic
contains two kind information:
Data Queue
: Which is all publish data send to this topic.Channel List
: Which is who subscribe this topic, we need notify.
In this modification, it gain follow benefits:
- Infinite Topic Queue Size (depends on storage size)
- Buffering Publish to improve performance.
go get github.com/kkdai/pd
package main
import (
"fmt"
"time"
. "github.com/kkdai/pubsub"
)
func main() {
ser := NewPubsub(1)
c1 := ser.Subscribe("topic1")
c2 := ser.Subscribe("topic2")
ser.Publish("test1", "topic1")
ser.Publish("test2", "topic2")
fmt.Println(<-c1)
//Got "test1"
fmt.Println(<-c2)
//Got "test2"
// Add subscription "topic2" for c1.
ser.AddSubscription(c1, "topic2")
// Publish new content in topic2
ser.Publish("test3", "topic2")
fmt.Println(<-c1)
//Got "test3"
fmt.Println(<-c2)
//Got "test3"
// Remove subscription "topic2" in c1
ser.RemoveSubscription(c1, "topic2")
// Publish new content in topic2
ser.Publish("test4", "topic2")
select {
case val := <-c1:
fmt.Errorf("Should not get %v notify on remove topic", val)
break
case <-time.After(time.Second):
//Will go here, because we remove subscription topic2 in c1.
fmt.Println("Not receive any msg from topic2, timeout.")
break
}
}
Benchmark include memory usage. (original memory)
BenchmarkAddSub-4 500 2906467 ns/op
BenchmarkRemoveSub-4 10000 232910 ns/op
BenchmarkBasicFunction-4 5000000 232 ns/op
Benchmark include memory usage. (Using Disk Queue)
BenchmarkAddSub-4 300000 125628 ns/op
BenchmarkRemoveSub-4 200000 144854 ns/op
BenchmarkBasicFunction-4 2000 906076 ns/op
It is one of my project 52.
This package is licensed under MIT license. See LICENSE for details.