Skip to content

Commit

Permalink
processing review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
Jolanrensen committed Nov 15, 2024
1 parent 0bc0c0b commit 5bd7567
Show file tree
Hide file tree
Showing 15 changed files with 185 additions and 167 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package org.jetbrains.kotlinx.dataframe.impl.io

import org.apache.commons.csv.CSVFormat
import org.apache.commons.csv.CSVRecord
import org.jetbrains.kotlinx.dataframe.AnyFrame
import org.jetbrains.kotlinx.dataframe.DataColumn
import org.jetbrains.kotlinx.dataframe.DataFrame
import org.jetbrains.kotlinx.dataframe.api.ParserOptions
import org.jetbrains.kotlinx.dataframe.api.toDataFrame
import org.jetbrains.kotlinx.dataframe.api.tryParse
import org.jetbrains.kotlinx.dataframe.impl.ColumnNameGenerator
import org.jetbrains.kotlinx.dataframe.io.ColType
import org.jetbrains.kotlinx.dataframe.io.toKType
import java.io.BufferedReader
import java.io.Reader
import kotlin.reflect.full.withNullability
import kotlin.reflect.typeOf

internal fun DataFrame.Companion.readDelimImpl(
reader: Reader,
format: CSVFormat,
colTypes: Map<String, ColType>,
skipLines: Int,
readLines: Int?,
parserOptions: ParserOptions?,
): AnyFrame {
var reader = reader
if (skipLines > 0) {
reader = BufferedReader(reader)
repeat(skipLines) { reader.readLine() }
}

val csvParser = format.parse(reader)
val records = if (readLines == null) {
csvParser.records
} else {
require(readLines >= 0) { "`readLines` must not be negative" }
val records = ArrayList<CSVRecord>(readLines)
val iter = csvParser.iterator()
var count = readLines ?: 0
while (iter.hasNext() && 0 < count--) {
records.add(iter.next())
}
records
}

val columnNames = csvParser.headerNames.takeIf { it.isNotEmpty() }
?: (1..(records.firstOrNull()?.count() ?: 0)).map { index -> "X$index" }

val generator = ColumnNameGenerator()
val uniqueNames = columnNames.map { generator.addUnique(it) }

val cols = uniqueNames.mapIndexed { colIndex, colName ->
val defaultColType = colTypes[".default"]
val colType = colTypes[colName] ?: defaultColType
var hasNulls = false
val values = records.map {
if (it.isSet(colIndex)) {
it[colIndex].ifEmpty {
hasNulls = true
null
}
} else {
hasNulls = true
null
}
}
val column = DataColumn.createValueColumn(colName, values, typeOf<String>().withNullability(hasNulls))
val skipTypes = when {
colType != null ->
// skip all types except the desired type
ParserOptions.allTypesExcept(colType.toKType())

else ->
// respect the provided parser options
parserOptions?.skipTypes ?: emptySet()
}
val adjustsedParserOptions = (parserOptions ?: ParserOptions())
.copy(skipTypes = skipTypes)

return@mapIndexed column.tryParse(adjustsedParserOptions)
}
return cols.toDataFrame()
}
77 changes: 11 additions & 66 deletions core/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/csv.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,20 @@ import kotlinx.datetime.LocalDate
import kotlinx.datetime.LocalDateTime
import kotlinx.datetime.LocalTime
import org.apache.commons.csv.CSVFormat
import org.apache.commons.csv.CSVRecord
import org.apache.commons.io.input.BOMInputStream
import org.jetbrains.kotlinx.dataframe.AnyFrame
import org.jetbrains.kotlinx.dataframe.AnyRow
import org.jetbrains.kotlinx.dataframe.DataColumn
import org.jetbrains.kotlinx.dataframe.DataFrame
import org.jetbrains.kotlinx.dataframe.DataRow
import org.jetbrains.kotlinx.dataframe.annotations.Interpretable
import org.jetbrains.kotlinx.dataframe.annotations.OptInRefine
import org.jetbrains.kotlinx.dataframe.annotations.Refine
import org.jetbrains.kotlinx.dataframe.api.ParserOptions
import org.jetbrains.kotlinx.dataframe.api.forEach
import org.jetbrains.kotlinx.dataframe.api.toDataFrame
import org.jetbrains.kotlinx.dataframe.api.tryParse
import org.jetbrains.kotlinx.dataframe.codeGen.DefaultReadCsvMethod
import org.jetbrains.kotlinx.dataframe.codeGen.DefaultReadDfMethod
import org.jetbrains.kotlinx.dataframe.impl.ColumnNameGenerator
import org.jetbrains.kotlinx.dataframe.impl.api.parse
import org.jetbrains.kotlinx.dataframe.impl.io.readDelimImpl
import org.jetbrains.kotlinx.dataframe.util.AS_URL
import org.jetbrains.kotlinx.dataframe.util.AS_URL_IMPORT
import org.jetbrains.kotlinx.dataframe.util.AS_URL_REPLACE
Expand All @@ -46,7 +42,6 @@ import java.nio.charset.Charset
import java.util.zip.GZIPInputStream
import kotlin.reflect.KClass
import kotlin.reflect.KType
import kotlin.reflect.full.withNullability
import kotlin.reflect.typeOf
import kotlin.time.Duration

Expand Down Expand Up @@ -354,74 +349,24 @@ public fun DataFrame.Companion.readDelim(
skipLines: Int = 0,
readLines: Int? = null,
parserOptions: ParserOptions? = null,
): AnyFrame {
): AnyFrame =
try {
var reader = reader
if (skipLines > 0) {
reader = BufferedReader(reader)
repeat(skipLines) { reader.readLine() }
}

val csvParser = format.parse(reader)
val records = if (readLines == null) {
csvParser.records
} else {
require(readLines >= 0) { "`readLines` must not be negative" }
val records = ArrayList<CSVRecord>(readLines)
val iter = csvParser.iterator()
var count = readLines ?: 0
while (iter.hasNext() && 0 < count--) {
records.add(iter.next())
}
records
}

val columnNames = csvParser.headerNames.takeIf { it.isNotEmpty() }
?: (1..(records.firstOrNull()?.count() ?: 0)).map { index -> "X$index" }

val generator = ColumnNameGenerator()
val uniqueNames = columnNames.map { generator.addUnique(it) }

val cols = uniqueNames.mapIndexed { colIndex, colName ->
val defaultColType = colTypes[".default"]
val colType = colTypes[colName] ?: defaultColType
var hasNulls = false
val values = records.map {
if (it.isSet(colIndex)) {
it[colIndex].ifEmpty {
hasNulls = true
null
}
} else {
hasNulls = true
null
}
}
val column = DataColumn.createValueColumn(colName, values, typeOf<String>().withNullability(hasNulls))
val skipTypes = when {
colType != null ->
// skip all types except the desired type
ParserOptions.allTypesExcept(colType.toKType())

else ->
// respect the provided parser options
parserOptions?.skipTypes ?: emptySet()
}
val adjustsedParserOptions = (parserOptions ?: ParserOptions())
.copy(skipTypes = skipTypes)

return@mapIndexed column.tryParse(adjustsedParserOptions)
}
return cols.toDataFrame()
} catch (e: OutOfMemoryError) {
readDelimImpl(
reader = reader,
format = format,
colTypes = colTypes,
skipLines = skipLines,
readLines = readLines,
parserOptions = parserOptions,
)
} catch (_: OutOfMemoryError) {
throw OutOfMemoryError(
"Ran out of memory reading this CSV-like file. " +
"You can try our new experimental CSV reader by adding the dependency " +
"\"org.jetbrains.kotlinx:dataframe-csv:{VERSION}\" and using `DataFrame.readCsv()` instead of " +
"`DataFrame.readCSV()`.",
)
}
}

public fun AnyFrame.writeCSV(file: File, format: CSVFormat = CSVFormat.DEFAULT): Unit =
writeCSV(FileWriter(file), format)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import io.kotest.matchers.nulls.shouldNotBeNull
import io.kotest.matchers.shouldBe
import kotlinx.datetime.LocalDateTime
import org.apache.commons.csv.CSVFormat
import org.intellij.lang.annotations.Language
import org.jetbrains.kotlinx.dataframe.DataFrame
import org.jetbrains.kotlinx.dataframe.api.ParserOptions
import org.jetbrains.kotlinx.dataframe.api.allNulls
Expand Down Expand Up @@ -177,7 +178,15 @@ class CsvTests {

@Test
fun `if string starts with a number, it should be parsed as a string anyway`() {
val df = DataFrame.readCSV(durationCsv)
@Language("CSV")
val df = DataFrame.readDelimStr(
"""
duration,floatDuration
12 min,1.0
15,12.98 sec
1 Season,0.9 parsec
""".trimIndent(),
)
df["duration"].type() shouldBe typeOf<String>()
df["floatDuration"].type() shouldBe typeOf<String>()
}
Expand Down Expand Up @@ -321,7 +330,6 @@ class CsvTests {
private val simpleCsv = testCsv("testCSV")
private val csvWithFrenchLocale = testCsv("testCSVwithFrenchLocale")
private val wineCsv = testCsv("wine")
private val durationCsv = testCsv("duration")
private val withBomCsv = testCsv("with-bom")
}
}
4 changes: 0 additions & 4 deletions core/src/test/resources/duration.csv

This file was deleted.

4 changes: 0 additions & 4 deletions dataframe-csv/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,4 @@ kotlinPublications {

kotlin {
explicitApi()
sourceSets.all {
languageSettings {
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,8 @@ internal object DelimParams {
/**
* @param parseParallel Whether to parse the data in parallel. Default: `true`.
*
* If `true`, the data will be parsed in parallel.
* This is usually faster, but can be turned off for debugging.
* If `true`, the data will be read and parsed in parallel by the Deephaven parser.
* This is usually faster but can be turned off for debugging.
*/
const val PARSE_PARALLEL: Boolean = true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ internal class DataFrameCustomDoubleParser(parserOptions: ParserOptions) : Custo
override fun parse(bs: ByteSlice): Double =
try {
fastDoubleParser.parseOrNull(bs.data(), bs.begin(), bs.size())
} catch (e: Exception) {
} catch (_: Exception) {
null
} ?: throw NumberFormatException("Failed to parse double")
} ?: throw NumberFormatException()

override fun parse(cs: CharSequence): Double =
fastDoubleParser.parseOrNull(cs.toString())
?: throw NumberFormatException("Failed to parse double")
?: throw NumberFormatException()
}
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ internal class ListSink(val columnIndex: Int, val dataType: DataType) : SinkSour
}
}

// Deephaven's fast path for numeric type inference supports only byte, short, int, and long
// so this should never be reached
else -> error("unsupported sink state")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,10 @@ internal fun readDelimImpl(
) {
return@readDelimImpl DataFrame.empty()
}
throw IllegalStateException("Could not read delimiter-separated data. ${e.message}", e)
throw IllegalStateException(
"Could not read delimiter-separated data: CsvReaderException: ${e.message}: ${e.cause?.message ?: ""}",
e,
)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,21 @@ import java.util.zip.ZipInputStream
*/
public sealed class Compression<I : InputStream>(public open val wrapStream: (InputStream) -> I) {

public companion object {
public fun of(fileOrUrl: String): Compression<*> =
when (fileOrUrl.split(".").last()) {
"gz" -> Gzip
"zip" -> Zip
else -> None
}

public fun of(file: File): Compression<*> = of(file.name)

public fun of(path: Path): Compression<*> = of(path.fileName?.toString() ?: "")

public fun of(url: URL): Compression<*> = of(url.path)
}

/** Can be overridden to perform some actions before reading from the input stream. */
public open fun doFirst(inputStream: I) {}

Expand Down Expand Up @@ -90,16 +105,3 @@ public inline fun <T, I : InputStream> InputStream.useDecompressed(
compression.doFinally(wrappedStream)
}
}

public fun compressionStateOf(fileOrUrl: String): Compression<*> =
when (fileOrUrl.split(".").last()) {
"gz" -> Compression.Gzip
"zip" -> Compression.Zip
else -> Compression.None
}

public fun compressionStateOf(file: File): Compression<*> = compressionStateOf(file.name)

public fun compressionStateOf(path: Path): Compression<*> = compressionStateOf(path.fileName?.toString() ?: "")

public fun compressionStateOf(url: URL): Compression<*> = compressionStateOf(url.path)
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.jetbrains.kotlinx.dataframe.documentation.DelimParams.READ_LINES
import org.jetbrains.kotlinx.dataframe.documentation.DelimParams.SKIP_LINES
import org.jetbrains.kotlinx.dataframe.documentation.DelimParams.TRIM_INSIDE_QUOTED
import org.jetbrains.kotlinx.dataframe.impl.io.readDelimImpl
import org.jetbrains.kotlinx.dataframe.io.Compression.Companion
import java.io.File
import java.io.FileInputStream
import java.io.InputStream
Expand All @@ -50,7 +51,7 @@ public fun DataFrame.Companion.readCsv(
header: List<String> = HEADER,
hasFixedWidthColumns: Boolean = HAS_FIXED_WIDTH_COLUMNS,
fixedColumnWidths: List<Int> = FIXED_COLUMN_WIDTHS,
compression: Compression<*> = compressionStateOf(path),
compression: Compression<*> = Compression.of(path),
colTypes: Map<String, ColType> = COL_TYPES,
skipLines: Long = SKIP_LINES,
readLines: Long? = READ_LINES,
Expand Down Expand Up @@ -101,7 +102,7 @@ public fun DataFrame.Companion.readCsv(
header: List<String> = HEADER,
hasFixedWidthColumns: Boolean = HAS_FIXED_WIDTH_COLUMNS,
fixedColumnWidths: List<Int> = FIXED_COLUMN_WIDTHS,
compression: Compression<*> = compressionStateOf(file),
compression: Compression<*> = Compression.of(file),
colTypes: Map<String, ColType> = COL_TYPES,
skipLines: Long = SKIP_LINES,
readLines: Long? = READ_LINES,
Expand Down Expand Up @@ -152,7 +153,7 @@ public fun DataFrame.Companion.readCsv(
header: List<String> = HEADER,
hasFixedWidthColumns: Boolean = HAS_FIXED_WIDTH_COLUMNS,
fixedColumnWidths: List<Int> = FIXED_COLUMN_WIDTHS,
compression: Compression<*> = compressionStateOf(url),
compression: Compression<*> = Compression.of(url),
colTypes: Map<String, ColType> = COL_TYPES,
skipLines: Long = SKIP_LINES,
readLines: Long? = READ_LINES,
Expand Down Expand Up @@ -203,7 +204,7 @@ public fun DataFrame.Companion.readCsv(
header: List<String> = HEADER,
hasFixedWidthColumns: Boolean = HAS_FIXED_WIDTH_COLUMNS,
fixedColumnWidths: List<Int> = FIXED_COLUMN_WIDTHS,
compression: Compression<*> = compressionStateOf(fileOrUrl),
compression: Compression<*> = Compression.of(fileOrUrl),
colTypes: Map<String, ColType> = COL_TYPES,
skipLines: Long = SKIP_LINES,
readLines: Long? = READ_LINES,
Expand Down
Loading

0 comments on commit 5bd7567

Please sign in to comment.