-
Notifications
You must be signed in to change notification settings - Fork 52
/
fallbacks.jl
265 lines (233 loc) · 11.1 KB
/
fallbacks.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
## generic `Tables.rows` and `Tables.columns` fallbacks
## if a table provides Tables.rows or Tables.columns,
## we'll provide a default implementation of the other
# Turn any AbstractColumns into an AbstractRow iterator
# get the number of rows in the incoming table
function rowcount(cols)
names = columnnames(cols)
isempty(names) && return 0
return length(getcolumn(cols, names[1]))
end
# a lazy row view into a AbstractColumns object
struct ColumnsRow{T} <: AbstractRow
columns::T # an `AbstractColumns`-compatible object
row::Int # row number
end
getcolumns(c::ColumnsRow) = getfield(c, :columns)
getrow(c::ColumnsRow) = getfield(c, :row)
# AbstractRow interface
Base.@propagate_inbounds getcolumn(c::ColumnsRow, ::Type{T}, col::Int, nm::Symbol) where {T} = getcolumn(getcolumns(c), T, col, nm)[getrow(c)]
Base.@propagate_inbounds getcolumn(c::ColumnsRow, i::Int) = getcolumn(getcolumns(c), i)[getrow(c)]
Base.@propagate_inbounds getcolumn(c::ColumnsRow, nm::Symbol) = getcolumn(getcolumns(c), nm)[getrow(c)]
columnnames(c::ColumnsRow) = columnnames(getcolumns(c))
@generated function Base.isless(c::ColumnsRow{T}, d::ColumnsRow{T}) where {T <: NamedTuple{names}} where names
exprs = Expr[]
for n in names
var1 = Expr(:., :c, QuoteNode(n))
var2 = Expr(:., :d, QuoteNode(n))
bl = quote
a, b = $var1, $var2
isless(a, b) && return true
isequal(a, b) || return false
end
push!(exprs, bl)
end
push!(exprs, :(return false))
Expr(:block, exprs...)
end
@generated function Base.isequal(c::ColumnsRow{T}, d::ColumnsRow{T}) where {T <: NamedTuple{names}} where names
exprs = Expr[]
for n in names
var1 = Expr(:., :c, QuoteNode(n))
var2 = Expr(:., :d, QuoteNode(n))
push!(exprs, :(isequal($var1, $var2) || return false))
end
push!(exprs, :(return true))
Expr(:block, exprs...)
end
# RowIterator wraps an AbstractColumns object and provides row iteration via lazy row views
struct RowIterator{T}
columns::T
len::Int
end
Base.eltype(x::RowIterator{T}) where {T} = ColumnsRow{T}
Base.length(x::RowIterator) = getfield(x, :len)
Base.getproperty(x::RowIterator, nm::Symbol) = getcolumn(x, nm)
Base.getproperty(x::RowIterator, i::Int) = getcolumn(x, i)
Base.propertynames(x::RowIterator) = columnnames(x)
isrowtable(::Type{<:RowIterator}) = true
columnaccess(::Type{<:RowIterator}) = true
columns(x::RowIterator) = getfield(x, :columns)
columnnames(x::RowIterator) = columnnames(columns(x))
getcolumn(x::RowIterator, nm::Symbol) = getcolumn(columns(x), nm)
getcolumn(x::RowIterator, i::Int) = getcolumn(columns(x), i)
materializer(x::RowIterator) = materializer(columns(x))
schema(x::RowIterator) = schema(columns(x))
@inline function Base.iterate(rows::RowIterator, st=1)
st > length(rows) && return nothing
return ColumnsRow(columns(rows), st), st + 1
end
# this is our generic Tables.rows fallback definition
function rows(x::T) where {T}
isrowtable(x) && return x
# because this method is being called, we know `x` didn't define it's own Tables.rows
# first check if it supports column access, and if so, wrap it in a RowIterator
if columnaccess(T)
cols = columns(x)
return RowIterator(cols, Int(rowcount(cols)))
# otherwise, if the input is at least iterable, we'll wrap it in an IteratorWrapper
# which will iterate the input, validating that elements support the AbstractRow interface
# and unwrapping any DataValues that are encountered
elseif IteratorInterfaceExtensions.isiterable(x)
return nondatavaluerows(x)
end
throw(ArgumentError("no default `Tables.rows` implementation for type: $T"))
end
# for AbstractRow iterators, we define a "collect"-like routine to build up columns from iterated rows
"""
Tables.allocatecolumn(::Type{T}, len) => returns a column type (usually `AbstractVector`) with size to hold `len` elements
Custom column types can override with an appropriate "scalar" element type that should dispatch to their column allocator.
Alternatively, and more generally, custom scalars can overload `DataAPI.defaultarray` to signal the default array type.
"""
allocatecolumn(T, len) = DataAPI.defaultarray(T, 1)(undef, len)
@inline function _allocatecolumns(::Schema{names, types}, len) where {names, types}
if @generated
vals = Tuple(:(allocatecolumn($(fieldtype(types, i)), len)) for i = 1:fieldcount(types))
return :(NamedTuple{Base.map(Symbol, names)}(($(vals...),)))
else
return NamedTuple{Base.map(Symbol, names)}(Tuple(allocatecolumn(fieldtype(types, i), len) for i = 1:fieldcount(types)))
end
end
@inline function allocatecolumns(sch::Schema{names, types}, len) where {names, types}
if fieldcount(types) <= SPECIALIZATION_THRESHOLD
return _allocatecolumns(sch, len)
else
return NamedTuple{Base.map(Symbol, names)}(Tuple(allocatecolumn(fieldtype(types, i), len) for i = 1:fieldcount(types)))
end
end
# add! will push! or setindex! a value depending on if the row-iterator HasLength or not
@inline add!(val, col::Int, nm, dest::AbstractArray, ::Union{Base.HasLength, Base.HasShape}, row) = setindex!(dest, val, row)
@inline add!(val, col::Int, nm, dest::AbstractArray, L, row) = push!(dest, val)
@inline function buildcolumns(schema, rowitr::T) where {T}
L = Base.IteratorSize(T)
len = Base.haslength(T) ? length(rowitr) : 0
nt = allocatecolumns(schema, len)
for (i, row) in enumerate(rowitr)
eachcolumns(add!, schema, row, nt, L, i)
end
return nt
end
@inline add!(dest::AbstractArray, val, ::Union{Base.HasLength, Base.HasShape}, row) = setindex!(dest, val, row)
@inline add!(dest::AbstractArray, val, T, row) = push!(dest, val)
replacex(t, col::Int, x) = ntuple(i->i == col ? x : t[i], length(t))
@inline function add_or_widen!(val, col::Int, nm, dest::AbstractArray{T}, row, updated, L) where {T}
if val isa T
add!(dest, val, L, row)
return
else
new = allocatecolumn(promote_type(T, typeof(val)), length(dest))
row > 1 && copyto!(new, 1, dest, 1, row - 1)
add!(new, val, L, row)
updated[] = replacex(updated[], col, new)
return
end
end
function __buildcolumns(rowitr, st, sch, columns, rownbr, updated)
while true
state = iterate(rowitr, st)
state === nothing && break
row, st = state
rownbr += 1
eachcolumns(add_or_widen!, sch, row, columns, rownbr, updated, Base.IteratorSize(rowitr))
# little explanation here: we just called add_or_widen! for each column value of our row
# note that when a column's type is widened, `updated` is set w/ the new set of columns
# we then check if our current `columns` isn't the same object as our `updated` ref
# if it isn't, we're going to call __buildcolumns again, passing our new updated ref as
# columns, which allows __buildcolumns to specialize (i.e. recompile) based on the new types
# of updated. So a new __buildcolumns will be compiled for each widening event.
columns !== updated[] && return __buildcolumns(rowitr, st, sch, updated[], rownbr, updated)
end
return updated
end
# for the schema-less case, we do one extra step of initializing each column as an `EmptyVector`
# and doing an initial widening for each column in _buildcolumns, before passing the widened
# set of columns on to __buildcolumns
struct EmptyVector <: AbstractVector{Union{}}
len::Int
end
Base.IndexStyle(::Type{EmptyVector}) = Base.IndexLinear()
Base.size(x::EmptyVector) = (x.len,)
Base.getindex(x::EmptyVector, i::Int) = throw(UndefRefError())
function _buildcolumns(rowitr, row, st, sch, columns, updated)
eachcolumns(add_or_widen!, sch, row, columns, 1, updated, Base.IteratorSize(rowitr))
return __buildcolumns(rowitr, st, sch, updated[], 1, updated)
end
if isdefined(Base, :fieldtypes)
_fieldtypes = fieldtypes
else
_fieldtypes(T) = (fieldtype(T, i) for i = 1:fieldcount(T))
end
# when Tables.schema(x) === nothing
@inline function buildcolumns(::Nothing, rowitr::T) where {T}
state = iterate(rowitr)
if state === nothing
# empty input iterator; check if it has eltype and maybe we can return a better typed empty NamedTuple
if Base.IteratorEltype(rowitr) == Base.HasEltype()
WT = wrappedtype(eltype(rowitr))
if WT <: Tuple
return allocatecolumns(Schema((Symbol("Column$i") for i = 1:fieldcount(WT)), _fieldtypes(WT)), 0)
elseif fieldcount(WT) > 0
return allocatecolumns(Schema(fieldnames(WT), _fieldtypes(WT)), 0)
end
end
return NamedTuple()
end
row, st = state
names = Tuple(columnnames(row))
len = Base.haslength(T) ? length(rowitr) : 0
sch = Schema(names, nothing)
columns = Tuple(EmptyVector(len) for _ = 1:length(names))
return NamedTuple{Base.map(Symbol, names)}(_buildcolumns(rowitr, row, st, sch, columns, Ref{Any}(columns))[])
end
"""
Tables.CopiedColumns
For some sinks, there's a concern about whether they can safely "own" columns from the input.
If mutation will be allowed, to be safe, they should always copy input columns, to avoid unintended mutation
to the original source.
When we've called `buildcolumns`, however, Tables.jl essentially built/owns the columns,
and it's happy to pass ownership to the sink. Thus, any built columns will be wrapped
in a `CopiedColumns` struct to signal to the sink that essentially "a copy has already been made"
and they're safe to assume ownership.
"""
struct CopiedColumns{T} <: AbstractColumns
x::T
end
source(x::CopiedColumns) = getfield(x, :x)
istable(::Type{<:CopiedColumns}) = true
columnaccess(::Type{<:CopiedColumns}) = true
columns(x::CopiedColumns) = x
schema(x::CopiedColumns) = schema(source(x))
materializer(x::CopiedColumns) = materializer(source(x))
getcolumn(x::CopiedColumns, ::Type{T}, col::Int, nm::Symbol) where {T} = getcolumn(source(x), T, col, nm)
getcolumn(x::CopiedColumns, i::Int) = getcolumn(source(x), i)
getcolumn(x::CopiedColumns, nm::Symbol) = getcolumn(source(x), nm)
columnnames(x::CopiedColumns) = columnnames(source(x))
# here's our generic fallback Tables.columns definition
@inline function columns(x::T) where {T}
# because this method is being called, we know `x` didn't define it's own Tables.columns method
# first check if it explicitly supports row access, and if so, build up the desired columns
if rowaccess(T)
r = rows(x)
return CopiedColumns(buildcolumns(schema(r), r))
# though not widely supported, if a source supports the TableTraits column interface, use it
elseif TableTraits.supports_get_columns_copy_using_missing(x)
return CopiedColumns(TableTraits.get_columns_copy_using_missing(x))
# otherwise, if the source is at least iterable, we'll wrap it in an IteratorWrapper and
# build columns from that, which will check if the source correctly iterates valid AbstractRow objects
# and unwraps DataValues for us
elseif IteratorInterfaceExtensions.isiterable(x)
iw = nondatavaluerows(x)
return CopiedColumns(buildcolumns(schema(iw), iw))
end
throw(ArgumentError("no default `Tables.columns` implementation for type: $T"))
end