Skip to content

Commit aaa4cf8

Browse files
authored
Implement Tables.columnnames and Tables.schema for Arrow.Stream (#395)
1 parent 9b36c8b commit aaa4cf8

File tree

1 file changed

+23
-2
lines changed

1 file changed

+23
-2
lines changed

src/table.jl

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,12 +80,33 @@ function Stream(inputs::Vector{ArrowBlob}; convert::Bool=true)
8080
Stream(inputs, inputindex, batchiterator, names, types, schema, dictencodings, dictencoded, convert, compression)
8181
end
8282

83-
Tables.partitions(x::Stream) = x
84-
8583
Stream(input, pos::Integer=1, len=nothing; kw...) = Stream([ArrowBlob(tobytes(input), pos, len)]; kw...)
8684
Stream(input::Vector{UInt8}, pos::Integer=1, len=nothing; kw...) = Stream([ArrowBlob(tobytes(input), pos, len)]; kw...)
8785
Stream(inputs::Vector; kw...) = Stream([ArrowBlob(tobytes(x), 1, nothing) for x in inputs]; kw...)
8886

87+
function initialize!(x::Stream)
88+
isempty(getfield(x, :names)) || return
89+
# Initialize member fields using iteration and reset state
90+
lastinputindex = x.inputindex
91+
lastbatchiterator = x.batchiterator
92+
iterate(x)
93+
x.inputindex = lastinputindex
94+
x.batchiterator = lastbatchiterator
95+
nothing
96+
end
97+
98+
Tables.partitions(x::Stream) = x
99+
100+
function Tables.columnnames(x::Stream)
101+
initialize!(x)
102+
getfield(x, :names)
103+
end
104+
105+
function Tables.schema(x::Stream)
106+
initialize!(x)
107+
Tables.Schema(Tables.columnnames(x), getfield(x, :types))
108+
end
109+
89110
Base.IteratorSize(::Type{Stream}) = Base.SizeUnknown()
90111
Base.eltype(::Type{Stream}) = Table
91112

0 commit comments

Comments
 (0)