Skip to content

Commit 3e08427

Browse files
committed
GoCollaborate ver:0.4.2
1 parent ac757c0 commit 3e08427

File tree

7 files changed

+102
-42
lines changed

7 files changed

+102
-42
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22
## Updates
33
**(Please note that no downward compability will be guaranteed before the formal release 1.0.0 )**
44
### 0.4.x
5+
#### 0.4.2
6+
- Update handler API -> Add `Background` as Job loader
7+
- Support asynchronously loading source data to Job
8+
- Update examples and documentation
59
#### 0.4.1
610
- Refactor handler code
711
- Clean up code, remove unnecessary method body

Readme.md

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -106,17 +106,27 @@ import (
106106
"github.com/GoCollaborate/src/artifacts/task"
107107
"github.com/GoCollaborate/src/wrappers/taskHelper"
108108
"net/http"
109+
"time"
109110
)
110111

111-
func ExampleJobHandler(w http.ResponseWriter, r *http.Request) *task.Job {
112+
func ExampleJobHandler(w http.ResponseWriter, r *http.Request, bg *task.Background) {
112113
job := task.MakeJob()
113-
job.Tasks(&task.Task{task.SHORT,
114-
task.BASE, "exampleFunc",
115-
task.Collection{1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4},
116-
task.Collection{0},
117-
task.NewTaskContext(struct{}{}), 0})
114+
115+
job.Tasks(
116+
&task.Task{
117+
task.SHORT,
118+
task.BASE,
119+
"exampleFunc",
120+
task.Collection{1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4},
121+
task.Collection{0},
122+
task.NewTaskContext(struct{}{}),
123+
0,
124+
},
125+
)
126+
118127
job.Stacks("core.ExampleTask.Mapper", "core.ExampleTask.Reducer")
119-
return job
128+
129+
bg.Mount(job)
120130
}
121131

122132
func ExampleFunc(source *task.Collection,
@@ -154,7 +164,6 @@ func (r *SimpleReducer) Reduce(maps map[int]*task.Task) (map[int]*task.Task, err
154164
return maps, nil
155165
}
156166

157-
158167
```
159168
### Run
160169
Here we create the entry file and a simple implementation of map-reduce interface, and next we will run with std arguments:

artifacts/task/background.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package task
2+
3+
type Background chan *Job
4+
5+
func NewBackground() *Background {
6+
bg := make(Background)
7+
return &bg
8+
}
9+
10+
func (bg *Background) Done() *Job {
11+
return <-*bg
12+
}
13+
14+
func (bg *Background) Mount(job *Job) {
15+
go func() {
16+
*bg <- job
17+
}()
18+
}
19+
20+
func (bg *Background) Close() {
21+
close(*bg)
22+
}

artifacts/task/task.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@ func (t *taskPriority) GetPriority() taskPriority {
5555
return *t
5656
}
5757

58+
func NewCollection() *Collection {
59+
return &Collection{}
60+
}
61+
5862
func (cg *Collection) Append(cs ...interface{}) *Collection {
5963
*cg = append(*cg, cs...)
6064
return cg

collaborate.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,9 @@ func Set(key string, val ...interface{}) interface{} {
5454
fs := store.GetInstance()
5555

5656
methods := val[0].([]string)
57-
handlers := make([]func(w http.ResponseWriter, r *http.Request) *task.Job, len(val)-1)
57+
handlers := make([]func(w http.ResponseWriter, r *http.Request, bg *task.Background), len(val)-1)
5858
for i, v := range val[1:] {
59-
handlers[i] = v.(func(w http.ResponseWriter, r *http.Request) *task.Job)
59+
handlers[i] = v.(func(w http.ResponseWriter, r *http.Request, bg *task.Background))
6060
}
6161

6262
// register jobs
@@ -65,9 +65,9 @@ func Set(key string, val ...interface{}) interface{} {
6565
fs := store.GetInstance()
6666

6767
methods := val[0].([]string)
68-
handlers := make([]func(w http.ResponseWriter, r *http.Request) *task.Job, len(val)-1)
68+
handlers := make([]func(w http.ResponseWriter, r *http.Request, bg *task.Background), len(val)-1)
6969
for i, v := range val[1:] {
70-
handlers[i] = v.(func(w http.ResponseWriter, r *http.Request) *task.Job)
70+
handlers[i] = v.(func(w http.ResponseWriter, r *http.Request, bg *task.Background))
7171
}
7272

7373
// register jobs

collaborator/collaborator.go

Lines changed: 48 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,14 @@ func (clbt *Collaborator) Catchup() {
150150

151151
from := c.Local.GetFullExposureCard()
152152
to := e
153-
var in *message.CardMessage = message.NewCardMessageWithOptions(c.GetCluster(), &from, to, dgst.GetCards(), dgst.GetTimeStamp(), message.CardMessage_SYNC)
153+
var in *message.CardMessage = message.NewCardMessageWithOptions(
154+
c.GetCluster(),
155+
&from,
156+
to,
157+
dgst.GetCards(),
158+
dgst.GetTimeStamp(),
159+
message.CardMessage_SYNC,
160+
)
154161
var out *message.CardMessage = message.NewCardMessage()
155162
var out2 *message.CardMessage = message.NewCardMessage()
156163
// first exchange
@@ -385,23 +392,30 @@ func (clbt *Collaborator) HandleLocal(router *mux.Router, jobFunc *store.JobFunc
385392

386393
var (
387394
f = func(w http.ResponseWriter, r *http.Request) {
388-
job := jobFunc.F(w, r)
389395
var (
396+
bg = task.NewBackground()
390397
counter = 0
391398
)
392-
for s := job.Front(); s != nil; s = s.Next() {
393-
exes, err := job.Exes(counter)
394-
if err != nil {
395-
logger.GetLoggerInstance().LogError(err)
396-
break
397-
}
398-
err = clbt.LocalDistribute(&s.TaskSet, exes)
399-
if err != nil {
400-
logger.GetLoggerInstance().LogError(err)
401-
break
399+
400+
jobFunc.F(w, r, bg)
401+
402+
go func() {
403+
job := bg.Done()
404+
defer bg.Close()
405+
for s := job.Front(); s != nil; s = s.Next() {
406+
exes, err := job.Exes(counter)
407+
if err != nil {
408+
logger.GetLoggerInstance().LogError(err)
409+
break
410+
}
411+
err = clbt.LocalDistribute(&s.TaskSet, exes)
412+
if err != nil {
413+
logger.GetLoggerInstance().LogError(err)
414+
break
415+
}
416+
counter++
402417
}
403-
counter++
404-
}
418+
}()
405419
}
406420
fs = store.GetInstance()
407421
)
@@ -423,24 +437,31 @@ func (clbt *Collaborator) HandleShared(router *mux.Router, jobFunc *store.JobFun
423437

424438
var (
425439
f = func(w http.ResponseWriter, r *http.Request) {
426-
job := jobFunc.F(w, r)
427440
var (
441+
bg = task.NewBackground()
428442
counter = 0
429443
)
430-
for s := job.Front(); s != nil; s = s.Next() {
431-
exes, err := job.Exes(counter)
432-
if err != nil {
433-
logger.GetLoggerInstance().LogError(err)
434-
break
435-
}
436444

437-
err = clbt.SharedDistribute(&s.TaskSet, exes)
438-
if err != nil {
439-
logger.GetLoggerInstance().LogError(err)
440-
break
445+
jobFunc.F(w, r, bg)
446+
447+
go func() {
448+
job := bg.Done()
449+
defer bg.Close()
450+
for s := job.Front(); s != nil; s = s.Next() {
451+
exes, err := job.Exes(counter)
452+
if err != nil {
453+
logger.GetLoggerInstance().LogError(err)
454+
break
455+
}
456+
457+
err = clbt.SharedDistribute(&s.TaskSet, exes)
458+
if err != nil {
459+
logger.GetLoggerInstance().LogError(err)
460+
break
461+
}
462+
counter++
441463
}
442-
counter++
443-
}
464+
}()
444465
}
445466
fs = store.GetInstance()
446467
)

store/store.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ type FS struct {
7575
}
7676

7777
type JobFunc struct {
78-
F func(w http.ResponseWriter, r *http.Request) *task.Job
78+
F func(w http.ResponseWriter, r *http.Request, bg *task.Background)
7979
Methods []string
8080
Signature string
8181
}
@@ -188,14 +188,14 @@ func (fs *FS) GetShared(key string) (*JobFunc, error) {
188188
return new(JobFunc), constants.ErrJobNotExist
189189
}
190190

191-
func (fs *FS) AddLocal(methods []string, jobs ...func(w http.ResponseWriter, r *http.Request) *task.Job) {
191+
func (fs *FS) AddLocal(methods []string, jobs ...func(w http.ResponseWriter, r *http.Request, bg *task.Background)) {
192192
for _, f := range jobs {
193193
signature := utils.StripRouteToAPIRoute(utils.ReflectFuncName(f))
194194
fs.LocalJobs[signature] = &JobFunc{f, methods, signature}
195195
}
196196
}
197197

198-
func (fs *FS) AddShared(methods []string, jobs ...func(w http.ResponseWriter, r *http.Request) *task.Job) {
198+
func (fs *FS) AddShared(methods []string, jobs ...func(w http.ResponseWriter, r *http.Request, bg *task.Background)) {
199199
for _, f := range jobs {
200200
signature := utils.StripRouteToAPIRoute(utils.ReflectFuncName(f))
201201
fs.SharedJobs[signature] = &JobFunc{f, methods, signature}

0 commit comments

Comments
 (0)