Skip to content

Commit 4a1f315

Browse files
authored
fix: Process nulls for tested types, too (maps, lists, structs) (#1125)
1 parent b2e6331 commit 4a1f315

File tree

2 files changed

+140
-45
lines changed

2 files changed

+140
-45
lines changed

plugin/nulls.go

Lines changed: 78 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -6,75 +6,108 @@ import (
66
"github.com/apache/arrow/go/v13/arrow/memory"
77
)
88

9-
func stripNullsFromLists(record arrow.Record) arrow.Record {
10-
cols := record.Columns()
11-
for c, col := range cols {
12-
list, ok := col.(array.ListLike)
13-
if !ok {
14-
continue
15-
}
16-
if _, ok := list.(*array.Map); ok {
17-
// maps also correspond to array.ListLike
9+
func stripNullsFromLists(list array.ListLike) array.ListLike {
10+
// TODO: handle Arrow maps separately if required
11+
12+
if list.NullN() == 0 {
13+
return list
14+
}
15+
16+
bldr := array.NewBuilder(memory.DefaultAllocator, list.DataType()).(array.ListLikeBuilder)
17+
for j := 0; j < list.Len(); j++ {
18+
if list.IsNull(j) {
19+
bldr.AppendNull()
1820
continue
1921
}
20-
21-
bldr := array.NewListBuilder(memory.DefaultAllocator, list.DataType().(arrow.ListLikeType).Elem())
22-
for j := 0; j < list.Len(); j++ {
23-
if list.IsNull(j) {
24-
bldr.AppendNull()
22+
bldr.Append(true)
23+
vBldr := bldr.ValueBuilder()
24+
from, to := list.ValueOffsets(j)
25+
slc := array.NewSlice(list.ListValues(), from, to)
26+
for k := 0; k < int(to-from); k++ {
27+
if slc.IsNull(k) {
2528
continue
2629
}
27-
bldr.Append(true)
28-
vBldr := bldr.ValueBuilder()
29-
from, to := list.ValueOffsets(j)
30-
slc := array.NewSlice(list.ListValues(), from, to)
31-
for k := 0; k < int(to-from); k++ {
32-
if slc.IsNull(k) {
33-
continue
34-
}
35-
err := vBldr.AppendValueFromString(slc.ValueStr(k))
36-
if err != nil {
37-
panic(err)
38-
}
30+
err := vBldr.AppendValueFromString(slc.ValueStr(k))
31+
if err != nil {
32+
panic(err)
3933
}
4034
}
41-
cols[c] = bldr.NewArray()
4235
}
43-
return array.NewRecord(record.Schema(), cols, record.NumRows())
36+
37+
return bldr.NewArray().(array.ListLike)
4438
}
4539

4640
type AllowNullFunc func(arrow.DataType) bool
4741

48-
func (s *WriterTestSuite) replaceNullsByEmpty(record arrow.Record) arrow.Record {
42+
func (s *WriterTestSuite) replaceNullsByEmpty(arr arrow.Array) arrow.Array {
4943
if s.allowNull == nil {
50-
return record
44+
return arr
5145
}
5246

53-
cols := record.Columns()
54-
for c, col := range cols {
55-
if col.NullN() == 0 || s.allowNull(col.DataType()) {
56-
continue
57-
}
58-
59-
builder := array.NewBuilder(memory.DefaultAllocator, col.DataType())
60-
for j := 0; j < col.Len(); j++ {
61-
if col.IsNull(j) {
47+
if !s.allowNull(arr.DataType()) && arr.NullN() > 0 {
48+
builder := array.NewBuilder(memory.DefaultAllocator, arr.DataType())
49+
for j := 0; j < arr.Len(); j++ {
50+
if arr.IsNull(j) {
6251
builder.AppendEmptyValue()
6352
continue
6453
}
6554

66-
if err := builder.AppendValueFromString(col.ValueStr(j)); err != nil {
55+
if err := builder.AppendValueFromString(arr.ValueStr(j)); err != nil {
6756
panic(err)
6857
}
6958
}
70-
cols[c] = builder.NewArray()
59+
60+
arr = builder.NewArray()
61+
}
62+
63+
// we need to process the nested arrays, too
64+
return s.replaceNullsByEmptyNestedArray(arr)
65+
}
66+
67+
func (s *WriterTestSuite) replaceNullsByEmptyNestedArray(arr arrow.Array) arrow.Array {
68+
if s.allowNull == nil {
69+
return arr
70+
}
71+
72+
switch arr := arr.(type) {
73+
case array.ListLike: // TODO: handle Arrow maps separately if required
74+
values := s.handleNullsArray(arr.ListValues())
75+
return array.MakeFromData(
76+
array.NewData(arr.DataType(), arr.Len(),
77+
arr.Data().Buffers(),
78+
[]arrow.ArrayData{values.Data()},
79+
arr.NullN(), arr.Data().Offset(),
80+
),
81+
)
82+
case *array.Struct:
83+
children := make([]arrow.ArrayData, arr.NumField())
84+
for i := 0; i < arr.NumField(); i++ {
85+
children[i] = s.handleNullsArray(arr.Field(i)).Data()
86+
}
87+
return array.MakeFromData(
88+
array.NewData(arr.DataType(), arr.Len(),
89+
arr.Data().Buffers(),
90+
children,
91+
arr.NullN(), arr.Data().Offset(),
92+
),
93+
)
94+
default:
95+
return arr
7196
}
72-
return array.NewRecord(record.Schema(), cols, record.NumRows())
7397
}
7498

7599
func (s *WriterTestSuite) handleNulls(record arrow.Record) arrow.Record {
76-
if s.ignoreNullsInLists {
77-
record = stripNullsFromLists(record)
100+
cols := record.Columns()
101+
for c, col := range cols {
102+
cols[c] = s.handleNullsArray(col)
103+
}
104+
return array.NewRecord(record.Schema(), cols, record.NumRows())
105+
}
106+
107+
func (s *WriterTestSuite) handleNullsArray(arr arrow.Array) arrow.Array {
108+
if list, ok := arr.(array.ListLike); ok && s.ignoreNullsInLists {
109+
arr = stripNullsFromLists(list) // TODO: handle Arrow maps separately if required
78110
}
79-
return s.replaceNullsByEmpty(record)
111+
112+
return s.replaceNullsByEmpty(arr)
80113
}

plugin/nulls_test.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package plugin
2+
3+
import (
4+
"github.com/apache/arrow/go/v13/arrow"
5+
"github.com/apache/arrow/go/v13/arrow/array"
6+
"github.com/cloudquery/plugin-sdk/v4/schema"
7+
"github.com/stretchr/testify/assert"
8+
"github.com/stretchr/testify/require"
9+
"testing"
10+
"time"
11+
)
12+
13+
func TestWithTestSourceAllowNull(t *testing.T) {
14+
s := &WriterTestSuite{allowNull: func(dt arrow.DataType) bool {
15+
switch dt.(type) {
16+
case *arrow.StructType, arrow.ListLikeType:
17+
return false
18+
default:
19+
return true
20+
}
21+
}}
22+
23+
tg := schema.NewTestDataGenerator()
24+
source := schema.TestTable("allow_null", schema.TestSourceOptions{})
25+
resource := s.handleNulls(tg.Generate(source, schema.GenTestDataOptions{
26+
SourceName: "allow_null",
27+
SyncTime: time.Now(),
28+
MaxRows: 100,
29+
NullRows: false,
30+
})[0])
31+
for _, c := range resource.Columns() {
32+
assertNoNulls(t, s.allowNull, c)
33+
}
34+
35+
resource = s.handleNulls(tg.Generate(source, schema.GenTestDataOptions{
36+
SourceName: "allow_null",
37+
SyncTime: time.Now(),
38+
MaxRows: 100,
39+
NullRows: true,
40+
})[0])
41+
for _, c := range resource.Columns() {
42+
assertNoNulls(t, s.allowNull, c)
43+
}
44+
}
45+
46+
func assertNoNulls(t *testing.T, allowNull AllowNullFunc, arr arrow.Array) {
47+
require.NotNil(t, allowNull)
48+
49+
if !allowNull(arr.DataType()) {
50+
assert.Zero(t, arr.NullN())
51+
}
52+
53+
// traverse
54+
switch arr := arr.(type) {
55+
case array.ListLike:
56+
assertNoNulls(t, allowNull, arr.ListValues())
57+
case *array.Struct:
58+
for i := 0; i < arr.NumField(); i++ {
59+
assertNoNulls(t, allowNull, arr.Field(i))
60+
}
61+
}
62+
}

0 commit comments

Comments
 (0)