Skip to content

Commit

Permalink
multi frame
Browse files Browse the repository at this point in the history
  • Loading branch information
scottlepp committed Feb 22, 2024
1 parent 4590955 commit b422bec
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 58 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
var values = []string{"test"}
frame := data.NewFrame("foo", data.NewField("value", nil, values))
frame.RefID = "foo"
frames := []*data.Frame{frame}
res, err := db.QueryFrame("foo", "select * from foo", frame)
res, err := db.QueryFrames("foo", "select * from foo", frames)
```
112 changes: 69 additions & 43 deletions pkg/data/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,57 +16,70 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/data"
)

func ToParquet(frame *data.Frame, chunk int) (string, string, error) {
schema, err := MarshalArrow(frame)
if err != nil {
return "", "", err
}
func ToParquet(frames []*data.Frame, chunk int) (map[string]string, error) {
dirs := map[string]string{}
frameIndex := framesByRef(frames)
for _, frameList := range frameIndex {

dir, err := os.MkdirTemp("", "duck")
if err != nil {
return nil, err
}

data := frameData(frame)
for _, frame := range frameList {
dirs[frame.RefID] = dir

schema, err := MarshalArrow(frame)
if err != nil {
return nil, err
}

data := frameData(frame)

if chunk > 0 {
var wg sync.WaitGroup
errCh := make(chan error, 10)
// write files in chunks
chunks := makeChunks(data, chunk)
for i, chunk := range chunks {
wg.Add(1)

go func(chunk FrameData, idx int) error {
defer wg.Done()
raw, err := json.Marshal(chunk)
if err != nil {
return err
}
name := fmt.Sprintf("%s%d", frame.RefID, idx)
_, _, err = write(dir, name, schema, raw)
return err
}(chunk, i)
}

dir, err := os.MkdirTemp("", "duck")
if err != nil {
return "", "", err
}
go func() {
wg.Wait()
close(errCh)
}()

if chunk > 0 {
var wg sync.WaitGroup
errCh := make(chan error, 10)
// write files in chunks
chunks := makeChunks(data, chunk)
for i, chunk := range chunks {
wg.Add(1)

go func(chunk FrameData, idx int) error {
defer wg.Done()
raw, err := json.Marshal(chunk)
if err != nil {
return err
for err := range errCh {
return nil, err
}
name := fmt.Sprintf("%s%d", frame.RefID, idx)
_, _, err = write(dir, name, schema, raw)
return err
}(chunk, i)
}

go func() {
wg.Wait()
close(errCh)
}()

for err := range errCh {
return "", "", err
}
continue
}

return dir, "", nil
}
raw, err := json.Marshal(data)
if err != nil {
return nil, err
}

raw, err := json.Marshal(data)
if err != nil {
return "", "", err
_, _, err = write(dir, frame.RefID, schema, raw)
if err != nil {
return nil, err
}
}
}

return write(dir, frame.RefID, schema, raw)
return dirs, nil
}

func frameData(frame *data.Frame) FrameData {
Expand Down Expand Up @@ -131,3 +144,16 @@ func makeChunks(xs FrameData, chunkSize int) []FrameData {
divided[i] = xs[prev:]
return divided
}

func framesByRef(frames []*data.Frame) map[string][]*data.Frame {
byRef := map[string][]*data.Frame{}
for _, f := range frames {
fr := byRef[f.RefID]
if fr == nil {
refFrames := []*data.Frame{}
byRef[f.RefID] = refFrames
}
byRef[f.RefID] = append(byRef[f.RefID], f)
}
return byRef
}
5 changes: 3 additions & 2 deletions pkg/data/parquet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ func TestWrite(t *testing.T) {
var values = []string{"test"}
frame := data.NewFrame("foo", data.NewField("value", nil, values))
frame.RefID = "foo"
dir, file, err := ToParquet(frame, 0)
frames := []*data.Frame{frame}

dir, err := ToParquet(frames, 0)
if err != nil {
fmt.Println(err.Error())
t.Fail()
}
fmt.Println(dir)
fmt.Println(file)
}

func TestRead(t *testing.T) {
Expand Down
61 changes: 52 additions & 9 deletions pkg/duckdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os/exec"

sdk "github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana-plugin-sdk-go/data/framestruct"
"github.com/scottlepp/go-duck/pkg/data"
)

Expand Down Expand Up @@ -93,34 +94,66 @@ func (d *DuckDB) Query(query string) (string, error) {
}

// QueryFrame will load a dataframe into a view named RefID, and run the query against that view
func (d *DuckDB) QueryFrame(name string, query string, frame *sdk.Frame) (string, error) {
dir, _, err := data.ToParquet(frame, d.Chunk)
func (d *DuckDB) QueryFrames(name string, query string, frames []*sdk.Frame) (string, error) {
dirs, err := data.ToParquet(frames, d.Chunk)
if err != nil {
return "", err
}

defer func() {
err := os.RemoveAll(dir)
if err != nil {
fmt.Println("failed to remove parquet files")
for _, dir := range dirs {
err := os.RemoveAll(dir)
if err != nil {
fmt.Println("failed to remove parquet files")
}
}
}()

cmd := fmt.Sprintf("CREATE VIEW %s AS (SELECT * from '%s/*.parquet');", frame.RefID, dir)
commands := []string{cmd, query}
commands := []string{}
created := map[string]bool{}
for _, frame := range frames {
if created[frame.RefID] {
continue
}
cmd := fmt.Sprintf("CREATE VIEW %s AS (SELECT * from '%s/*.parquet');", frame.RefID, dirs[frame.RefID])
commands = append(commands, cmd)
created[frame.RefID] = true
}

commands = append(commands, query)
res, err := d.RunCommands(commands)
if err != nil {
return "", err
}
return res, nil
}

func (d *DuckDB) QueryFrameInto(name string, query string, frame *sdk.Frame, v any) (any, error) {
res, err := d.QueryFrame(name, query, frame)
func (d *DuckDB) QueryFramesInto(name string, query string, frames []*sdk.Frame, v any) (any, error) {
res, err := d.QueryFrames(name, query, frames)
if err != nil {
return "", err
}

// if v is a frame then return a new frame with the results
if f := isFrame(v); f != nil {
var data []map[string]any
err := json.Unmarshal([]byte(res), &data)
if err != nil {
return nil, err
}
resultsFrame, err := framestruct.ToDataFrame(name, data)
if err != nil {
return nil, err
}

f.Fields = resultsFrame.Fields
f.Name = resultsFrame.Name
f.Meta = resultsFrame.Meta
f.RefID = resultsFrame.RefID

return resultsFrame, nil
}

err = json.Unmarshal([]byte(res), v)
if err != nil {
return "", err
Expand Down Expand Up @@ -149,3 +182,13 @@ func defaultInt(val int, dflt int) int {
}
return val
}

func isFrame(v any) *sdk.Frame {
if f, ok := v.(*sdk.Frame); ok {
return f
}
if f, ok := v.(sdk.Frame); ok {
return &f
}
return nil
}
29 changes: 26 additions & 3 deletions pkg/duckdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ func TestQueryFrame(t *testing.T) {
var values = []string{"test"}
frame := data.NewFrame("foo", data.NewField("value", nil, values))
frame.RefID = "foo"
frames := []*data.Frame{frame}

res, err := db.QueryFrame("foo", "select * from foo", frame)
res, err := db.QueryFrames("foo", "select * from foo", frames)
assert.Nil(t, err)

assert.Contains(t, res, `[{"value":"test"}]`)
Expand All @@ -64,8 +65,9 @@ func TestQueryFrameChunks(t *testing.T) {
var values = []string{"test", "test", "test", "test", "test", "test2"}
frame := data.NewFrame("foo", data.NewField("value", nil, values))
frame.RefID = "foo"
frames := []*data.Frame{frame}

res, err := db.QueryFrame("foo", "select * from foo", frame)
res, err := db.QueryFrames("foo", "select * from foo", frames)
assert.Nil(t, err)

assert.Contains(t, res, `test2`)
Expand All @@ -77,12 +79,33 @@ func TestQueryFrameInto(t *testing.T) {
var values = []string{"test"}
frame := data.NewFrame("foo", data.NewField("value", nil, values))
frame.RefID = "foo"
frames := []*data.Frame{frame}

model := []map[string]any{}
_, err := db.QueryFrameInto("foo", "select * from foo", frame, &model)
_, err := db.QueryFramesInto("foo", "select * from foo", frames, &model)
assert.Nil(t, err)

assert.Equal(t, 1, len(model))
raw := fmt.Sprintf("%s", model)
assert.Contains(t, raw, "test")
}

func TestQueryFrameIntoFrame(t *testing.T) {
db := NewInMemoryDB()

var values = []string{"test"}
frame := data.NewFrame("foo", data.NewField("value", nil, values))
frame.RefID = "foo"

var values2 = []string{"foo"}
frame2 := data.NewFrame("foo", data.NewField("value", nil, values2))
frame2.RefID = "foo"

frames := []*data.Frame{frame, frame2}

model := &data.Frame{}
_, err := db.QueryFramesInto("foo", "select * from foo", frames, model)
assert.Nil(t, err)

assert.Equal(t, 1, model.Rows())
}

0 comments on commit b422bec

Please sign in to comment.