Skip to content

Add possibility to iterate over table data (streaming) #440

@hagenw

Description

@hagenw

To support loading large tables, that might not fit into memory, it would be a good idea to add an option to Table.get() (or another method?), to read the data piece-wise.

Let us first create an example table and store as CSV and PARQUET:

import audformat

entries = 100
db = audformat.Database("mydb")
db.schemes["int"] = audformat.Scheme("int")
index = audformat.filewise_index([f"file{n}.wav" for n in range(entries)])
db["files"] = audformat.Table(index)
db["files"]["int"] = audformat.Column(scheme_id="int")
db["files"]["int"].set([int(n) for n in range(entries)])

db["files"].save("files", storage_format="csv")
db["files"].save("files", storage_format="parquet", update_other_formats=False)

Stream PARQUET tables

The table stored in PARQUET can be iterated with:

import pyarrow.parquet as parquet

stream = parquet.ParquetFile("files.parquet")
nrows = 10
for batch in stream.iter_batches(batch_size=nrows):
    print(batch.to_pandas())

Stream CSV tables

Streaming a CSV file with pyarrow seems to be more complicated, as we cannot directly pass the number of rows we want per batch, but only the size in bytes of a batch. But the problem is the number of bytes per line can vary:

with open("files.csv") as file:
    for line in file:
        bytes_per_line = len(line) + 1
        print(bytes_per_line)

returns

10
13
13
13
13
13
13
13
13
13
13
15
...
15

If we find a way to calculate the correct block_size value we could do:

import pyarrow.csv as csv

block_size = 140  # returns same result as streaming from PARQUET
read_options = csv.ReadOptions(column_names=["file", "int"], skip_rows=1, block_size=block_size)
convert_options = csv.ConvertOptions(column_types=db["files"]._pyarrow_csv_schema())
stream = csv.open_csv("files.csv", read_options=read_options, convert_options=convert_options)
for batch in stream:
    print(batch.to_pandas())

Fallback solution using pandas (as far as I understand this, it uses the Python read engine under the hood, which should be much slower than pyarrow):

import pandas as pd

for batch in pd.read_csv("files.csv", chunksize=nrows):
    print(batch)

Argument name

The most straightforward implementation seems to me to add a single argument to audformat.Table.get() specifying the number of rows we want to read. This could be named nrows, n_rows, chunksize, batch_size or similar:

for batch in db["files"].get(batch_size=10):
    print(batch)

We might want to add a second argument to specify an offset to the first row we start reading. This way, we would be able to read a particular part of the table, e.g.

next(db["files"].get(batch_size=5, offset=5)

We might also want to consider integration with audb already. In audb we might want to have the option to stream the data directly from the backend and not from the cache. This means we load the part of the table file from the backend, and we also load the corresponding media files.

/cc @ChristianGeng

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions