Skip to content

Commit 7bafd81

Browse files
committed
Implement Tables.columnnames and Tables.schema for Arrow.Stream
1 parent 9b36c8b commit 7bafd81

File tree

1 file changed

+15
-2
lines changed

1 file changed

+15
-2
lines changed

src/table.jl

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,12 +80,25 @@ function Stream(inputs::Vector{ArrowBlob}; convert::Bool=true)
8080
Stream(inputs, inputindex, batchiterator, names, types, schema, dictencodings, dictencoded, convert, compression)
8181
end
8282

83-
Tables.partitions(x::Stream) = x
84-
8583
Stream(input, pos::Integer=1, len=nothing; kw...) = Stream([ArrowBlob(tobytes(input), pos, len)]; kw...)
8684
Stream(input::Vector{UInt8}, pos::Integer=1, len=nothing; kw...) = Stream([ArrowBlob(tobytes(input), pos, len)]; kw...)
8785
Stream(inputs::Vector; kw...) = Stream([ArrowBlob(tobytes(x), 1, nothing) for x in inputs]; kw...)
8886

87+
function initialize!(x::Stream)
88+
isempty(getfield(x, :names)) || return
89+
# Initialize member fields using iteration and reset state
90+
lastinputindex = x.inputindex
91+
lastbatchiterator = x.batchiterator
92+
iterate(x)
93+
x.inputindex = lastinputindex
94+
x.batchiterator = lastbatchiterator
95+
nothing
96+
end
97+
98+
Tables.partitions(x::Stream) = x
99+
Tables.columnnames(x::Stream) = initialize!(x); getfield(x, :names)
100+
Tables.schema(x::Stream) = initialize!(x); Tables.Schema(Tables.columnnames(x), getfield(x, :types))
101+
89102
Base.IteratorSize(::Type{Stream}) = Base.SizeUnknown()
90103
Base.eltype(::Type{Stream}) = Table
91104

0 commit comments

Comments
 (0)