Skip to content

Parallel computations #723

Open
Open
@koperagen

Description

@koperagen

Object properties sometimes can be heavy or lazily computed and overall conversion take minutes for somewhat big lists
One can write this fairly simple code to speed up the conversion

 val df = runBlocking {
      list
          .chunked(workload)
          .map {
              async(Dispatchers.IO) { it.toDataFrame() }
          }.awaitAll().concat()
  }

Although code is simple, it seems hard to properly make this parallelism part of the toDataFrame implementation.
Only list.toDataFrame(maxDepth = int) and list.toDataFrame { properties(maxDepth = int) { } } are side effect free, and it's (mostly) safe to split the list in chunks, run conversion in parallel and concat results. But even computation of the properties can be not parallel friendly. And then there is a question how workload is split and so on.

add and convert can be heavy and involve IO too. For this i have something like this in mind

fun DataFrame<*>.awaitAll(selector: ColumnSelector<*, Deferred<*>>) = runBlocking {
    val column = getColumn(selector)
    val values = column.toList().awaitAll()
    replace(selector).with(values.toColumn(column.name(), infer = Infer.Type))
}

Usage:

val df = runBlocking {
    otherDf.add("col") {
        async(Dispatchers.IO) {
            heavyCompute()
        }
    }.awaitAll { "col"() }
}

These two approaches can speed up dataframe code significantly in certain scenarios, so we can give them some visibility in the documentation.

Metadata

Metadata

Assignees

No one assigned

    Labels

    help wantedExtra attention is needed, feel free to help :)researchThis requires a deeper dive to gather a better understanding

    Type

    No type

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions