Skip to content

Commit

Permalink
server: fix BulkAdd
Browse files Browse the repository at this point in the history
  • Loading branch information
adamstruck committed Dec 5, 2019
1 parent 0e6805e commit 27765ed
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 11 deletions.
7 changes: 2 additions & 5 deletions gripql/schema/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,16 @@ func TestSchemaScanner(t *testing.T) {
t.Fatal(err)
}
defer func() {
kv.Close()
os.RemoveAll(conf.KVStorePath)
os.RemoveAll(conf.Server.WorkDir)
}()

db := kvgraph.NewKVGraph(kv)

srv, err := server.NewGripServer(db, conf.Server, nil)
if err != nil {
t.Fatal(err)
}
defer func() {
kv.Close()
os.RemoveAll(conf.Server.WorkDir)
}()

queryClient := gripql.NewQueryDirectClient(srv)
editClient := gripql.NewEditDirectClient(srv)
Expand Down
11 changes: 5 additions & 6 deletions server/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,13 @@ func (server *GripServer) addEdge(ctx context.Context, elem *gripql.GraphElement

// BulkAdd a stream of inputs and loads them into the graph
func (server *GripServer) BulkAdd(stream gripql.Edit_BulkAddServer) error {

var elementStream chan *gripql.GraphElement
var graphName string

wg := &sync.WaitGroup{}
var insertCount int32
var errorCount int32

elementStream := make(chan *gripql.GraphElement, 100)
wg := &sync.WaitGroup{}

for {
element, err := stream.Recv()
if err == io.EOF {
Expand All @@ -200,9 +200,8 @@ func (server *GripServer) BulkAdd(stream gripql.Edit_BulkAddServer) error {

// create a BulkAdd stream per graph
// close and switch when a new graph is encountered
if element.Graph != graphName && graphName != "" {
if element.Graph != graphName {
close(elementStream)

graph, err := server.db.Graph(element.Graph)
if err != nil {
log.WithFields(log.Fields{"error": err}).Error("BulkAdd: error")
Expand Down

0 comments on commit 27765ed

Please sign in to comment.