Skip to content

Commit

Permalink
Start work on supporting filtering while parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
quinnj committed Jun 27, 2020
1 parent d74c2fc commit 72177e7
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 65 deletions.
198 changes: 133 additions & 65 deletions src/file.jl
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ function File(source;
tasks::Integer=Threads.nthreads(),
select=nothing,
drop=nothing,
filter=nothing,
# parsing options
missingstrings=String[],
missingstring="",
Expand Down Expand Up @@ -233,14 +234,15 @@ function File(source;
refs = Vector{RefPool}(undef, ncols)
if threaded === true
# multithreaded
finalrows, columns = multithreadparse(types, flags, buf, datapos, len, options, coloptions, rowsguess, datarow - 1, pool, refs, ncols, typemap, h.categorical, customtypes, limit, tasks, debug)
finalrows, columns = multithreadparse(types, flags, buf, datapos, len, options, coloptions, rowsguess, datarow - 1, pool, refs, ncols, typemap, h.categorical, customtypes, limit, filter, h.names, tasks, debug)
else
if limit < rowsguess
rowsguess = limit
end
columns = allocate(rowsguess, ncols, types, flags)
codes = filter === nothing ? EMPTY_CODES : zeros(Int16, ncols)
t = Base.time()
finalrows, pos = parsefilechunk!(Val(transpose), ncols, typemap, columns, buf, datapos, len, limit, positions, pool, refs, rowsguess, datarow - 1, types, flags, debug, options, coloptions, customtypes)
finalrows, pos = parsefilechunk!(Val(transpose), ncols, typemap, columns, buf, datapos, len, limit, positions, pool, refs, rowsguess, datarow - 1, types, flags, filter, h.names, codes, debug, options, coloptions, customtypes)
debug && println("time for initial parsing: $(Base.time() - t)")
# cleanup our columns if needed
for i = 1:ncols
Expand Down Expand Up @@ -292,6 +294,7 @@ end

const EMPTY_INT_ARRAY = Int64[]
const EMPTY_REFRECODE = UInt32[]
const EMPTY_CODES = Int16[]

# after multithreaded parsing, we need to synchronize pooled refs from different chunks of the file
# we pick one chunk as "source of truth", then adjust other chunks as needed
Expand Down Expand Up @@ -379,12 +382,12 @@ function makeandsetpooled!(columns, i, column, refs, flags, categorical)
return
end

function multithreadparse(types, flags, buf, datapos, len, options, coloptions, rowsguess, datarow, pool, refs, ncols, typemap, categorical, customtypes, limit, N, debug)
function multithreadparse(types, flags, buf, datapos, len, options, coloptions, rowsguess, datarow, pool, refs, ncols, typemap, categorical, customtypes, limit, filter, names, N, debug)
# when limiting w/ multithreaded parsing, we try to guess about where in the file the limit row # will be
# then adjust our final file len to the end of that row
# we add some cushion so we hopefully get the limit row correctly w/o shooting past too far and needing to resize! down
# but we also don't guarantee limit will be exact w/ multithreaded parsing
if limit < rowsguess
if limit < rowsguess && filter === nothing
newlen = [0, ceil(Int64, (limit / (rowsguess * 0.8)) * len), 0]
findrowstarts!(buf, len, options, newlen, ncols)
len = newlen[2] - 1
Expand All @@ -411,7 +414,8 @@ function multithreadparse(types, flags, buf, datapos, len, options, coloptions,
task_types = copy(types)
task_columns = allocate(rowchunkguess, ncols, task_types, task_flags)
pertaskcolumns[i] = task_columns
task_rows, task_pos = parsefilechunk!(Val(false), ncols, typemap, task_columns, buf, task_pos, task_len, typemax(Int64), EMPTY_INT_ARRAY, pool, task_refs, rowchunkguess, datarow + (rowchunkguess * (i - 1)), task_types, task_flags, debug, options, coloptions, customtypes)
codes = filter === nothing ? EMPTY_CODES : zeros(Int16, ncols)
task_rows, task_pos = parsefilechunk!(Val(false), ncols, typemap, task_columns, buf, task_pos, task_len, typemax(Int64), EMPTY_INT_ARRAY, pool, task_refs, rowchunkguess, datarow + (rowchunkguess * (i - 1)), task_types, task_flags, filter, names, codes, debug, options, coloptions, customtypes)
rows[i] = task_rows
# promote column types/flags across task chunks
for col = 1:ncols
Expand Down Expand Up @@ -509,13 +513,13 @@ end # @static if VERSION >= v"1.3-DEV"
return limit < finalrows ? limit : finalrows, finalcolumns
end

function parsefilechunk!(TR::Val{transpose}, ncols, typemap, columns, buf, pos, len, limit, positions, pool, refs, rowsguess, rowoffset, types, flags, debug, options::Parsers.Options{ignorerepeated}, coloptions, ::Type{customtypes}) where {transpose, ignorerepeated, customtypes}
function parsefilechunk!(TR::Val{transpose}, ncols, typemap, columns, buf, pos, len, limit, positions, pool, refs, rowsguess, rowoffset, types, flags, filter, names, codes, debug, options, coloptions, ::Type{customtypes}) where {transpose, customtypes}
row = 0
startpos = pos
if pos <= len && len > 0
while row < limit
row += 1
pos = parserow(row, TR, ncols, typemap, columns, startpos, buf, pos, len, positions, pool, refs, rowsguess, rowoffset, types, flags, debug, options, coloptions, customtypes)
pos = parserow(row, TR, ncols, typemap, columns, startpos, buf, pos, len, positions, pool, refs, rowsguess, rowoffset, types, flags, filter, names, codes, debug, options, coloptions, customtypes)
(pos > len || row == limit) && break
# if our initial row estimate was too few, we need to reallocate our columsn to read the rest of the file
if row + 1 > rowsguess
Expand All @@ -537,7 +541,7 @@ function parsefilechunk!(TR::Val{transpose}, ncols, typemap, columns, buf, pos,
return row, pos
end

@noinline function promotetostring!(col, TR::Val{transpose}, ncols, typemap, columns, buf, pos, len, limit, positions, pool, refs, rowsguess, rowoffset, types, origflags, debug, options::Parsers.Options{ignorerepeated}, coloptions, ::Type{customtypes}) where {transpose, ignorerepeated, customtypes}
@noinline function promotetostring!(col, TR::Val{transpose}, ncols, typemap, columns, buf, pos, len, limit, positions, pool, refs, rowsguess, rowoffset, types, origflags, debug, options, coloptions, ::Type{customtypes}) where {transpose, customtypes}
flags = copy(origflags)
for i = 1:ncols
if i == col
Expand All @@ -550,10 +554,11 @@ end
end
row = 0
startpos = pos
names = Symbol[]
if pos <= len && len > 0
while row < limit
row += 1
pos = parserow(row, TR, ncols, typemap, columns, startpos, buf, pos, len, positions, pool, refs, rowsguess, rowoffset, types, flags, debug, options, coloptions, customtypes)
pos = parserow(row, TR, ncols, typemap, columns, startpos, buf, pos, len, positions, pool, refs, rowsguess, rowoffset, types, flags, nothing, names, EMPTY_CODES, debug, options, coloptions, customtypes)
pos > len && break
end
end
Expand Down Expand Up @@ -594,70 +599,133 @@ end
end
end

@inline function parserow(row, TR::Val{transpose}, ncols, typemap, columns, startpos, buf, pos::A, len, positions, pool, refs, rowsguess, rowoffset, types, flags, debug, options::B, coloptions::C, ::Type{customtypes}) where {transpose, A, B, C, customtypes}
for col = 1:ncols
if transpose
@inbounds pos = positions[col]
end
@inbounds flag = flags[col]
@inbounds column = columns[col]
@inbounds opts = coloptions === nothing ? options : coloptions[col]
# @show typeof(column)
if willdrop(flag) || (user(flag) && column isa MissingVector)
pos, code = parsemissing!(buf, pos, len, opts, row, rowoffset, col)
elseif !typedetected(flag)
pos, code = detect(columns, buf, pos, len, opts, row, rowoffset, col, typemap, pool, refs, debug, types, flags, rowsguess)
elseif column isa SVec{Int64}
pos, code = parseint!(flag, column, columns, buf, pos, len, opts, row, rowoffset, col, types, flags)
elseif column isa SVec{Float64}
pos, code = parsevalue!(Float64, flag, column, columns, buf, pos, len, opts, row, rowoffset, col, types, flags)
elseif column isa SVec2{String}
pos, code = parsestring2!(flag, column, buf, pos, len, opts, row, rowoffset, col, types, flags)
elseif column isa SVec{Date}
pos, code = parsevalue!(Date, flag, column, columns, buf, pos, len, opts, row, rowoffset, col, types, flags)
elseif column isa SVec{DateTime}
pos, code = parsevalue!(DateTime, flag, column, columns, buf, pos, len, opts, row, rowoffset, col, types, flags)
elseif column isa SVec{Time}
pos, code = parsevalue!(Time, flag, column, columns, buf, pos, len, opts, row, rowoffset, col, types, flags)
elseif column isa Vector{Union{Missing, Bool}}
pos, code = parsevalue!(Bool, flag, column, columns, buf, pos, len, opts, row, rowoffset, col, types, flags)
elseif column isa Vector{UInt32}
pos, code = parsepooled!(flag, column, columns, buf, pos, len, opts, row, rowoffset, col, rowsguess, pool, refs, types, flags)
elseif column isa Vector{PosLen}
pos, code = parsestring!(flag, column, buf, pos, len, opts, row, rowoffset, col, types, flags)
elseif customtypes !== Tuple{}
pos, code = parsecustom!(customtypes, flag, columns, buf, pos, len, opts, row, rowoffset, col, types, flags)
else
error("bad array type: $(typeof(column))")
end
if promote_to_string(code)
# debug && println("promoting col = $col to string")
promotetostring!(col, TR, ncols, typemap, columns, buf, startpos, len, row, positions, pool, refs, rowsguess, rowoffset, types, flags, debug, options, coloptions, customtypes)
end
if transpose
@inbounds positions[col] = pos
else
if col < ncols
if Parsers.newline(code) || pos > len
options.silencewarnings || notenoughcolumns(col, ncols, rowoffset + row)
for j = (col + 1):ncols
@inbounds flags[j] |= ANYMISSING
@inbounds types[j] = Union{Missing, types[j]}
end
break # from for col = 1:ncols
end
@inline function parserow(row, TR::Val{transpose}, ncols, typemap, columns, startpos, buf, pos, len, positions, pool, refs, rowsguess, rowoffset, types, flags, filter, names, codes, debug, options, coloptions, ::Type{customtypes}) where {transpose, customtypes}
while pos <= len
origpos = pos
rowcode = None
for col = 1:ncols
if transpose
@inbounds pos = positions[col]
end
@inbounds flag = flags[col]
@inbounds column = columns[col]
@inbounds opts = coloptions === nothing ? options : coloptions[col]
# @show typeof(column)
if willdrop(flag) || (user(flag) && column isa MissingVector)
pos, code = parsemissing!(buf, pos, len, opts, row, rowoffset, col)
elseif !typedetected(flag)
pos, code = detect(columns, buf, pos, len, opts, row, rowoffset, col, typemap, pool, refs, debug, types, flags, rowsguess)
elseif column isa SVec{Int64}
pos, code = parseint!(flag, column, columns, buf, pos, len, opts, row, rowoffset, col, types, flags)
elseif column isa SVec{Float64}
pos, code = parsevalue!(Float64, flag, column, columns, buf, pos, len, opts, row, rowoffset, col, types, flags)
elseif column isa SVec2{String}
pos, code = parsestring2!(flag, column, buf, pos, len, opts, row, rowoffset, col, types, flags)
elseif column isa SVec{Date}
pos, code = parsevalue!(Date, flag, column, columns, buf, pos, len, opts, row, rowoffset, col, types, flags)
elseif column isa SVec{DateTime}
pos, code = parsevalue!(DateTime, flag, column, columns, buf, pos, len, opts, row, rowoffset, col, types, flags)
elseif column isa SVec{Time}
pos, code = parsevalue!(Time, flag, column, columns, buf, pos, len, opts, row, rowoffset, col, types, flags)
elseif column isa Vector{Union{Missing, Bool}}
pos, code = parsevalue!(Bool, flag, column, columns, buf, pos, len, opts, row, rowoffset, col, types, flags)
elseif column isa Vector{UInt32}
pos, code = parsepooled!(flag, column, columns, buf, pos, len, opts, row, rowoffset, col, rowsguess, pool, refs, types, flags)
elseif column isa Vector{PosLen}
pos, code = parsestring!(flag, column, buf, pos, len, opts, row, rowoffset, col, types, flags)
elseif customtypes !== Tuple{}
pos, code = parsecustom!(customtypes, flag, columns, buf, pos, len, opts, row, rowoffset, col, types, flags)
else
error("bad array type: $(typeof(column))")
end
if promote_to_string(code)
# debug && println("promoting col = $col to string")
promotetostring!(col, TR, ncols, typemap, columns, buf, startpos, len, row, positions, pool, refs, rowsguess, rowoffset, types, flags, debug, options, coloptions, customtypes)
end
if filter !== nothing
@inbounds codes[col] = code
end
if transpose
@inbounds positions[col] = pos
else
if pos <= len && !Parsers.newline(code)
options.silencewarnings || toomanycolumns(ncols, rowoffset + row)
# ignore the rest of the line
pos = skiptorow(buf, pos, len, options.oq, options.e, options.cq, 1, 2)
if col < ncols
if Parsers.newline(code) || pos > len
rowcode = NotEnoughColumns
options.silencewarnings || notenoughcolumns(col, ncols, rowoffset + row)
for j = (col + 1):ncols
@inbounds flags[j] |= ANYMISSING
@inbounds types[j] = Union{Missing, types[j]}
end
break # from for col = 1:ncols
end
else
if pos <= len && !Parsers.newline(code)
rowcode = TooManyColumns
options.silencewarnings || toomanycolumns(ncols, rowoffset + row)
# ignore the rest of the line
pos = skiptorow(buf, pos, len, options.oq, options.e, options.cq, 1, 2)
end
end
end
end
if filter === nothing
break
else
res = filter(ParsingRow(names, types, flags, rowcode, codes, row, columns, buf, origpos, pos - origpos))
@show res
if res
break
end
end
end
return pos
end

struct ParsingRow <: Tables.AbstractRow
names::Vector{Symbol}
types::Vector{Type}
flags::Vector{UInt8}
code::RowErrorCode
codes::Vector{Int16}
row::Int64
columns::Vector{AbstractVector}
buf::AbstractVector{UInt8}
pos::Int64 # starting byte position of row
len::Int64 # # of bytes in row
end

getnames(r::ParsingRow) = getfield(r, :names)
getcolumn(r::ParsingRow, col::Int) = getfield(r, :columns)[col]
getcolumn(r::ParsingRow, col::Symbol) = getfield(r, :columns)[findfirst(==(col), getnames(r))]
gettypes(r::ParsingRow) = getfield(r, :types)
getflags(r::ParsingRow) = getfield(r, :flags)
getcode(r::ParsingRow) = getfield(r, :code)
getcodes(r::ParsingRow) = getfield(r, :codes)
getrow(r::ParsingRow) = getfield(r, :row)
getbuf(r::ParsingRow) = getfield(r, :buf)
getpos(r::ParsingRow) = getfield(r, :pos)
getlen(r::ParsingRow) = getfield(r, :len)

Tables.columnnames(r::ParsingRow) = getnames(r)

@inline function Tables.getcolumn(row::ParsingRow, ::Type{T}, i::Int, nm::Symbol) where {T}
column = getcolumn(row, i)
@inbounds x = column[getrow(row)]
return x
end

@inline function Tables.getcolumn(row::ParsingRow, col::Symbol)
column = getcolumn(row, col)
@inbounds x = column[getrow(row)]
return x
end

@inline function Tables.getcolumn(row::ParsingRow, col::Int)
column = getcolumn(row, col)
@inbounds x = column[getrow(row)]
return x
end

@inline function poslen(code, pos, len)
pos = Core.bitcast(UInt64, pos) << 20
pos |= ifelse(Parsers.sentinel(code), MISSING_BIT, UInt64(0))
Expand Down
2 changes: 2 additions & 0 deletions src/utils.jl
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ flag(T, lazystrings) = (T === Union{} ? 0x00 : ((USER | TYPEDETECTED) | (hasmiss
const PROMOTE_TO_STRING = 0b0100000000000000 % Int16
promote_to_string(code) = code & PROMOTE_TO_STRING > 0

@enum RowErrorCode None NotEnoughColumns TooManyColumns

hasmissingtype(T) = T === Missing || T !== Core.Compiler.typesubtract(T, Missing)

@inline function promote_types(@nospecialize(T), @nospecialize(S))
Expand Down

0 comments on commit 72177e7

Please sign in to comment.