From 40797be0bb62422984845597fbd984b877c76032 Mon Sep 17 00:00:00 2001 From: Yevgeny Pats Date: Sat, 24 Jun 2023 12:56:58 +0300 Subject: [PATCH] fix: Add record enc/dec to destv1 sourcev2 (#33) * fix: Add record enc/dec to destv1 sourcev2 * lint fix --- pb/destination/v1/arrow.go | 13 +++++++++++++ pb/source/v2/arrow.go | 12 ++++++++++++ 2 files changed, 25 insertions(+) diff --git a/pb/destination/v1/arrow.go b/pb/destination/v1/arrow.go index bb77542..0c2f177 100644 --- a/pb/destination/v1/arrow.go +++ b/pb/destination/v1/arrow.go @@ -18,3 +18,16 @@ func NewSchemasFromBytes(b [][]byte) ([]*arrow.Schema, error) { } return ret, nil } + +func NewRecordFromBytes(b []byte) (arrow.Record, error) { + rdr, err := ipc.NewReader(bytes.NewReader(b)) + if err != nil { + return nil, err + } + for rdr.Next() { + rec := rdr.Record() + rec.Retain() + return rec, nil + } + return nil, nil +} diff --git a/pb/source/v2/arrow.go b/pb/source/v2/arrow.go index b63b5de..19152d5 100644 --- a/pb/source/v2/arrow.go +++ b/pb/source/v2/arrow.go @@ -19,3 +19,15 @@ func SchemasToBytes(schemas []*arrow.Schema) ([][]byte, error) { } return ret, nil } + +func RecordToBytes(record arrow.Record) ([]byte, error) { + var buf bytes.Buffer + wr := ipc.NewWriter(&buf, ipc.WithSchema(record.Schema())) + if err := wr.Write(record); err != nil { + return nil, err + } + if err := wr.Close(); err != nil { + return nil, err + } + return buf.Bytes(), nil +}