Skip to content

Commit

Permalink
move to zap logger, reduced seq size
Browse files Browse the repository at this point in the history
  • Loading branch information
cjimti committed Aug 3, 2018
1 parent 87a1287 commit 890287b
Show file tree
Hide file tree
Showing 266 changed files with 38,389 additions and 10,852 deletions.
92 changes: 64 additions & 28 deletions Godeps/Godeps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

92 changes: 68 additions & 24 deletions rtq/rtq.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,18 @@ package rtq

import (
"bytes"
"encoding/json"
"fmt"
"net"
"net/http"
"time"

"encoding/json"

"net"
"errors"

"github.com/bhoriuchi/go-bunyan/bunyan"
"github.com/coreos/bbolt"
"github.com/satori/go.uuid"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

// Message to store and send
Expand All @@ -38,21 +39,21 @@ type Config struct {
Interval time.Duration
Batch int
MaxInQueue int
Logger *bunyan.Logger
Logger *zap.Logger
Receiver string
Path string
}

// rtQ private struct see NewQ
type rtQ struct {
db *bolt.DB // the database
cfg Config // configuration
mq chan Message // message channel
remove chan int // number of records to remove
txSeq []byte // max transmitted sequence
mCount int // last reported message count
status func(args ...interface{}) // status output
statusError func(args ...interface{}) // status error output
db *bolt.DB // the database
cfg Config // configuration
mq chan Message // message channel
remove chan int // number of records to remove
txSeq []byte // max transmitted sequence
mCount int // last reported message count
status func(msg string, fields ...zapcore.Field) // status output
statusError func(msg string, fields ...zapcore.Field) // status error output
}

// NewQ returns a new rtQ
Expand Down Expand Up @@ -102,15 +103,21 @@ func (rt *rtQ) getMessageBatch() MessageBatch {
// get bucket stats
stats := bucket.Stats()

// Encode stats to JSON and print to STDERR.
//json.NewEncoder(os.Stderr).Encode(stats)
rt.status("Records in mq: %d", stats.KeyN)
rt.status("QueueState", zapcore.Field{
Key: "TotalRecords",
Type: zapcore.Int32Type,
Integer: int64(stats.KeyN),
})

ingressOver := 0
if rt.mCount > 0 {
ingressOver = stats.KeyN - rt.mCount
}
rt.status("NEW Records in mq: %d", ingressOver)
rt.status("QueueState", zapcore.Field{
Key: "NewRecords",
Type: zapcore.Int32Type,
Integer: int64(ingressOver),
})

// store the new count
rt.mCount = stats.KeyN
Expand Down Expand Up @@ -166,7 +173,15 @@ func (rt *rtQ) transmit(msgB MessageBatch) error {

defer resp.Body.Close()

rt.status("Tx Status: %s", resp.Status)
if resp.StatusCode != 200 {
return errors.New(resp.Status)
}

rt.status("TransmissionStatus", zapcore.Field{
Key: "Reponse",
Type: zapcore.StringType,
String: resp.Status,
})

return nil
}
Expand All @@ -178,13 +193,28 @@ func (rt *rtQ) tx() {
// get a message batch
mb := rt.getMessageBatch()

rt.status("Txing %d Messages.", mb.Size)
if mb.Size < 1 {
// nothing to send
rt.status("TransmissionSkipEmpty")
rt.waitTx()
return
}

rt.status("TransmissionAttempt", zapcore.Field{
Key: "Messages",
Type: zapcore.Int32Type,
Integer: int64(mb.Size),
})

// try to send
err := rt.transmit(mb)
if err != nil {
// transmission failed
rt.statusError("Tx Error: %s", err.Error())
rt.status("Transmission", zapcore.Field{
Key: "TransmissionError",
Type: zapcore.StringType,
String: err.Error(),
})

// is the queue larger than allowed
if rt.mCount > rt.cfg.MaxInQueue {
Expand All @@ -194,22 +224,36 @@ func (rt *rtQ) tx() {
return
}

rt.status("Transmission complete. Removing %d records.", mb.Size)
rt.status("TransmissionComplete", zapcore.Field{
Key: "RemovingMessages",
Type: zapcore.Int32Type,
Integer: int64(mb.Size),
})
rt.remove <- mb.Size
rt.waitTx()
}

// waitTx sleeps for rt.cfg.Interval * time.Second then performs a tx.
func (rt *rtQ) waitTx() {
rt.status("Tx Waiting %d seconds.", rt.cfg.Interval/time.Second)
rt.status("TransmissionStatus", zapcore.Field{
Key: "WaitSecond",
Type: zapcore.Int32Type,
Integer: int64(rt.cfg.Interval / time.Second),
})

time.Sleep(rt.cfg.Interval)
rt.tx() // recursion
}

// Write to the queue
func (rt *rtQ) QWrite(msg Message) error {

rt.status("Send to channel: %s", msg)
rt.status("ReceiverStatus", zapcore.Field{
Key: "KeyValues",
Type: zapcore.Int32Type,
Integer: int64(len(msg.Payload)),
})

rt.mq <- msg

return nil
Expand All @@ -233,7 +277,7 @@ func messageHandler(db *bolt.DB, mq chan Message, remove chan int) {

df := msg.Time.Format("20060102")

msg.Seq = fmt.Sprintf("%s%012d", df, id)
msg.Seq = fmt.Sprintf("%s%011d", df, id)

buf, err := json.Marshal(msg)
if err != nil {
Expand Down
Loading

0 comments on commit 890287b

Please sign in to comment.