目录 | 作用 |
---|---|
apps | 命令行工具 |
build | make 之后产生的目录 |
nsqd | 节点服务端核心代码 |
nsqadmin | 管理界面核心代码 |
1. 在根目录执行dep ensure
,确保依赖都安装完毕。
2. 执行make
命令,构建出build
目录 ,make命令会逐个从apps
目录下的子目录构建出各个命令行工具
目录 | 作用 |
---|---|
nsq_stat | 统计服务 |
nsq_tail | 类似tail命令,消费指定的topic,将消息输出到stdout |
nsqd | 节点服务 |
nsqlookupd | lookupd服务 |
nsqadmin | 管理界面 |
在main
方法中依赖github.com/judwhite/go-svc/svc
包启动服务,通过实现Init
、Start
、Stop
方法来启动nsq服务。
首先调用Init
方法对win的目录做处理,Stop
方法是在捕获到终端信号时关闭服务调用准备的,关闭时会执行一次messagepump
方法,将内存数据写入磁盘,重点在Start
方法
func (p *program) Start() error {
// 默认配置
opts := nsqd.NewOptions()
// 解析命令行参数替换默认配置
flagSet := nsqdFlagSet(opts)
flagSet.Parse(os.Args[1:])
rand.Seed(time.Now().UTC().UnixNano())
// 查看version操作
if flagSet.Lookup("version").Value.(flag.Getter).Get().(bool) {
fmt.Println(version.String("nsqd"))
os.Exit(0)
}
// 解析传入的配置文件
var cfg config
configFile := flagSet.Lookup("config").Value.String()
if configFile != "" {
_, err := toml.DecodeFile(configFile, &cfg)
if err != nil {
log.Fatalf("ERROR: failed to load config file %s - %s", configFile, err.Error())
}
}
// 校验配置
cfg.Validate()
options.Resolve(opts, flagSet, cfg)
// 调用nsqd目录下的核心代码创建服务端对象
nsqd := nsqd.New(opts)
// 加载options(topic/channel metadata)
err := nsqd.LoadMetadata()
if err != nil {
log.Fatalf("ERROR: %s", err.Error())
}
// options(topic/channel metadata)保存到文件
err = nsqd.PersistMetadata()
if err != nil {
log.Fatalf("ERROR: failed to persist metadata - %s", err.Error())
}
// 启动tcp(4150)和http(4151)两个服务端口
nsqd.Main()
p.nsqd = nsqd
return nil
}
func (n *NSQD) Main() {
var err error
ctx := &context{n}
n.tcpListener, err = net.Listen("tcp", n.getOpts().TCPAddress)
if err != nil {
n.logf(LOG_FATAL, "listen (%s) failed - %s", n.getOpts().TCPAddress, err)
os.Exit(1)
}
n.httpListener, err = net.Listen("tcp", n.getOpts().HTTPAddress)
if err != nil {
n.logf(LOG_FATAL, "listen (%s) failed - %s", n.getOpts().HTTPAddress, err)
os.Exit(1)
}
if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
n.httpsListener, err = tls.Listen("tcp", n.getOpts().HTTPSAddress, n.tlsConfig)
if err != nil {
n.logf(LOG_FATAL, "listen (%s) failed - %s", n.getOpts().HTTPSAddress, err)
os.Exit(1)
}
}
// tcp(4150)
tcpServer := &tcpServer{ctx: ctx}
n.waitGroup.Wrap(func() {
protocol.TCPServer(n.tcpListener, tcpServer, n.logf)
})
// http(4151)
httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)
n.waitGroup.Wrap(func() {
http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf)
})
if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
httpsServer := newHTTPServer(ctx, true, true)
n.waitGroup.Wrap(func() {
http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf)
})
}
n.waitGroup.Wrap(n.queueScanLoop)
// 根据--lookupd-tcp-address参数去请求lookup服务端,如果没有配置该参数,就不会请求
n.waitGroup.Wrap(n.lookupLoop)
if n.getOpts().StatsdAddress != "" {
n.waitGroup.Wrap(n.statsdLoop)
}
}
当tcpserver接收到protocol_v2的请求时,会启动一个线程
func (p *protocolV2) IOLoop(conn net.Conn) error {
var err error
var line []byte
var zeroTime time.Time
clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1)
client := newClientV2(clientID, conn, p.ctx)
// synchronize the startup of messagePump in order
// to guarantee that it gets a chance to initialize
// goroutine local state derived from client attributes
// and avoid a potential race with IDENTIFY (where a client
// could have changed or disabled said attributes)
messagePumpStartedChan := make(chan bool)
go p.messagePump(client, messagePumpStartedChan)
<-messagePumpStartedChan
for {
if client.HeartbeatInterval > 0 {
client.SetReadDeadline(time.Now().Add(client.HeartbeatInterval * 2))
} else {
client.SetReadDeadline(zeroTime)
}
// ReadSlice does not allocate new space for the data each request
// ie. the returned slice is only valid until the next call to it
line, err = client.Reader.ReadSlice('\n')
if err != nil {
if err == io.EOF {
err = nil
} else {
err = fmt.Errorf("failed to read command - %s", err)
}
break
}
// trim the '\n'
line = line[:len(line)-1]
// optionally trim the '\r'
if len(line) > 0 && line[len(line)-1] == '\r' {
line = line[:len(line)-1]
}
params := bytes.Split(line, separatorBytes)
p.ctx.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): [%s] %s", client, params)
var response []byte
response, err = p.Exec(client, params)
if err != nil {
ctx := ""
if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil {
ctx = " - " + parentErr.Error()
}
p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, err, ctx)
sendErr := p.Send(client, frameTypeError, []byte(err.Error()))
if sendErr != nil {
p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx)
break
}
// errors of type FatalClientErr should forceably close the connection
if _, ok := err.(*protocol.FatalClientErr); ok {
break
}
continue
}
if response != nil {
err = p.Send(client, frameTypeResponse, response)
if err != nil {
err = fmt.Errorf("failed to send response - %s", err)
break
}
}
}
p.ctx.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting ioloop", client)
conn.Close()
close(client.ExitChan)
if client.Channel != nil {
client.Channel.RemoveClient(client.ID)
}
return err
}
// 接收以下命令
func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) {
if bytes.Equal(params[0], []byte("IDENTIFY")) {
return p.IDENTIFY(client, params)
}
err := enforceTLSPolicy(client, p, params[0])
if err != nil {
return nil, err
}
switch {
case bytes.Equal(params[0], []byte("FIN")):
return p.FIN(client, params)
case bytes.Equal(params[0], []byte("RDY")):
return p.RDY(client, params)
case bytes.Equal(params[0], []byte("REQ")):
return p.REQ(client, params)
case bytes.Equal(params[0], []byte("PUB")):
return p.PUB(client, params)
case bytes.Equal(params[0], []byte("MPUB")):
return p.MPUB(client, params)
case bytes.Equal(params[0], []byte("DPUB")):
return p.DPUB(client, params)
case bytes.Equal(params[0], []byte("NOP")):
return p.NOP(client, params)
case bytes.Equal(params[0], []byte("TOUCH")):
return p.TOUCH(client, params)
case bytes.Equal(params[0], []byte("SUB")):
return p.SUB(client, params)
case bytes.Equal(params[0], []byte("CLS")):
return p.CLS(client, params)
case bytes.Equal(params[0], []byte("AUTH")):
return p.AUTH(client, params)
}
return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0]))
}
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
var err error
var memoryMsgChan chan *Message
var backendMsgChan chan []byte
var subChannel *Channel
// NOTE: `flusherChan` is used to bound message latency for
// the pathological case of a channel on a low volume topic
// with >1 clients having >1 RDY counts
var flusherChan <-chan time.Time
var sampleRate int32
subEventChan := client.SubEventChan
identifyEventChan := client.IdentifyEventChan
outputBufferTicker := time.NewTicker(client.OutputBufferTimeout)
heartbeatTicker := time.NewTicker(client.HeartbeatInterval)
heartbeatChan := heartbeatTicker.C
msgTimeout := client.MsgTimeout
// v2 opportunistically buffers data to clients to reduce write system calls
// we force flush in two cases:
// 1. when the client is not ready to receive messages
// 2. we're buffered and the channel has nothing left to send us
// (ie. we would block in this loop anyway)
//
flushed := true
// signal to the goroutine that started the messagePump
// that we've started up
close(startedChan)
for {
if subChannel == nil || !client.IsReadyForMessages() {
// the client is not ready to receive messages...
memoryMsgChan = nil
backendMsgChan = nil
flusherChan = nil
// force flush
client.writeLock.Lock()
err = client.Flush()
client.writeLock.Unlock()
if err != nil {
goto exit
}
flushed = true
} else if flushed {
// last iteration we flushed...
// do not select on the flusher ticker channel
memoryMsgChan = subChannel.memoryMsgChan
backendMsgChan = subChannel.backend.ReadChan()
flusherChan = nil
} else {
// we're buffered (if there isn't any more data we should flush)...
// select on the flusher ticker channel, too
memoryMsgChan = subChannel.memoryMsgChan
backendMsgChan = subChannel.backend.ReadChan()
flusherChan = outputBufferTicker.C
}
select {
case <-flusherChan:
// if this case wins, we're either starved
// or we won the race between other channels...
// in either case, force flush
client.writeLock.Lock()
err = client.Flush()
client.writeLock.Unlock()
if err != nil {
goto exit
}
flushed = true
case <-client.ReadyStateChan:
case subChannel = <-subEventChan:
// you can't SUB anymore
subEventChan = nil
case identifyData := <-identifyEventChan:
// you can't IDENTIFY anymore
identifyEventChan = nil
outputBufferTicker.Stop()
if identifyData.OutputBufferTimeout > 0 {
outputBufferTicker = time.NewTicker(identifyData.OutputBufferTimeout)
}
heartbeatTicker.Stop()
heartbeatChan = nil
if identifyData.HeartbeatInterval > 0 {
heartbeatTicker = time.NewTicker(identifyData.HeartbeatInterval)
heartbeatChan = heartbeatTicker.C
}
if identifyData.SampleRate > 0 {
sampleRate = identifyData.SampleRate
}
msgTimeout = identifyData.MsgTimeout
case <-heartbeatChan:
err = p.Send(client, frameTypeResponse, heartbeatBytes)
if err != nil {
goto exit
}
case b := <-backendMsgChan:
if sampleRate > 0 && rand.Int31n(100) > sampleRate {
continue
}
msg, err := decodeMessage(b)
if err != nil {
p.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
continue
}
msg.Attempts++
subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
client.SendingMessage()
err = p.SendMessage(client, msg)
if err != nil {
goto exit
}
flushed = false
case msg := <-memoryMsgChan:
if sampleRate > 0 && rand.Int31n(100) > sampleRate {
continue
}
msg.Attempts++
subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
client.SendingMessage()
err = p.SendMessage(client, msg)
if err != nil {
goto exit
}
flushed = false
case <-client.ExitChan:
goto exit
}
}
exit:
p.ctx.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting messagePump", client)
heartbeatTicker.Stop()
outputBufferTicker.Stop()
if err != nil {
p.ctx.nsqd.logf(LOG_ERROR, "PROTOCOL(V2): [%s] messagePump error - %s", client, err)
}
}
lookupLoop的线程处理
func (n *NSQD) lookupLoop() {
// 存放lookupPeer对象
var lookupPeers []*lookupPeer
// 存放lookupd服务节点列表,用于判断去重
var lookupAddrs []string
connect := true
hostname, err := os.Hostname()
if err != nil {
n.logf(LOG_FATAL, "failed to get hostname - %s", err)
os.Exit(1)
}
// for announcements, lookupd determines the host automatically
ticker := time.Tick(15 * time.Second)
for {
if connect {
for _, host := range n.getOpts().NSQLookupdTCPAddresses {
// 判断是否处理过
if in(host, lookupAddrs) {
continue
}
n.logf(LOG_INFO, "LOOKUP(%s): adding peer", host)
lookupPeer := newLookupPeer(host, n.getOpts().MaxBodySize, n.logf,
connectCallback(n, hostname))
// 初始化连接lookup节点,并发送nsq.MagicV1消息,再回调connectCallback的闭包函数
lookupPeer.Command(nil) // start the connection
lookupPeers = append(lookupPeers, lookupPeer)
lookupAddrs = append(lookupAddrs, host)
}
n.lookupPeers.Store(lookupPeers)
connect = false
}
select {
case <-ticker:
// 心跳
for _, lookupPeer := range lookupPeers {
n.logf(LOG_DEBUG, "LOOKUPD(%s): sending heartbeat", lookupPeer)
cmd := nsq.Ping()
_, err := lookupPeer.Command(cmd)
if err != nil {
n.logf(LOG_ERROR, "LOOKUPD(%s): %s - %s", lookupPeer, cmd, err)
}
}
// topic或channel有变化(注册或退出),会通知消息发送给所有已经连接的lookupd服务
case val := <-n.notifyChan:
var cmd *nsq.Command
var branch string
switch val.(type) {
case *Channel:
// notify all nsqlookupds that a new channel exists, or that it's removed
branch = "channel"
channel := val.(*Channel)
if channel.Exiting() == true {
cmd = nsq.UnRegister(channel.topicName, channel.name)
} else {
cmd = nsq.Register(channel.topicName, channel.name)
}
case *Topic:
// notify all nsqlookupds that a new topic exists, or that it's removed
branch = "topic"
topic := val.(*Topic)
if topic.Exiting() == true {
cmd = nsq.UnRegister(topic.name, "")
} else {
cmd = nsq.Register(topic.name, "")
}
}
for _, lookupPeer := range lookupPeers {
n.logf(LOG_INFO, "LOOKUPD(%s): %s %s", lookupPeer, branch, cmd)
_, err := lookupPeer.Command(cmd)
if err != nil {
n.logf(LOG_ERROR, "LOOKUPD(%s): %s - %s", lookupPeer, cmd, err)
}
}
// 重置更新opts.NSQLookupdTCPAddresses参数重新连接
case <-n.optsNotificationChan:
var tmpPeers []*lookupPeer
var tmpAddrs []string
for _, lp := range lookupPeers {
if in(lp.addr, n.getOpts().NSQLookupdTCPAddresses) {
tmpPeers = append(tmpPeers, lp)
tmpAddrs = append(tmpAddrs, lp.addr)
continue
}
n.logf(LOG_INFO, "LOOKUP(%s): removing peer", lp)
lp.Close()
}
lookupPeers = tmpPeers
lookupAddrs = tmpAddrs
connect = true
case <-n.exitChan:
goto exit
}
}
exit:
n.logf(LOG_INFO, "LOOKUP: closing")
}
首先init
方法解析要监听的节点地址参数(http地址)和管理员用户名参数
func main() {
flagSet.Parse(os.Args[1:])
if *showVersion {
fmt.Println(version.String("nsqadmin"))
return
}
// 信号处理
exitChan := make(chan int)
signalChan := make(chan os.Signal, 1)
go func() {
<-signalChan
exitChan <- 1
}()
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
// 解析配置
var cfg map[string]interface{}
if *config != "" {
_, err := toml.DecodeFile(*config, &cfg)
if err != nil {
log.Fatalf("ERROR: failed to load config file %s - %s", *config, err)
}
}
// 创建nsqadmin
opts := nsqadmin.NewOptions()
options.Resolve(opts, flagSet, cfg)
nsqadmin := nsqadmin.New(opts)
// 启动服务
nsqadmin.Main()
<-exitChan
nsqadmin.Exit()
}
func (p *LookupProtocolV1) Exec(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) {
switch params[0] {
case "PING":
return p.PING(client, params)
case "IDENTIFY":
return p.IDENTIFY(client, reader, params[1:])
case "REGISTER":
// topic和channel注册到lookupd的DB对象中,DB对象其实就是个map
return p.REGISTER(client, reader, params[1:])
case "UNREGISTER":
return p.UNREGISTER(client, reader, params[1:])
}
return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0]))
}
先启动nsqlookupd服务,再起动nsqd服务,启动nsqd时带上
--lookupd-tcp-address nsqlookupd-host:nsqlookupd-port
参数,nsqd会在启动tcp和http服务后去连接nsqd的tcp服务,之后nsqlookupd服务会15秒一次与该节点保持心跳,客户端可以通过nsqlookup的http接口获取节点信息。在启动nsqadmin时也可以指定--lookupd-tcp-address
参数,nsqadmin服务会通过http接口从nsqlookupd服务获取所有节点信息,并且在可以通过lookup创建topic和channel(--nsqd-http-address
做不到这点)