Skip to content

Commit cf1f784

Browse files
committed
Added series and waterfall [goroutine for sequential tasks?? needs a solid answer]
1 parent 5027bcc commit cf1f784

File tree

3 files changed

+108
-4
lines changed

3 files changed

+108
-4
lines changed

async.go

Lines changed: 70 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,14 @@
66
package async
77

88
type Task func() interface{}
9+
type STask func(interface{}) interface{}
910
type Channel chan interface{}
1011
type Result []interface{}
1112

13+
// Reference: https://blog.golang.org/pipelines
14+
// Reference: https://en.wikipedia.org/wiki/Moore%27s_law
15+
// https://en.wikipedia.org/wiki/Amdahl%27s_law
16+
1217
type Promise struct {
1318
//TODO:
1419
//Context - Cancel, Timeout
@@ -17,9 +22,12 @@ type Promise struct {
1722
//Can I use map for tasks? It will be easy to associate results with keys
1823
}
1924

20-
func (p *Promise) All(tasks []Task) Result {
21-
// can not use buffer channels, because not able to maintain sequence
22-
// using channel slice
25+
//
26+
// parallel execution of all tasks
27+
//
28+
func (p *Promise) Parallel(tasks []Task) Result {
29+
// can not use buffer channels, because not able to maintain output sequence
30+
// using slice channels
2331
workers := make([]Channel, 0, len(tasks))
2432
for _, task := range tasks {
2533
workers = append(workers, func() Channel {
@@ -37,5 +45,64 @@ func (p *Promise) All(tasks []Task) Result {
3745
for _, result := range workers {
3846
out = append(out, <-result)
3947
}
48+
4049
return out
4150
}
51+
52+
//
53+
// sequential execution of all tasks
54+
//
55+
func (p *Promise) Series(tasks []Task) (out interface{}, err error) {
56+
// unbuffered channel
57+
worker := make(Channel)
58+
defer close(worker)
59+
60+
for _, task := range tasks {
61+
// need to check the benefits of executing tasks on another go routine
62+
go func(task Task) {
63+
worker <- task()
64+
}(task)
65+
66+
out = <-worker
67+
68+
if err = isError(out); err != nil {
69+
out = nil
70+
break
71+
}
72+
}
73+
74+
return
75+
}
76+
77+
//
78+
// sequential execution of all tasks, but output of all tasks will be input for next task
79+
//
80+
func (p *Promise) Waterfall(tasks []STask) (out interface{}, err error) {
81+
// unbuffered channel
82+
worker := make(Channel)
83+
defer close(worker)
84+
85+
for _, task := range tasks {
86+
87+
go func(task STask) {
88+
worker <- task(out)
89+
}(task)
90+
91+
out = <-worker
92+
93+
if err = isError(out); err != nil {
94+
out = nil
95+
break
96+
}
97+
}
98+
99+
return
100+
}
101+
102+
func isError(val interface{}) (err error) {
103+
switch val.(type) {
104+
case error:
105+
err = val.(error)
106+
}
107+
return
108+
}

examples/main.go renamed to examples/parallel/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func main() {
2727
}
2828

2929
fmt.Println("From: ", time.Now())
30-
result := promise.All(tasks)
30+
result := promise.Parallel(tasks)
3131
fmt.Println("End: ", time.Now())
3232

3333
fmt.Println("Result: ", result)

examples/series/main.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"time"
6+
7+
"github.com/go-rs/async"
8+
)
9+
10+
func main() {
11+
fmt.Println("start")
12+
var promise async.Promise
13+
14+
tasks := []async.Task{
15+
func() interface{} {
16+
fmt.Println("Running Hello..........")
17+
time.Sleep(2 * time.Second)
18+
fmt.Println("Completed Hello..........")
19+
//return errors.New("innocent")
20+
return "Hello"
21+
},
22+
func() interface{} {
23+
fmt.Println("Running World..........")
24+
time.Sleep(1 * time.Second)
25+
fmt.Println("Completed World..........")
26+
return "World"
27+
},
28+
}
29+
30+
fmt.Println("From: ", time.Now())
31+
result, err := promise.Series(tasks)
32+
fmt.Println("End: ", time.Now())
33+
34+
fmt.Println("Result: ", result, err)
35+
36+
fmt.Println("exit")
37+
}

0 commit comments

Comments
 (0)