Skip to content

Commit

Permalink
Initial revision
Browse files Browse the repository at this point in the history
  • Loading branch information
rs committed Mar 9, 2018
0 parents commit 0fc07e9
Show file tree
Hide file tree
Showing 16 changed files with 680 additions and 0 deletions.
11 changes: 11 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
language: go
go:
- "1.8"
- "1.9"
- "1.10"
- master
matrix:
allow_failures:
- go: master
script:
go test -v -race -cpu=1,2,4 -bench . -benchmem ./...
21 changes: 21 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
MIT License

Copyright (c) 2018 Olivier Poitrey

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
60 changes: 60 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# jaggr: JSON Aggregation CLI
[![license](http://img.shields.io/badge/license-MIT-red.svg?style=flat)](https://raw.githubusercontent.com/rs/jaggr/master/LICENSE) [![Build Status](https://travis-ci.org/rs/jaggr.svg?branch=master)](https://travis-ci.org/rs/jaggr)

Jaggr is a command line tool to aggregate in real time a series of JSON logs. The main goal of this tool is to prepare data for plotting with [jplot](https://github.com/rs/jplot).

## Install

```
go get -u github.com/rs/jaggr
```

## Usage

Given the input below, generate one line per second with mean, min, max:

```
{"code": 200, "latency": 4788000, "error": ""}
{"code": 200, "latency": 5785000, "error": ""}
{"code": 200, "latency": 4162000, "error": ""}
{"code": 502, "latency": 4461000, "error": "i/o error"}
{"code": 200, "latency": 5884000, "error": ""}
{"code": 200, "latency": 4702000, "error": ""}
...
```

```
tail -f log.json | jaggr @count=rps hist[200,502,*]:code min,max,mean:latency
```

Output will be on line per second as follow:

```
{"count":123, "code": {"hist": {"200": 100, 502: 13, "*": 10}}, "latency":{"min": 4461000, "max": 5884000, "mean": 4483000}}
```

So here we give a stream of real-time requests to jaggr standard input and request the aggregation of the `code` and `latency` fields. For the `code` we request an histogram with some known error codes with an "other" bucket defined by `*`. The `latency` field is aggregated using minimum, maximum and mean. In addition, `@count` adds an extra field indicating the total number of lines aggregated. The `=` sign can be used on any field to rename it, here we use it to say that the count is an `rps` as we are using the default aggregation time of 1 second.

Note that any field not specified in the argument list are removed from the output (i.e. `error` field).

## Recipes

### Vegeta

Jaggr can be used to integrate [vegeta](https://github.com/tsenart/vegeta) with [jplot](https://github.com/rs/jplot) as follow:

```
echo 'GET http://localhost' | \
vegeta attack -rate 5000 -workers 100 -duration 10m | vegeta dump | \
jaggr @count=rps \
hist[100,200,300,400,500]:code \
p25,p50,p95:latency \
p25,p50,p95:bytes_in \
p25,p50,p95:bytes_out | \
jplot rps+code.hist.100+code.hist.200+code.hist.300+code.hist.400+code.hist.500 \
latency.p95+latency.p50+latency.p25 \
bytes_in.p95+bytes_in.p50+bytes_in.p25 \
bytes_out.p95+bytes_out.p50+bytes_out.p25
```

![](doc/vegeta.gif)
45 changes: 45 additions & 0 deletions aggr/aggr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package aggr

import (
"errors"
"fmt"
"strconv"
)

var rAggrs = map[string]func(args []string) (Aggregator, error){}

func init() {
Register("min", newMin)
Register("max", newMax)
Register("mean", newMean)
Register("median", newMedian)
Register("hist", newHist)
Register("cat", newCat)
for i := 1; i < 100; i++ {
p := float64(i)
Register(fmt.Sprintf("p%d", i), func(args []string) (Aggregator, error) {
return &percentile{p: p}, nil
})
}
}

// Aggregator accumulates data using Push and aggregate (and reset) the
// accumulated data on Aggr. The result is expected to be used as a JSON value.
// Numbers are expressed as float64.
type Aggregator interface {
Push(interface{}) error
Aggr() interface{}
}

// Register a custom aggregator.
func Register(name string, aggr func(args []string) (Aggregator, error)) error {
if _, found := rAggrs[name]; found {
return errors.New("already exists")
}
rAggrs[name] = aggr
return nil
}

func parseFloat(v interface{}) (float64, error) {
return strconv.ParseFloat(fmt.Sprint(v), 64)
}
34 changes: 34 additions & 0 deletions aggr/cat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package aggr

import (
"fmt"
)

type cat map[string]uint32

func newCat(args []string) (Aggregator, error) {
h := cat{}
for _, bucket := range args {
h[bucket] = 0
}
return h, nil
}

func (a cat) Push(v interface{}) error {
bucket := fmt.Sprint(v)
if _, found := a[bucket]; found {
a[bucket]++
} else if _, found := a["*"]; found {
a["*"]++
}
return nil
}

func (a cat) Aggr() interface{} {
v := make(map[string]uint32, len(a))
for b, c := range a {
v[b] = c
a[b] = 0
}
return v
}
16 changes: 16 additions & 0 deletions aggr/count.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package aggr

type count struct {
c uint32
}

func (a *count) Push(v interface{}) error {
a.c++
return nil
}

func (a *count) Aggr() interface{} {
v := a.c
a.c = 0
return v
}
115 changes: 115 additions & 0 deletions aggr/field.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package aggr

import (
"errors"
"fmt"
"strings"
"sync"

"github.com/elgs/gojq"
)

// Field describe an aggregation on a field.
type Field struct {
Path string
Name string
Aggrs map[string]Aggregator
}

type Fields struct {
f []Field
mu sync.Mutex
}

func NewFields(defs []string) (*Fields, error) {
fields := make([]Field, 0, len(defs))
for _, def := range defs {
f, err := NewField(def)
if err != nil {
return nil, fmt.Errorf("%s: %v", def, err)
}
fields = append(fields, f)
}
return &Fields{f: fields}, nil
}

func (fs *Fields) Push(jq *gojq.JQ) error {
fs.mu.Lock()
defer fs.mu.Unlock()
for _, f := range fs.f {
if err := f.Push(jq); err != nil {
return err
}
}
return nil
}

func (fs *Fields) Aggr() map[string]interface{} {
fs.mu.Lock()
defer fs.mu.Unlock()
v := map[string]interface{}{}
for _, f := range fs.f {
v[f.Name] = f.Aggr()
}
return v
}

// NewField parses a field definition.
func NewField(def string) (Field, error) {
if strings.HasPrefix(def, "@count") {
name := def
if idx := strings.LastIndexByte(name, '='); idx != -1 {
name = name[idx+1:]
}
return Field{
Path: ".",
Name: name,
Aggrs: map[string]Aggregator{"": &count{}},
}, nil
}
idx := strings.LastIndexByte(def, ':')
if idx == -1 {
return Field{}, errors.New("missing aggregation definition")
}
path := def[idx+1:]
p := &aggrsParser{exp: def[:idx]}
aggrs, err := p.parse()
if err != nil {
return Field{}, err
}
f := Field{
Path: path,
Name: path,
Aggrs: aggrs,
}
if idx = strings.LastIndexByte(f.Path, '='); idx != -1 {
f.Name = f.Path[idx+1:]
f.Path = f.Path[:idx]
}
return f, nil
}

func (f *Field) Push(jq *gojq.JQ) error {
v, err := jq.Query(f.Path)
if err != nil {
return err
}
for name, aggr := range f.Aggrs {
if err := aggr.Push(v); err != nil {
return fmt.Errorf("%s: %v", name, err)
}
}
return nil
}

func (f *Field) Aggr() interface{} {
if f.Path == "." && f.Aggrs[""] != nil {
// Count special field
return f.Aggrs[""].Aggr()
}
v := map[string]interface{}{}
for name, aggr := range f.Aggrs {
v[name] = aggr.Aggr()
}
return v
}
61 changes: 61 additions & 0 deletions aggr/hist.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package aggr

import (
"sort"
)

type bucket struct {
name string
val float64
}

type buckets []bucket

func (b buckets) Len() int { return len(b) }
func (b buckets) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
func (b buckets) Less(i, j int) bool { return b[i].val < b[j].val }

type hist struct {
buckets buckets
counts []uint32
}

func newHist(args []string) (Aggregator, error) {
h := hist{
buckets: make([]bucket, 0, len(args)),
counts: make([]uint32, len(args)),
}
for _, arg := range args {
val, err := parseFloat(arg)
if err != nil {
return nil, err
}
h.buckets = append(h.buckets, bucket{name: arg, val: val})
}
sort.Sort(h.buckets)
return h, nil
}

func (a hist) Push(v interface{}) error {
f, err := parseFloat(v)
if err != nil {
return err
}
var i int
for ; i < len(a.buckets)-1; i++ {
if f >= a.buckets[i].val && f < a.buckets[i+1].val {
break
}
}
a.counts[i]++
return nil
}

func (a hist) Aggr() interface{} {
v := make(map[string]uint32, len(a.buckets))
for i, b := range a.buckets {
v[b.name] = a.counts[i]
a.counts[i] = 0
}
return v
}
Loading

0 comments on commit 0fc07e9

Please sign in to comment.