Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GRIPPER plugin driver #231

Merged
merged 89 commits into from
Dec 15, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
89 commits
Select commit Hold shift + click to select a range
9442de6
Starting work on a prototype
kellrott Nov 25, 2019
e125230
Working initial prototype of tab file index and retrieve
kellrott Nov 26, 2019
7ba889f
Changing tabular init to use bulkwrite
kellrott Nov 26, 2019
b73f330
Checking to see if a file is already indexed
kellrott Nov 26, 2019
50e1987
Prototyped vertex listing
kellrott Nov 26, 2019
52f8d77
Connecting shim tabular driver to query engine
kellrott Nov 27, 2019
bc7c2ac
Loading graph based on config, adding test graph
kellrott Nov 27, 2019
e756ab6
Adding more graph access functions to tabular driver
kellrott Nov 27, 2019
5cad85a
Adding label based vertex listing
kellrott Dec 1, 2019
a5295ed
Adding column indexing to tabular driver
kellrott Dec 3, 2019
d65e3eb
Working on tabular edge construction. Still not working
kellrott Dec 7, 2019
64b242e
Some minor tweaks. But the way edges are stored still needs to be ref…
kellrott Dec 9, 2019
40d13dc
Merge remote-tracking branch 'origin/master' into tabular-driver
kellrott Jan 25, 2020
11c6173
Reworking organization of the code. It's broken right now.
kellrott Jan 27, 2020
d618815
Tabular vertex listing working again
kellrott Jan 27, 2020
16507ba
Starting to rework the config file. Should make data structures more …
kellrott Jan 27, 2020
8e827cd
Refining prototype of tabular graph config
kellrott Feb 4, 2020
862a7c9
Continuing to change the tabular driver config as I work out how edge…
kellrott Feb 5, 2020
3f93dee
Working on edge definitions
kellrott Feb 5, 2020
cb48562
Cleaning up the index interface a little, so we can extend the row in…
kellrott Feb 5, 2020
3a59e80
Indexing fields in tabular files now
kellrott Feb 6, 2020
3617463
Working out edge command
kellrott Feb 6, 2020
5db3b62
Working on prototype on web based tabular driver
kellrott Feb 7, 2020
cb0cf96
Adding another endpoint to the web based demo graph
kellrott Feb 7, 2020
70dfe68
Changing query compiler so that optimizer steps can be customized
kellrott Feb 7, 2020
c6bf0ab
Custom label based table scanning optimization for tabular graph driver
kellrott Feb 7, 2020
3a975ac
Prototyping a templated get query config
kellrott Feb 7, 2020
2076cd2
Implementing single row get from web driver
kellrott Feb 7, 2020
35418e0
Working web join
kellrott Feb 7, 2020
c8b5557
Adding tabular single query command to base grip program
kellrott Feb 7, 2020
9f8769d
Removing extra print statements
kellrott Feb 7, 2020
4e5fd52
Working on new cross site example
kellrott Feb 7, 2020
493fef4
Switching prefixes to be more URL like
kellrott Feb 8, 2020
1b85428
Adding a context to all streaming graph requests.
kellrott Feb 8, 2020
8037017
Tabular driver now plugs into the main server
kellrott Feb 8, 2020
a32e5b4
More work on example queries
kellrott Feb 9, 2020
0a65830
Refining the cache interface inside of the tabular driver
kellrott Feb 10, 2020
e30e2c6
Adding caching for things stored as lists
kellrott Feb 10, 2020
cdba691
Now getting rows by ids, on web tables only accessable via listing, w…
kellrott Feb 10, 2020
d651572
Removing caching from the GDC table right now
kellrott Feb 11, 2020
48d9165
Making sure the cache is only updated if needed
kellrott Feb 11, 2020
9ae185c
Working on queries
kellrott Feb 11, 2020
89e6d4b
Now multiple edge drivers can connect to the same vertex
kellrott Feb 11, 2020
d9a2280
Merge remote-tracking branch 'origin/master' into multi-driver
kellrott Mar 24, 2020
3829bb9
Renaming driver to something more descriptive
kellrott Mar 24, 2020
0c9bad0
Fixing some printf formatting that caused issues on unit testing
kellrott Mar 24, 2020
172bb71
Doing formatting cleanup
kellrott Mar 24, 2020
76dd21e
Working on protobuf based plugin driver for graph multidriver
kellrott Mar 28, 2020
b5217a3
Working on building multi test graph based on conformance test swapi
kellrott Mar 28, 2020
257e184
Refactoring to the DIG driver
kellrott Mar 29, 2020
7531098
Working edge table connector for DIG
kellrott Mar 30, 2020
69111f7
Adding edge queries for Dig driver
kellrott Mar 30, 2020
00811e9
Working on more test graphs
kellrott Mar 31, 2020
efc1162
Starting to convert some of the GDC scrapper code to the DIG system
kellrott Mar 31, 2020
0ae05c1
Adding FieldToField edge connections to out().
kellrott Mar 31, 2020
7194eba
Debugging GDC proxy
kellrott Apr 1, 2020
a53ca9d
Updating test queries
kellrott Apr 1, 2020
b4afbfe
Adding more edges to GDC test, getting FieldToField incoming edges to…
kellrott Apr 1, 2020
e8b1d80
Updating queries and fixing javascript client
kellrott Apr 1, 2020
547cf30
Setting up more of the swapi graph so the dig driver can be run throu…
kellrott Apr 2, 2020
d18c2a5
Adding some error checking for the commit file
kellrott Apr 11, 2020
04d4236
Adding README to describe plugin drivers
kellrott Jul 8, 2020
0e8ad28
Renaming plugin system to be GRIPPER
kellrott Jul 8, 2020
41b3ba6
Adding gripper tests into Travis config
kellrott Jul 8, 2020
089666c
Fixing issues found by linter
kellrott Jul 9, 2020
edeffe9
Fixing lint issues
kellrott Jul 9, 2020
8c0aa2c
Fixing linting issues
kellrott Jul 9, 2020
f5e8f50
Adding missing import from table gripper
kellrott Jul 9, 2020
63ae8c9
Working on out by EdgeTable
kellrott Jul 9, 2020
bea0869
Fixing local variable issue in GetVertexChannel
kellrott Jul 9, 2020
b7be3c0
Working on in/out edges
kellrott Jul 10, 2020
f67cc0c
Upping the number of thread workers
kellrott Jul 10, 2020
f8d5cf9
Working on aggregration testing
kellrott Aug 27, 2020
09c37cf
Adding another edge type to the swapi table view
kellrott Aug 27, 2020
2dcf1b8
Improving gripper conformance graph
kellrott Aug 27, 2020
c506f0b
Fixing a few more issues with table graph build
kellrott Aug 27, 2020
4b4aadc
Starting to add code that will be used to manage the multiple connect…
kellrott Aug 28, 2020
e2639ee
Working on making sure outputs are in a consistant order
kellrott Sep 1, 2020
fcdb355
Working on normalizing edge id generation
kellrott Sep 1, 2020
0018b46
Working on edge lookups
kellrott Sep 1, 2020
90e133c
Fixing more edge reading and improving conformance
kellrott Sep 1, 2020
0a1d92a
Refactoring conformance test so that it isn't continually writing the…
kellrott Sep 2, 2020
d9b3ece
Fixing issues on gripper edges
kellrott Sep 2, 2020
f3fa310
Gripper plugin system now passing conformance tests
kellrott Sep 2, 2020
e15d30b
Fixing linting issues
kellrott Sep 4, 2020
5b9e8da
Adding missing figure to docs
kellrott Oct 5, 2020
4221ee0
Update README.md
kellrott Nov 9, 2020
4d36180
Fixing the GDC gripper example
kellrott Nov 10, 2020
343496d
Updating readme
kellrott Nov 10, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Adding caching for things stored as lists
  • Loading branch information
kellrott committed Feb 10, 2020
commit e30e2c6a38c0d6eb8f4c99449567b468062ffc21
1 change: 1 addition & 0 deletions tabular/gdc-graph/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ tables:
elementList: "$.data.hits"
params:
size: "100"
cache: true
pager:
code: >
def pager(url, result):
Expand Down
21 changes: 21 additions & 0 deletions tabular/kvcache/index_ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,27 @@ func (t *KVCache) NewLineIndex( path string ) (tabular.LineIndex, error) {
return &LineIndex{t,num}, nil
}

func (t *KVCache) GetRowStorage(path string) (tabular.RowStorage, error) {
pk := PathKey(path)
v, err := t.KV.Get(pk)
if err != nil {
return nil, err
}
o, _ := binary.Uvarint(v)
return &KVRowStorage{t, o}, nil
}

func (t *KVCache) NewRowStorage( path string ) (tabular.RowStorage, error) {
ok := PathNumKey()
num := uint64(0)
if v, err := t.KV.Get(ok); err == nil {
num, _ = binary.Uvarint(v)
}
b := make([]byte, binary.MaxVarintLen64)
binary.PutUvarint(b, num+1)
t.KV.Set(ok, b) //Make part of same transaction as Get above?
return &KVRowStorage{t,num}, nil
}


func (t *LineIndex) AddIndexedField(colName string) {
Expand Down
30 changes: 30 additions & 0 deletions tabular/kvcache/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ var idPrefix = []byte("g")
var countPrefix = []byte("n")
var idxPrefix = []byte("x")

var rowPrefix = []byte("r")

func PathNumKey() []byte {
return pathCount
}
Expand Down Expand Up @@ -72,3 +74,31 @@ func LineCountKey(pathID uint64) []byte {
binary.PutUvarint(out[1:binary.MaxVarintLen64+1], pathID)
return out
}

func RowPrefix(pathID uint64) []byte {
out := make([]byte, 1 + binary.MaxVarintLen64)
out[0] = rowPrefix[0]
binary.PutUvarint(out[1:binary.MaxVarintLen64+1], pathID)
return out
}

func RowKey(pathID uint64, id string) []byte {
p := []byte(id)
out := make([]byte, 1 + binary.MaxVarintLen64 + len(p))
out[0] = rowPrefix[0]
binary.PutUvarint(out[1:binary.MaxVarintLen64+1], pathID)
for i := 0; i < len(p); i++ {
out[i+1+binary.MaxVarintLen64] = p[i]
}
return out
}

func RowKeyParse(key []byte) (uint64, []byte) {
pathID, _ := binary.Uvarint(key[1:binary.MaxVarintLen64+1])
sLen := len(key) - (binary.MaxVarintLen64+1)
id := make([]byte, sLen)
for i := 0; i < sLen; i++ {
id[i] = key[i+1+binary.MaxVarintLen64]
}
return pathID, id
}
57 changes: 57 additions & 0 deletions tabular/kvcache/row_storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package kvcache

import (
"log"
"context"
"bytes"
"encoding/json"
"github.com/bmeg/grip/tabular"
"github.com/bmeg/grip/kvi"
)

type KVRowStorage struct {
cache *KVCache
pathID uint64
}


func (r *KVRowStorage) Write(row *tabular.TableRow) error {
key := RowKey(r.pathID, row.Key)
buf := bytes.Buffer{}
enc := json.NewEncoder(&buf)
if err := enc.Encode(row.Values); err != nil {
log.Printf("Encode error: %s", err)
}
return r.cache.KV.Set(key, buf.Bytes())
}


func (r *KVRowStorage) GetRowsByField(ctx context.Context, field string, value string) chan *tabular.TableRow {
out := make(chan *tabular.TableRow, 10)
go func() {
defer close(out)
prefix := RowPrefix(r.pathID)
r.cache.KV.View(func(it kvi.KVIterator) error {
for it.Seek(prefix); it.Valid() && bytes.HasPrefix(it.Key(), prefix); it.Next() {
select {
case <-ctx.Done():
return nil
default:
}
_, id := RowKeyParse(it.Key())
data := map[string]interface{}{}
v, _ := it.Value()
buf := bytes.NewBuffer(v)
dec := json.NewDecoder(buf)
if err := dec.Decode(&data); err != nil {
log.Printf("Decode Error: %s", err)
}
if tabular.FieldFilter(field, value, data) {
out <- &tabular.TableRow{Key:string(id), Values:data}
}
}
return nil
})
}()
return out
}
34 changes: 34 additions & 0 deletions tabular/tabular.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package tabular
import (
"fmt"
"context"
"strings"
"github.com/oliveagle/jsonpath"
)


Expand Down Expand Up @@ -42,10 +44,18 @@ type LineIndexWriter interface {
IndexRow( line uint64, row map[string]interface{}) error
}

type RowStorage interface {
Write(row *TableRow) error
GetRowsByField(ctx context.Context, field string, value string) chan *TableRow
}

//Cache
type Cache interface {
NewLineIndex(path string) (LineIndex, error)
GetLineIndex(path string) (LineIndex, error)

NewRowStorage(path string) (RowStorage, error)
GetRowStorage(path string) (RowStorage, error)
}

type Options struct {
Expand Down Expand Up @@ -84,3 +94,27 @@ func (t *TableManager) NewDriver(name string, url string, opts Options) (Driver,
func NewCache(path string) (Cache, error) {
return cacheMap["kv"](path)
}


func pathFix(p string) string {
if !strings.HasPrefix(p, "$.") {
return "$." + p
}
return p
}

func FieldFilter(field string, value string, data map[string]interface{}) bool {
if field == "" {
return true
}
v, err := jsonpath.JsonPathLookup(data, pathFix(field) )
if err != nil {
return false
}
if valStr, ok := v.(string); ok {
if valStr == value {
return true
}
}
return false
}
30 changes: 27 additions & 3 deletions tabular/web/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
type Driver struct {
conf Config
opts tabular.Options
cache tabular.Cache
rowStorage tabular.RowStorage
}

type QueryConfig struct {
Expand All @@ -26,6 +28,7 @@ type QueryConfig struct {
Element string `json:"element"`
Params map[string]string `json:"params"`
Headers []string `json:"headers"`
Cache bool `json:"cache"`
}

type Config struct {
Expand All @@ -34,7 +37,7 @@ type Config struct {
}

func TSVDriverBuilder(url string, manager tabular.Cache, opts tabular.Options) (tabular.Driver, error) {
o := Driver{opts:opts}
o := Driver{opts:opts, cache:manager}
conf := Config{}
err := mapstructure.Decode(opts.Config, &conf)
if err != nil {
Expand All @@ -60,7 +63,21 @@ func pathFix(p string) string {
return p
}

func (d *Driver) GetRows(ctx context.Context) chan *tabular.TableRow {
func (d *Driver) buildCache() {
if d.conf.List != nil && d.conf.List.Cache {
url := d.conf.List.URL //BUG: need to make sure URL is unique in config file
var err error
if d.rowStorage, err = d.cache.GetRowStorage(url); err != nil {
d.rowStorage, _ = d.cache.NewRowStorage(url)
}
log.Printf("Caching %s", d.conf.List.URL)
for row := range d.fetchRows(context.TODO()) {
d.rowStorage.Write(row)
}
}
}

func (d *Driver) fetchRows(ctx context.Context) chan *tabular.TableRow {
out := make(chan *tabular.TableRow, 10)
go func() {
defer close(out)
Expand Down Expand Up @@ -110,10 +127,17 @@ func (d *Driver) GetRows(ctx context.Context) chan *tabular.TableRow {
}
}
}()

return out
}

func (d *Driver) GetRows(ctx context.Context) chan *tabular.TableRow {
d.buildCache()
if d.rowStorage != nil {
return d.rowStorage.GetRowsByField(ctx, "", "")
}
return d.fetchRows(ctx)
}

func (d *Driver) GetRowByID(id string) (*tabular.TableRow, error) {
log.Printf("Getting row: %s", id)
if tableGet, ok := d.conf.Get[d.opts.PrimaryKey]; ok {
Expand Down