Skip to content

Publish overload for IAsyncEnumerable #2159

Open
@julealgon

Description

Feature request

I'd like for a new extension to exist with this signature:

public static IAsyncEnumerable<TSource> Publish<TSource>(this IAsyncEnumerable<TSource> source)

That would be the async counterpart of the existing IEnumerable.Publish extension:

Which subcomponent library (Ix, Async.Ix)?

Async,Ix.

Which next library version (i.e., patch, minor or major)?

Minor (feature).

What are the platform(s), environment(s) and related component version(s)?

How commonly is this feature needed (one project, several projects, company-wide, global)?

It is fairly specialized in our case, but at the same time fairly broadly applicable.

Please describe the feature.

We just went through some specific requirement where the capability of grabbing "the next N elements" from an IAsyncEnumerable multiple times based on a parent IAsyncEnumerable would cleanly solve our requirement in an efficient manner.

I came to know that the Publish extensions on IEnumerable allows for something like that by buffering the last enumerated index of the sequence, which allows you to, say, call Take(10) on the sequence, do something with those 10 elements, then call Take(10) again and grab the next 10, without multiple enumeration of the original (and hopefully without too much of a memory overhead).

We would want that same capability but for IAsyncEnumerable.

We have a scenario where a small database has been serialized as CSV files: each CSV file is an entity, and relationships between entities (1:1 or 1:N) are controlled by columns in those CSVs that work like foreign keys. At the same time, this multi-CSV structure has an additional column in all files that keeps track "what entity we are dealing with". For example, if the set of files describes 5 main entities, this column will go from 1 through 5 in values, in that order.

For example:

vehicle.csv (main entity)

entity_num vehicle_id vin ...
1 4512 WP0CA29972S650104 ...
2 9102 1GCDC14H5DS161081 ...

engine.csv (N:1 with vehicle)

entity_num vehicle_id engine_type fuel_type ...
1 4512 4 Cylinder Engine Gasoline Fuel ...
1 4512 4 Cylinder Engine Flex Fuel Capability ...
2 9102 V6 Cylinder Engine Gasoline Fuel ...

We need to parse this structure as cleanly and as efficiently as possible. Assuming we have an IAsyncEnumerable for each of these CSV files (using something like CsvHelper, Publish would allow us to do something like:

await foreach (var vehicleRow in vehicleCsv)
{
    Vehicle vehicle = mapper.Map<Vehicle>(vehicleRow);

    await foreach (var engineRow in engineCsv.TakeWhile(e => e.EntityNum == vehicleRow.EntityNum))
    {
        vehicle.Engines.Add(mapper.Map<Engine>(engineRow);
    }

    ... // several other await foreach loops for different child CSVs here

    yield return vehicle;
}

The outside foreach is straightforward as that's the 1:1 parent entity, but the child entities can have multiple rows that correspond to that parent, so we use TakeWhile to read those that are related to this entity based on the running EntityNum value.

Without Publish, this of course doesn't work for anything past the first parent since the TakeWhile call would always be on the beginning of the sequence, and having to add a SkipWhile (or even a Skip) there would mean unneeded multiple enumeration too.

The only other equivalent in my mind for this problem would be to drop to the enumerator level and call MoveNextAsync manually but that makes the code substantially less readable.

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions