Skip to content

Commit

Permalink
reverse order of return variables for protocol; remove knowledge of c…
Browse files Browse the repository at this point in the history
…lient in channel
  • Loading branch information
mreiferson committed May 27, 2012
1 parent 4249e40 commit 7ae6823
Show file tree
Hide file tree
Showing 10 changed files with 115 additions and 90 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
nsq
nsq_server/nsq_server
nsq_lookup/nsq_lookup
nsq_admin/nsq_admin

# Go.gitignore

Expand Down
9 changes: 9 additions & 0 deletions INSTALLING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@

Install instructions

cd nsq_server
go build

cd ../nsq_admin
go build

55 changes: 5 additions & 50 deletions message/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,12 @@ import (
"errors"
"log"
"time"
// "../server"
)

type Channel struct {
name string
addClientChan chan util.ChanReq
removeClientChan chan util.ChanReq
// clients []*Client
name string
addClientChan chan util.ChanReq
removeClientChan chan util.ChanReq
backend queue.BackendQueue
incomingMessageChan chan *Message
msgChan chan *Message
Expand All @@ -27,9 +25,8 @@ type Channel struct {
// Channel constructor
func NewChannel(channelName string, inMemSize int) *Channel {
channel := &Channel{name: channelName,
addClientChan: make(chan util.ChanReq),
removeClientChan: make(chan util.ChanReq),
// clients: make([]*Client, 0, 5),
addClientChan: make(chan util.ChanReq),
removeClientChan: make(chan util.ChanReq),
backend: queue.NewDiskQueue(channelName),
incomingMessageChan: make(chan *Message, 5),
msgChan: make(chan *Message, inMemSize),
Expand All @@ -42,23 +39,6 @@ func NewChannel(channelName string, inMemSize int) *Channel {
return channel
}

// // AddClient performs a thread safe operation
// // to add a Client to a Channel
// // see: Channel.Router()
// func (c *Channel) AddClient(client *Client) {
// log.Printf("CHANNEL(%s): adding client...", c.name)
// doneChan := make(chan interface{})
// c.addClientChan <- ChanReq{client, doneChan}
// <-doneChan
// }
//
// func (c *Channel) RemoveClient(client *Client) {
// log.Printf("CHANNEL(%s): removing client...", c.name)
// doneChan := make(chan interface{})
// c.removeClientChan <- ChanReq{client, doneChan}
// <-doneChan
// }

// PutMessage writes to the appropriate incoming
// message channel
func (c *Channel) PutMessage(msg *Message) {
Expand All @@ -80,8 +60,6 @@ func (c *Channel) RequeueMessage(uuidStr string) error {
// Router handles the muxing of Channel messages including
// the addition of a Client to the Channel
func (c *Channel) Router() {
// var clientReq util.ChanReq

helperCloseChan := make(chan int)

go func() {
Expand Down Expand Up @@ -128,27 +106,6 @@ func (c *Channel) Router() {

for {
select {
// case clientReq = <-c.addClientChan:
// client := clientReq.variable.(*Client)
// c.clients = append(c.clients, client)
// log.Printf("CHANNEL(%s): added client(%#v)", c.name, client)
// clientReq.retChan <- 1
// case clientReq = <-c.removeClientChan:
// indexToRemove := -1
// client := clientReq.variable.(*Client)
// for k, v := range c.clients {
// if v == client {
// indexToRemove = k
// break
// }
// }
// if indexToRemove == -1 {
// log.Printf("ERROR: could not find client(%#v) in clients(%#v)", client, c.clients)
// } else {
// c.clients = append(c.clients[:indexToRemove], c.clients[indexToRemove+1:]...)
// log.Printf("CHANNEL(%s): removed client(%#v)", c.name, client)
// }
// clientReq.retChan <- 1
case msg := <-c.incomingMessageChan:
select {
case c.msgChan <- msg:
Expand Down Expand Up @@ -212,8 +169,6 @@ func (c *Channel) Close() error {

c.exitChan <- 1

// TODO: close (and wait to flush?) all clients

err = c.backend.Close()
if err != nil {
return err
Expand Down
36 changes: 36 additions & 0 deletions nsq_admin/nsq_admin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package main

import (
"flag"
"log"
"io"
"strconv"
"net/http"
"time"
)

var bindAddress = flag.String("address", "", "address to bind to")
var webPort = flag.Int("web-port", 5152, "port to listen on for HTTP connections")
var debugMode = flag.Bool("debug", false, "enable debug mode")

func pingHandler(w http.ResponseWriter, req *http.Request) {
w.Header().Set("Content-Length", "2")
io.WriteString(w, "OK")
}

func main() {
flag.Parse()
fqAddress := *bindAddress + ":" + strconv.Itoa(*webPort)
log.Printf("listening for http requests on %s", fqAddress)
s := &http.Server{
Addr: fqAddress,
Handler: http.DefaultServeMux,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
MaxHeaderBytes: 1 << 20,
}
http.HandleFunc("/ping", pingHandler)
http.Handle("/static", http.FileServer(http.Dir("static")))
log.Fatal(s.ListenAndServe())

}
6 changes: 3 additions & 3 deletions nsq.go → nsq_server/nsq_server.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package main

import (
"./message"
"./server"
"./util"
"../message"
"../server"
"../util"
"flag"
"log"
"os"
Expand Down
2 changes: 1 addition & 1 deletion protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type Client interface {

type Protocol interface {
IOLoop(client Client) error
Execute(client Client, params ...string) (error, []byte)
Execute(client Client, params ...string) ([]byte, error)
}

var Protocols = map[int32]Protocol{}
Expand Down
72 changes: 38 additions & 34 deletions protocol/protocol_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (p *ProtocolV1) IOLoop(client Client) error {

log.Printf("PROTOCOL: %#v", params)

err, response := p.Execute(client, params...)
response, err := p.Execute(client, params...)
if err != nil {
err = client.WriteError(err)
if err != nil {
Expand All @@ -77,7 +77,7 @@ func (p *ProtocolV1) IOLoop(client Client) error {
return err
}

func (p *ProtocolV1) Execute(client Client, params ...string) (error, []byte) {
func (p *ProtocolV1) Execute(client Client, params ...string) ([]byte, error) {
var err error
var response []byte

Expand All @@ -93,38 +93,38 @@ func (p *ProtocolV1) Execute(client Client, params ...string) (error, []byte) {
if method, ok := typ.MethodByName(cmd); ok {
args[2] = reflect.ValueOf(params)
returnValues := method.Func.Call(args)
err = nil
response = nil
if !returnValues[0].IsNil() {
err = returnValues[0].Interface().(error)
response = returnValues[0].Interface().([]byte)
}
response = nil
err = nil
if !returnValues[1].IsNil() {
response = returnValues[1].Interface().([]byte)
err = returnValues[1].Interface().(error)
}

return err, response
return response, err
}

return ClientErrInvalid, nil
return nil, ClientErrInvalid
}

func (p *ProtocolV1) SUB(client Client, params []string) (error, []byte) {
func (p *ProtocolV1) SUB(client Client, params []string) ([]byte, error) {
if client.GetState() != ClientInit {
return ClientErrInvalid, nil
return nil, ClientErrInvalid
}

if len(params) < 3 {
return ClientErrInvalid, nil
return nil, ClientErrInvalid
}

topicName := params[1]
if len(topicName) == 0 {
return ClientErrBadTopic, nil
return nil, ClientErrBadTopic
}

channelName := params[2]
if len(channelName) == 0 {
return ClientErrBadChannel, nil
return nil, ClientErrBadChannel
}

client.SetState(ClientWaitGet)
Expand All @@ -136,18 +136,18 @@ func (p *ProtocolV1) SUB(client Client, params []string) (error, []byte) {
return nil, nil
}

func (p *ProtocolV1) GET(client Client, params []string) (error, []byte) {
func (p *ProtocolV1) GET(client Client, params []string) ([]byte, error) {
var err error

if client.GetState() != ClientWaitGet {
return ClientErrInvalid, nil
return nil, ClientErrInvalid
}

// this blocks until a message is ready
msg := p.channel.GetMessage()
if msg == nil {
log.Printf("ERROR: msg == nil")
return ClientErrBadMessage, nil
return nil, ClientErrBadMessage
}

uuidStr := util.UuidToStr(msg.Uuid())
Expand All @@ -157,87 +157,91 @@ func (p *ProtocolV1) GET(client Client, params []string) (error, []byte) {
buf := bytes.NewBuffer([]byte(uuidStr))
_, err = buf.Write(msg.Body())
if err != nil {
return err, nil
return nil, err
}

client.SetState(ClientWaitAck)

return nil, buf.Bytes()
return buf.Bytes(), nil
}

func (p *ProtocolV1) ACK(client Client, params []string) (error, []byte) {
func (p *ProtocolV1) ACK(client Client, params []string) ([]byte, error) {
if client.GetState() != ClientWaitAck {
return ClientErrInvalid, nil
return nil, ClientErrInvalid
}

client.SetState(ClientWaitResponse)

return nil, nil
}

func (p *ProtocolV1) FIN(client Client, params []string) (error, []byte) {
func (p *ProtocolV1) FIN(client Client, params []string) ([]byte, error) {
if client.GetState() != ClientWaitResponse {
return ClientErrInvalid, nil
return nil, ClientErrInvalid
}

if len(params) < 2 {
return ClientErrInvalid, nil
return nil, ClientErrInvalid
}

uuidStr := params[1]
err := p.channel.FinishMessage(uuidStr)
if err != nil {
return err, nil
return nil, err
}

client.SetState(ClientWaitGet)

return nil, nil
}

func (p *ProtocolV1) REQ(client Client, params []string) (error, []byte) {
func (p *ProtocolV1) REQ(client Client, params []string) ([]byte, error) {
if client.GetState() != ClientWaitResponse {
return ClientErrInvalid, nil
return nil, ClientErrInvalid
}

if len(params) < 2 {
return ClientErrInvalid, nil
return nil, ClientErrInvalid
}

uuidStr := params[1]
err := p.channel.RequeueMessage(uuidStr)
if err != nil {
return err, nil
return nil, err
}

client.SetState(ClientWaitGet)

return nil, nil
}

func (p *ProtocolV1) PUB(client Client, params []string) (error, []byte) {
func (p *ProtocolV1) PUB(client Client, params []string) ([]byte, error) {
var buf bytes.Buffer
var err error

if client.GetState() != ClientInit {
return ClientErrInvalid, nil
// log.Printf("PROTOCOL: PUB %s", params)

// pub's are fake clients. They don't get to ClientInit
if client.GetState() != -1 {
log.Printf("PROTOCOL: INVALID STATE %d", client.GetState())
return nil, ClientErrInvalid
}

if len(params) < 3 {
return ClientErrInvalid, nil
return nil, ClientErrInvalid
}

topicName := params[1]
body := []byte(params[2])

_, err = buf.Write(<-util.UuidChan)
if err != nil {
return err, nil
return nil, err
}

_, err = buf.Write(body)
if err != nil {
return err, nil
return nil, err
}

topic := message.GetTopic(topicName)
Expand Down
3 changes: 3 additions & 0 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ func NewClient(conn net.Conn) *Client {
}

func (c *Client) String() string {
if c.conn == nil {
return "<nil>"
}
return c.conn.RemoteAddr().String()
}

Expand Down
Loading

0 comments on commit 7ae6823

Please sign in to comment.