Add tests for yielding in SpillManager::read_spill_as_stream#16616
Add tests for yielding in SpillManager::read_spill_as_stream#16616alamb merged 3 commits intoapache:mainfrom
SpillManager::read_spill_as_stream#16616Conversation
| /// If running in a tokio context spawns the execution of `stream` to a separate task | ||
| /// allowing it to execute in parallel with an intermediate buffer of size `buffer` | ||
| pub(crate) fn spawn_buffered( | ||
| pub fn spawn_buffered( |
There was a problem hiding this comment.
I changed the visibility of spawn_buffered to pub to use it in the test.
If this isn't ideal, I'll follow any suggestions. (Perhaps duplicating the code might be better?)
|
Thank you @pepijnve and fyi @zhuqi-lucas |
| let schema = mock_stream.schema(); | ||
|
|
||
| let consumer_stream = futures::stream::poll_fn(move |cx| { | ||
| let mut collected = vec![]; |
There was a problem hiding this comment.
I would think of having some cap of iterations or timelimit instead of infinite loop?
|
@pepijnve I wonder if you have some time to review this PR as well? |
0d64f60 to
6659935
Compare
| // To make sure that inner stream is polled multiple times, loop until the buffer is full | ||
| // Ideally, the stream will yield before the loop ends | ||
| for _ in 0..buffer_capacity { |
There was a problem hiding this comment.
Updated to loop until the buffer_capacity
@alamb IIRC, I provided feedback out of band which was integrated by @ding-young |
comphead
left a comment
There was a problem hiding this comment.
Thanks @ding-young
this is lgtm
If the test hits a cap lets debug_assert! it or output a warning to indicate something is wrong with the test
90af070 to
6c93a0f
Compare
6c93a0f to
77541b5
Compare
| // This should be unreachable since the stream is canceled | ||
| unreachable!("Expected the stream to be canceled, but it continued polling"); | ||
| }); |
There was a problem hiding this comment.
Updated (Clippy told me to use unreachable!), thanks @comphead
|
Thanks again @ding-young and @comphead |
…e#16616) * add a test for mocking spawn_bufferd behavior * Clean up * add comment
Which issue does this PR close?
Rationale for this change
As described in above issue, since we introduced cooperative stream when reading spill file, we need to test that the reader stream properly yields. Since it is not obvious to write a test using SpillManager api, this PR adds a test that manually mocks
SpillManager::read_spill_as_stream. (Note that writing to a spill file does not use the stream API.)What changes are included in this PR?
In the test, the producer/sender stream always returns
Poll::Ready, consumer/receiver stream polls it repeatedly and buffer capacity forspawn_bufferedis set large enough so that yielding behavior comes fromCooperativeStream.Are these changes tested?
Are there any user-facing changes?
The visibility of
spawn_bufferedhas changed