Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 34 additions & 18 deletions src/file.jl
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ function File(h::Header;
if limit < rowsguess
rowsguess = limit
end
columns = allocate(rowsguess, ncols, types, flags)
columns = allocate(rowsguess, ncols, types, flags, refs)
t = Base.time()
finalrows, pos = parsefilechunk!(Val(transpose), ncols, typemap, columns, buf, datapos, len, limit, positions, pool, refs, rowsguess, datarow - 1, types, flags, debug, options, coloptions, customtypes)
debug && println("time for initial parsing: $(Base.time() - t)")
Expand Down Expand Up @@ -429,7 +429,7 @@ function multithreadparse(types, flags, buf, datapos, len, options, coloptions,
task_len = ranges[i + 1] - (i != N)
task_pos = ranges[i]
task_types = copy(types)
task_columns = allocate(rowchunkguess, ncols, task_types, task_flags)
task_columns = allocate(rowchunkguess, ncols, task_types, task_flags, task_refs)
pertaskcolumns[i] = task_columns
task_rows, task_pos = parsefilechunk!(Val(false), ncols, typemap, task_columns, buf, task_pos, task_len, typemax(Int64), EMPTY_INT_ARRAY, pool, task_refs, rowchunkguess, datarow + (rowchunkguess * (i - 1)), task_types, task_flags, debug, options, coloptions, customtypes)
rows[i] = task_rows
Expand Down Expand Up @@ -472,8 +472,16 @@ end # @static if VERSION >= v"1.3-DEV"
task_columns[col] = convert(SentinelVector{Float64}, task_columns[col])
elseif T !== Union{} && T !== Missing && task_columns[col] isa MissingVector
# one chunk parsed all missing values, but another chunk had a typed value, promote to that
debug && println("multithreaded promoting column $col from missing")
debug && println("multithreaded promoting column $col from missing on task $i")
task_columns[col] = allocate(T, task_rows)
if T == Union{PooledString, Missing}
colrefs = refs[col]
ref = getref!(colrefs, missing, Int16(0), options)
column = task_columns[col]
for j = 1:task_rows
@inbounds column[j] = ref
end
end
end
end
@inbounds column = pertaskcolumns[1][col]
Expand Down Expand Up @@ -769,13 +777,18 @@ function detect(columns, buf, pos, len, options, row, rowoffset, col, typemap, p
@goto done
end
_, code, vpos, vlen, tlen = Parsers.xparse(String, buf, pos, len, options)
if pool > 0.0
if pool > 0.0 && (row / POOLSAMPLESIZE < pool)
r = RefPool()
@inbounds refs[col] = r
column = allocate(PooledString, rowsguess)
column = allocate(PooledString, pool < 1.0 ? min(rowsguess, POOLSAMPLESIZE) : rowsguess)
if pool < 1.0
flags[col] |= MAYBEPOOLED
end
if anymissing(flags[col])
ref = getref!(r, missing, code, options)
fill!(column, ref)
for i = 1:(row - 1)
@inbounds column[i] = ref
end
end
ref = getref!(r, PointerString(pointer(buf, vpos), vlen), code, options)
@inbounds column[row] = ref
Expand Down Expand Up @@ -944,23 +957,26 @@ function parsepooled!(flag, column, columns, buf, pos, len, options, row, rowoff
# this usually means parsing is borked because of an invalidly quoted field, hard error
fatalerror(buf, pos, tlen, code, rowoffset + row, col)
end
if !isassigned(refs, col)
r = RefPool()
@inbounds refs[col] = r
else
@inbounds r = refs[col]
end
@inbounds colrefs = refs[col]
if Parsers.sentinel(code)
@inbounds flags[col] = flag | ANYMISSING
@inbounds types[col] = Union{PooledString, Missing}
ref = getref!(r, missing, code, options)
ref = getref!(colrefs, missing, code, options)
else
ref = getref!(r, PointerString(pointer(buf, vpos), vlen), code, options)
ref = getref!(colrefs, PointerString(pointer(buf, vpos), vlen), code, options)
end
if !user(flag) && ((length(r.refs) - anymissing(flags[col])) / rowsguess) > pool
code |= PROMOTE_TO_STRING
else
@inbounds column[row] = ref
@inbounds column[row] = ref
if !user(flag) && maybepooled(flag)
if rowsguess <= POOLSAMPLESIZE && ((length(colrefs.refs) - anymissing(flags[col])) / rowsguess) > pool
code |= PROMOTE_TO_STRING
elseif row == POOLSAMPLESIZE
if ((length(colrefs.refs) - anymissing(flags[col])) / min(rowsguess, POOLSAMPLESIZE)) > pool
code |= PROMOTE_TO_STRING
else
resize!(column, rowsguess)
end
flags[col] &= ~MAYBEPOOLED
end
end
return pos + tlen, code
end
2 changes: 1 addition & 1 deletion src/rows.jl
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ function Rows(source;
kw...)

h = Header(source, header, normalizenames, datarow, skipto, footerskip, transpose, comment, use_mmap, ignoreemptylines, select, drop, missingstrings, missingstring, delim, ignorerepeated, quotechar, openquotechar, closequotechar, escapechar, dateformat, dateformats, decimal, truestrings, falsestrings, type, types, categorical, pool, lazystrings, strict, silencewarnings, debug, parsingdebug, true)
columns = allocate(1, h.cols, h.types, h.flags)
columns = allocate(1, h.cols, h.types, h.flags, nothing)
values = all(x->x == Union{String, Missing}, h.types) && lazystrings ? Vector{PosLen}(undef, h.cols) : Vector{Any}(undef, h.cols)
finaltypes = copy(h.types)
columnmap = [i for i = 1:h.cols]
Expand Down
24 changes: 18 additions & 6 deletions src/utils.jl
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ willdrop(flag) = flag & WILLDROP > 0
const LAZYSTRINGS = 0b00010000
lazystrings(flag) = flag & LAZYSTRINGS > 0

const MAYBEPOOLED = 0b00100000
maybepooled(flag) = flag & MAYBEPOOLED > 0
# ~95% z-score, 10% MoE
const POOLSAMPLESIZE = 100

flag(T, lazystrings) = (T === Union{} ? 0x00 : ((USER | TYPEDETECTED) | (hasmissingtype(T) ? ANYMISSING : 0x00))) | (lazystrings ? LAZYSTRINGS : 0x00)

# we define our own bit flag on a Parsers.ReturnCode to signal if a column needs to promote to string
Expand Down Expand Up @@ -163,8 +168,15 @@ end
const SmallIntegers = Union{Int8, UInt8, Int16, UInt16, Int32, UInt32}

# allocate columns for a full file
function allocate(rowsguess, ncols, types, flags)
return AbstractVector[allocate(lazystrings(flags[i]) && (types[i] === String || types[i] === Union{String, Missing}) ? PosLen : types[i], rowsguess) for i = 1:ncols]
function allocate(rowsguess, ncols, types, flags, refs)
columns = Vector{AbstractVector}(undef, ncols)
for i = 1:ncols
@inbounds columns[i] = allocate(lazystrings(flags[i]) && (types[i] === String || types[i] === Union{String, Missing}) ? PosLen : types[i], rowsguess)
if types[i] === PooledString || types[i] === Union{PooledString, Missing} || types[i] === CategoricalValue{String, UInt32} || types[i] === Union{CategoricalValue{String, UInt32}, Missing}
refs[i] = RefPool()
end
end
return columns
end

# MissingVector is an efficient representation in SentinelArrays.jl package
Expand All @@ -177,10 +189,10 @@ function allocate(::Type{PosLen}, len)
end
allocate(::Type{String}, len) = SentinelVector{String}(undef, len)
allocate(::Type{Union{String, Missing}}, len) = SentinelVector{String}(undef, len)
allocate(::Type{PooledString}, len) = fill(UInt32(0), len)
allocate(::Type{Union{PooledString, Missing}}, len) = fill(UInt32(0), len)
allocate(::Type{CategoricalValue{String, UInt32}}, len) = fill(UInt32(0), len)
allocate(::Type{Union{CategoricalValue{String, UInt32}, Missing}}, len) = fill(UInt32(0), len)
allocate(::Type{PooledString}, len) = Vector{UInt32}(undef, len)
allocate(::Type{Union{PooledString, Missing}}, len) = Vector{UInt32}(undef, len)
allocate(::Type{CategoricalValue{String, UInt32}}, len) = Vector{UInt32}(undef, len)
allocate(::Type{Union{CategoricalValue{String, UInt32}, Missing}}, len) = Vector{UInt32}(undef, len)
allocate(::Type{Bool}, len) = Vector{Union{Missing, Bool}}(undef, len)
allocate(::Type{Union{Missing, Bool}}, len) = Vector{Union{Missing, Bool}}(undef, len)
allocate(::Type{T}, len) where {T <: SmallIntegers} = Vector{Union{Missing, T}}(undef, len)
Expand Down
2 changes: 1 addition & 1 deletion test/basics.jl
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ f = CSV.File(IOBuffer("col1\nhey\nthere\nsailor"); lazystrings=true)
@test f.col1 isa CSV.LazyStringVector
@test Tables.columnnames(f) == [:col1]
@test propertynames(f) == [:col1]
@test CSV.getname(f) == "<Base.GenericIOBuffer{Array{UInt8,1}}>"
@test occursin("IOBuffer", CSV.getname(f))
@test CSV.getcols(f) == 1
@test Base.IndexStyle(f) == Base.IndexLinear()
@test f.col1 === Tables.getcolumn(f, 1)
Expand Down