Skip to content

Commit 8cc92f4

Browse files
sbinetwesm
authored andcommitted
ARROW-3626: [Go] implement CSV reader
needs #2871 Author: Sebastien Binet <binet@cern.ch> Closes #2872 from sbinet/issue-3626 and squashes the following commits: 72d9d9d <Sebastien Binet> ARROW-3626: implement CSV reader 9dfda70 <Sebastien Binet> ARROW-3625: add examples for Record and Table f2bf762 <Sebastien Binet> ARROW-3627: implement Record builder 8367d64 <Sebastien Binet> make sure Boolean implements array.Interface c055186 <Sebastien Binet> add Release, Reserve and Resize to array.Builder interface 4aaff04 <Sebastien Binet> test allocator interface
1 parent eeaf121 commit 8cc92f4

File tree

14 files changed

+896
-1
lines changed

14 files changed

+896
-1
lines changed

go/arrow/array/boolean.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,3 +72,7 @@ func (a *Boolean) setData(data *Data) {
7272
a.values = vals.Bytes()
7373
}
7474
}
75+
76+
var (
77+
_ Interface = (*Boolean)(nil)
78+
)

go/arrow/array/builder.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ const (
3131

3232
// Builder provides an interface to build arrow arrays.
3333
type Builder interface {
34+
// Retain increases the reference count by 1.
35+
// Retain may be called simultaneously from multiple goroutines.
36+
Retain()
37+
3438
// Release decreases the reference count by 1.
3539
Release()
3640

@@ -47,6 +51,14 @@ type Builder interface {
4751
// AppendNull adds a new null value to the array being built.
4852
AppendNull()
4953

54+
// Reserve ensures there is enough space for appending n elements
55+
// by checking the capacity and calling Resize if necessary.
56+
Reserve(n int)
57+
58+
// Resize adjusts the space allocated by b to n elements. If n is greater than b.Cap(),
59+
// additional memory will be allocated. If n is smaller, the allocated memory may reduced.
60+
Resize(n int)
61+
5062
// NewArray creates a new array from the memory buffers used
5163
// by the builder and resets the Builder so it can be used to build
5264
// a new array.

go/arrow/array/null.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,10 +86,13 @@ func (b *NullBuilder) AppendNull() {
8686
b.builder.nulls++
8787
}
8888

89+
func (*NullBuilder) Reserve(size int) {}
90+
func (*NullBuilder) Resize(size int) {}
91+
8992
func (*NullBuilder) init(cap int) {}
9093
func (*NullBuilder) resize(newBits int, init func(int)) {}
9194

92-
// NewArray creates a List array from the memory buffers used by the builder and resets the NullBuilder
95+
// NewArray creates a Null array from the memory buffers used by the builder and resets the NullBuilder
9396
// so it can be used to build a new array.
9497
func (b *NullBuilder) NewArray() Interface {
9598
return b.NewNullArray()

go/arrow/array/record.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323

2424
"github.com/apache/arrow/go/arrow"
2525
"github.com/apache/arrow/go/arrow/internal/debug"
26+
"github.com/apache/arrow/go/arrow/memory"
2627
)
2728

2829
// RecordReader reads a stream of records.
@@ -115,6 +116,8 @@ type Record interface {
115116

116117
NumRows() int64
117118
NumCols() int64
119+
120+
Columns() []Interface
118121
Column(i int) Interface
119122
ColumnName(i int) string
120123

@@ -217,6 +220,7 @@ func (rec *simpleRecord) Release() {
217220
func (rec *simpleRecord) Schema() *arrow.Schema { return rec.schema }
218221
func (rec *simpleRecord) NumRows() int64 { return rec.rows }
219222
func (rec *simpleRecord) NumCols() int64 { return int64(len(rec.arrs)) }
223+
func (rec *simpleRecord) Columns() []Interface { return rec.arrs }
220224
func (rec *simpleRecord) Column(i int) Interface { return rec.arrs[i] }
221225
func (rec *simpleRecord) ColumnName(i int) string { return rec.schema.Field(i).Name }
222226

@@ -239,6 +243,91 @@ func (rec *simpleRecord) NewSlice(i, j int64) Record {
239243
return NewRecord(rec.schema, arrs, j-i)
240244
}
241245

246+
// RecordBuilder eases the process of building a Record, iteratively, from
247+
// a known Schema.
248+
type RecordBuilder struct {
249+
refCount int64
250+
mem memory.Allocator
251+
schema *arrow.Schema
252+
fields []Builder
253+
}
254+
255+
// NewRecordBuilder returns a builder, using the provided memory allocator and a schema.
256+
func NewRecordBuilder(mem memory.Allocator, schema *arrow.Schema) *RecordBuilder {
257+
b := &RecordBuilder{
258+
refCount: 1,
259+
mem: mem,
260+
schema: schema,
261+
fields: make([]Builder, len(schema.Fields())),
262+
}
263+
264+
for i, f := range schema.Fields() {
265+
b.fields[i] = newBuilder(b.mem, f.Type)
266+
}
267+
268+
return b
269+
}
270+
271+
// Retain increases the reference count by 1.
272+
// Retain may be called simultaneously from multiple goroutines.
273+
func (b *RecordBuilder) Retain() {
274+
atomic.AddInt64(&b.refCount, 1)
275+
}
276+
277+
// Release decreases the reference count by 1.
278+
func (b *RecordBuilder) Release() {
279+
debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases")
280+
281+
for _, f := range b.fields {
282+
f.Release()
283+
}
284+
285+
if atomic.AddInt64(&b.refCount, -1) == 0 {
286+
b.fields = nil
287+
}
288+
}
289+
290+
func (b *RecordBuilder) Schema() *arrow.Schema { return b.schema }
291+
func (b *RecordBuilder) Fields() []Builder { return b.fields }
292+
func (b *RecordBuilder) Field(i int) Builder { return b.fields[i] }
293+
294+
func (b *RecordBuilder) Reserve(size int) {
295+
for _, f := range b.fields {
296+
f.Reserve(size)
297+
}
298+
}
299+
300+
// NewRecord creates a new record from the memory buffers and resets the
301+
// RecordBuilder so it can be used to build a new record.
302+
//
303+
// The returned Record must be Release()'d after use.
304+
//
305+
// NewRecord panics if the fields' builder do not have the same length.
306+
func (b *RecordBuilder) NewRecord() Record {
307+
cols := make([]Interface, len(b.fields))
308+
rows := int64(0)
309+
310+
defer func(cols []Interface) {
311+
for _, col := range cols {
312+
if col == nil {
313+
continue
314+
}
315+
col.Release()
316+
}
317+
}(cols)
318+
319+
for i, f := range b.fields {
320+
cols[i] = f.NewArray()
321+
irow := int64(cols[i].Len())
322+
if i > 0 && irow != rows {
323+
panic(fmt.Errorf("arrow/array: field %d has %d rows. want=%d", i, irow, rows))
324+
}
325+
rows = irow
326+
}
327+
328+
return NewRecord(b.schema, cols, rows)
329+
}
330+
242331
var (
243332
_ Record = (*simpleRecord)(nil)
244333
_ RecordReader = (*simpleRecords)(nil)

go/arrow/array/record_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,9 @@ func TestRecord(t *testing.T) {
7272
if got, want := rec.NumCols(), int64(2); got != want {
7373
t.Fatalf("invalid number of columns: got=%d, want=%d", got, want)
7474
}
75+
if got, want := rec.Columns()[0], cols[0]; got != want {
76+
t.Fatalf("invalid column: got=%q, want=%q", got, want)
77+
}
7578
if got, want := rec.Column(0), cols[0]; got != want {
7679
t.Fatalf("invalid column: got=%q, want=%q", got, want)
7780
}
@@ -345,3 +348,43 @@ func TestRecordReader(t *testing.T) {
345348
})
346349
}
347350
}
351+
352+
func TestRecordBuilder(t *testing.T) {
353+
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
354+
defer mem.AssertSize(t, 0)
355+
356+
schema := arrow.NewSchema(
357+
[]arrow.Field{
358+
arrow.Field{Name: "f1-i32", Type: arrow.PrimitiveTypes.Int32},
359+
arrow.Field{Name: "f2-f64", Type: arrow.PrimitiveTypes.Float64},
360+
},
361+
nil,
362+
)
363+
364+
b := array.NewRecordBuilder(mem, schema)
365+
defer b.Release()
366+
367+
b.Retain()
368+
b.Release()
369+
370+
b.Field(0).(*array.Int32Builder).AppendValues([]int32{1, 2, 3, 4, 5, 6}, nil)
371+
b.Field(0).(*array.Int32Builder).AppendValues([]int32{7, 8, 9, 10}, nil)
372+
b.Field(1).(*array.Float64Builder).AppendValues([]float64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)
373+
374+
rec := b.NewRecord()
375+
defer rec.Release()
376+
377+
if got, want := rec.Schema(), schema; !got.Equal(want) {
378+
t.Fatalf("invalid schema: got=%#v, want=%#v", got, want)
379+
}
380+
381+
if got, want := rec.NumRows(), int64(10); got != want {
382+
t.Fatalf("invalid number of rows: got=%d, want=%d", got, want)
383+
}
384+
if got, want := rec.NumCols(), int64(2); got != want {
385+
t.Fatalf("invalid number of columns: got=%d, want=%d", got, want)
386+
}
387+
if got, want := rec.ColumnName(0), schema.Field(0).Name; got != want {
388+
t.Fatalf("invalid column name: got=%q, want=%q", got, want)
389+
}
390+
}

go/arrow/array/table.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,32 @@ func NewTable(schema *arrow.Schema, cols []Column, rows int64) *simpleTable {
246246
return &tbl
247247
}
248248

249+
// NewTableFromRecords returns a new basic, non-lazy in-memory table.
250+
//
251+
// NewTableFromRecords panics if the records and schema are inconsistent.
252+
func NewTableFromRecords(schema *arrow.Schema, recs []Record) *simpleTable {
253+
arrs := make([]Interface, len(recs))
254+
cols := make([]Column, len(schema.Fields()))
255+
256+
defer func(cols []Column) {
257+
for i := range cols {
258+
cols[i].Release()
259+
}
260+
}(cols)
261+
262+
for i := range cols {
263+
field := schema.Field(i)
264+
for j, rec := range recs {
265+
arrs[j] = rec.Column(i)
266+
}
267+
chunk := NewChunked(field.Type, arrs)
268+
cols[i] = *NewColumn(field, chunk)
269+
chunk.Release()
270+
}
271+
272+
return NewTable(schema, cols, -1)
273+
}
274+
249275
func (tbl *simpleTable) Schema() *arrow.Schema { return tbl.schema }
250276
func (tbl *simpleTable) NumRows() int64 { return tbl.rows }
251277
func (tbl *simpleTable) NumCols() int64 { return int64(len(tbl.cols)) }

go/arrow/array/table_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -556,6 +556,52 @@ func TestTable(t *testing.T) {
556556
}
557557
}
558558

559+
func TestTableFromRecords(t *testing.T) {
560+
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
561+
defer mem.AssertSize(t, 0)
562+
563+
schema := arrow.NewSchema(
564+
[]arrow.Field{
565+
arrow.Field{Name: "f1-i32", Type: arrow.PrimitiveTypes.Int32},
566+
arrow.Field{Name: "f2-f64", Type: arrow.PrimitiveTypes.Float64},
567+
},
568+
nil,
569+
)
570+
571+
b := array.NewRecordBuilder(mem, schema)
572+
defer b.Release()
573+
574+
b.Field(0).(*array.Int32Builder).AppendValues([]int32{1, 2, 3, 4, 5, 6}, nil)
575+
b.Field(0).(*array.Int32Builder).AppendValues([]int32{7, 8, 9, 10}, []bool{true, true, false, true})
576+
b.Field(1).(*array.Float64Builder).AppendValues([]float64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)
577+
578+
rec1 := b.NewRecord()
579+
defer rec1.Release()
580+
581+
b.Field(0).(*array.Int32Builder).AppendValues([]int32{11, 12, 13, 14, 15, 16, 17, 18, 19, 20}, nil)
582+
b.Field(1).(*array.Float64Builder).AppendValues([]float64{11, 12, 13, 14, 15, 16, 17, 18, 19, 20}, nil)
583+
584+
rec2 := b.NewRecord()
585+
defer rec2.Release()
586+
587+
tbl := array.NewTableFromRecords(schema, []array.Record{rec1, rec2})
588+
defer tbl.Release()
589+
590+
if got, want := tbl.Schema(), schema; !got.Equal(want) {
591+
t.Fatalf("invalid schema: got=%#v, want=%#v", got, want)
592+
}
593+
594+
if got, want := tbl.NumRows(), int64(20); got != want {
595+
t.Fatalf("invalid number of rows: got=%d, want=%d", got, want)
596+
}
597+
if got, want := tbl.NumCols(), int64(2); got != want {
598+
t.Fatalf("invalid number of columns: got=%d, want=%d", got, want)
599+
}
600+
if got, want := tbl.Column(0).Name(), schema.Field(0).Name; got != want {
601+
t.Fatalf("invalid column: got=%q, want=%q", got, want)
602+
}
603+
}
604+
559605
func TestTableReader(t *testing.T) {
560606
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
561607
defer mem.AssertSize(t, 0)

0 commit comments

Comments
 (0)