Skip to content

Commit

Permalink
nsq_tail: an example reader to make it really easy to 'tail' a topic …
Browse files Browse the repository at this point in the history
…with an ephemeral channel
  • Loading branch information
mreiferson committed Feb 21, 2013
1 parent 62eeb9d commit 23b58e2
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 12 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ examples/bench_writer/bench_writer
examples/nsq_to_file/nsq_to_file
examples/nsq_pubsub/nsq_pubsub
examples/nsq_to_http/nsq_to_http
examples/nsq_tail/nsq_tail
dist

*.dat
Expand Down
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ NSQADMIN_SRCS = $(wildcard nsqadmin/*.go util/*.go)
NSQ_PUBSUB_SRCS = $(wildcard examples/nsq_pubsub/*.go nsq/*.go util/*.go)
NSQ_TO_FILE_SRCS = $(wildcard examples/nsq_to_file/*.go nsq/*.go util/*.go)
NSQ_TO_HTTP_SRCS = $(wildcard examples/nsq_to_http/*.go nsq/*.go util/*.go)
NSQ_TAIL_SRCS = $(wildcard examples/nsq_tail/*.go nsq/*.go util/*.go)

BINARIES = nsqd nsqlookupd nsqadmin
EXAMPLES = nsq_pubsub nsq_to_file nsq_to_http
EXAMPLES = nsq_pubsub nsq_to_file nsq_to_http nsq_tail
BLDDIR = build

all: $(BINARIES) $(EXAMPLES)
Expand All @@ -30,6 +31,7 @@ $(BLDDIR)/nsqadmin: $(NSQADMIN_SRCS)
$(BLDDIR)/examples/nsq_pubsub: $(NSQ_PUBSUB_SRCS)
$(BLDDIR)/examples/nsq_to_file: $(NSQ_TO_FILE_SRCS)
$(BLDDIR)/examples/nsq_to_http: $(NSQ_TO_HTTP_SRCS)
$(BLDDIR)/examples/nsq_tail: $(NSQ_TAIL_SRCS)

clean:
rm -fr $(BLDDIR)
Expand All @@ -49,6 +51,7 @@ install: $(BINARIES) $(EXAMPLES)
install -m 755 $(BLDDIR)/examples/nsq_pubsub ${DESTDIR}${BINDIR}/nsq_pubsub
install -m 755 $(BLDDIR)/examples/nsq_to_file ${DESTDIR}${BINDIR}/nsq_to_file
install -m 755 $(BLDDIR)/examples/nsq_to_http ${DESTDIR}${BINDIR}/nsq_to_http
install -m 755 $(BLDDIR)/examples/nsq_tail ${DESTDIR}${BINDIR}/nsq_tail
install -m 755 -d ${DESTDIR}${DATADIR}
install -d ${DESTDIR}${DATADIR}/nsqadmin
cp -r nsqadmin/templates ${DESTDIR}${DATADIR}/nsqadmin
10 changes: 5 additions & 5 deletions examples/bench_reader/bench_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ func main() {
wg.Wait()
end := time.Now()
duration := end.Sub(start)
log.Printf("duration: %s - %.03fmb/s - %.03fops/s - %.03fus/op",
duration,
float64(*num * 200) / duration.Seconds() / 1024 / 1024,
float64(*num) / duration.Seconds(),
float64(duration / time.Microsecond) / float64(*num))
log.Printf("duration: %s - %.03fmb/s - %.03fops/s - %.03fus/op",
duration,
float64(*num*200)/duration.Seconds()/1024/1024,
float64(*num)/duration.Seconds(),
float64(duration/time.Microsecond)/float64(*num))
}

func subWorker(n int, workers int, tcpAddr string, topic string, channel string, rdyChan chan int, goChan chan int, id int) {
Expand Down
10 changes: 5 additions & 5 deletions examples/bench_writer/bench_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ func main() {
wg.Wait()
end := time.Now()
duration := end.Sub(start)
log.Printf("duration: %s - %.03fmb/s - %.03fops/s - %.03fus/op",
duration,
float64(*num * 200) / duration.Seconds() / 1024 / 1024,
float64(*num) / duration.Seconds(),
float64(duration / time.Microsecond) / float64(*num))
log.Printf("duration: %s - %.03fmb/s - %.03fops/s - %.03fus/op",
duration,
float64(*num*200)/duration.Seconds()/1024/1024,
float64(*num)/duration.Seconds(),
float64(duration/time.Microsecond)/float64(*num))
}

func pubWorker(n int, tcpAddr string, batchSize int, batch [][]byte, topic string) {
Expand Down
95 changes: 95 additions & 0 deletions examples/nsq_tail/nsq_tail.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package main

import (
"../../nsq"
"../../util"
"flag"
"fmt"
"log"
"os"
"os/signal"
"syscall"
)

var (
showVersion = flag.Bool("version", false, "print version string")
topic = flag.String("topic", "", "nsq topic")
channel = flag.String("channel", "nsq_tail#ephemeral", "nsq channel")
maxInFlight = flag.Int("max-in-flight", 200, "max number of messages to allow in flight")
nsqdTCPAddrs = util.StringArray{}
lookupdHTTPAddrs = util.StringArray{}
)

func init() {
flag.Var(&nsqdTCPAddrs, "nsqd-tcp-address", "nsqd TCP address (may be given multiple times)")
flag.Var(&lookupdHTTPAddrs, "lookupd-http-address", "lookupd HTTP address (may be given multiple times)")
}

type TailHandler struct{}

func (th *TailHandler) HandleMessage(m *nsq.Message) error {
_, err := os.Stdout.Write(m.Body)
if err != nil {
log.Fatalf("ERROR: failed to write to os.Stdout - %s", err.Error())
}
_, err = os.Stdout.WriteString("\n")
if err != nil {
log.Fatalf("ERROR: failed to write to os.Stdout - %s", err.Error())
}
return nil
}

func main() {
flag.Parse()

if *showVersion {
fmt.Printf("nsq_tail v%s\n", util.BINARY_VERSION)
return
}

if *topic == "" || *channel == "" {
log.Fatalf("--topic and --channel are required")
}

if *maxInFlight < 0 {
log.Fatalf("--max-in-flight must be > 0")
}

if len(nsqdTCPAddrs) == 0 && len(lookupdHTTPAddrs) == 0 {
log.Fatalf("--nsqd-tcp-address or --lookupd-http-address required")
}
if len(nsqdTCPAddrs) > 0 && len(lookupdHTTPAddrs) > 0 {
log.Fatalf("use --nsqd-tcp-address or --lookupd-http-address not both")
}

sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP)

r, err := nsq.NewReader(*topic, *channel)
if err != nil {
log.Fatalf(err.Error())
}
r.SetMaxInFlight(*maxInFlight)
r.AddHandler(&TailHandler{})

for _, addrString := range nsqdTCPAddrs {
err := r.ConnectToNSQ(addrString)
if err != nil {
log.Fatalf(err.Error())
}
}

for _, addrString := range lookupdHTTPAddrs {
log.Printf("lookupd addr %s", addrString)
err := r.ConnectToLookupd(addrString)
if err != nil {
log.Fatalf(err.Error())
}
}

select {
case <-r.ExitChan:
case <-sigChan:
r.Stop()
}
}
2 changes: 1 addition & 1 deletion fmt.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/bin/bash
for d in nsq nsqd nsqlookupd nsqadmin util util/pqueue examples/nsq_to_file examples/nsq_pubsub examples/nsq_to_http; do
for d in nsq nsqd nsqlookupd nsqadmin util util/pqueue examples/*; do
pushd $d
go fmt
popd
Expand Down

0 comments on commit 23b58e2

Please sign in to comment.