-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Stream client prototype #2: asyncio-heavy #6145
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Stream client prototype #2: asyncio-heavy #6145
Conversation
This PR is a rough draft. I have not tested this, but curious to hear your feedback on the design. The call to I'm not sure if this cancellation approach works - would the main task get cancelled once the AwaitableFuture gets a cancellation signal? Probably not? |
Improved user-triggered cancellation and added the ability to stop the stream manager loop, which cancels all existing requests. The main pieces are in place. Remaining tasks are captured in TODOs and they should be simple enough to add when I write the real implementation. PTAL @maffoo @wcourtney at the high-level design (this prototype is still untested). Let me know if a GVC going over this PR would help. |
6866e2d
to
658f1b3
Compare
* Message ID should be str in the request. * _executor and _request_iterator should be properties * quantum_run_stream() call needs to be awaited.
This is now passing the test for the happy path (no breakages) and the cancellation test, after a local installation of the latest duet version which includes the AwaitableFuture cancellation propagation fix: google/duet#77 |
A broken stream test is added and passing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initial comments on the high-level usage and tests. Still digging into the manager and demuxer.
project_id: str, | ||
program_id: str, | ||
code: any_pb2.Any, | ||
job_id: Optional[str], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we recover a stream if we don't provide a job_id? Or is that handled between this layer and the API call?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point - we talked about requiring the job_id before and I missed it here. Thanks!
|
||
job_name = _job_name_from_ids(project_id, program_id, job_id) if job_id else '' | ||
job = quantum.QuantumJob( | ||
name=job_name, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than setting the name to ''
(an implementation detail of the proto) if not job_id
, WDYT of handling this like the other optional fields below and assigning only if it's set?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like you mentioned in the other comment, I also think it's a good idea to require job_id
to be set, so I think this comment no longer applies
elif 'create_quantum_job' in request: | ||
return request.create_quantum_job.quantum_job.name | ||
# 'get_quantum_result' in request | ||
return request.get_quantum_result.parent |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This won't be true if a new request type is added later. Prefer to be specific and raise an exception if an unexpected request comes through so that we can detect that it wasn't correctly handled.
class _FakeQuantumRunStream: | ||
def __init__(self): | ||
self.stream_request_count = 0 | ||
self.cancel_requests = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a type annotation.
self.cancel_requests = [] | ||
self.responses_and_exceptions: List[engine.QuantumRunStreamResponse | BaseException] = [] | ||
|
||
def add_responses_and_exceptions( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this appending behavior needed, or could we make the list immutable and provide it at construction time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, updated
|
||
|
||
@mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True) | ||
def test_run_job_over_stream(client_constructor): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Test names should summarize the behavior being tested and its expected outcome." go/unit-testing-practices#naming
priority: Optional[int] = None, | ||
description: Optional[str] = None, | ||
labels: Optional[Dict[str, str]] = None, | ||
) -> duet.AwaitableFuture[quantum.QuantumResult]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How are failed jobs or errors conveyed through this interface? https://source.corp.google.com/piper///depot/google3/google/cloud/quantum/v1alpha1/engine.proto;rcl=389782031;l=1207
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_manage_stream
first receives these responses through the response iterable, which is then sent through the ResponseDemux
to the _make_request
task handling their corresponding jobs. Where response.result
is checked inside _make_request
, there will be additional clauses for failed jobs and stream errors.
For retryable errors, the appropriate stream request will be set as current_request
and the loop continues, so that the request is sent over the stream.
Non-retryable errors are raised, which is propagated to the duet.AwaitableFuture
for this task. The end user awaiting on this future will see the exception.
return response.result | ||
|
||
except GoogleAPICallError: | ||
# TODO how to distinguish between program not found vs job not found? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are different Code
s in a StreamError
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, thanks!
except GoogleAPICallError: | ||
# TODO how to distinguish between program not found vs job not found? | ||
# TODO handle QuantumJob response and retryable StreamError. | ||
# TODO Send a CreateProgramAndJobRequest or CreateJobRequest if either program or |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure what this means. e.g., a "job doesn't exist" should only happen during a GetQuantumResultRequest
, but that value won't have enough context to create a new job.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GetQuantumResultRequest
is only sent when a previous CreateProgramAndJobRequest
failed for some reason (most likely a stream failure) within this asyncio task, so this task does have the full program and job context to recreate the job.
Getting a "job doesn't exist" means either the "create program" part or the "create job" part failed. We could try recreating just the job first by sending a CreateJobRequest
, and if that fails with a "program doesn't exist" error, we can try the full CreateProgramAndJobRequest
again.
self._subscribers = {} | ||
|
||
|
||
class StreamManager: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add unit tests for the public API of this class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely. For this PR I focused on sketching a prototype to get high-level design review. The real implementations will include detailed unit tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review! I addressed most comments but deferred a few to the real implementation later.
self._subscribers = {} | ||
|
||
|
||
class StreamManager: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely. For this PR I focused on sketching a prototype to get high-level design review. The real implementations will include detailed unit tests.
return response.result | ||
|
||
except GoogleAPICallError: | ||
# TODO how to distinguish between program not found vs job not found? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, thanks!
except GoogleAPICallError: | ||
# TODO how to distinguish between program not found vs job not found? | ||
# TODO handle QuantumJob response and retryable StreamError. | ||
# TODO Send a CreateProgramAndJobRequest or CreateJobRequest if either program or |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GetQuantumResultRequest
is only sent when a previous CreateProgramAndJobRequest
failed for some reason (most likely a stream failure) within this asyncio task, so this task does have the full program and job context to recreate the job.
Getting a "job doesn't exist" means either the "create program" part or the "create job" part failed. We could try recreating just the job first by sending a CreateJobRequest
, and if that fails with a "program doesn't exist" error, we can try the full CreateProgramAndJobRequest
again.
priority: Optional[int] = None, | ||
description: Optional[str] = None, | ||
labels: Optional[Dict[str, str]] = None, | ||
) -> duet.AwaitableFuture[quantum.QuantumResult]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_manage_stream
first receives these responses through the response iterable, which is then sent through the ResponseDemux
to the _make_request
task handling their corresponding jobs. Where response.result
is checked inside _make_request
, there will be additional clauses for failed jobs and stream errors.
For retryable errors, the appropriate stream request will be set as current_request
and the loop continues, so that the request is sent over the stream.
Non-retryable errors are raised, which is propagated to the duet.AwaitableFuture
for this task. The end user awaiting on this future will see the exception.
|
||
job_name = _job_name_from_ids(project_id, program_id, job_id) if job_id else '' | ||
job = quantum.QuantumJob( | ||
name=job_name, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like you mentioned in the other comment, I also think it's a good idea to require job_id
to be set, so I think this comment no longer applies
self.cancel_requests = [] | ||
self.responses_and_exceptions: List[engine.QuantumRunStreamResponse | BaseException] = [] | ||
|
||
def add_responses_and_exceptions( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, updated
raise response_or_exception | ||
yield response_or_exception | ||
|
||
await asyncio.sleep(0.0001) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Turns out setting it to 0 also works!
labels = {'hello': 'world'} | ||
client = EngineClient() | ||
expected_result = quantum.QuantumResult(parent='projects/proj/programs/prog/jobs/job0') | ||
mock_responses = [quantum.QuantumRunStreamResponse(message_id='0', result=expected_result)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the updated implementation, I think leaving it separate makes fake client construction more clear.
job_path = next( | ||
( | ||
p | ||
for p, (message_id, _) in self._subscribers.items() | ||
if message_id == response.message_id | ||
), | ||
default='', | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider storing a second map from message_id to future (or job_path) so that you don't have to do a linear search here.
"""Assumes the message ID has not been set.""" | ||
job_path = _get_job_path_from_stream_request(request) | ||
request.message_id = str(self._next_available_message_id) | ||
response_future: asyncio.Future = asyncio.Future() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the docs the recommended way to create futures is to call loop.create_future()
. It'd probably be good to pass in the event loop to the ResponseDemux
constructor and store it as an instance variable so it can be used here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing this out. I will use asyncio.get_running_loop()
.
_, future = self._subscribers[job_path] | ||
if not future.done(): | ||
future.set_result(response) | ||
del self._subscribers[job_path] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can use dict.pop
_, future = self._subscribers[job_path] | |
if not future.done(): | |
future.set_result(response) | |
del self._subscribers[job_path] | |
_, future = self._subscribers.pop(job_path) | |
if not future.done(): | |
future.set_result(response) |
return AsyncioExecutor.instance() | ||
|
||
@property | ||
async def _request_iterator(self) -> AsyncIterator[quantum.QuantumRunStreamRequest]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest not making this a property, since that makes it seem like there is one request iterator, but actually we get a new iterator each time self._request_iterator
is accessed. I think it'd be more clear to make this a method on document that we get a new iterator each time. It occurs to me that it might be better to make this a static method or module level helper function and have it take the queue as a parameter, rather than using attributes on self
, because the use of this iterator is actually local to one iteration of the loop in _manage_stream
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I follow the last part - why would it better to use a static method or module-level function if it's local to one iteration? Isn't a new iterator created for each quantum_run_stream
in both cases?
# TODO might need to keep this long-running in order to not close the stream. | ||
while not self._request_queue.empty(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I definitely think you don't want to terminate this loop when the queue gets empty, since that can happen if there is ever a pause in new requests coming in, but we want the stream to be ready for new requests not have to be reestablished each time there is a gap in requests.
elif 'create_quantum_job' in request: | ||
return request.create_quantum_job.quantum_job.name | ||
elif 'get_quantum_result' in request: | ||
return request.get_quantum_result.parent |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I understad the use of "job path" these would have to be unique across requests, since we use the job path as a key to keep track of futures, so if there is a collision we could lose a future and the client will hang waiting for it to complete. Is "parent" here unique across requests? What if a user makes two get requests for the same result?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Parent" here is also the full job path.
Thanks for catching the future collision issue. One idea is to reuse the existing future in ResponseDemux
if the subscriber already exists, and still increment the message ID.
except BaseException as e: | ||
# TODO Close the request iterator to close the existing stream. | ||
self._response_demux.publish_exception(e) # Raise to all request tasks | ||
self._request_queue = asyncio.Queue() # Clear requests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like a bad idea to clear the request queue here. I think we should either leave requests in the queue so they can be sent to the next stream, or else fail them with an explicit error message so the client can retry, but we should not drop any queued messages on the floor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense. We could close the existing request iterator and replace it with a new one, but reuse the queue under the hood.
def stop(self) -> None: | ||
"""Stops and resets the stream manager.""" | ||
if self._manage_stream_loop_future is not None: | ||
self._manage_stream_loop_future.cancel() | ||
self._manage_stream_loop_future = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest returning a future here too, so the caller can await to ensure that cancellation is done:
def stop(self) -> None: | |
"""Stops and resets the stream manager.""" | |
if self._manage_stream_loop_future is not None: | |
self._manage_stream_loop_future.cancel() | |
self._manage_stream_loop_future = None | |
def stop(self) -> duet.AwaitableFuture[None]: | |
"""Stops and resets the stream manager.""" | |
if self._manage_stream_loop_future is None: | |
return duet.completed_future(None) | |
self._manage_stream_loop_future.cancel() | |
return self._manage_stream_loop_future |
You might need to do some work in _manage_future
because currently it looks like it will run forever even if a cancelation error is raised internally since it captures BaseException
, though it might be that asyncio will just stop running the coroutine when a task is cancelled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From my experiments, cancel()
waits for the done callback to finish, but it doesn't wait for the CancelledError
handler to finish. Awaiting on the future doesn't wait for CancelledError
handler, either.
Let me know if there's a better way to wait for the error handler other than artificially introducing a delay. I'll revert the change for the time being.
|
||
return self._stream_manager.send(project_name, program, job) | ||
|
||
def stop_stream(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As noted above, I think this should return a future so callers can wait until cancellation is done if they want to.
The approach generally looks good to me. Please see comments. |
Addressed most of Matthew's feedback. This PR is at a stage where I'm comfortable starting the real implementation. Please feel free to either wait for the real implementation to review or continue to review this PR, whichever way you find easier. The real implementation will have docstrings and tests which will hopefully help with your understanding. The ResponseDemux was simplified to use the message ID as the key instead of job ID. Turns out job ID isn't necessary after all :) Some of Matthew's feedback which I omitted:
|
The actual implementation has been merged (#6199). Closing |
Second prototype for the streaming client with an architecture from conversations with @maffoo . This design is more asyncio heavy; in particular:
CreateQuantumProgramAndJobRequest
. There is one such task per request.GetQuantumResultRequest
.CancelQuantumJob()
of Quantum Engine.This design is simpler because:
AsyncIterator
to be shared between duet and asyncio threads.