-
Notifications
You must be signed in to change notification settings - Fork 897
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GODRIVER-1489 Add ability to stream wire messages #405
GODRIVER-1489 Add ability to stream wire messages #405
Conversation
@@ -11,3 +11,4 @@ driver-test-data.tar.gz | |||
perf | |||
**mongocryptd.pid | |||
*.test | |||
**.md |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I usually keep a design.md
file around and it's annoying to have to manually ignore it.
// messages to inform the server that the driver supports streaming. | ||
type Streamer interface { | ||
Connection | ||
SetStreaming(bool) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I opted to give these functions names that define the semantics of exhaustAllowed
/moreToCome
. If this is confusing, I'm open to giving them more literal names like SetMoreToCome
, IsMoreToCome
, and SupportsExhaustAllowed
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm leaning more towards the more literal names, but I don't feel that strongly about it.
func (ssd SingleConnectionDeployment) Connection(context.Context) (Connection, error) { | ||
return nopCloserConnection{ssd.C}, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this wrapping is necessary. There are two instances where a connection should be used for an operation and then should not be closed or returned to a pool: handshakes and heartbeats. Both of these uses already wrap the connection in topology.initConnection
, which serves the same purpose as nopCloserConnection
, so the extra wrapping adds nothing.
Having this wrapping also hurts us because if initConnection
implements Streamer
, wrapping it in nopCloserConnection
"gets rid of" the Streamer
implementation.
return op.readWireMessage(ctx, conn, wm) | ||
} | ||
|
||
func (op Operation) readWireMessage(ctx context.Context, conn Connection, wm []byte) ([]byte, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is mostly the same logic that was originally in roundTrip
but split off into a separate function so ExecuteExhaust
can call it as well. The only difference is that it calls SetStreaming
if a streaming connection is given and the response has moreToCome
.
x/mongo/driver/operation_exhaust.go
Outdated
|
||
// ExecuteExhaust gets a connection from the provided deployment and reads a response from it. This will error if the | ||
// connection CurrentlyStreaming function returns false. | ||
func (op Operation) ExecuteExhaust(ctx context.Context, conn Streamer, scratch []byte) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I struggled a bit with the parameters for this function. Even though the callers of this method will use something like SingleConnectionDeployment
, I don't think it makes sense to go through server selection and getting a connection like the normal Execute
function does because a response can semantically only be streamed from one connection, so I opted to have this take a Streamer
.
We could add extra validation like confirming that some operation fields like Deployment
, Database
, and CommandFn
aren't set because those are used to get connections or create wire messages, so they aren't relevant here.
// messages to inform the server that the driver supports streaming. | ||
type Streamer interface { | ||
Connection | ||
SetStreaming(bool) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm leaning more towards the more literal names, but I don't feel that strongly about it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very cool. few comments.
3e32f7e
to
6f60c74
Compare
This is the first part of streamable isMaster. I added a new
Streamer
interface that connections should implement if they support settingexhaustAllowed
and receivingmoreToCome
messages. The other main addition is theOperation.ExecuteExhaust
function, which takes aStreamer
and streams the next message.