Skip to content

Commit 65dbc2d

Browse files
author
Olivier Poitrey
committed
Extracted from rest-layer
0 parents  commit 65dbc2d

File tree

4 files changed

+300
-0
lines changed

4 files changed

+300
-0
lines changed

.travis.yml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
language: go
2+
go:
3+
- 1.6
4+
- tip
5+
env:
6+
allow_failures:
7+
- go: tip

README.md

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
# REST Layer Hystrix storage handler wrapper
2+
3+
[![godoc](http://img.shields.io/badge/godoc-reference-blue.svg?style=flat)](https://godoc.org/github.com/rs/rest-layer-hystrix) [![license](http://img.shields.io/badge/license-MIT-red.svg?style=flat)](https://raw.githubusercontent.com/rs/rest-layer-hystrix/master/LICENSE) [![build](https://img.shields.io/travis/rs/rest-layer-hystrix.svg?style=flat)](https://travis-ci.org/rs/rest-layer-hystrix)
4+
5+
This [REST Layer](https://github.com/rs/rest-layer) resource storage wrapper uses [hystrix-go](github.com/afex/hystrix-go) to add curuit breaker support to any REST Layer resource storage handler.
6+
7+
## Usage
8+
9+
```go
10+
import "github.com/rs/rest-layer-hystrix"
11+
```
12+
13+
Wrap existing storage handler with a name that will be used to construct hystrix commands:
14+
15+
```go
16+
s := restrix.Wrap("myResource", mem.NewHandler())
17+
```
18+
19+
Use this handler with a resource:
20+
21+
```go
22+
index.Bind("foo", foo, s, resource.DefaultConf)
23+
```
24+
25+
Customize the hystrix commands:
26+
27+
```go
28+
// Configure hystrix commands
29+
hystrix.Configure(map[string]hystrix.CommandConfig{
30+
"posts.Find": {
31+
Timeout: 1000,
32+
MaxConcurrentRequests: 100,
33+
ErrorPercentThreshold: 25,
34+
},
35+
"posts.Insert": {
36+
Timeout: 1000,
37+
MaxConcurrentRequests: 50,
38+
ErrorPercentThreshold: 25,
39+
},
40+
...
41+
})
42+
```
43+
44+
Start the metrics stream handler:
45+
46+
```go
47+
hystrixStreamHandler := hystrix.NewStreamHandler()
48+
hystrixStreamHandler.Start()
49+
log.Print("Serving Hystrix metrics on http://localhost:8081")
50+
go http.ListenAndServe(net.JoinHostPort("", "8081"), hystrixStreamHandler)
51+
```

examples/hystrix/main.go

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package main
2+
3+
import (
4+
"log"
5+
"net"
6+
"net/http"
7+
"net/http/httptest"
8+
"strings"
9+
10+
"github.com/afex/hystrix-go/hystrix"
11+
"github.com/rs/rest-layer-hystrix"
12+
"github.com/rs/rest-layer-mem"
13+
"github.com/rs/rest-layer/resource"
14+
"github.com/rs/rest-layer/rest"
15+
"github.com/rs/rest-layer/schema"
16+
"github.com/rs/xaccess"
17+
"github.com/rs/xhandler"
18+
"github.com/rs/xlog"
19+
)
20+
21+
var (
22+
post = schema.Schema{
23+
Fields: schema.Fields{
24+
"id": schema.IDField,
25+
"created": schema.CreatedField,
26+
"updated": schema.UpdatedField,
27+
"title": {},
28+
"body": {},
29+
},
30+
}
31+
)
32+
33+
func main() {
34+
index := resource.NewIndex()
35+
36+
index.Bind("posts", post, restrix.Wrap("posts", mem.NewHandler()), resource.Conf{
37+
AllowedModes: resource.ReadWrite,
38+
})
39+
40+
// Create API HTTP handler for the resource graph
41+
api, err := rest.NewHandler(index)
42+
if err != nil {
43+
log.Fatalf("Invalid API configuration: %s", err)
44+
}
45+
46+
// Setup logger
47+
c := xhandler.Chain{}
48+
c.UseC(xlog.NewHandler(xlog.Config{}))
49+
c.UseC(xaccess.NewHandler())
50+
51+
// Bind the API under the root path
52+
http.Handle("/", c.Handler(api))
53+
54+
// Configure hystrix commands
55+
hystrix.Configure(map[string]hystrix.CommandConfig{
56+
"posts.MultiGet": {
57+
Timeout: 500,
58+
MaxConcurrentRequests: 200,
59+
ErrorPercentThreshold: 25,
60+
},
61+
"posts.Find": {
62+
Timeout: 1000,
63+
MaxConcurrentRequests: 100,
64+
ErrorPercentThreshold: 25,
65+
},
66+
"posts.Insert": {
67+
Timeout: 1000,
68+
MaxConcurrentRequests: 50,
69+
ErrorPercentThreshold: 25,
70+
},
71+
"posts.Update": {
72+
Timeout: 1000,
73+
MaxConcurrentRequests: 50,
74+
ErrorPercentThreshold: 25,
75+
},
76+
"posts.Delete": {
77+
Timeout: 1000,
78+
MaxConcurrentRequests: 10,
79+
ErrorPercentThreshold: 10,
80+
},
81+
"posts.Clear": {
82+
Timeout: 10000,
83+
MaxConcurrentRequests: 5,
84+
ErrorPercentThreshold: 10,
85+
},
86+
})
87+
88+
// Start the metrics stream handler
89+
hystrixStreamHandler := hystrix.NewStreamHandler()
90+
hystrixStreamHandler.Start()
91+
log.Print("Serving Hystrix metrics on http://localhost:8081")
92+
go http.ListenAndServe(net.JoinHostPort("", "8081"), hystrixStreamHandler)
93+
94+
// Inject some fixtures
95+
fixtures := [][]string{
96+
[]string{"POST", "/posts", `{"title": "First Post", "body": "This is my first post"}`},
97+
[]string{"POST", "/posts", `{"title": "Second Post", "body": "This is my second post"}`},
98+
[]string{"POST", "/posts", `{"title": "Third Post", "body": "This is my third post"}`},
99+
}
100+
for _, fixture := range fixtures {
101+
req, err := http.NewRequest(fixture[0], fixture[1], strings.NewReader(fixture[2]))
102+
if err != nil {
103+
log.Fatal(err)
104+
}
105+
w := httptest.NewRecorder()
106+
api.ServeHTTP(w, req)
107+
if w.Code >= 400 {
108+
log.Fatalf("Error returned for `%s %s`: %v", fixture[0], fixture[1], w)
109+
}
110+
}
111+
112+
// Serve it
113+
log.Print("Serving API on http://localhost:8080")
114+
if err := http.ListenAndServe(":8080", nil); err != nil {
115+
log.Fatal(err)
116+
}
117+
}

hystrix.go

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
// Package restrix is a REST Layer resource storage wrapper to add hystrix support
2+
// to the underlaying storage handler.
3+
package restrix
4+
5+
import (
6+
"fmt"
7+
8+
"github.com/afex/hystrix-go/hystrix"
9+
"github.com/rs/rest-layer/resource"
10+
"golang.org/x/net/context"
11+
)
12+
13+
type wrapper struct {
14+
resource.Storer
15+
getCmd string
16+
findCmd string
17+
insertCmd string
18+
updateCmd string
19+
deleteCmd string
20+
clearCmd string
21+
}
22+
23+
type mgetWrapper struct {
24+
wrapper
25+
multiGetCmd string
26+
}
27+
28+
// Wrap wraps a REST Layer storage handler to add hystrix support to all
29+
// handler's methods.
30+
//
31+
// Hystrix wraps each storage handlers into an hystrix command. One hystrix
32+
// command is created per backend actions with the format <name>.<Action>.
33+
//
34+
// Actions are Find, Insert, Update, Delete, Clear and MultiGet for handlers
35+
// implementing MultiGetter interface.
36+
//
37+
// You must configure hystrix for each command you want to control and start the
38+
// stream handler.
39+
// See https://godoc.org/github.com/afex/hystrix-go/hystrix for more info and
40+
// examples/hystrix/main.go for a usage example.
41+
func Wrap(name string, h resource.Storer) resource.Storer {
42+
w := wrapper{
43+
Storer: h,
44+
getCmd: fmt.Sprintf("%s.Get", name),
45+
findCmd: fmt.Sprintf("%s.Find", name),
46+
insertCmd: fmt.Sprintf("%s.Insert", name),
47+
updateCmd: fmt.Sprintf("%s.Update", name),
48+
deleteCmd: fmt.Sprintf("%s.Delete", name),
49+
clearCmd: fmt.Sprintf("%s.Clear", name),
50+
}
51+
if _, ok := h.(resource.MultiGetter); ok {
52+
return mgetWrapper{
53+
wrapper: w,
54+
multiGetCmd: fmt.Sprintf("%s.MultiGet", name),
55+
}
56+
}
57+
return w
58+
}
59+
60+
func (w wrapper) Insert(ctx context.Context, items []*resource.Item) error {
61+
return hystrix.Do(w.insertCmd, func() error {
62+
return w.Storer.Insert(ctx, items)
63+
}, nil)
64+
}
65+
66+
func (w wrapper) Update(ctx context.Context, item *resource.Item, original *resource.Item) error {
67+
return hystrix.Do(w.updateCmd, func() error {
68+
return w.Storer.Update(ctx, item, original)
69+
}, nil)
70+
}
71+
72+
func (w wrapper) Delete(ctx context.Context, item *resource.Item) error {
73+
return hystrix.Do(w.deleteCmd, func() error {
74+
return w.Storer.Delete(ctx, item)
75+
}, nil)
76+
}
77+
78+
func (w wrapper) Clear(ctx context.Context, lookup *resource.Lookup) (deleted int, err error) {
79+
out := make(chan int, 1)
80+
errs := hystrix.Go(w.clearCmd, func() error {
81+
deleted, err := w.Storer.Clear(ctx, lookup)
82+
if err == nil {
83+
out <- deleted
84+
}
85+
return err
86+
}, nil)
87+
select {
88+
case deleted = <-out:
89+
case err = <-errs:
90+
}
91+
return
92+
}
93+
94+
func (w wrapper) Find(ctx context.Context, lookup *resource.Lookup, page, perPage int) (list *resource.ItemList, err error) {
95+
out := make(chan *resource.ItemList, 1)
96+
errs := hystrix.Go(w.findCmd, func() error {
97+
list, err := w.Storer.Find(ctx, lookup, page, perPage)
98+
if err == nil {
99+
out <- list
100+
}
101+
return err
102+
}, nil)
103+
select {
104+
case list = <-out:
105+
case err = <-errs:
106+
}
107+
return
108+
}
109+
110+
func (w mgetWrapper) MultiGet(ctx context.Context, ids []interface{}) (items []*resource.Item, err error) {
111+
out := make(chan []*resource.Item, 1)
112+
errs := hystrix.Go(w.multiGetCmd, func() error {
113+
mg := w.wrapper.Storer.(resource.MultiGetter)
114+
items, err := mg.MultiGet(ctx, ids)
115+
if err == nil {
116+
out <- items
117+
}
118+
return err
119+
}, nil)
120+
select {
121+
case items = <-out:
122+
case err = <-errs:
123+
}
124+
return
125+
}

0 commit comments

Comments
 (0)