From 72177e727983cbde839c4bbe6cc13d2b490dbb43 Mon Sep 17 00:00:00 2001 From: Jacob Quinn Date: Thu, 25 Jun 2020 10:58:42 -0600 Subject: [PATCH] Start work on supporting filtering while parsing --- src/file.jl | 198 ++++++++++++++++++++++++++++++++++----------------- src/utils.jl | 2 + 2 files changed, 135 insertions(+), 65 deletions(-) diff --git a/src/file.jl b/src/file.jl index ee2a6d93..28758918 100644 --- a/src/file.jl +++ b/src/file.jl @@ -188,6 +188,7 @@ function File(source; tasks::Integer=Threads.nthreads(), select=nothing, drop=nothing, + filter=nothing, # parsing options missingstrings=String[], missingstring="", @@ -233,14 +234,15 @@ function File(source; refs = Vector{RefPool}(undef, ncols) if threaded === true # multithreaded - finalrows, columns = multithreadparse(types, flags, buf, datapos, len, options, coloptions, rowsguess, datarow - 1, pool, refs, ncols, typemap, h.categorical, customtypes, limit, tasks, debug) + finalrows, columns = multithreadparse(types, flags, buf, datapos, len, options, coloptions, rowsguess, datarow - 1, pool, refs, ncols, typemap, h.categorical, customtypes, limit, filter, h.names, tasks, debug) else if limit < rowsguess rowsguess = limit end columns = allocate(rowsguess, ncols, types, flags) + codes = filter === nothing ? EMPTY_CODES : zeros(Int16, ncols) t = Base.time() - finalrows, pos = parsefilechunk!(Val(transpose), ncols, typemap, columns, buf, datapos, len, limit, positions, pool, refs, rowsguess, datarow - 1, types, flags, debug, options, coloptions, customtypes) + finalrows, pos = parsefilechunk!(Val(transpose), ncols, typemap, columns, buf, datapos, len, limit, positions, pool, refs, rowsguess, datarow - 1, types, flags, filter, h.names, codes, debug, options, coloptions, customtypes) debug && println("time for initial parsing: $(Base.time() - t)") # cleanup our columns if needed for i = 1:ncols @@ -292,6 +294,7 @@ end const EMPTY_INT_ARRAY = Int64[] const EMPTY_REFRECODE = UInt32[] +const EMPTY_CODES = Int16[] # after multithreaded parsing, we need to synchronize pooled refs from different chunks of the file # we pick one chunk as "source of truth", then adjust other chunks as needed @@ -379,12 +382,12 @@ function makeandsetpooled!(columns, i, column, refs, flags, categorical) return end -function multithreadparse(types, flags, buf, datapos, len, options, coloptions, rowsguess, datarow, pool, refs, ncols, typemap, categorical, customtypes, limit, N, debug) +function multithreadparse(types, flags, buf, datapos, len, options, coloptions, rowsguess, datarow, pool, refs, ncols, typemap, categorical, customtypes, limit, filter, names, N, debug) # when limiting w/ multithreaded parsing, we try to guess about where in the file the limit row # will be # then adjust our final file len to the end of that row # we add some cushion so we hopefully get the limit row correctly w/o shooting past too far and needing to resize! down # but we also don't guarantee limit will be exact w/ multithreaded parsing - if limit < rowsguess + if limit < rowsguess && filter === nothing newlen = [0, ceil(Int64, (limit / (rowsguess * 0.8)) * len), 0] findrowstarts!(buf, len, options, newlen, ncols) len = newlen[2] - 1 @@ -411,7 +414,8 @@ function multithreadparse(types, flags, buf, datapos, len, options, coloptions, task_types = copy(types) task_columns = allocate(rowchunkguess, ncols, task_types, task_flags) pertaskcolumns[i] = task_columns - task_rows, task_pos = parsefilechunk!(Val(false), ncols, typemap, task_columns, buf, task_pos, task_len, typemax(Int64), EMPTY_INT_ARRAY, pool, task_refs, rowchunkguess, datarow + (rowchunkguess * (i - 1)), task_types, task_flags, debug, options, coloptions, customtypes) + codes = filter === nothing ? EMPTY_CODES : zeros(Int16, ncols) + task_rows, task_pos = parsefilechunk!(Val(false), ncols, typemap, task_columns, buf, task_pos, task_len, typemax(Int64), EMPTY_INT_ARRAY, pool, task_refs, rowchunkguess, datarow + (rowchunkguess * (i - 1)), task_types, task_flags, filter, names, codes, debug, options, coloptions, customtypes) rows[i] = task_rows # promote column types/flags across task chunks for col = 1:ncols @@ -509,13 +513,13 @@ end # @static if VERSION >= v"1.3-DEV" return limit < finalrows ? limit : finalrows, finalcolumns end -function parsefilechunk!(TR::Val{transpose}, ncols, typemap, columns, buf, pos, len, limit, positions, pool, refs, rowsguess, rowoffset, types, flags, debug, options::Parsers.Options{ignorerepeated}, coloptions, ::Type{customtypes}) where {transpose, ignorerepeated, customtypes} +function parsefilechunk!(TR::Val{transpose}, ncols, typemap, columns, buf, pos, len, limit, positions, pool, refs, rowsguess, rowoffset, types, flags, filter, names, codes, debug, options, coloptions, ::Type{customtypes}) where {transpose, customtypes} row = 0 startpos = pos if pos <= len && len > 0 while row < limit row += 1 - pos = parserow(row, TR, ncols, typemap, columns, startpos, buf, pos, len, positions, pool, refs, rowsguess, rowoffset, types, flags, debug, options, coloptions, customtypes) + pos = parserow(row, TR, ncols, typemap, columns, startpos, buf, pos, len, positions, pool, refs, rowsguess, rowoffset, types, flags, filter, names, codes, debug, options, coloptions, customtypes) (pos > len || row == limit) && break # if our initial row estimate was too few, we need to reallocate our columsn to read the rest of the file if row + 1 > rowsguess @@ -537,7 +541,7 @@ function parsefilechunk!(TR::Val{transpose}, ncols, typemap, columns, buf, pos, return row, pos end -@noinline function promotetostring!(col, TR::Val{transpose}, ncols, typemap, columns, buf, pos, len, limit, positions, pool, refs, rowsguess, rowoffset, types, origflags, debug, options::Parsers.Options{ignorerepeated}, coloptions, ::Type{customtypes}) where {transpose, ignorerepeated, customtypes} +@noinline function promotetostring!(col, TR::Val{transpose}, ncols, typemap, columns, buf, pos, len, limit, positions, pool, refs, rowsguess, rowoffset, types, origflags, debug, options, coloptions, ::Type{customtypes}) where {transpose, customtypes} flags = copy(origflags) for i = 1:ncols if i == col @@ -550,10 +554,11 @@ end end row = 0 startpos = pos + names = Symbol[] if pos <= len && len > 0 while row < limit row += 1 - pos = parserow(row, TR, ncols, typemap, columns, startpos, buf, pos, len, positions, pool, refs, rowsguess, rowoffset, types, flags, debug, options, coloptions, customtypes) + pos = parserow(row, TR, ncols, typemap, columns, startpos, buf, pos, len, positions, pool, refs, rowsguess, rowoffset, types, flags, nothing, names, EMPTY_CODES, debug, options, coloptions, customtypes) pos > len && break end end @@ -594,70 +599,133 @@ end end end -@inline function parserow(row, TR::Val{transpose}, ncols, typemap, columns, startpos, buf, pos::A, len, positions, pool, refs, rowsguess, rowoffset, types, flags, debug, options::B, coloptions::C, ::Type{customtypes}) where {transpose, A, B, C, customtypes} - for col = 1:ncols - if transpose - @inbounds pos = positions[col] - end - @inbounds flag = flags[col] - @inbounds column = columns[col] - @inbounds opts = coloptions === nothing ? options : coloptions[col] - # @show typeof(column) - if willdrop(flag) || (user(flag) && column isa MissingVector) - pos, code = parsemissing!(buf, pos, len, opts, row, rowoffset, col) - elseif !typedetected(flag) - pos, code = detect(columns, buf, pos, len, opts, row, rowoffset, col, typemap, pool, refs, debug, types, flags, rowsguess) - elseif column isa SVec{Int64} - pos, code = parseint!(flag, column, columns, buf, pos, len, opts, row, rowoffset, col, types, flags) - elseif column isa SVec{Float64} - pos, code = parsevalue!(Float64, flag, column, columns, buf, pos, len, opts, row, rowoffset, col, types, flags) - elseif column isa SVec2{String} - pos, code = parsestring2!(flag, column, buf, pos, len, opts, row, rowoffset, col, types, flags) - elseif column isa SVec{Date} - pos, code = parsevalue!(Date, flag, column, columns, buf, pos, len, opts, row, rowoffset, col, types, flags) - elseif column isa SVec{DateTime} - pos, code = parsevalue!(DateTime, flag, column, columns, buf, pos, len, opts, row, rowoffset, col, types, flags) - elseif column isa SVec{Time} - pos, code = parsevalue!(Time, flag, column, columns, buf, pos, len, opts, row, rowoffset, col, types, flags) - elseif column isa Vector{Union{Missing, Bool}} - pos, code = parsevalue!(Bool, flag, column, columns, buf, pos, len, opts, row, rowoffset, col, types, flags) - elseif column isa Vector{UInt32} - pos, code = parsepooled!(flag, column, columns, buf, pos, len, opts, row, rowoffset, col, rowsguess, pool, refs, types, flags) - elseif column isa Vector{PosLen} - pos, code = parsestring!(flag, column, buf, pos, len, opts, row, rowoffset, col, types, flags) - elseif customtypes !== Tuple{} - pos, code = parsecustom!(customtypes, flag, columns, buf, pos, len, opts, row, rowoffset, col, types, flags) - else - error("bad array type: $(typeof(column))") - end - if promote_to_string(code) - # debug && println("promoting col = $col to string") - promotetostring!(col, TR, ncols, typemap, columns, buf, startpos, len, row, positions, pool, refs, rowsguess, rowoffset, types, flags, debug, options, coloptions, customtypes) - end - if transpose - @inbounds positions[col] = pos - else - if col < ncols - if Parsers.newline(code) || pos > len - options.silencewarnings || notenoughcolumns(col, ncols, rowoffset + row) - for j = (col + 1):ncols - @inbounds flags[j] |= ANYMISSING - @inbounds types[j] = Union{Missing, types[j]} - end - break # from for col = 1:ncols - end +@inline function parserow(row, TR::Val{transpose}, ncols, typemap, columns, startpos, buf, pos, len, positions, pool, refs, rowsguess, rowoffset, types, flags, filter, names, codes, debug, options, coloptions, ::Type{customtypes}) where {transpose, customtypes} + while pos <= len + origpos = pos + rowcode = None + for col = 1:ncols + if transpose + @inbounds pos = positions[col] + end + @inbounds flag = flags[col] + @inbounds column = columns[col] + @inbounds opts = coloptions === nothing ? options : coloptions[col] + # @show typeof(column) + if willdrop(flag) || (user(flag) && column isa MissingVector) + pos, code = parsemissing!(buf, pos, len, opts, row, rowoffset, col) + elseif !typedetected(flag) + pos, code = detect(columns, buf, pos, len, opts, row, rowoffset, col, typemap, pool, refs, debug, types, flags, rowsguess) + elseif column isa SVec{Int64} + pos, code = parseint!(flag, column, columns, buf, pos, len, opts, row, rowoffset, col, types, flags) + elseif column isa SVec{Float64} + pos, code = parsevalue!(Float64, flag, column, columns, buf, pos, len, opts, row, rowoffset, col, types, flags) + elseif column isa SVec2{String} + pos, code = parsestring2!(flag, column, buf, pos, len, opts, row, rowoffset, col, types, flags) + elseif column isa SVec{Date} + pos, code = parsevalue!(Date, flag, column, columns, buf, pos, len, opts, row, rowoffset, col, types, flags) + elseif column isa SVec{DateTime} + pos, code = parsevalue!(DateTime, flag, column, columns, buf, pos, len, opts, row, rowoffset, col, types, flags) + elseif column isa SVec{Time} + pos, code = parsevalue!(Time, flag, column, columns, buf, pos, len, opts, row, rowoffset, col, types, flags) + elseif column isa Vector{Union{Missing, Bool}} + pos, code = parsevalue!(Bool, flag, column, columns, buf, pos, len, opts, row, rowoffset, col, types, flags) + elseif column isa Vector{UInt32} + pos, code = parsepooled!(flag, column, columns, buf, pos, len, opts, row, rowoffset, col, rowsguess, pool, refs, types, flags) + elseif column isa Vector{PosLen} + pos, code = parsestring!(flag, column, buf, pos, len, opts, row, rowoffset, col, types, flags) + elseif customtypes !== Tuple{} + pos, code = parsecustom!(customtypes, flag, columns, buf, pos, len, opts, row, rowoffset, col, types, flags) + else + error("bad array type: $(typeof(column))") + end + if promote_to_string(code) + # debug && println("promoting col = $col to string") + promotetostring!(col, TR, ncols, typemap, columns, buf, startpos, len, row, positions, pool, refs, rowsguess, rowoffset, types, flags, debug, options, coloptions, customtypes) + end + if filter !== nothing + @inbounds codes[col] = code + end + if transpose + @inbounds positions[col] = pos else - if pos <= len && !Parsers.newline(code) - options.silencewarnings || toomanycolumns(ncols, rowoffset + row) - # ignore the rest of the line - pos = skiptorow(buf, pos, len, options.oq, options.e, options.cq, 1, 2) + if col < ncols + if Parsers.newline(code) || pos > len + rowcode = NotEnoughColumns + options.silencewarnings || notenoughcolumns(col, ncols, rowoffset + row) + for j = (col + 1):ncols + @inbounds flags[j] |= ANYMISSING + @inbounds types[j] = Union{Missing, types[j]} + end + break # from for col = 1:ncols + end + else + if pos <= len && !Parsers.newline(code) + rowcode = TooManyColumns + options.silencewarnings || toomanycolumns(ncols, rowoffset + row) + # ignore the rest of the line + pos = skiptorow(buf, pos, len, options.oq, options.e, options.cq, 1, 2) + end end end end + if filter === nothing + break + else + res = filter(ParsingRow(names, types, flags, rowcode, codes, row, columns, buf, origpos, pos - origpos)) + @show res + if res + break + end + end end return pos end +struct ParsingRow <: Tables.AbstractRow + names::Vector{Symbol} + types::Vector{Type} + flags::Vector{UInt8} + code::RowErrorCode + codes::Vector{Int16} + row::Int64 + columns::Vector{AbstractVector} + buf::AbstractVector{UInt8} + pos::Int64 # starting byte position of row + len::Int64 # # of bytes in row +end + +getnames(r::ParsingRow) = getfield(r, :names) +getcolumn(r::ParsingRow, col::Int) = getfield(r, :columns)[col] +getcolumn(r::ParsingRow, col::Symbol) = getfield(r, :columns)[findfirst(==(col), getnames(r))] +gettypes(r::ParsingRow) = getfield(r, :types) +getflags(r::ParsingRow) = getfield(r, :flags) +getcode(r::ParsingRow) = getfield(r, :code) +getcodes(r::ParsingRow) = getfield(r, :codes) +getrow(r::ParsingRow) = getfield(r, :row) +getbuf(r::ParsingRow) = getfield(r, :buf) +getpos(r::ParsingRow) = getfield(r, :pos) +getlen(r::ParsingRow) = getfield(r, :len) + +Tables.columnnames(r::ParsingRow) = getnames(r) + +@inline function Tables.getcolumn(row::ParsingRow, ::Type{T}, i::Int, nm::Symbol) where {T} + column = getcolumn(row, i) + @inbounds x = column[getrow(row)] + return x +end + +@inline function Tables.getcolumn(row::ParsingRow, col::Symbol) + column = getcolumn(row, col) + @inbounds x = column[getrow(row)] + return x +end + +@inline function Tables.getcolumn(row::ParsingRow, col::Int) + column = getcolumn(row, col) + @inbounds x = column[getrow(row)] + return x +end + @inline function poslen(code, pos, len) pos = Core.bitcast(UInt64, pos) << 20 pos |= ifelse(Parsers.sentinel(code), MISSING_BIT, UInt64(0)) diff --git a/src/utils.jl b/src/utils.jl index 391efb8f..8b739ab4 100644 --- a/src/utils.jl +++ b/src/utils.jl @@ -67,6 +67,8 @@ flag(T, lazystrings) = (T === Union{} ? 0x00 : ((USER | TYPEDETECTED) | (hasmiss const PROMOTE_TO_STRING = 0b0100000000000000 % Int16 promote_to_string(code) = code & PROMOTE_TO_STRING > 0 +@enum RowErrorCode None NotEnoughColumns TooManyColumns + hasmissingtype(T) = T === Missing || T !== Core.Compiler.typesubtract(T, Missing) @inline function promote_types(@nospecialize(T), @nospecialize(S))