From 36497702c74fe2cc5735dbf30da2ce14d1847371 Mon Sep 17 00:00:00 2001 From: Ted Xu Date: Sun, 28 Apr 2024 19:42:08 +0800 Subject: [PATCH] Fixing errors when write multiple batches Signed-off-by: Ted Xu --- internal/datanode/compactor.go | 4 ++-- internal/storage/serde.go | 27 +++++++++++++++------------ internal/storage/serde_test.go | 7 ++++--- 3 files changed, 21 insertions(+), 17 deletions(-) diff --git a/internal/datanode/compactor.go b/internal/datanode/compactor.go index f52d57c44317d..e99642316e6f4 100644 --- a/internal/datanode/compactor.go +++ b/internal/datanode/compactor.go @@ -361,11 +361,11 @@ func (t *compactionTask) merge( } err = writer.Write(v) - numRows++ - unflushedRows++ if err != nil { return nil, nil, 0, err } + numRows++ + unflushedRows++ stats.Update(v.PK) diff --git a/internal/storage/serde.go b/internal/storage/serde.go index bd88b83b87ea0..636e505b83c76 100644 --- a/internal/storage/serde.go +++ b/internal/storage/serde.go @@ -28,6 +28,7 @@ import ( "github.com/apache/arrow/go/v12/arrow" "github.com/apache/arrow/go/v12/arrow/array" "github.com/apache/arrow/go/v12/arrow/memory" + "github.com/apache/arrow/go/v12/parquet" "github.com/apache/arrow/go/v12/parquet/pqarrow" "github.com/cockroachdb/errors" "github.com/golang/protobuf/proto" @@ -749,21 +750,17 @@ var _ RecordWriter = (*singleFieldRecordWriter)(nil) type singleFieldRecordWriter struct { fw *pqarrow.FileWriter fieldId FieldID + schema *arrow.Schema numRows int - grouped bool } func (sfw *singleFieldRecordWriter) Write(r Record) error { - if !sfw.grouped { - sfw.grouped = true - sfw.fw.NewRowGroup() - } - - sfw.numRows++ - // TODO: adding row group support by calling fw.NewRowGroup() + sfw.numRows += r.Len() a := r.Column(sfw.fieldId) - return sfw.fw.WriteColumnData(a) + rec := array.NewRecord(sfw.schema, []arrow.Array{a}, int64(r.Len())) + defer rec.Release() + return sfw.fw.WriteBuffered(rec) } func (sfw *singleFieldRecordWriter) Close() { @@ -772,13 +769,16 @@ func (sfw *singleFieldRecordWriter) Close() { func newSingleFieldRecordWriter(fieldId FieldID, field arrow.Field, writer io.Writer) (*singleFieldRecordWriter, error) { schema := arrow.NewSchema([]arrow.Field{field}, nil) - fw, err := pqarrow.NewFileWriter(schema, writer, nil, pqarrow.DefaultWriterProps()) + fw, err := pqarrow.NewFileWriter(schema, writer, + parquet.NewWriterProperties(parquet.WithMaxRowGroupLength(math.MaxInt64)), // No additional grouping for now. + pqarrow.DefaultWriterProps()) if err != nil { return nil, err } return &singleFieldRecordWriter{ fw: fw, fieldId: fieldId, + schema: schema, }, nil } @@ -801,10 +801,10 @@ func (sw *SerializeWriter[T]) Flush() error { if err != nil { return err } + defer r.Release() if err := sw.rw.Write(r); err != nil { return err } - r.Release() sw.pos = 0 sw.writtenMemorySize += size return nil @@ -829,8 +829,11 @@ func (sw *SerializeWriter[T]) WrittenMemorySize() uint64 { } func (sw *SerializeWriter[T]) Close() error { + if err := sw.Flush(); err != nil { + return err + } sw.rw.Close() - return sw.Flush() + return nil } func NewSerializeRecordWriter[T any](rw RecordWriter, serializer Serializer[T], batchSize int) *SerializeWriter[T] { diff --git a/internal/storage/serde_test.go b/internal/storage/serde_test.go index 17a10e3a2104e..21a871cb5e606 100644 --- a/internal/storage/serde_test.go +++ b/internal/storage/serde_test.go @@ -124,7 +124,7 @@ func TestBinlogSerializeWriter(t *testing.T) { }) t.Run("test serialize", func(t *testing.T) { - size := 3 + size := 16 blobs, err := generateTestData(size) assert.NoError(t, err) reader, err := NewBinlogDeserializeReader(blobs, common.RowIDField) @@ -134,7 +134,7 @@ func TestBinlogSerializeWriter(t *testing.T) { schema := generateTestSchema() // Copy write the generated data writers := NewBinlogStreamWriters(0, 0, 0, schema.Fields) - writer, err := NewBinlogSerializeWriter(schema, 0, 0, writers, 1024) + writer, err := NewBinlogSerializeWriter(schema, 0, 0, writers, 7) assert.NoError(t, err) for i := 1; i <= size; i++ { @@ -143,7 +143,8 @@ func TestBinlogSerializeWriter(t *testing.T) { value := reader.Value() assertTestData(t, i, value) - writer.Write(value) + err := writer.Write(value) + assert.NoError(t, err) } err = reader.Next()