Skip to content

Stream client prototype #2: asyncio-heavy #6145

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

verult
Copy link
Collaborator

@verult verult commented Jun 20, 2023

Second prototype for the streaming client with an architecture from conversations with @maffoo . This design is more asyncio heavy; in particular:

  • Upon receiving a run request, the duet thread immediately spawn a separate asyncio task to send a CreateQuantumProgramAndJobRequest. There is one such task per request.
  • A single manager loop in the asyncio thread handles stream (re)creation and demultiplexing responses to the appropriate per-request task.
  • When the stream breaks, an exception is raised from the manager loop to all per-request tasks in flight, which will send a GetQuantumResultRequest.
  • When a user cancels the circuit run, a separate asyncio task is spawned to call the unary CancelQuantumJob() of Quantum Engine.

This design is simpler because:

  • There's no need for a thread-safe request AsyncIterator to be shared between duet and asyncio threads.
  • Every task has a smaller set of responsibilities.

@CirqBot CirqBot added the size: M 50< lines changed <250 label Jun 20, 2023
@verult
Copy link
Collaborator Author

verult commented Jun 20, 2023

cc @maffoo @wcourtney

This PR is a rough draft. I have not tested this, but curious to hear your feedback on the design.

The call to send() doesn't pass in the right info yet, but that should be an easy change.

I'm not sure if this cancellation approach works - would the main task get cancelled once the AwaitableFuture gets a cancellation signal? Probably not?

@verult
Copy link
Collaborator Author

verult commented Jun 23, 2023

Improved user-triggered cancellation and added the ability to stop the stream manager loop, which cancels all existing requests.

The main pieces are in place. Remaining tasks are captured in TODOs and they should be simple enough to add when I write the real implementation. PTAL @maffoo @wcourtney at the high-level design (this prototype is still untested). Let me know if a GVC going over this PR would help.

@verult verult requested a review from maffoo June 23, 2023 00:17
@verult verult force-pushed the stream-client/prototype-asyncio-heavy branch from 6866e2d to 658f1b3 Compare June 23, 2023 18:39
verult added 6 commits June 23, 2023 18:54
* Message ID should be str in the request.
* _executor and _request_iterator should be properties
* quantum_run_stream() call needs to be awaited.
@verult
Copy link
Collaborator Author

verult commented Jun 24, 2023

This is now passing the test for the happy path (no breakages) and the cancellation test, after a local installation of the latest duet version which includes the AwaitableFuture cancellation propagation fix: google/duet#77

@verult
Copy link
Collaborator Author

verult commented Jun 25, 2023

A broken stream test is added and passing.

Copy link
Collaborator

@wcourtney wcourtney left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initial comments on the high-level usage and tests. Still digging into the manager and demuxer.

project_id: str,
program_id: str,
code: any_pb2.Any,
job_id: Optional[str],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we recover a stream if we don't provide a job_id? Or is that handled between this layer and the API call?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point - we talked about requiring the job_id before and I missed it here. Thanks!


job_name = _job_name_from_ids(project_id, program_id, job_id) if job_id else ''
job = quantum.QuantumJob(
name=job_name,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than setting the name to '' (an implementation detail of the proto) if not job_id, WDYT of handling this like the other optional fields below and assigning only if it's set?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like you mentioned in the other comment, I also think it's a good idea to require job_id to be set, so I think this comment no longer applies

elif 'create_quantum_job' in request:
return request.create_quantum_job.quantum_job.name
# 'get_quantum_result' in request
return request.get_quantum_result.parent
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't be true if a new request type is added later. Prefer to be specific and raise an exception if an unexpected request comes through so that we can detect that it wasn't correctly handled.

class _FakeQuantumRunStream:
def __init__(self):
self.stream_request_count = 0
self.cancel_requests = []
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a type annotation.

self.cancel_requests = []
self.responses_and_exceptions: List[engine.QuantumRunStreamResponse | BaseException] = []

def add_responses_and_exceptions(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this appending behavior needed, or could we make the list immutable and provide it at construction time?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, updated



@mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True)
def test_run_job_over_stream(client_constructor):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Test names should summarize the behavior being tested and its expected outcome." go/unit-testing-practices#naming

priority: Optional[int] = None,
description: Optional[str] = None,
labels: Optional[Dict[str, str]] = None,
) -> duet.AwaitableFuture[quantum.QuantumResult]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_manage_stream first receives these responses through the response iterable, which is then sent through the ResponseDemux to the _make_request task handling their corresponding jobs. Where response.result is checked inside _make_request, there will be additional clauses for failed jobs and stream errors.

For retryable errors, the appropriate stream request will be set as current_request and the loop continues, so that the request is sent over the stream.

Non-retryable errors are raised, which is propagated to the duet.AwaitableFuture for this task. The end user awaiting on this future will see the exception.

return response.result

except GoogleAPICallError:
# TODO how to distinguish between program not found vs job not found?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are different Codes in a StreamError.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, thanks!

except GoogleAPICallError:
# TODO how to distinguish between program not found vs job not found?
# TODO handle QuantumJob response and retryable StreamError.
# TODO Send a CreateProgramAndJobRequest or CreateJobRequest if either program or
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what this means. e.g., a "job doesn't exist" should only happen during a GetQuantumResultRequest, but that value won't have enough context to create a new job.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetQuantumResultRequest is only sent when a previous CreateProgramAndJobRequest failed for some reason (most likely a stream failure) within this asyncio task, so this task does have the full program and job context to recreate the job.

Getting a "job doesn't exist" means either the "create program" part or the "create job" part failed. We could try recreating just the job first by sending a CreateJobRequest, and if that fails with a "program doesn't exist" error, we can try the full CreateProgramAndJobRequest again.

self._subscribers = {}


class StreamManager:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add unit tests for the public API of this class.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely. For this PR I focused on sketching a prototype to get high-level design review. The real implementations will include detailed unit tests.

Copy link
Collaborator Author

@verult verult left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review! I addressed most comments but deferred a few to the real implementation later.

self._subscribers = {}


class StreamManager:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely. For this PR I focused on sketching a prototype to get high-level design review. The real implementations will include detailed unit tests.

return response.result

except GoogleAPICallError:
# TODO how to distinguish between program not found vs job not found?
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, thanks!

except GoogleAPICallError:
# TODO how to distinguish between program not found vs job not found?
# TODO handle QuantumJob response and retryable StreamError.
# TODO Send a CreateProgramAndJobRequest or CreateJobRequest if either program or
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetQuantumResultRequest is only sent when a previous CreateProgramAndJobRequest failed for some reason (most likely a stream failure) within this asyncio task, so this task does have the full program and job context to recreate the job.

Getting a "job doesn't exist" means either the "create program" part or the "create job" part failed. We could try recreating just the job first by sending a CreateJobRequest, and if that fails with a "program doesn't exist" error, we can try the full CreateProgramAndJobRequest again.

priority: Optional[int] = None,
description: Optional[str] = None,
labels: Optional[Dict[str, str]] = None,
) -> duet.AwaitableFuture[quantum.QuantumResult]:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_manage_stream first receives these responses through the response iterable, which is then sent through the ResponseDemux to the _make_request task handling their corresponding jobs. Where response.result is checked inside _make_request, there will be additional clauses for failed jobs and stream errors.

For retryable errors, the appropriate stream request will be set as current_request and the loop continues, so that the request is sent over the stream.

Non-retryable errors are raised, which is propagated to the duet.AwaitableFuture for this task. The end user awaiting on this future will see the exception.


job_name = _job_name_from_ids(project_id, program_id, job_id) if job_id else ''
job = quantum.QuantumJob(
name=job_name,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like you mentioned in the other comment, I also think it's a good idea to require job_id to be set, so I think this comment no longer applies

self.cancel_requests = []
self.responses_and_exceptions: List[engine.QuantumRunStreamResponse | BaseException] = []

def add_responses_and_exceptions(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, updated

raise response_or_exception
yield response_or_exception

await asyncio.sleep(0.0001)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turns out setting it to 0 also works!

labels = {'hello': 'world'}
client = EngineClient()
expected_result = quantum.QuantumResult(parent='projects/proj/programs/prog/jobs/job0')
mock_responses = [quantum.QuantumRunStreamResponse(message_id='0', result=expected_result)]
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the updated implementation, I think leaving it separate makes fake client construction more clear.

Comment on lines 124 to 131
job_path = next(
(
p
for p, (message_id, _) in self._subscribers.items()
if message_id == response.message_id
),
default='',
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider storing a second map from message_id to future (or job_path) so that you don't have to do a linear search here.

"""Assumes the message ID has not been set."""
job_path = _get_job_path_from_stream_request(request)
request.message_id = str(self._next_available_message_id)
response_future: asyncio.Future = asyncio.Future()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the docs the recommended way to create futures is to call loop.create_future(). It'd probably be good to pass in the event loop to the ResponseDemux constructor and store it as an instance variable so it can be used here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out. I will use asyncio.get_running_loop().

Comment on lines 140 to 143
_, future = self._subscribers[job_path]
if not future.done():
future.set_result(response)
del self._subscribers[job_path]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can use dict.pop

Suggested change
_, future = self._subscribers[job_path]
if not future.done():
future.set_result(response)
del self._subscribers[job_path]
_, future = self._subscribers.pop(job_path)
if not future.done():
future.set_result(response)

return AsyncioExecutor.instance()

@property
async def _request_iterator(self) -> AsyncIterator[quantum.QuantumRunStreamRequest]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest not making this a property, since that makes it seem like there is one request iterator, but actually we get a new iterator each time self._request_iterator is accessed. I think it'd be more clear to make this a method on document that we get a new iterator each time. It occurs to me that it might be better to make this a static method or module level helper function and have it take the queue as a parameter, rather than using attributes on self, because the use of this iterator is actually local to one iteration of the loop in _manage_stream.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow the last part - why would it better to use a static method or module-level function if it's local to one iteration? Isn't a new iterator created for each quantum_run_stream in both cases?

Comment on lines 168 to 169
# TODO might need to keep this long-running in order to not close the stream.
while not self._request_queue.empty():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I definitely think you don't want to terminate this loop when the queue gets empty, since that can happen if there is ever a pause in new requests coming in, but we want the stream to be ready for new requests not have to be reestablished each time there is a gap in requests.

elif 'create_quantum_job' in request:
return request.create_quantum_job.quantum_job.name
elif 'get_quantum_result' in request:
return request.get_quantum_result.parent
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understad the use of "job path" these would have to be unique across requests, since we use the job path as a key to keep track of futures, so if there is a collision we could lose a future and the client will hang waiting for it to complete. Is "parent" here unique across requests? What if a user makes two get requests for the same result?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Parent" here is also the full job path.

Thanks for catching the future collision issue. One idea is to reuse the existing future in ResponseDemux if the subscriber already exists, and still increment the message ID.

except BaseException as e:
# TODO Close the request iterator to close the existing stream.
self._response_demux.publish_exception(e) # Raise to all request tasks
self._request_queue = asyncio.Queue() # Clear requests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like a bad idea to clear the request queue here. I think we should either leave requests in the queue so they can be sent to the next stream, or else fail them with an explicit error message so the client can retry, but we should not drop any queued messages on the floor.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. We could close the existing request iterator and replace it with a new one, but reuse the queue under the hood.

Comment on lines 240 to 244
def stop(self) -> None:
"""Stops and resets the stream manager."""
if self._manage_stream_loop_future is not None:
self._manage_stream_loop_future.cancel()
self._manage_stream_loop_future = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest returning a future here too, so the caller can await to ensure that cancellation is done:

Suggested change
def stop(self) -> None:
"""Stops and resets the stream manager."""
if self._manage_stream_loop_future is not None:
self._manage_stream_loop_future.cancel()
self._manage_stream_loop_future = None
def stop(self) -> duet.AwaitableFuture[None]:
"""Stops and resets the stream manager."""
if self._manage_stream_loop_future is None:
return duet.completed_future(None)
self._manage_stream_loop_future.cancel()
return self._manage_stream_loop_future

You might need to do some work in _manage_future because currently it looks like it will run forever even if a cancelation error is raised internally since it captures BaseException, though it might be that asyncio will just stop running the coroutine when a task is cancelled.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my experiments, cancel() waits for the done callback to finish, but it doesn't wait for the CancelledError handler to finish. Awaiting on the future doesn't wait for CancelledError handler, either.

Let me know if there's a better way to wait for the error handler other than artificially introducing a delay. I'll revert the change for the time being.


return self._stream_manager.send(project_name, program, job)

def stop_stream(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As noted above, I think this should return a future so callers can wait until cancellation is done if they want to.

@maffoo
Copy link
Contributor

maffoo commented Jun 29, 2023

The approach generally looks good to me. Please see comments.

@verult
Copy link
Collaborator Author

verult commented Jul 7, 2023

Addressed most of Matthew's feedback. This PR is at a stage where I'm comfortable starting the real implementation. Please feel free to either wait for the real implementation to review or continue to review this PR, whichever way you find easier. The real implementation will have docstrings and tests which will hopefully help with your understanding.

The ResponseDemux was simplified to use the message ID as the key instead of job ID. Turns out job ID isn't necessary after all :)

Some of Matthew's feedback which I omitted:

  • Re handling of long stream disconnects, similar to what we've observed in backdraft: let's wait til the implementation complete to see how often this happens in reality. I imagine this is less critical compared to backdraft because this stream only handles requests from a single user.
  • Re health checks to detect silent stream failures observed in backdraft: similar to above, let's also wait and see.
  • Re sequence diagram for stream requests: Will look into that for the real implementation.
  • Re folding ResponseDemux into StreamManager: I think there's still enough complexity to have a separate lightweight class to unit test.

@verult verult mentioned this pull request Jul 8, 2023
@verult
Copy link
Collaborator Author

verult commented Oct 20, 2023

The actual implementation has been merged (#6199). Closing

@verult verult closed this Oct 20, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size: M 50< lines changed <250
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants