Skip to content

Commit

Permalink
Merge pull request trustmaster#14 from davidkbainbridge/master
Browse files Browse the repository at this point in the history
adds the ability for a component to have multiple clients on an out channel
  • Loading branch information
trustmaster committed Sep 30, 2013
2 parents e219a6c + c3b6dc5 commit 9c3832c
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 14 deletions.
11 changes: 9 additions & 2 deletions component.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,15 @@ func closePorts(c interface{}) {
fv := v.Field(i)
ft := fv.Type()
// Detect and close send-only channels
if fv.IsValid() && fv.Kind() == reflect.Chan && (ft.ChanDir()&reflect.SendDir) != 0 && (ft.ChanDir()&reflect.RecvDir) == 0 {
fv.Close()
if fv.IsValid() {
if fv.Kind() == reflect.Chan && (ft.ChanDir()&reflect.SendDir) != 0 && (ft.ChanDir()&reflect.RecvDir) == 0 {
fv.Close()
} else if fv.Kind() == reflect.Slice && ft.Elem().Kind() == reflect.Chan {
ll := fv.Len()
for i := 0; i < ll; i += 1 {
fv.Index(i).Close()
}
}
}
}
}
Expand Down
35 changes: 23 additions & 12 deletions network.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,20 +146,31 @@ func (n *Graph) ConnectBuf(senderName, senderPort, receiverName, receiverPort st

// Validate sender port
stport := sport.Type()
if stport.Kind() != reflect.Chan || stport.ChanDir()&reflect.SendDir == 0 {
panic(senderName + "." + senderPort + " is not a valid output channel")
return false
}
var channel reflect.Value
if stport.Kind() == reflect.Slice {

// Make a channel of an appropriate type
chanType := reflect.ChanOf(reflect.BothDir, stport.Elem())
channel := reflect.MakeChan(chanType, bufferSize)
if sport.Type().Elem().Kind() == reflect.Chan && sport.Type().Elem().ChanDir()&reflect.SendDir != 0 {

// Set the channel
if sport.CanSet() {
sport.Set(channel)
} else {
panic(senderName + "." + senderPort + " is not settable")
// Need to create a new channel and add it to the array
chanType := reflect.ChanOf(reflect.BothDir, sport.Type().Elem().Elem())
channel = reflect.MakeChan(chanType, bufferSize)
sport.Set(reflect.Append(sport, channel))
}
} else if stport.Kind() == reflect.Chan && stport.ChanDir()&reflect.SendDir != 0 {
// Make a channel of an appropriate type
chanType := reflect.ChanOf(reflect.BothDir, stport.Elem())
channel = reflect.MakeChan(chanType, bufferSize)
// Set the channel
if sport.CanSet() {
sport.Set(channel)
} else {
panic(senderName + "." + senderPort + " is not settable")
}
}

if channel.IsNil() {
panic(senderName + "." + senderPort + " is not a valid output channel")
return false
}

// Get the reciever port
Expand Down
84 changes: 84 additions & 0 deletions network_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package flow

import (
"sync"
"testing"
)

Expand Down Expand Up @@ -153,3 +154,86 @@ func TestComposite(t *testing.T) {

close(in)
}

type rr struct {
In <-chan int
Out []chan<- int

StateLock *sync.Mutex

Component
idx int
}

func (r *rr) OnIn(i int) {
pick := r.idx
r.idx = (r.idx + 1) % len(r.Out)

r.Out[pick] <- i
}

/*
* Creates a simple network with a load balancer that round robins to its out
* channels. Then sends to messages in and expects a response, 1 from each
* of the out channels.
*/
func TestMultiOutChannel(t *testing.T) {
n := new(compositeTest)
n.InitGraphState()

r := new(rr)
if !n.Add(r, "lb") {
t.Error("Unable to add load balancer")
}

e1 := new(echoer)
if !n.Add(e1, "e1") {
t.Error("Unable to add second echoer, e1")
}

e2 := new(echoer)
if !n.Add(e2, "e2") {
t.Error("Unable to add second echoer, e2")
}

if !n.Connect("lb", "Out", "e1", "In") {
t.Error("Unable to connect LB to e1")
}

if !n.Connect("lb", "Out", "e2", "In") {
t.Error("Unable to connect LB to e2")
}

if !n.MapInPort("In", "lb", "In") {
t.Error("Unable to map InPort")
}
if !n.MapOutPort("Out1", "e1", "Out") {
t.Error("Unable to mape OutPort 1")
}

if !n.MapOutPort("Out2", "e2", "Out") {
t.Error("Unable to mape OutPort 2")
}

in := make(chan int)
out1 := make(chan int)
out2 := make(chan int)
n.SetInPort("In", in)
n.SetOutPort("Out1", out1)
n.SetOutPort("Out2", out2)
RunNet(n)

in <- 42
i := <-out1
if i != 42 {
t.Errorf("%d != %d", i, 42)
}

in <- 42
i = <-out2
if i != 42 {
t.Errorf("%d != %d", i, 42)
}

close(in)
}

0 comments on commit 9c3832c

Please sign in to comment.