From 2428e5d2ad0a04b23e4c8c127e066ca08956ca65 Mon Sep 17 00:00:00 2001 From: Vladimir Sibirov Date: Mon, 21 Oct 2013 21:13:49 +0400 Subject: [PATCH] Implemented IIP support #15 and JSON graph format support #7 --- component_test.go | 18 +-- factory.go | 6 +- factory_test.go | 6 +- loader.go | 107 ++++++++++++++++++ network.go | 241 ++++++++++++++++++++++++++++++++-------- network_test.go | 92 +++++++++++---- runtime_network_test.go | 174 +++++++++++++++++++++++++++++ 7 files changed, 563 insertions(+), 81 deletions(-) create mode 100644 loader.go create mode 100644 runtime_network_test.go diff --git a/component_test.go b/component_test.go index 725d7d6..0e2da18 100644 --- a/component_test.go +++ b/component_test.go @@ -19,7 +19,7 @@ func (d *doubler) OnIn(i int) { } // A constructor that can be used by component registry/factory -func newDoubler(iip interface{}) interface{} { +func newDoubler() interface{} { return new(doubler) } @@ -69,7 +69,7 @@ func newLocker() *locker { } // A constructor that can be used by component registry/factory -func newLockerConstructor(iip interface{}) interface{} { +func newLockerConstructor() interface{} { return newLocker() } @@ -138,7 +138,7 @@ func newSyncLocker() *syncLocker { } // A constructor that can be used by component registry/factory -func newSyncLockerConstructor(iip interface{}) interface{} { +func newSyncLockerConstructor() interface{} { return newSyncLocker() } @@ -223,7 +223,7 @@ func TestInitFinish(t *testing.T) { i := new(initfin) i.Net = new(Graph) i.Net.InitGraphState() - i.Net.WaitGrp.Add(1) + i.Net.waitGrp.Add(1) in := make(chan int) out := make(chan int) i.In = in @@ -237,7 +237,7 @@ func TestInitFinish(t *testing.T) { } // Shut the component down and wait for Finish() code close(in) - i.Net.WaitGrp.Wait() + i.Net.waitGrp.Wait() if testInitFinFlag != 456 { t.Errorf("%d != %d", testInitFinFlag, 456) } @@ -262,13 +262,13 @@ func TestClose(t *testing.T) { c := new(closeTest) c.Net = new(Graph) c.Net.InitGraphState() - c.Net.WaitGrp.Add(1) + c.Net.waitGrp.Add(1) in := make(chan int) c.In = in RunProc(c) in <- 1 close(in) - c.Net.WaitGrp.Wait() + c.Net.waitGrp.Wait() if closeTestFlag != 789 { t.Errorf("%d != %d", closeTestFlag, 789) } @@ -298,13 +298,13 @@ func TestShutdown(t *testing.T) { s := new(shutdownTest) s.Net = new(Graph) s.Net.InitGraphState() - s.Net.WaitGrp.Add(1) + s.Net.waitGrp.Add(1) in := make(chan int) s.In = in RunProc(s) in <- 1 close(in) - s.Net.WaitGrp.Wait() + s.Net.waitGrp.Wait() if shutdownTestFlag != 789 { t.Errorf("%d != %d", shutdownTestFlag, 789) } diff --git a/factory.go b/factory.go index 6d1c6c9..e89d066 100644 --- a/factory.go +++ b/factory.go @@ -6,7 +6,7 @@ const DefaultRegistryCapacity = 64 // ComponentConstructor is a function that can be registered in the ComponentRegistry // so that it is used when creating new processes of a specific component using // Factory function at run-time. -type ComponentConstructor func(interface{}) interface{} +type ComponentConstructor func() interface{} // ComponentRegistry is used to register components and spawn processes given just // a string component name. @@ -35,9 +35,9 @@ func Unregister(componentName string) bool { } // Factory creates a new instance of a component registered under a specific name. -func Factory(componentName string, constructorArgs interface{}) interface{} { +func Factory(componentName string) interface{} { if constructor, exists := ComponentRegistry[componentName]; exists { - return constructor(constructorArgs) + return constructor() } else { panic("Uknown component name: " + componentName) } diff --git a/factory_test.go b/factory_test.go index 384d604..efd6194 100644 --- a/factory_test.go +++ b/factory_test.go @@ -7,7 +7,7 @@ import ( // Tests run-time process creating with flow.Factory func TestFactory(t *testing.T) { procs := make(map[string]interface{}) - procs["e1"] = Factory("echoer", nil) + procs["e1"] = Factory("echoer") in, out := make(chan int), make(chan int) e1 := procs["e1"].(*echoer) e1.In = in @@ -33,8 +33,8 @@ func TestFactoryConnection(t *testing.T) { net := new(dummyNet) net.InitGraphState() - net.AddNew("echoer", "e1", nil) - net.AddNew("echoer", "e2", nil) + net.AddNew("echoer", "e1") + net.AddNew("echoer", "e2") net.Connect("e1", "Out", "e2", "In") diff --git a/loader.go b/loader.go new file mode 100644 index 0000000..34ee46a --- /dev/null +++ b/loader.go @@ -0,0 +1,107 @@ +package flow + +import ( + "encoding/json" + // "fmt" + "io/ioutil" + "reflect" + "strings" +) + +// Internal representation of NoFlo JSON format +type graphDescription struct { + Processes map[string]struct { + Component string + Sync bool `json:",omitempty"` + } + Connections []struct { + Data interface{} `json:",omitempty"` + Src struct { + Process string + Port string + } `json:",omitempty"` + Tgt struct { + Process string + Port string + } + Buffer int `json:",omitempty"` + } + Exports []struct { + Private string + Public string + } +} + +// ParseJSON converts a JSON network definition string into +// a flow.Graph object that can be run or used in other networks +func ParseJSON(js []byte) *Graph { + // Parse JSON into Go struct + var descr graphDescription + err := json.Unmarshal(js, &descr) + if err != nil { + return nil + } + // fmt.Printf("%+v\n", descr) + + // Create a new Graph + net := new(Graph) + net.InitGraphState() + + // Add processes to the network + for procName, procValue := range descr.Processes { + net.AddNew(procValue.Component, procName) + // Support for Sync/Async process switch + if procValue.Sync { + proc := net.Get(procName).(*Component) + proc.Mode = ComponentModeSync + } + } + + // Add connections + for _, conn := range descr.Connections { + // Check if it is an IIP or actual connection + if conn.Data == nil { + // Add a connection + net.ConnectBuf(conn.Src.Process, conn.Src.Port, conn.Tgt.Process, conn.Tgt.Port, conn.Buffer) + } else { + // Add an IIP + net.AddIIP(conn.Data, conn.Tgt.Process, conn.Tgt.Port) + } + } + + // Add port exports + for _, export := range descr.Exports { + // Split private into proc.port + procName := export.Private[:strings.Index(export.Private, ".")] + procPort := export.Private[strings.Index(export.Private, ".")+1:] + // Try to detect port direction using reflection + procType := reflect.TypeOf(net.Get(procName)).Elem() + field, fieldFound := procType.FieldByName(procPort) + if !fieldFound { + panic("Private port '" + export.Private + "' not found") + } + if field.Type.Kind() == reflect.Chan && (field.Type.ChanDir()&reflect.RecvDir) != 0 { + // It's an inport + net.MapInPort(export.Public, procName, procPort) + } else if field.Type.Kind() == reflect.Chan && (field.Type.ChanDir()&reflect.SendDir) != 0 { + // It's an outport + net.MapOutPort(export.Public, procName, procPort) + } else { + // It's not a proper port + panic("Private port '" + export.Private + "' is not a valid channel") + } + // TODO add support for subgraphs + } + + return net +} + +// LoadJSON loads a JSON graph definition file into +// a flow.Graph object that can be run or used in other networks +func LoadJSON(filename string) *Graph { + js, err := ioutil.ReadFile(filename) + if err != nil { + return nil + } + return ParseJSON(js) +} diff --git a/network.go b/network.go index 2bbce94..8a987f1 100644 --- a/network.go +++ b/network.go @@ -14,10 +14,38 @@ var DefaultNetworkCapacity = 32 // Default network output or input ports number var DefaultNetworkPortsNum = 16 +// port stores full port information within the network. +type port struct { + // Process name in the network + proc string + // Port name of the process + port string + // Actual channel attached + channel reflect.Value +} + // portName stores full port name within the network. type portName struct { - proc string // Process name in the network - port string // Port name of the process + // Process name in the network + proc string + // Port name of the process + port string +} + +// connection stores information about a connection within the net. +type connection struct { + src portName + tgt portName + channel reflect.Value + buffer int +} + +// iip stands for Initial Information Packet representation +// within the network. +type iip struct { + data interface{} + proc string // Target process name + port string // Target port name } // portMapper interface is used to obtain subnet's ports. @@ -39,26 +67,35 @@ type netController interface { // Graph represents a graph of processes connected with packet channels. type Graph struct { // Wait is used for graceful network termination. - WaitGrp *sync.WaitGroup + waitGrp *sync.WaitGroup // Net is a pointer to parent network. Net *Graph // procs contains the processes of the network. procs map[string]interface{} // inPorts maps network incoming ports to component ports. - inPorts map[string]portName + inPorts map[string]port // outPorts maps network outgoing ports to component ports. - outPorts map[string]portName + outPorts map[string]port + // connections contains graph edges and channels. + connections []connection + // iips contains initial IPs attached to the network + iips []iip // done is used to let the outside world know when the net has finished its job done chan struct{} + // ready is used to let the outside world know when the net is ready to accept input + ready chan struct{} } // InitGraphState method initializes graph fields and allocates memory. func (n *Graph) InitGraphState() { - n.WaitGrp = new(sync.WaitGroup) + n.waitGrp = new(sync.WaitGroup) n.procs = make(map[string]interface{}, DefaultNetworkCapacity) - n.inPorts = make(map[string]portName, DefaultNetworkPortsNum) - n.outPorts = make(map[string]portName, DefaultNetworkPortsNum) + n.inPorts = make(map[string]port, DefaultNetworkPortsNum) + n.outPorts = make(map[string]port, DefaultNetworkPortsNum) + n.connections = make([]connection, 0, DefaultNetworkCapacity) + n.iips = make([]iip, 0, DefaultNetworkPortsNum) n.done = make(chan struct{}) + n.ready = make(chan struct{}) } // Add adds a new process with a given name to the network. @@ -95,11 +132,20 @@ func (n *Graph) Add(c interface{}, name string) bool { } // AddNew creates a new process instance using component factory and adds it to the network. -func (n *Graph) AddNew(componentName string, processName string, initialPacket interface{}) bool { - proc := Factory(componentName, initialPacket) +func (n *Graph) AddNew(componentName string, processName string) bool { + proc := Factory(componentName) return n.Add(proc, processName) } +// AddIIP adds an Initial Information packet to the network +func (n *Graph) AddIIP(data interface{}, processName, portName string) bool { + if _, exists := n.procs[processName]; exists { + n.iips = append(n.iips, iip{data: data, proc: processName, port: portName}) + return true + } + return false +} + // Connect connects a sender to a receiver and creates a channel between them using DefaultBufferSize. // Normally such a connection is unbuffered but you can change by setting flow.DefaultBufferSize > 0 or // by using ConnectBuf() function instead. @@ -143,7 +189,13 @@ func (n *Graph) ConnectBuf(senderName, senderPort, receiverName, receiverPort st // Get the actual ports and link them to the channel // Check if sender is a net - if snet := sv.FieldByName("Graph"); snet.IsValid() { + var snet reflect.Value + if sv.Type().Name() == "Graph" { + snet = sv + } else { + snet = sv.FieldByName("Graph") + } + if snet.IsValid() { // Sender is a net if pm, isPm := snet.Addr().Interface().(portMapper); isPm { sport = pm.getOutPort(senderPort) @@ -186,7 +238,13 @@ func (n *Graph) ConnectBuf(senderName, senderPort, receiverName, receiverPort st var rport reflect.Value // Check if receiver is a net - if rnet := rv.FieldByName("Graph"); rnet.IsValid() { + var rnet reflect.Value + if rv.Type().Name() == "Graph" { + rnet = rv + } else { + rnet = rv.FieldByName("Graph") + } + if rnet.IsValid() { if pm, isPm := rnet.Addr().Interface().(portMapper); isPm { rport = pm.getInPort(receiverPort) } @@ -198,7 +256,7 @@ func (n *Graph) ConnectBuf(senderName, senderPort, receiverName, receiverPort st // Validate receiver port rtport := rport.Type() if rtport.Kind() != reflect.Chan || rtport.ChanDir()&reflect.RecvDir == 0 { - panic(receiverName + "." + receiverPort + " is not a valid output channel") + panic(receiverName + "." + receiverPort + " is not a valid input channel") return false } @@ -209,6 +267,15 @@ func (n *Graph) ConnectBuf(senderName, senderPort, receiverName, receiverPort st panic(receiverName + "." + receiverPort + " is not settable") } + // Add connection info + n.connections = append(n.connections, connection{ + src: portName{proc: senderName, + port: senderPort}, + tgt: portName{proc: receiverName, + port: receiverPort}, + channel: channel, + buffer: bufferSize}) + return true } @@ -227,20 +294,7 @@ func (n *Graph) getInPort(name string) reflect.Value { if !ok { panic("flow.Graph.getInPort(): Invalid inport name: " + name) } - // We assume that pName contains a valid reference to a proc/port - p := reflect.ValueOf(n.procs[pName.proc]) - var ret reflect.Value - if p.Elem().FieldByName("Graph").IsValid() { - // Is a subnet - if sub, ok2 := p.Elem().FieldByName("Graph").Addr().Interface().(portMapper); ok2 { - ret = sub.getInPort(pName.port) - } else { - panic("flow.Graph.getInPort(): Couldn't get portMapper") - } - } else { - ret = p.Elem().FieldByName(pName.port) - } - return ret + return pName.channel } // getOutPort returns the outport with given name as reflect.Value channel. @@ -249,25 +303,12 @@ func (n *Graph) getOutPort(name string) reflect.Value { if !ok { panic("flow.Graph.getOutPort(): Invalid outport name: " + name) } - // We assume that pName contains a valid reference to a proc/port - p := reflect.ValueOf(n.procs[pName.proc]) - var ret reflect.Value - if p.Elem().FieldByName("Graph").IsValid() { - // Is a subnet - if sub, ok2 := p.Elem().FieldByName("Graph").Addr().Interface().(portMapper); ok2 { - ret = sub.getOutPort(pName.port) - } else { - panic("flow.Graph.getOutPort(): Couldn't get portMapper") - } - } else { - ret = p.Elem().FieldByName(pName.port) - } - return ret + return pName.channel } // getWait returns net's wait group. func (n *Graph) getWait() *sync.WaitGroup { - return n.WaitGrp + return n.waitGrp } // hasInPort checks if the net has an inport with given name. @@ -287,14 +328,17 @@ func (n *Graph) hasOutPort(name string) bool { func (n *Graph) MapInPort(name, procName, procPort string) bool { ret := false // Check if target component and port exists + var channel reflect.Value if p, procFound := n.procs[procName]; procFound { if i, isNet := p.(portMapper); isNet { // Is a subnet ret = i.hasInPort(procPort) + channel = i.getInPort(procPort) } else { // Is a proc f := reflect.ValueOf(p).Elem().FieldByName(procPort) ret = f.IsValid() && f.Kind() == reflect.Chan && (f.Type().ChanDir()&reflect.RecvDir) != 0 + channel = f } if !ret { panic("flow.Graph.MapInPort(): No such inport: " + procName + "." + procPort) @@ -303,7 +347,7 @@ func (n *Graph) MapInPort(name, procName, procPort string) bool { panic("flow.Graph.MapInPort(): No such process: " + procName) } if ret { - n.inPorts[name] = portName{proc: procName, port: procPort} + n.inPorts[name] = port{proc: procName, port: procPort, channel: channel} } return ret } @@ -313,14 +357,17 @@ func (n *Graph) MapInPort(name, procName, procPort string) bool { func (n *Graph) MapOutPort(name, procName, procPort string) bool { ret := false // Check if target component and port exists + var channel reflect.Value if p, procFound := n.procs[procName]; procFound { if i, isNet := p.(portMapper); isNet { // Is a subnet ret = i.hasOutPort(procPort) + channel = i.getOutPort(procPort) } else { // Is a proc f := reflect.ValueOf(p).Elem().FieldByName(procPort) ret = f.IsValid() && f.Kind() == reflect.Chan && (f.Type().ChanDir()&reflect.SendDir) != 0 + channel = f } if !ret { panic("flow.Graph.MapOutPort(): No such outport: " + procName + "." + procPort) @@ -329,7 +376,7 @@ func (n *Graph) MapOutPort(name, procName, procPort string) bool { panic("flow.Graph.MapOutPort(): No such process: " + procName) } if ret { - n.outPorts[name] = portName{proc: procName, port: procPort} + n.outPorts[name] = port{proc: procName, port: procPort, channel: channel} } return ret } @@ -337,7 +384,7 @@ func (n *Graph) MapOutPort(name, procName, procPort string) bool { // run runs the network and waits for all processes to finish. func (n *Graph) run() { // Add processes to the waitgroup before starting them - n.WaitGrp.Add(len(n.procs)) + n.waitGrp.Add(len(n.procs)) for _, v := range n.procs { // Check if it is a net or proc r := reflect.ValueOf(v).Elem() @@ -347,15 +394,105 @@ func (n *Graph) run() { RunProc(v) } } + + // Send initial IPs + for _, ip := range n.iips { + // Get the reciever port + var rport reflect.Value + found := false + + // Try to find it among network inports + for _, inPort := range n.inPorts { + if inPort.proc == ip.proc && inPort.port == ip.port { + rport = inPort.channel + found = true + break + } + } + + if !found { + // Try to find among connections + for _, conn := range n.connections { + if conn.tgt.proc == ip.proc && conn.tgt.port == ip.port { + rport = conn.channel + found = true + break + } + } + } + + if !found { + // Try to find a proc and attach a new channel to it + for procName, proc := range n.procs { + if procName == ip.proc { + // Check if receiver is a net + rv := reflect.ValueOf(proc).Elem() + var rnet reflect.Value + if rv.Type().Name() == "Graph" { + rnet = rv + } else { + rnet = rv.FieldByName("Graph") + } + if rnet.IsValid() { + if pm, isPm := rnet.Addr().Interface().(portMapper); isPm { + rport = pm.getInPort(ip.port) + } + } else { + // Receiver is a proc + rport = rv.FieldByName(ip.port) + } + + // Validate receiver port + rtport := rport.Type() + if rtport.Kind() != reflect.Chan || rtport.ChanDir()&reflect.RecvDir == 0 { + panic(ip.proc + "." + ip.port + " is not a valid input channel") + } + var channel reflect.Value + + // Make a channel of an appropriate type + chanType := reflect.ChanOf(reflect.BothDir, rtport.Elem()) + channel = reflect.MakeChan(chanType, DefaultBufferSize) + // Set the channel + if rport.CanSet() { + rport.Set(channel) + } else { + panic(ip.proc + "." + ip.port + " is not settable") + } + + // Use the new channel to send the IIP + rport = channel + found = true + break + } + } + } + + if found { + // Send data to the port + rport.Send(reflect.ValueOf(ip.data)) + } else { + panic("IIP target not found: " + ip.proc + "." + ip.port) + } + } + + // Let the outside world know that the network is ready + close(n.ready) + // Wait for all processes to terminate - n.WaitGrp.Wait() + n.waitGrp.Wait() // Check if there is a parent net if n.Net != nil { // Notify parent of finish - n.Net.WaitGrp.Done() + n.Net.waitGrp.Done() } } +// Ready returns a channel that can be used to suspend the caller +// goroutine until the network is ready to accept input packets +func (n *Graph) Ready() <-chan struct{} { + return n.ready +} + // Wait returns a channel that can be used to suspend the caller // goroutine until the network finishes its job func (n *Graph) Wait() <-chan struct{} { @@ -373,6 +510,11 @@ func (n *Graph) SetInPort(name string, channel interface{}) bool { p.Set(reflect.ValueOf(channel)) res = true } + // Save it in inPorts to be used with IIPs if needed + if p, ok := n.inPorts[name]; ok { + p.channel = reflect.ValueOf(channel) + n.inPorts[name] = p + } return res } @@ -387,6 +529,11 @@ func (n *Graph) SetOutPort(name string, channel interface{}) bool { p.Set(reflect.ValueOf(channel)) res = true } + // Save it in outPorts to be used later + if p, ok := n.outPorts[name]; ok { + p.channel = reflect.ValueOf(channel) + n.outPorts[name] = p + } return res } diff --git a/network_test.go b/network_test.go index a5c2fa8..0fb545b 100644 --- a/network_test.go +++ b/network_test.go @@ -18,7 +18,7 @@ func (e *echoer) OnIn(i int) { } // A constructor that can be used by component registry/factory -func newEchoer(iip interface{}) interface{} { +func newEchoer() interface{} { return new(echoer) } @@ -57,15 +57,6 @@ func newTestNet(t *testing.T) *testNet { return n } -// A constructor that can be used by component registry/factory -func newTestNetConstructor(iip interface{}) interface{} { - return newTestNet(iip.(*struct{ t *testing.T }).t) -} - -func init() { - Register("testNet", newTestNetConstructor) -} - // Test for a network initializer func (n *testNet) Init() { initTestFlag = 123 @@ -151,15 +142,6 @@ func newCompositeTest(t *testing.T) *compositeTest { return n } -// A constructor that can be used by component registry/factory -func newCompositeTestConstructor(iip interface{}) interface{} { - return newCompositeTest(iip.(*struct{ t *testing.T }).t) -} - -func init() { - Register("compositeTest", newCompositeTestConstructor) -} - // Tests a composite with processes and subnets func TestComposite(t *testing.T) { // Make the network @@ -180,6 +162,7 @@ func TestComposite(t *testing.T) { } close(in) + <-net.Wait() } type rr struct { @@ -264,3 +247,74 @@ func TestMultiOutChannel(t *testing.T) { close(in) } + +// A struct to test IIPs support +type iipNet struct { + Graph +} + +// Creates a new test network with an IIP +func newIipNet() *iipNet { + n := new(iipNet) + n.InitGraphState() + + n.Add(new(echoer), "e1") + + n.AddIIP(interface{}(123), "e1", "In") + + n.MapInPort("In", "e1", "In") + n.MapOutPort("Out", "e1", "Out") + + return n +} + +// Tests IIP support in network +func TestIIP(t *testing.T) { + net := newIipNet() + in := make(chan int) + out := make(chan int) + net.SetInPort("In", in) + net.SetOutPort("Out", out) + + RunNet(net) + + h := <-out + if h != 123 { + t.Errorf("%d != 123", h) + } + + close(in) + <-net.Wait() +} + +// A simple syncrhonous summator for 2 arguments +type sum2 struct { + Arg1 <-chan int + Arg2 <-chan int + Sum chan<- int + StateLock *sync.Mutex + buf1 []int + buf2 []int +} + +// If available, pops arguments from the stack +// and sends the sum to the output +func (s *sum2) trySum() { + if len(s.buf1) > 0 && len(s.buf2) > 0 { + a1 := s.buf1[0] + s.buf1 = s.buf1[1:] + a2 := s.buf2[0] + s.buf2 = s.buf2[1:] + s.Sum <- (a1 + a2) + } +} + +func (s *sum2) OnArg1(a int) { + s.buf1 = append(s.buf1, a) + s.trySum() +} + +func (s *sum2) OnArg2(a int) { + s.buf2 = append(s.buf2, a) + s.trySum() +} diff --git a/runtime_network_test.go b/runtime_network_test.go new file mode 100644 index 0000000..0e4ad66 --- /dev/null +++ b/runtime_network_test.go @@ -0,0 +1,174 @@ +package flow + +import ( + "sync" + "testing" +) + +// Starter component fires the network +// by sending a number given in its constructor +// to its output port. +type starter struct { + Component + Start <-chan float64 + Out chan<- int +} + +func (s *starter) OnStart(num float64) { + s.Out <- int(num) +} + +func newStarter() interface{} { + s := new(starter) + return s +} + +func init() { + Register("starter", newStarter) +} + +// SequenceGenerator generates a sequence of integers +// from 0 to a number passed to its input. +type sequenceGenerator struct { + Component + Num <-chan int + Sequence chan<- int +} + +func (s *sequenceGenerator) OnNum(n int) { + for i := 1; i <= n; i++ { + s.Sequence <- i + } +} + +func newSequenceGenerator() interface{} { + return new(sequenceGenerator) +} + +func init() { + Register("sequenceGenerator", newSequenceGenerator) +} + +// Summarizer component sums all its input packets and +// produces a sum output just before shutdown +type summarizer struct { + Component + In <-chan int + // Flush <-chan bool + Sum chan<- int + StateLock *sync.Mutex + + current int +} + +func newSummarizer() interface{} { + s := new(summarizer) + s.Component.Mode = ComponentModeSync + return s +} + +func init() { + Register("summarizer", newSummarizer) +} + +func (s *summarizer) OnIn(i int) { + s.current += i +} + +func (s *summarizer) Finish() { + s.Sum <- s.current +} + +var runtimeNetworkJSON = `{ + "processes": { + "starter": { + "component": "starter" + }, + "generator": { + "component": "sequenceGenerator" + }, + "doubler": { + "component": "doubler" + }, + "sum": { + "component": "summarizer" + } + }, + "connections": [ + { + "data": 10, + "tgt": { + "process": "starter", + "port": "Start" + } + }, + { + "src": { + "process": "starter", + "port": "Out" + }, + "tgt": { + "process": "generator", + "port": "Num" + } + }, + { + "src": { + "process": "generator", + "port": "Sequence" + }, + "tgt": { + "process": "doubler", + "port": "In" + } + }, + { + "src": { + "process": "doubler", + "port": "Out" + }, + "tgt": { + "process": "sum", + "port": "In" + } + } + ], + "exports": [ + { + "private": "starter.Start", + "public": "Start" + }, + { + "private": "sum.Sum", + "public": "Out" + } + ] +}` + +func TestRuntimeNetwork(t *testing.T) { + net := ParseJSON([]byte(runtimeNetworkJSON)) + if net == nil { + t.Error("Could not load JSON") + } + + start := make(chan float64) + out := make(chan int) + + net.SetInPort("Start", start) + net.SetOutPort("Out", out) + + RunNet(net) + + // Wait for the network setup + <-net.Ready() + + // Close start to halt it normally + close(start) + + i := <-out + if i != 110 { + t.Errorf("Wrong result: %d != 110", i) + } + + <-net.Wait() +}