Skip to content

Commit

Permalink
fix: Add record enc/dec to destv1 sourcev2 (#33)
Browse files Browse the repository at this point in the history
* fix: Add record enc/dec to destv1 sourcev2

* lint fix
  • Loading branch information
yevgenypats committed Jun 24, 2023
1 parent 8dc47ce commit 40797be
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 0 deletions.
13 changes: 13 additions & 0 deletions pb/destination/v1/arrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,16 @@ func NewSchemasFromBytes(b [][]byte) ([]*arrow.Schema, error) {
}
return ret, nil
}

func NewRecordFromBytes(b []byte) (arrow.Record, error) {
rdr, err := ipc.NewReader(bytes.NewReader(b))
if err != nil {
return nil, err
}
for rdr.Next() {
rec := rdr.Record()
rec.Retain()
return rec, nil
}
return nil, nil
}
12 changes: 12 additions & 0 deletions pb/source/v2/arrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,15 @@ func SchemasToBytes(schemas []*arrow.Schema) ([][]byte, error) {
}
return ret, nil
}

func RecordToBytes(record arrow.Record) ([]byte, error) {
var buf bytes.Buffer
wr := ipc.NewWriter(&buf, ipc.WithSchema(record.Schema()))
if err := wr.Write(record); err != nil {
return nil, err
}
if err := wr.Close(); err != nil {
return nil, err
}
return buf.Bytes(), nil
}

0 comments on commit 40797be

Please sign in to comment.