Description
Feature request
I'd like for a new extension to exist with this signature:
public static IAsyncEnumerable<TSource> Publish<TSource>(this IAsyncEnumerable<TSource> source)
That would be the async
counterpart of the existing IEnumerable.Publish
extension:
Which subcomponent library (Ix, Async.Ix)?
Async,Ix
.
Which next library version (i.e., patch, minor or major)?
Minor (feature).
What are the platform(s), environment(s) and related component version(s)?
How commonly is this feature needed (one project, several projects, company-wide, global)?
It is fairly specialized in our case, but at the same time fairly broadly applicable.
Please describe the feature.
We just went through some specific requirement where the capability of grabbing "the next N elements" from an IAsyncEnumerable
multiple times based on a parent IAsyncEnumerable
would cleanly solve our requirement in an efficient manner.
I came to know that the Publish
extensions on IEnumerable
allows for something like that by buffering the last enumerated index of the sequence, which allows you to, say, call Take(10)
on the sequence, do something with those 10 elements, then call Take(10)
again and grab the next 10, without multiple enumeration of the original (and hopefully without too much of a memory overhead).
We would want that same capability but for IAsyncEnumerable
.
We have a scenario where a small database has been serialized as CSV files: each CSV file is an entity, and relationships between entities (1:1 or 1:N) are controlled by columns in those CSVs that work like foreign keys. At the same time, this multi-CSV structure has an additional column in all files that keeps track "what entity we are dealing with". For example, if the set of files describes 5 main entities, this column will go from 1 through 5 in values, in that order.
For example:
vehicle.csv
(main entity)
entity_num | vehicle_id | vin | ... |
---|---|---|---|
1 | 4512 | WP0CA29972S650104 | ... |
2 | 9102 | 1GCDC14H5DS161081 | ... |
engine.csv
(N:1 with vehicle)
entity_num | vehicle_id | engine_type | fuel_type | ... |
---|---|---|---|---|
1 | 4512 | 4 Cylinder Engine | Gasoline Fuel | ... |
1 | 4512 | 4 Cylinder Engine | Flex Fuel Capability | ... |
2 | 9102 | V6 Cylinder Engine | Gasoline Fuel | ... |
We need to parse this structure as cleanly and as efficiently as possible. Assuming we have an IAsyncEnumerable
for each of these CSV files (using something like CsvHelper
, Publish
would allow us to do something like:
await foreach (var vehicleRow in vehicleCsv)
{
Vehicle vehicle = mapper.Map<Vehicle>(vehicleRow);
await foreach (var engineRow in engineCsv.TakeWhile(e => e.EntityNum == vehicleRow.EntityNum))
{
vehicle.Engines.Add(mapper.Map<Engine>(engineRow);
}
... // several other await foreach loops for different child CSVs here
yield return vehicle;
}
The outside foreach
is straightforward as that's the 1:1 parent entity, but the child entities can have multiple rows that correspond to that parent, so we use TakeWhile
to read those that are related to this entity based on the running EntityNum
value.
Without Publish
, this of course doesn't work for anything past the first parent since the TakeWhile
call would always be on the beginning of the sequence, and having to add a SkipWhile
(or even a Skip
) there would mean unneeded multiple enumeration too.
The only other equivalent in my mind for this problem would be to drop to the enumerator level and call MoveNextAsync
manually but that makes the code substantially less readable.