Skip to content

Commit caf331a

Browse files
committed
GoCollaborate ver:0.3.2
1 parent 79f699c commit caf331a

File tree

7 files changed

+133
-56
lines changed

7 files changed

+133
-56
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
### 0.3.x
55
#### 0.3.2
66
- Add versioning for coordinator
7+
- Support more load-balancing algorithms
8+
- Optimize error responses
79
#### 0.3.1
810
- Add "methods" field to service response
911
- Change rate limiting response header: 422 -> 429 Too Many Requests

artifacts/card/card.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,13 @@ type Card struct {
1414
Seed bool `json:"seed,omitempty"`
1515
}
1616

17+
func (c *Card) IsInitialized() bool {
18+
if len(c.IP) > 0 {
19+
return true
20+
}
21+
return false
22+
}
23+
1724
func (c *Card) GetFullIP() string {
1825
return c.IP + ":" + strconv.Itoa(c.Port)
1926
}

artifacts/restful/error.go

Lines changed: 55 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,73 @@
11
package restful
22

33
type ErrorPayload struct {
4-
JSONAPI string `json:"jsonapi, omitempty"`
5-
Errors []Error `json:"errors"`
4+
Errors []Error `json:"errors"`
65
}
76

87
type Error struct {
9-
Code string `json:"code"`
10-
Source ErrorSource `json:"source"`
11-
Title string `json:"title"`
12-
Detail string `json:"detail"`
8+
Status int `json:"status"`
9+
Title string `json:"title"`
10+
Detail string `json:"detail"`
11+
Source string `json:"source"`
1312
}
1413

15-
type ErrorSource struct {
16-
Pointer string `json:"pointer"`
14+
// create error types
15+
func Error401Unauthorized() ErrorPayload {
16+
return ErrorPayload{[]Error{
17+
Error{401, "Unauthorized", "Although the HTTP standard specifies 'unauthorized', semantically this response means 'unauthenticated'.", ""}},
18+
}
1719
}
1820

19-
// create specific Error Types Below
20-
func Error415UnsupportedMediaType() ErrorPayload {
21-
return ErrorPayload{"add_your_custom_api_route_here", []Error{Error{"403", ErrorSource{"add_your_custom_error_source_here"}, "add_your_custom_error_title_here", "add_your_custom_error_detail_here"}}}
21+
func Error403Forbidden() ErrorPayload {
22+
return ErrorPayload{[]Error{
23+
Error{403, "Forbidden", "The client does not have access rights to the content.", ""}},
24+
}
25+
}
26+
27+
func Error404NotFound() ErrorPayload {
28+
return ErrorPayload{[]Error{
29+
Error{404, "Not Found", "The server can not find requested resource.", ""}},
30+
}
31+
}
32+
33+
func Error405MethodNotAllowed() ErrorPayload {
34+
return ErrorPayload{[]Error{
35+
Error{404, "Method Not Allowed", "The request method is known by the server but has been disabled and cannot be used.", ""}},
36+
}
37+
}
38+
39+
func Error408RequestTimeout() ErrorPayload {
40+
return ErrorPayload{[]Error{
41+
Error{408, "Method Not Allowed", "This response is sent on an idle connection by some servers, even without any previous request by the client.", ""}},
42+
}
2243
}
2344

2445
func Error409Conflict() ErrorPayload {
25-
return ErrorPayload{"add_your_custom_api_route_here", []Error{Error{"409", ErrorSource{"add_your_custom_error_source_here"}, "add_your_custom_error_title_here", "add_your_custom_error_detail_here"}}}
46+
return ErrorPayload{[]Error{
47+
Error{409, "Conflict", "This response is sent when a request conflicts with the current state of the server.", ""}},
48+
}
2649
}
2750

28-
func Error404NotFound() ErrorPayload {
29-
return ErrorPayload{"add_your_custom_api_route_here", []Error{Error{"404", ErrorSource{"add_your_custom_error_source_here"}, "add_your_custom_error_title_here", "add_your_custom_error_detail_here"}}}
51+
func Error415UnsupportedMediaType() ErrorPayload {
52+
return ErrorPayload{[]Error{
53+
Error{415, "Bad Request", "This response means that server could not understand the request due to invalid syntax.", ""}},
54+
}
3055
}
3156

32-
func Error403Forbidden() ErrorPayload {
33-
return ErrorPayload{"add_your_custom_api_route_here", []Error{Error{"403", ErrorSource{"add_your_custom_error_source_here"}, "add_your_custom_error_title_here", "add_your_custom_error_detail_here"}}}
57+
func Error500InternalServerError() ErrorPayload {
58+
return ErrorPayload{[]Error{
59+
Error{500, "Internal Server Error", "The server has encountered a situation it doesn't know how to handle.", ""}},
60+
}
61+
}
62+
63+
func Error502BadGateway() ErrorPayload {
64+
return ErrorPayload{[]Error{
65+
Error{502, "Bad Gateway", "This error response means that the server, while working as a gateway to get a response needed to handle the request, got an invalid response.", ""}},
66+
}
67+
}
68+
69+
func Error503ServiceUnavailable() ErrorPayload {
70+
return ErrorPayload{[]Error{
71+
Error{503, "Service Unavailable", "The server is not ready to handle the request.", ""}},
72+
}
3473
}

artifacts/service/service.go

Lines changed: 65 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"github.com/GoCollaborate/src/artifacts/card"
55
"github.com/GoCollaborate/src/artifacts/parameter"
66
"github.com/GoCollaborate/src/constants"
7+
"hash/fnv"
78
"math/rand"
89
"sync"
910
"time"
@@ -16,45 +17,52 @@ type Service struct {
1617
Parameters []parameter.Parameter `json:"parameters"`
1718
RegList []card.Card `json:"registers"`
1819
// a map of last heartbeat timestamps, eact key corresponds to one particular card full endpoint
19-
Heartbeats map[string]int64 `json:"heartbeats,omitempty"`
20-
SbscrbList []string `json:"subscribers"` // subscriber tokens
21-
Dependencies []string `json:"dependencies"` // dependent ServiceIDs
22-
Mode Mode `json:"mode,omitempty"`
23-
LoadBalanceMode Mode `json:"load_balance_mode,omitemtpy"`
24-
Version string `json:"version"`
25-
PlatformVersion string `json:"platform_version"`
26-
LastAssignedTo card.Card `json:"last_assigned_to,omitempty"`
27-
LastAssignedTime int64 `json:"last_assigned_time,omitempty"`
28-
serviceLock sync.Mutex `json:"-"`
20+
Heartbeats map[string]int64 `json:"heartbeats,omitempty"`
21+
SbscrbList []string `json:"subscribers"` // subscriber tokens
22+
Dependencies []string `json:"dependencies"` // dependent ServiceIDs
23+
Mode Mode `json:"mode,omitempty"`
24+
LoadBalanceMode Mode `json:"load_balance_mode,omitemtpy"`
25+
Version string `json:"version"`
26+
PlatformVersion string `json:"platform_version"`
27+
LastAssignedTo card.Card `json:"last_assigned_to,omitempty"`
28+
LastAssignedTime int64 `json:"last_assigned_time,omitempty"`
29+
LastAssignedIndex uint32 `json:"-"`
30+
serviceLock sync.Mutex `json:"-"`
2931
}
3032

3133
func NewService() *Service {
3234
return &Service{
33-
Description: "",
34-
Methods: []string{},
35-
Parameters: []parameter.Parameter{},
36-
RegList: []card.Card{},
37-
Heartbeats: map[string]int64{},
38-
SbscrbList: []string{},
39-
Dependencies: []string{},
40-
Version: "1.0",
41-
PlatformVersion: "golang1.8.1",
42-
serviceLock: sync.Mutex{},
35+
Description: "",
36+
Methods: []string{},
37+
Parameters: []parameter.Parameter{},
38+
RegList: []card.Card{},
39+
Heartbeats: map[string]int64{},
40+
SbscrbList: []string{},
41+
Dependencies: []string{},
42+
Mode: ClbtModeNormal,
43+
LoadBalanceMode: LBModeRandom,
44+
Version: "1.0",
45+
PlatformVersion: "golang1.8.1",
46+
LastAssignedTime: int64(0),
47+
serviceLock: sync.Mutex{},
4348
}
4449
}
4550

4651
func NewServiceFrom(from *Service) *Service {
4752
return &Service{
48-
Description: from.Description,
49-
Methods: from.Methods,
50-
Parameters: from.Parameters,
51-
RegList: from.RegList,
52-
Heartbeats: map[string]int64{},
53-
SbscrbList: from.SbscrbList,
54-
Dependencies: from.Dependencies,
55-
Version: from.Version,
56-
PlatformVersion: from.PlatformVersion,
57-
serviceLock: sync.Mutex{},
53+
Description: from.Description,
54+
Methods: from.Methods,
55+
Parameters: from.Parameters,
56+
RegList: from.RegList,
57+
Heartbeats: map[string]int64{},
58+
SbscrbList: from.SbscrbList,
59+
Dependencies: from.Dependencies,
60+
Mode: from.Mode,
61+
LoadBalanceMode: from.LoadBalanceMode,
62+
Version: from.Version,
63+
PlatformVersion: from.PlatformVersion,
64+
LastAssignedTime: int64(0),
65+
serviceLock: sync.Mutex{},
5866
}
5967
}
6068

@@ -143,19 +151,41 @@ func (s *Service) UnSubscribe(token string) error {
143151
}
144152

145153
func (s *Service) Query(token string) (*card.Card, error) {
154+
if len(s.RegList) < 1 {
155+
return nil, constants.ErrNoRegister
156+
}
157+
146158
for _, x := range s.SbscrbList {
147159
if x == token {
148-
return s.loadBalanceRoll(), nil
160+
return s.loadBalanceRoll(token)
149161
}
150162
}
151-
return nil, constants.ErrNoRegister
163+
164+
return nil, constants.ErrNoSubscriber
152165
}
153166

154-
func (s *Service) loadBalanceRoll() *card.Card {
167+
func (s *Service) loadBalanceRoll(token string) (*card.Card, error) {
168+
var last_assigned_to card.Card
169+
var last_assigned_index = s.LastAssignedIndex
170+
var length = len(s.RegList)
171+
155172
switch s.LoadBalanceMode {
156-
default:
157-
return &s.RegList[rand.Intn(len(s.RegList))]
173+
case LBModeRoundRobin:
174+
last_assigned_index = (last_assigned_index + 1) % uint32(length)
175+
case LBModeTokenHash:
176+
h := fnv.New32a()
177+
h.Write([]byte(token))
178+
last_assigned_index = h.Sum32() % uint32(length)
158179
}
180+
181+
if !s.LastAssignedTo.IsInitialized() {
182+
last_assigned_index = rand.Uint32() % uint32(length)
183+
}
184+
185+
last_assigned_to = s.RegList[last_assigned_index]
186+
s.LastAssignedTo = last_assigned_to
187+
s.LastAssignedIndex = last_assigned_index
188+
return &last_assigned_to, nil
159189
}
160190

161191
func (s *Service) UnSubscribeAll() error {

artifacts/service/serviceMode.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ const (
88
LBModeRandom // assign tasks as per weighted probability
99
LBModeLeastActive // assign tasks to active responders
1010
LBModeRoundRobin // assign tasks sequentially based on the order of collaborator
11-
LBModeIPHash // assign tasks based on the hash value of subscriber IP
11+
LBModeTokenHash // assign tasks based on the hash value of subscriber token
1212
)
1313

1414
type Mode int

coordinator/coordinator.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,6 @@ func (cdnt *Coordinator) CreateService(w http.ResponseWriter, bytes []byte) erro
114114
id := utils.RandStringBytesMaskImprSrc(ServiceLength)
115115
svr := service.NewServiceFrom(&dat.Attributes)
116116
svr.ServiceID = id
117-
svr.LastAssignedTime = int64(0)
118117
cdnt.Services[id] = svr
119118
dat.ID = id
120119
dat.Attributes = *svr

wrappers/serviceHelper/serviceHelper.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,19 @@ import (
1212
"net/http"
1313
)
1414

15-
func UnmarshalMode(original interface{}) service.Mode {
15+
func ModeInterpret(original interface{}) service.Mode {
1616
if original == nil {
1717
return service.ClbtModeNormal
1818
}
1919
var m service.Mode
2020
omode := original.(string)
2121
switch omode {
22-
case "LBModeIPHash":
23-
m = service.LBModeIPHash
2422
case "ClbtModeOnlyRegister":
2523
m = service.ClbtModeOnlyRegister
2624
case "ClbtModeOnlySubscribe":
2725
m = service.ClbtModeOnlySubscribe
26+
case "LBModeTokenHash":
27+
m = service.LBModeTokenHash
2828
case "LBModeRandom":
2929
m = service.LBModeRandom
3030
case "LBModeLeastActive":

0 commit comments

Comments
 (0)