Skip to content

Commit

Permalink
Create proof of concept Activity Monitor
Browse files Browse the repository at this point in the history
- Forgot to bind all queues to the exchange in previous commit.
- Added basic AM standalone program
- TCP/IP JSON-formatted system logger appropriate for transmitting to LogStash
  • Loading branch information
jcjones committed Feb 17, 2015
1 parent c61a582 commit 3387a77
Show file tree
Hide file tree
Showing 4 changed files with 330 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,6 @@ _testmain.go
*.exe
*.test
*.prof

boulder-start/boulder-start
activity-monitor/activity-monitor
166 changes: 166 additions & 0 deletions activity-monitor/activity-monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// Copyright 2014 ISRG. All rights reserved
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package main

import (
"github.com/codegangsta/cli"
"github.com/letsencrypt/boulder"
"github.com/streadway/amqp"
"log"
"net/url"
"os"
)

const (
QueueName = "Monitor"
AmqpExchange = "boulder"
AmqpExchangeType = "topic"
AmqpInternal = false
AmqpDurable = false
AmqpDeleteUnused = false
AmqpExclusive = false
AmqpNoWait = false
AmqpNoLocal = false
AmqpAutoAck = true
AmqpMandatory = false
AmqpImmediate = false
)


func startMonitor(AmqpUrl string, logger *boulder.JsonLogger) {

conn, err := amqp.Dial(AmqpUrl)
if err != nil {
log.Fatalf("Could not connect to AMQP server: %s", err)
return
}

rpcCh, err := conn.Channel()
if err != nil {
log.Fatalf("Could not start channel: %s", err)
return
}

err = rpcCh.ExchangeDeclare(
AmqpExchange,
AmqpExchangeType,
AmqpDurable,
AmqpDeleteUnused,
AmqpInternal,
AmqpNoWait,
nil)
if err != nil {
log.Fatalf("Could not declare exchange: %s", err)
return
}

_, err = rpcCh.QueueDeclare(
QueueName,
AmqpDurable,
AmqpDeleteUnused,
AmqpExclusive,
AmqpNoWait,
nil)
if err != nil {
log.Fatalf("Could not declare queue: %s", err)
return
}

err = rpcCh.QueueBind(
QueueName,
"#", //wildcard
AmqpExchange,
false,
nil)
if err != nil {
log.Fatalf("Could not bind queue: %s", err)
return
}

delveries, err := rpcCh.Consume(
QueueName,
"",
AmqpAutoAck,
AmqpExclusive,
AmqpNoLocal,
AmqpNoWait,
nil)
if err != nil {
log.Fatalf("Could not subscribe to queue: %s", err)
return
}

// Run forever.
handle(delveries, logger)
}

func handle(deliveries <-chan amqp.Delivery, jsonLogger *boulder.JsonLogger) {
for d := range deliveries {

// Send the entire message contents to the syslog server for debugging.
// TODO: Track state,
jsonLogger.Debug("Message contents", d)
}
}

func main() {
app := cli.NewApp()
app.Name = "activity-monitor"
app.Usage = "Monitor Boulder's communications."
app.Version = "0.0.0"

// Specify AMQP Server
app.Flags = []cli.Flag{
cli.StringFlag{
Name: "amqp",
Value: "amqp://guest:guest@localhost:5672",
Usage: "AMQP Broker String",
},
cli.StringFlag{
Name: "syslog",
Value: "tcp://localhost:514",
Usage: "Syslog server and port",
},
cli.BoolFlag{
Name: "stdout",
Usage: "Enable debug logging to stdout",
},
cli.IntFlag{
Name: "level",
Value: 4,
Usage: "Minimum Level to log (0-7), 7=Debug",
},
}

app.Action = func(c *cli.Context) {
// Parse SysLog URL
syslogU, err := url.Parse(c.GlobalString("syslog"))
if err != nil {
log.Fatalf("Could not parse Syslog URL: %s", err)
return
}

logger := &boulder.JsonLogger{}
logger.SetEndpoint(syslogU.Scheme, syslogU.Host)
err = logger.Connect()
if err != nil {
log.Fatalf("Could not open remote syslog: %s", err)
return
}

logger.SetDebugToStdOut(c.GlobalBool("stdout"))

logger.SetLevel(c.GlobalInt("level"))

startMonitor( c.GlobalString("amqp"), logger )
}

err := app.Run(os.Args)
if err != nil {
log.Fatalf("Could not start: %s", err)
return
}
}
18 changes: 18 additions & 0 deletions amqp-rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func amqpSubscribe(ch *amqp.Channel, name string) (msgs <-chan amqp.Delivery, er
nil)
if err != nil {
log.Fatalf("Could not declare exchange: %s", err)
return
}

q, err := ch.QueueDeclare(
Expand All @@ -70,6 +71,18 @@ func amqpSubscribe(ch *amqp.Channel, name string) (msgs <-chan amqp.Delivery, er
AmqpNoWait,
nil)
if err != nil {
log.Fatalf("Could not declare queue: %s", err)
return
}

err = ch.QueueBind(
name,
name,
AmqpExchange,
false,
nil)
if err != nil {
log.Fatalf("Could not bind queue: %s", err)
return
}

Expand All @@ -81,6 +94,11 @@ func amqpSubscribe(ch *amqp.Channel, name string) (msgs <-chan amqp.Delivery, er
AmqpNoLocal,
AmqpNoWait,
nil)
if err != nil {
log.Fatalf("Could not subscribe to queue: %s", err)
return
}

return
}

Expand Down
143 changes: 143 additions & 0 deletions json_logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
// Copyright 2014 ISRG. All rights reserved
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package boulder

import (
"encoding/json"
"fmt"
"log"
"net"
"sync"
)

const (
EMERGENCY = 0
ALERT = 1
CRITICAL = 2
ERROR = 3
WARNING = 4
NOTICE = 5
INFO = 6
DEBUG = 7
)

type LogMessage struct {
Message string `json:"message"`
Payload interface{} `json:"payload"`
Program string `json:"program"`
Severity int `json:"severity"`
}

type JsonLogger struct {
debug bool
scheme string
host string
level int
conn net.Conn
mu sync.Mutex // guards conn
program string // Defines the 'program' field in JSON
}

func (jl *JsonLogger) SetDebugToStdOut(debug bool) {
jl.debug = debug
}

func (jl *JsonLogger) SetLevel(level int) {
jl.level = level
}

func (jl *JsonLogger) SetEndpoint(scheme string, host string) {
jl.scheme = scheme
jl.host = host
}

func (jl *JsonLogger) Connect() (error) {
conn, err := net.Dial(jl.scheme, jl.host)
if err == nil {
jl.conn = conn
}
return err
}

func (jl *JsonLogger) Critical(messageStr string, payloadObj interface{}) {
jl.Write(CRITICAL, messageStr, payloadObj)
}

func (jl *JsonLogger) Alert(messageStr string, payloadObj interface{}) {
jl.Write(ALERT, messageStr, payloadObj)
}

func (jl *JsonLogger) Emergency(messageStr string, payloadObj interface{}) {
jl.Write(EMERGENCY, messageStr, payloadObj)
}

func (jl *JsonLogger) Error(messageStr string, payloadObj interface{}) {
jl.Write(ERROR, messageStr, payloadObj)
}

func (jl *JsonLogger) Warning(messageStr string, payloadObj interface{}) {
jl.Write(WARNING, messageStr, payloadObj)
}

func (jl *JsonLogger) Notice(messageStr string, payloadObj interface{}) {
jl.Write(NOTICE, messageStr, payloadObj)
}

func (jl *JsonLogger) Info(messageStr string, payloadObj interface{}) {
jl.Write(INFO, messageStr, payloadObj)
}

func (jl *JsonLogger) Debug(messageStr string, payloadObj interface{}) {
jl.Write(DEBUG, messageStr, payloadObj)
}

func (jl *JsonLogger) Write(severity int, messageStr string, payloadObj interface{}) {
if severity > jl.level {
return
}

data := LogMessage{
Program: "am",
Payload: payloadObj,
Message: messageStr,
Severity: severity}

encoded, err := json.Marshal(data)

// s, err := json.Marshal(lm)
if err != nil {
log.Fatalf("Could not marshal log message: %s", err)
return
}

if jl.debug {
log.Println(fmt.Sprintf("<%d> %s", severity, string(encoded)))
}

_, err = jl.WriteAndRetry(string(encoded))
if err != nil {
log.Fatalf("Failed to send log message, even with retry: %s", encoded)
return
}
}

func (jl *JsonLogger) Transmit(s string) (int, error) {
return fmt.Fprintln(jl.conn, s)
}

func (jl *JsonLogger) WriteAndRetry(s string) (int, error) {
jl.mu.Lock()
defer jl.mu.Unlock()

if jl.conn != nil {
if n, err := jl.Transmit(s); err == nil {
return n, err
}
}
if err := jl.Connect(); err != nil {
return 0, err
}
return jl.Transmit(s)
}

0 comments on commit 3387a77

Please sign in to comment.