Skip to content

Commit

Permalink
Implemented IIP support trustmaster#15 and JSON graph format support t…
Browse files Browse the repository at this point in the history
  • Loading branch information
trustmaster committed Oct 21, 2013
1 parent f2e2835 commit 2428e5d
Show file tree
Hide file tree
Showing 7 changed files with 563 additions and 81 deletions.
18 changes: 9 additions & 9 deletions component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func (d *doubler) OnIn(i int) {
}

// A constructor that can be used by component registry/factory
func newDoubler(iip interface{}) interface{} {
func newDoubler() interface{} {
return new(doubler)
}

Expand Down Expand Up @@ -69,7 +69,7 @@ func newLocker() *locker {
}

// A constructor that can be used by component registry/factory
func newLockerConstructor(iip interface{}) interface{} {
func newLockerConstructor() interface{} {
return newLocker()
}

Expand Down Expand Up @@ -138,7 +138,7 @@ func newSyncLocker() *syncLocker {
}

// A constructor that can be used by component registry/factory
func newSyncLockerConstructor(iip interface{}) interface{} {
func newSyncLockerConstructor() interface{} {
return newSyncLocker()
}

Expand Down Expand Up @@ -223,7 +223,7 @@ func TestInitFinish(t *testing.T) {
i := new(initfin)
i.Net = new(Graph)
i.Net.InitGraphState()
i.Net.WaitGrp.Add(1)
i.Net.waitGrp.Add(1)
in := make(chan int)
out := make(chan int)
i.In = in
Expand All @@ -237,7 +237,7 @@ func TestInitFinish(t *testing.T) {
}
// Shut the component down and wait for Finish() code
close(in)
i.Net.WaitGrp.Wait()
i.Net.waitGrp.Wait()
if testInitFinFlag != 456 {
t.Errorf("%d != %d", testInitFinFlag, 456)
}
Expand All @@ -262,13 +262,13 @@ func TestClose(t *testing.T) {
c := new(closeTest)
c.Net = new(Graph)
c.Net.InitGraphState()
c.Net.WaitGrp.Add(1)
c.Net.waitGrp.Add(1)
in := make(chan int)
c.In = in
RunProc(c)
in <- 1
close(in)
c.Net.WaitGrp.Wait()
c.Net.waitGrp.Wait()
if closeTestFlag != 789 {
t.Errorf("%d != %d", closeTestFlag, 789)
}
Expand Down Expand Up @@ -298,13 +298,13 @@ func TestShutdown(t *testing.T) {
s := new(shutdownTest)
s.Net = new(Graph)
s.Net.InitGraphState()
s.Net.WaitGrp.Add(1)
s.Net.waitGrp.Add(1)
in := make(chan int)
s.In = in
RunProc(s)
in <- 1
close(in)
s.Net.WaitGrp.Wait()
s.Net.waitGrp.Wait()
if shutdownTestFlag != 789 {
t.Errorf("%d != %d", shutdownTestFlag, 789)
}
Expand Down
6 changes: 3 additions & 3 deletions factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const DefaultRegistryCapacity = 64
// ComponentConstructor is a function that can be registered in the ComponentRegistry
// so that it is used when creating new processes of a specific component using
// Factory function at run-time.
type ComponentConstructor func(interface{}) interface{}
type ComponentConstructor func() interface{}

// ComponentRegistry is used to register components and spawn processes given just
// a string component name.
Expand Down Expand Up @@ -35,9 +35,9 @@ func Unregister(componentName string) bool {
}

// Factory creates a new instance of a component registered under a specific name.
func Factory(componentName string, constructorArgs interface{}) interface{} {
func Factory(componentName string) interface{} {
if constructor, exists := ComponentRegistry[componentName]; exists {
return constructor(constructorArgs)
return constructor()
} else {
panic("Uknown component name: " + componentName)
}
Expand Down
6 changes: 3 additions & 3 deletions factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
// Tests run-time process creating with flow.Factory
func TestFactory(t *testing.T) {
procs := make(map[string]interface{})
procs["e1"] = Factory("echoer", nil)
procs["e1"] = Factory("echoer")
in, out := make(chan int), make(chan int)
e1 := procs["e1"].(*echoer)
e1.In = in
Expand All @@ -33,8 +33,8 @@ func TestFactoryConnection(t *testing.T) {
net := new(dummyNet)
net.InitGraphState()

net.AddNew("echoer", "e1", nil)
net.AddNew("echoer", "e2", nil)
net.AddNew("echoer", "e1")
net.AddNew("echoer", "e2")

net.Connect("e1", "Out", "e2", "In")

Expand Down
107 changes: 107 additions & 0 deletions loader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package flow

import (
"encoding/json"
// "fmt"
"io/ioutil"
"reflect"
"strings"
)

// Internal representation of NoFlo JSON format
type graphDescription struct {
Processes map[string]struct {
Component string
Sync bool `json:",omitempty"`
}
Connections []struct {
Data interface{} `json:",omitempty"`
Src struct {
Process string
Port string
} `json:",omitempty"`
Tgt struct {
Process string
Port string
}
Buffer int `json:",omitempty"`
}
Exports []struct {
Private string
Public string
}
}

// ParseJSON converts a JSON network definition string into
// a flow.Graph object that can be run or used in other networks
func ParseJSON(js []byte) *Graph {
// Parse JSON into Go struct
var descr graphDescription
err := json.Unmarshal(js, &descr)
if err != nil {
return nil
}
// fmt.Printf("%+v\n", descr)

// Create a new Graph
net := new(Graph)
net.InitGraphState()

// Add processes to the network
for procName, procValue := range descr.Processes {
net.AddNew(procValue.Component, procName)
// Support for Sync/Async process switch
if procValue.Sync {
proc := net.Get(procName).(*Component)
proc.Mode = ComponentModeSync
}
}

// Add connections
for _, conn := range descr.Connections {
// Check if it is an IIP or actual connection
if conn.Data == nil {
// Add a connection
net.ConnectBuf(conn.Src.Process, conn.Src.Port, conn.Tgt.Process, conn.Tgt.Port, conn.Buffer)
} else {
// Add an IIP
net.AddIIP(conn.Data, conn.Tgt.Process, conn.Tgt.Port)
}
}

// Add port exports
for _, export := range descr.Exports {
// Split private into proc.port
procName := export.Private[:strings.Index(export.Private, ".")]
procPort := export.Private[strings.Index(export.Private, ".")+1:]
// Try to detect port direction using reflection
procType := reflect.TypeOf(net.Get(procName)).Elem()
field, fieldFound := procType.FieldByName(procPort)
if !fieldFound {
panic("Private port '" + export.Private + "' not found")
}
if field.Type.Kind() == reflect.Chan && (field.Type.ChanDir()&reflect.RecvDir) != 0 {
// It's an inport
net.MapInPort(export.Public, procName, procPort)
} else if field.Type.Kind() == reflect.Chan && (field.Type.ChanDir()&reflect.SendDir) != 0 {
// It's an outport
net.MapOutPort(export.Public, procName, procPort)
} else {
// It's not a proper port
panic("Private port '" + export.Private + "' is not a valid channel")
}
// TODO add support for subgraphs
}

return net
}

// LoadJSON loads a JSON graph definition file into
// a flow.Graph object that can be run or used in other networks
func LoadJSON(filename string) *Graph {
js, err := ioutil.ReadFile(filename)
if err != nil {
return nil
}
return ParseJSON(js)
}
Loading

0 comments on commit 2428e5d

Please sign in to comment.