Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GODRIVER-1489 Add ability to stream wire messages #405

Merged

Conversation

divjotarora
Copy link
Contributor

@divjotarora divjotarora commented May 12, 2020

This is the first part of streamable isMaster. I added a new Streamer interface that connections should implement if they support setting exhaustAllowed and receiving moreToCome messages. The other main addition is the Operation.ExecuteExhaust function, which takes a Streamer and streams the next message.

@divjotarora divjotarora requested review from iwysiu and jyemin May 12, 2020 21:59
@@ -11,3 +11,4 @@ driver-test-data.tar.gz
perf
**mongocryptd.pid
*.test
**.md
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I usually keep a design.md file around and it's annoying to have to manually ignore it.

// messages to inform the server that the driver supports streaming.
type Streamer interface {
Connection
SetStreaming(bool)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opted to give these functions names that define the semantics of exhaustAllowed/moreToCome. If this is confusing, I'm open to giving them more literal names like SetMoreToCome, IsMoreToCome, and SupportsExhaustAllowed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm leaning more towards the more literal names, but I don't feel that strongly about it.

func (ssd SingleConnectionDeployment) Connection(context.Context) (Connection, error) {
return nopCloserConnection{ssd.C}, nil
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this wrapping is necessary. There are two instances where a connection should be used for an operation and then should not be closed or returned to a pool: handshakes and heartbeats. Both of these uses already wrap the connection in topology.initConnection, which serves the same purpose as nopCloserConnection, so the extra wrapping adds nothing.

Having this wrapping also hurts us because if initConnection implements Streamer, wrapping it in nopCloserConnection "gets rid of" the Streamer implementation.

return op.readWireMessage(ctx, conn, wm)
}

func (op Operation) readWireMessage(ctx context.Context, conn Connection, wm []byte) ([]byte, error) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is mostly the same logic that was originally in roundTrip but split off into a separate function so ExecuteExhaust can call it as well. The only difference is that it calls SetStreaming if a streaming connection is given and the response has moreToCome.


// ExecuteExhaust gets a connection from the provided deployment and reads a response from it. This will error if the
// connection CurrentlyStreaming function returns false.
func (op Operation) ExecuteExhaust(ctx context.Context, conn Streamer, scratch []byte) error {
Copy link
Contributor Author

@divjotarora divjotarora May 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I struggled a bit with the parameters for this function. Even though the callers of this method will use something like SingleConnectionDeployment, I don't think it makes sense to go through server selection and getting a connection like the normal Execute function does because a response can semantically only be streamed from one connection, so I opted to have this take a Streamer.

We could add extra validation like confirming that some operation fields like Deployment, Database, and CommandFn aren't set because those are used to get connections or create wire messages, so they aren't relevant here.

@divjotarora divjotarora requested a review from craiggwilson May 12, 2020 22:13
// messages to inform the server that the driver supports streaming.
type Streamer interface {
Connection
SetStreaming(bool)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm leaning more towards the more literal names, but I don't feel that strongly about it.

Copy link
Collaborator

@craiggwilson craiggwilson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very cool. few comments.

x/mongo/driver/operation.go Show resolved Hide resolved
x/mongo/driver/topology/connection.go Show resolved Hide resolved
@divjotarora divjotarora requested a review from craiggwilson May 29, 2020 14:56
@jyemin jyemin removed their request for review May 29, 2020 15:27
@divjotarora divjotarora force-pushed the godriver1489-streamable-ismaster branch from 3e32f7e to 6f60c74 Compare May 29, 2020 17:51
@divjotarora divjotarora merged commit cce6057 into mongodb:master May 29, 2020
@divjotarora divjotarora deleted the godriver1489-streamable-ismaster branch May 29, 2020 17:52
divjotarora pushed a commit to divjotarora/mongo-go-driver that referenced this pull request Jun 2, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants