Skip to content

Commit

Permalink
Fixing errors when write multiple batches
Browse files Browse the repository at this point in the history
Signed-off-by: Ted Xu <ted.xu@zilliz.com>
  • Loading branch information
tedxu committed May 16, 2024
1 parent 675910f commit 3649770
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 17 deletions.
4 changes: 2 additions & 2 deletions internal/datanode/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,11 +361,11 @@ func (t *compactionTask) merge(
}

err = writer.Write(v)
numRows++
unflushedRows++
if err != nil {
return nil, nil, 0, err
}
numRows++
unflushedRows++

stats.Update(v.PK)

Expand Down
27 changes: 15 additions & 12 deletions internal/storage/serde.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/apache/arrow/go/v12/arrow"
"github.com/apache/arrow/go/v12/arrow/array"
"github.com/apache/arrow/go/v12/arrow/memory"
"github.com/apache/arrow/go/v12/parquet"
"github.com/apache/arrow/go/v12/parquet/pqarrow"
"github.com/cockroachdb/errors"
"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -749,21 +750,17 @@ var _ RecordWriter = (*singleFieldRecordWriter)(nil)
type singleFieldRecordWriter struct {
fw *pqarrow.FileWriter
fieldId FieldID
schema *arrow.Schema

numRows int
grouped bool
}

func (sfw *singleFieldRecordWriter) Write(r Record) error {
if !sfw.grouped {
sfw.grouped = true
sfw.fw.NewRowGroup()
}

sfw.numRows++
// TODO: adding row group support by calling fw.NewRowGroup()
sfw.numRows += r.Len()
a := r.Column(sfw.fieldId)
return sfw.fw.WriteColumnData(a)
rec := array.NewRecord(sfw.schema, []arrow.Array{a}, int64(r.Len()))
defer rec.Release()
return sfw.fw.WriteBuffered(rec)
}

func (sfw *singleFieldRecordWriter) Close() {
Expand All @@ -772,13 +769,16 @@ func (sfw *singleFieldRecordWriter) Close() {

func newSingleFieldRecordWriter(fieldId FieldID, field arrow.Field, writer io.Writer) (*singleFieldRecordWriter, error) {
schema := arrow.NewSchema([]arrow.Field{field}, nil)
fw, err := pqarrow.NewFileWriter(schema, writer, nil, pqarrow.DefaultWriterProps())
fw, err := pqarrow.NewFileWriter(schema, writer,
parquet.NewWriterProperties(parquet.WithMaxRowGroupLength(math.MaxInt64)), // No additional grouping for now.
pqarrow.DefaultWriterProps())
if err != nil {
return nil, err
}
return &singleFieldRecordWriter{
fw: fw,
fieldId: fieldId,
schema: schema,
}, nil
}

Expand All @@ -801,10 +801,10 @@ func (sw *SerializeWriter[T]) Flush() error {
if err != nil {
return err
}
defer r.Release()
if err := sw.rw.Write(r); err != nil {
return err
}
r.Release()
sw.pos = 0
sw.writtenMemorySize += size
return nil
Expand All @@ -829,8 +829,11 @@ func (sw *SerializeWriter[T]) WrittenMemorySize() uint64 {
}

func (sw *SerializeWriter[T]) Close() error {
if err := sw.Flush(); err != nil {
return err

Check warning on line 833 in internal/storage/serde.go

View check run for this annotation

Codecov / codecov/patch

internal/storage/serde.go#L833

Added line #L833 was not covered by tests
}
sw.rw.Close()
return sw.Flush()
return nil
}

func NewSerializeRecordWriter[T any](rw RecordWriter, serializer Serializer[T], batchSize int) *SerializeWriter[T] {
Expand Down
7 changes: 4 additions & 3 deletions internal/storage/serde_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func TestBinlogSerializeWriter(t *testing.T) {
})

t.Run("test serialize", func(t *testing.T) {
size := 3
size := 16
blobs, err := generateTestData(size)
assert.NoError(t, err)
reader, err := NewBinlogDeserializeReader(blobs, common.RowIDField)
Expand All @@ -134,7 +134,7 @@ func TestBinlogSerializeWriter(t *testing.T) {
schema := generateTestSchema()
// Copy write the generated data
writers := NewBinlogStreamWriters(0, 0, 0, schema.Fields)
writer, err := NewBinlogSerializeWriter(schema, 0, 0, writers, 1024)
writer, err := NewBinlogSerializeWriter(schema, 0, 0, writers, 7)
assert.NoError(t, err)

for i := 1; i <= size; i++ {
Expand All @@ -143,7 +143,8 @@ func TestBinlogSerializeWriter(t *testing.T) {

value := reader.Value()
assertTestData(t, i, value)
writer.Write(value)
err := writer.Write(value)
assert.NoError(t, err)
}

err = reader.Next()
Expand Down

0 comments on commit 3649770

Please sign in to comment.