Skip to content

Commit

Permalink
[Spanner Graph] Get node edges. (#1446)
Browse files Browse the repository at this point in the history
  • Loading branch information
keyurva authored Nov 6, 2024
1 parent bf25724 commit c163577
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 102 deletions.
2 changes: 1 addition & 1 deletion deploy/storage/spanner_graph_info.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
project: datcom-store
instance: dc-kg-test
database: dc_graph
database: dc_graph_2
111 changes: 111 additions & 0 deletions internal/server/spanner/golden/query/get_node_edges_by_id.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
{
"Aadhaar": [
{
"SubjectID": "Aadhaar",
"Predicate": "extendedName",
"ObjectID": "Aadhaar",
"ObjectValue": "Aadhaar",
"Provenance": "dc/base/BaseSchema"
},
{
"SubjectID": "Aadhaar",
"Predicate": "extendedName",
"ObjectID": "Aadhaar",
"ObjectValue": "Aadhaar",
"Provenance": "dc/base/BaseSchema"
},
{
"SubjectID": "Aadhaar",
"Predicate": "name",
"ObjectID": "Aadhaar",
"ObjectValue": "Aadhaar",
"Provenance": "dc/base/BaseSchema"
},
{
"SubjectID": "Aadhaar",
"Predicate": "typeOf",
"ObjectID": "GovernmentIdEnum",
"ObjectValue": "",
"Provenance": "dc/base/BaseSchema"
},
{
"SubjectID": "Aadhaar",
"Predicate": "typeOf",
"ObjectID": "GovernmentIdenitifierEnum",
"ObjectValue": "",
"Provenance": "dc/base/BaseSchema"
},
{
"SubjectID": "Aadhaar",
"Predicate": "name",
"ObjectID": "Aadhaar",
"ObjectValue": "Aadhaar",
"Provenance": "dc/base/BaseSchema"
}
],
"Monthly_Average_RetailPrice_Electricity_Residential": [
{
"SubjectID": "Monthly_Average_RetailPrice_Electricity_Residential",
"Predicate": "name",
"ObjectID": "Monthly_Average_RetailPrice_Electricity_Residential",
"ObjectValue": "Average retail price of electricity, residential, monthly",
"Provenance": "dc/base/HumanReadableStatVars"
},
{
"SubjectID": "Monthly_Average_RetailPrice_Electricity_Residential",
"Predicate": "extendedName",
"ObjectID": "Monthly_Average_RetailPrice_Electricity_Residential",
"ObjectValue": "Average retail price of electricity, residential, monthly",
"Provenance": "dc/base/HumanReadableStatVars"
},
{
"SubjectID": "Monthly_Average_RetailPrice_Electricity_Residential",
"Predicate": "measurementQualifier",
"ObjectID": "Monthly",
"ObjectValue": "",
"Provenance": "dc/base/HumanReadableStatVars"
},
{
"SubjectID": "Monthly_Average_RetailPrice_Electricity_Residential",
"Predicate": "statType",
"ObjectID": "meanValue",
"ObjectValue": "",
"Provenance": "dc/base/HumanReadableStatVars"
},
{
"SubjectID": "Monthly_Average_RetailPrice_Electricity_Residential",
"Predicate": "typeOf",
"ObjectID": "StatisticalVariable",
"ObjectValue": "",
"Provenance": "dc/base/HumanReadableStatVars"
},
{
"SubjectID": "Monthly_Average_RetailPrice_Electricity_Residential",
"Predicate": "measuredProp",
"ObjectID": "retailPrice",
"ObjectValue": "",
"Provenance": "dc/base/HumanReadableStatVars"
},
{
"SubjectID": "Monthly_Average_RetailPrice_Electricity_Residential",
"Predicate": "name",
"ObjectID": "Monthly_Average_RetailPrice_Electricity_Residential",
"ObjectValue": "Average retail price of electricity, residential, monthly",
"Provenance": "dc/base/HumanReadableStatVars"
},
{
"SubjectID": "Monthly_Average_RetailPrice_Electricity_Residential",
"Predicate": "populationType",
"ObjectID": "Electricity",
"ObjectValue": "",
"Provenance": "dc/base/HumanReadableStatVars"
},
{
"SubjectID": "Monthly_Average_RetailPrice_Electricity_Residential",
"Predicate": "consumingSector",
"ObjectID": "Residential",
"ObjectValue": "",
"Provenance": "dc/base/HumanReadableStatVars"
}
]
}
40 changes: 0 additions & 40 deletions internal/server/spanner/golden/query/get_nodes_by_id.json

This file was deleted.

21 changes: 6 additions & 15 deletions internal/server/spanner/golden/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@ import (
"runtime"
"testing"

"github.com/datacommonsorg/mixer/internal/server/spanner"
"github.com/datacommonsorg/mixer/test"
"github.com/google/go-cmp/cmp"
)

func TestGetNodesByID(t *testing.T) {
func TestGetNodeEdgesByID(t *testing.T) {
client := test.NewSpannerClient()
if client == nil {
return
Expand All @@ -35,24 +34,16 @@ func TestGetNodesByID(t *testing.T) {
ctx := context.Background()
_, filename, _, _ := runtime.Caller(0)
goldenDir := path.Join(path.Dir(filename), "query")
goldenFile := "get_nodes_by_id.json"
goldenFile := "get_node_edges_by_id.json"

ids := []string{"StatisticalVariable", "USD"}
ids := []string{"Aadhaar", "Monthly_Average_RetailPrice_Electricity_Residential"}

actual, err := client.GetNodesByID(ctx, ids)
actual, err := client.GetNodeEdgesByID(ctx, ids)
if err != nil {
t.Fatalf("GetNodesByID error (%v): %v", goldenFile, err)
t.Fatalf("GetNodeEdgesByID error (%v): %v", goldenFile, err)
}

// Use ordered list of nodes so that the golden file is deterministic.
var ordered []*spanner.Node
for _, id := range ids {
if node, ok := actual[id]; ok {
ordered = append(ordered, node)
}
}

got, err := test.StructToJSON(ordered)
got, err := test.StructToJSON(actual)
if err != nil {
t.Fatalf("StructToJSON error (%v): %v", goldenFile, err)
}
Expand Down
37 changes: 7 additions & 30 deletions internal/server/spanner/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,13 @@
// Model objects related to the spanner graph database.
package spanner

import (
"encoding/json"
"fmt"
)

// Node struct representing a single row in the Node table.
type Node struct {
ID string `spanner:"id"`
TypeOf string `spanner:"typeOf"`
Name string `spanner:"name"`
Properties JSONMap `spanner:"properties"`
Provenances JSONMap `spanner:"provenances"`
// Edge struct represents a single row in the Edge table.
type Edge struct {
SubjectID string `spanner:"subject_id"`
Predicate string `spanner:"predicate"`
ObjectID string `spanner:"object_id"`
ObjectValue string `spanner:"object_value"`
Provenance string `spanner:"provenance"`
}

// SpannerConfig struct to hold the YAML configuration to a spanner database.
Expand All @@ -35,21 +30,3 @@ type SpannerConfig struct {
Instance string `yaml:"instance"`
Database string `yaml:"database"`
}

// JSONMap struct represents spanner JSON fields as golang maps.
type JSONMap struct {
Map map[string]string
}

// Convert a JSON field to a JSONMap value.
// Note that the undecoded value happens to be a string.
func (m *JSONMap) DecodeSpanner(val interface{}) (err error) {
strVal, ok := val.(string)
if !ok {
return fmt.Errorf("failed to decode JSONMap: %v", val)
}
if err := json.Unmarshal([]byte(strVal), &m.Map); err != nil {
return fmt.Errorf("failed to unmarshal JSON to map: %w", err)
}
return nil
}
62 changes: 46 additions & 16 deletions internal/server/spanner/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,57 @@ import (

// SQL / GQL statements executed by the SpannerClient
var statements = struct {
getNodesByID string
getEdgesBySubjectID string
}{
getNodesByID: `
SELECT id, typeOf, name, properties, provenances
FROM Node
WHERE id IN UNNEST(@ids)
getEdgesBySubjectID: `
SELECT
subject_id,
predicate,
COALESCE(object_id, '') AS object_id,
COALESCE(object_value, '') AS object_value,
COALESCE(provenance, '') AS provenance
FROM Edge
WHERE subject_id IN UNNEST(@ids)
`,
}

// GetNodesByID retrieves nodes from Spanner given a list of IDs and returns a map.
func (sc *SpannerClient) GetNodesByID(ctx context.Context, ids []string) (map[string]*Node, error) {
nodes := make(map[string]*Node)
// GetNodeEdgesByID retrieves node edges from Spanner given a list of IDs and returns a map.
func (sc *SpannerClient) GetNodeEdgesByID(ctx context.Context, ids []string) (map[string][]*Edge, error) {
edges := make(map[string][]*Edge)
if len(ids) == 0 {
return nodes, nil
return edges, nil
}

stmt := spanner.Statement{
SQL: statements.getNodesByID,
SQL: statements.getEdgesBySubjectID,
Params: map[string]interface{}{"ids": ids},
}

err := sc.queryAndCollect(
ctx,
stmt,
func() interface{} {
return &Edge{}
},
func(rowStruct interface{}) {
edge := rowStruct.(*Edge)
subjectID := edge.SubjectID
edges[subjectID] = append(edges[subjectID], edge)
},
)
if err != nil {
return edges, err
}

return edges, nil
}

func (sc *SpannerClient) queryAndCollect(
ctx context.Context,
stmt spanner.Statement,
newStruct func() interface{},
withStruct func(interface{}),
) error {
iter := sc.client.Single().Query(ctx, stmt)
defer iter.Stop()

Expand All @@ -55,15 +85,15 @@ func (sc *SpannerClient) GetNodesByID(ctx context.Context, ids []string) (map[st
break
}
if err != nil {
return nil, fmt.Errorf("failed to fetch row: %w", err)
return fmt.Errorf("failed to fetch row: %w", err)
}

var node Node
if err := row.ToStructLenient(&node); err != nil {
return nil, fmt.Errorf("failed to parse row: %w", err)
rowStruct := newStruct()
if err := row.ToStructLenient(rowStruct); err != nil {
return fmt.Errorf("failed to parse row: %w", err)
}
nodes[node.ID] = &node
withStruct(rowStruct)
}

return nodes, nil
return nil
}

0 comments on commit c163577

Please sign in to comment.