Skip to content

Commit f93b208

Browse files
committed
Update coordinator, now get connected to collaborator, refactor HTTP status code
1 parent 7ebfd8b commit f93b208

File tree

14 files changed

+361
-102
lines changed

14 files changed

+361
-102
lines changed

artifacts/service/queryResource.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55
)
66

77
type QueryResource struct {
8-
Resource *restful.Resource
8+
*restful.Resource
99
Attributes Query `json:"attributes"`
1010
}
1111

artifacts/service/registryResource.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55
)
66

77
type RegistryResource struct {
8-
Resource *restful.Resource
8+
*restful.Resource
99
Attributes Registry `json:"attributes"`
1010
}
1111

artifacts/service/service.go

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,11 @@ import (
55
"github.com/GoCollaborate/artifacts/parameter"
66
"github.com/GoCollaborate/constants"
77
"math/rand"
8+
"time"
89
)
910

1011
type Service struct {
11-
ServiceID string `json:"serviceid"`
12+
ServiceID string `json:"serviceid,omitempty"`
1213
Description string `json:"description"`
1314
Parameters []parameter.Parameter `json:"parameters"`
1415
RegList []card.Card `json:"registers"`
@@ -24,6 +25,19 @@ type Service struct {
2425
LastAssignedTime int64 `json:"last_assigned_time,omitempty"`
2526
}
2627

28+
func NewService() *Service {
29+
return &Service{
30+
Description: "",
31+
Parameters: []parameter.Parameter{},
32+
RegList: []card.Card{},
33+
Heartbeats: map[string]int64{},
34+
SbscrbList: []string{},
35+
Dependencies: []string{},
36+
Version: "1.0",
37+
PlatformVersion: "golang1.8.1",
38+
}
39+
}
40+
2741
func (s *Service) SetMode(m *Mode) Mode {
2842
s.Mode = *m
2943
return *m
@@ -119,3 +133,13 @@ func (s *Service) UnSubscribeAll() error {
119133
s.SbscrbList = y
120134
return nil
121135
}
136+
137+
func (s *Service) Heartbeat(agt *card.Card) {
138+
y := s.RegList
139+
for _, x := range y {
140+
if agt.IsEqualTo(&x) {
141+
s.Heartbeats[x.GetFullEndPoint()] = time.Now().Unix()
142+
return
143+
}
144+
}
145+
}

artifacts/service/serviceResource.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55
)
66

77
type ServiceResource struct {
8-
Resource *restful.Resource
8+
*restful.Resource
99
Attributes Service `json:"attributes"`
1010
}
1111

artifacts/service/subscriptionResource.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55
)
66

77
type SubscriptionResource struct {
8-
Resource *restful.Resource
8+
*restful.Resource
99
Attributes Subscription `json:"attributes"`
1010
}
1111

collaborator/collaborator.go

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/GoCollaborate/artifacts/iremote"
88
"github.com/GoCollaborate/artifacts/iworkable"
99
"github.com/GoCollaborate/artifacts/message"
10+
"github.com/GoCollaborate/artifacts/service"
1011
"github.com/GoCollaborate/artifacts/stats"
1112
"github.com/GoCollaborate/artifacts/task"
1213
"github.com/GoCollaborate/cmd"
@@ -17,6 +18,7 @@ import (
1718
"github.com/GoCollaborate/web"
1819
"github.com/GoCollaborate/wrappers/cardHelper"
1920
"github.com/GoCollaborate/wrappers/messageHelper"
21+
"github.com/GoCollaborate/wrappers/serviceHelper"
2022
"github.com/gorilla/mux"
2123
"net"
2224
"net/http"
@@ -93,13 +95,23 @@ func (clbt *Collaborator) Join(wk iworkable.Workable) {
9395
go func() {
9496
for {
9597
clbt.Catchup()
96-
<-time.After(constants.DefaultSynInterval)
98+
<-time.After(constants.DefaultSyncInterval)
9799
// clean collaborators that are no longer alive
98100
clbt.Clean()
99101
// dump data to local store
100102
clbt.CardCase.writeStream()
101103
}
102104
}()
105+
106+
go func() {
107+
// service registry
108+
clbt.RegisterService()
109+
// todo: service mornitoring
110+
// for {
111+
// <-time.After(constants.DefaultHeartbeatInterval)
112+
// clbt.HeartBeat()
113+
// }
114+
}()
103115
}
104116

105117
// Catchup with peer servers.
@@ -170,6 +182,69 @@ func (clbt *Collaborator) Clean() {
170182
cardHelper.RangePrint(cards)
171183
}
172184

185+
func (clbt *Collaborator) RegisterService() {
186+
var (
187+
cdntIP = clbt.CardCase.Reserved.Coordinator.GetFullIP()
188+
clbtIP = clbt.CardCase.Local.GetFullIP()
189+
)
190+
191+
resp, err := http.Get("http://" + cdntIP + "/cluster/" + clbt.CardCase.CaseID + "/services")
192+
if err != nil {
193+
return
194+
}
195+
196+
// if cluster already get created, return to call general service registry api on coordinator
197+
if resp.StatusCode >= 200 && resp.StatusCode <= 299 {
198+
return
199+
}
200+
201+
var (
202+
router = store.GetRouter()
203+
services = map[string]*service.Service{}
204+
)
205+
206+
router.Walk(func(route *mux.Route, router *mux.Router, ancestors []*mux.Route) error {
207+
t, err := route.GetPathTemplate()
208+
if err != nil {
209+
return err
210+
}
211+
svr := service.NewService()
212+
svr.Description = route.GetName()
213+
svr.RegList = append(svr.RegList, card.Card{
214+
IP: clbt.CardCase.Local.IP,
215+
Port: clbt.CardCase.Local.Port,
216+
API: t,
217+
Alive: true,
218+
})
219+
services[clbtIP+"/"+t] = svr
220+
return nil
221+
})
222+
223+
reader, err := serviceHelper.MarshalServiceResourceToByteStreamReader(services)
224+
if err != nil {
225+
panic(constants.ErrCoordinatorNotFound)
226+
return
227+
}
228+
229+
// walk through services and get them created on the coordinator
230+
resp2, err := http.Post("http://"+cdntIP+"/services", "application/json", reader)
231+
if err != nil {
232+
panic(err)
233+
}
234+
235+
_, err = http.Post("http://"+cdntIP+"/cluster/"+clbt.CardCase.CaseID+"/services", "application/json", resp2.Body)
236+
if err != nil {
237+
panic(err)
238+
}
239+
return
240+
}
241+
242+
func (clbt *Collaborator) HeartBeat() {
243+
// cdntIP := clbt.CardCase.Reserved.Coordinator.GetFullIP()
244+
// // todo
245+
// http.Get(cdntIP + "")
246+
}
247+
173248
// Start handling server routes.
174249
func (clbt *Collaborator) Handle(router *mux.Router) *mux.Router {
175250

constants/constants.go

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ const (
4343
DefaultHashLength = 12
4444
)
4545

46+
// time consts
4647
var (
4748
DefaultReadTimeout = 15 * time.Second
4849
DefaultPeriodShort = 500 * time.Millisecond
@@ -54,7 +55,8 @@ var (
5455
DefaultTaskExpireTime = 30 * time.Second
5556
DefaultGCInterval = 30 * time.Second
5657
DefaultMaxMappingTime = 600 * time.Second
57-
DefaultSynInterval = 3 * time.Minute
58+
DefaultSyncInterval = 3 * time.Minute
59+
DefaultHeartbeatInterval = 1 * time.Minute
5860
DefaultJobRequestRefillInterval = 1 * time.Millisecond
5961
DefaultStatFlushInterval = 20 * time.Millisecond
6062
DefaultStatAbstractInterval = 3 * time.Second
@@ -130,27 +132,32 @@ var (
130132
ErrMessageChannelDirty = errors.New("GoCollaborate: message channel has unconsumed message error")
131133
ErrTaskChannelDirty = errors.New("GoCollaborate: task channel has unconsumed task error")
132134
ErrStatTypeNotFound = errors.New("GoCollaborate: stat type not found error")
135+
ErrCoordinatorNotFound = errors.New("GoCollaborate: coordinator not found error")
133136
)
134137

135138
type Header struct {
136139
Key string `json:"key"`
137140
Value string `json:"value"`
138141
}
139142

143+
// HTTP Status
144+
// var (
145+
// Header200OK = Header{"200", "OK"}
146+
// Header201Created = Header{"201", "Created"}
147+
// Header202Accepted = Header{"202", "Accepted"}
148+
// Header204NoContent = Header{"204", "NoContent"}
149+
// Header403Forbidden = Header{"403", "Forbidden"}
150+
// Header404NotFound = Header{"404", "NotFound"}
151+
// Header409Conflict = Header{"409", "Conflict"}
152+
// Header415UnsupportedMediaType = Header{"415", "UnsupportedMediaType"}
153+
// Header422ExceedLimit = Header{"422", "ExceedLimit"}
154+
// )
155+
140156
// HTTP headers
141157
var (
142-
Header200OK = Header{"200", "OK"}
143-
Header201Created = Header{"201", "Created"}
144-
Header202Accepted = Header{"202", "Accepted"}
145-
Header204NoContent = Header{"204", "NoContent"}
146-
Header403Forbidden = Header{"403", "Forbidden"}
147-
Header404NotFound = Header{"404", "NotFound"}
148-
Header409Conflict = Header{"409", "Conflict"}
149-
Header415UnsupportedMediaType = Header{"415", "UnsupportedMediaType"}
150-
Header422ExceedLimit = Header{"422", "ExceedLimit"}
151-
HeaderContentTypeJSON = Header{"Content-Type", "application/json"}
152-
HeaderContentTypeText = Header{"Content-Type", "text/html"}
153-
HeaderCORSEnableAllOrigin = Header{"Access-Control-Allow-Origin", "*"}
158+
HeaderContentTypeJSON = Header{"Content-Type", "application/json"}
159+
HeaderContentTypeText = Header{"Content-Type", "text/html"}
160+
HeaderCORSEnableAllOrigin = Header{"Access-Control-Allow-Origin", "*"}
154161
)
155162

156163
// Gossip Protocol headers

0 commit comments

Comments
 (0)