-
Notifications
You must be signed in to change notification settings - Fork 919
GODRIVER-1489 Add ability to stream wire messages #405
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
cf4c119
3e5fcf6
3857171
2866bc9
6f60c74
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,3 +11,4 @@ driver-test-data.tar.gz | |
| perf | ||
| **mongocryptd.pid | ||
| *.test | ||
| **.md | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -65,6 +65,21 @@ type Expirable interface { | |
| Alive() bool | ||
| } | ||
|
|
||
| // StreamerConnection represents a Connection that supports streaming wire protocol messages using the moreToCome and | ||
| // exhaustAllowed flags. | ||
| // | ||
| // The SetStreaming and CurrentlyStreaming functions correspond to the moreToCome flag on server responses. If a | ||
| // response has moreToCome set, SetStreaming(true) will be called and CurrentlyStreaming() should return true. | ||
| // | ||
| // CanStream corresponds to the exhaustAllowed flag. The operations layer will set exhaustAllowed on outgoing wire | ||
| // messages to inform the server that the driver supports streaming. | ||
| type StreamerConnection interface { | ||
| Connection | ||
| SetStreaming(bool) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I opted to give these functions names that define the semantics of
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm leaning more towards the more literal names, but I don't feel that strongly about it. |
||
| CurrentlyStreaming() bool | ||
| SupportsStreaming() bool | ||
| } | ||
|
|
||
| // Compressor is an interface used to compress wire messages. If a Connection supports compression | ||
| // it should implement this interface as well. The CompressWireMessage method will be called during | ||
| // the execution of an operation if the wire message is allowed to be compressed. | ||
|
|
@@ -127,20 +142,10 @@ func (ssd SingleConnectionDeployment) SupportsRetryWrites() bool { return false | |
| func (ssd SingleConnectionDeployment) Kind() description.TopologyKind { return description.Single } | ||
|
|
||
| // Connection implements the Server interface. It always returns the embedded connection. | ||
| // | ||
| // This method returns a Connection with a no-op Close method. This ensures that a | ||
| // SingleConnectionDeployment can be used across multiple operation executions. | ||
| func (ssd SingleConnectionDeployment) Connection(context.Context) (Connection, error) { | ||
| return nopCloserConnection{ssd.C}, nil | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this wrapping is necessary. There are two instances where a connection should be used for an operation and then should not be closed or returned to a pool: handshakes and heartbeats. Both of these uses already wrap the connection in Having this wrapping also hurts us because if |
||
| return ssd.C, nil | ||
| } | ||
|
|
||
| // nopCloserConnection is an adapter used in a SingleConnectionDeployment. It passes through all | ||
| // functionality expcect for closing, which is a no-op. This is done so the connection can be used | ||
| // across multiple operations. | ||
| type nopCloserConnection struct{ Connection } | ||
|
|
||
| func (ncc nopCloserConnection) Close() error { return nil } | ||
|
|
||
| // TODO(GODRIVER-617): We can likely use 1 type for both the Type and the RetryMode by using | ||
| // 2 bits for the mode and 1 bit for the type. Although in the practical sense, we might not want to | ||
| // do that since the type of retryability is tied to the operation itself and isn't going change, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -329,7 +329,7 @@ func (op Operation) Execute(ctx context.Context, scratch []byte) error { | |
| if len(scratch) > 0 { | ||
| scratch = scratch[:0] | ||
| } | ||
| wm, startedInfo, err := op.createWireMessage(ctx, scratch, desc) | ||
| wm, startedInfo, err := op.createWireMessage(ctx, scratch, desc, conn) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
@@ -575,6 +575,12 @@ func (op Operation) roundTrip(ctx context.Context, conn Connection, wm []byte) ( | |
| return nil, Error{Message: err.Error(), Labels: labels, Wrapped: err} | ||
| } | ||
|
|
||
| return op.readWireMessage(ctx, conn, wm) | ||
| } | ||
|
|
||
| func (op Operation) readWireMessage(ctx context.Context, conn Connection, wm []byte) ([]byte, error) { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is mostly the same logic that was originally in |
||
| var err error | ||
|
|
||
| wm, err = conn.ReadWireMessage(ctx, wm[:0]) | ||
| if err != nil { | ||
| labels := []string{NetworkError} | ||
|
|
@@ -590,6 +596,12 @@ func (op Operation) roundTrip(ctx context.Context, conn Connection, wm []byte) ( | |
| return nil, Error{Message: err.Error(), Labels: labels, Wrapped: err} | ||
| } | ||
|
|
||
| // If we're using a streamable connection, we set its streaming state based on the moreToCome flag in the server | ||
| // response. | ||
| if streamer, ok := conn.(StreamerConnection); ok { | ||
| streamer.SetStreaming(wiremessage.IsMsgMoreToCome(wm)) | ||
divjotarora marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| // decompress wiremessage | ||
| wm, err = op.decompressWireMessage(wm) | ||
| if err != nil { | ||
|
|
@@ -675,12 +687,12 @@ func (Operation) decompressWireMessage(wm []byte) ([]byte, error) { | |
| } | ||
|
|
||
| func (op Operation) createWireMessage(ctx context.Context, dst []byte, | ||
| desc description.SelectedServer) ([]byte, startedInformation, error) { | ||
| desc description.SelectedServer, conn Connection) ([]byte, startedInformation, error) { | ||
|
|
||
| if desc.WireVersion == nil || desc.WireVersion.Max < wiremessage.OpmsgWireVersion { | ||
| return op.createQueryWireMessage(dst, desc) | ||
| } | ||
| return op.createMsgWireMessage(ctx, dst, desc) | ||
| return op.createMsgWireMessage(ctx, dst, desc, conn) | ||
| } | ||
|
|
||
| func (op Operation) addBatchArray(dst []byte) []byte { | ||
|
|
@@ -758,7 +770,9 @@ func (op Operation) createQueryWireMessage(dst []byte, desc description.Selected | |
| return bsoncore.UpdateLength(dst, wmindex, int32(len(dst[wmindex:]))), info, nil | ||
| } | ||
|
|
||
| func (op Operation) createMsgWireMessage(ctx context.Context, dst []byte, desc description.SelectedServer) ([]byte, startedInformation, error) { | ||
| func (op Operation) createMsgWireMessage(ctx context.Context, dst []byte, desc description.SelectedServer, | ||
| conn Connection) ([]byte, startedInformation, error) { | ||
|
|
||
| var info startedInformation | ||
| var flags wiremessage.MsgFlag | ||
| var wmindex int32 | ||
|
|
@@ -767,6 +781,12 @@ func (op Operation) createMsgWireMessage(ctx context.Context, dst []byte, desc d | |
| if op.WriteConcern != nil && !writeconcern.AckWrite(op.WriteConcern) && (op.Batches == nil || len(op.Batches.Documents) == 0) { | ||
| flags = wiremessage.MoreToCome | ||
| } | ||
| // Set the ExhaustAllowed flag if the connection supports streaming. This will tell the server that it can | ||
| // respond with the MoreToCome flag and then stream responses over this connection. | ||
| if streamer, ok := conn.(StreamerConnection); ok && streamer.SupportsStreaming() { | ||
| flags |= wiremessage.ExhaustAllowed | ||
| } | ||
|
|
||
| info.requestID = wiremessage.NextRequestID() | ||
| wmindex, dst = wiremessage.AppendHeaderStart(dst, info.requestID, 0, wiremessage.OpMsg) | ||
| dst = wiremessage.AppendMsgFlags(dst, flags) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| // Copyright (C) MongoDB, Inc. 2017-present. | ||
| // | ||
| // Licensed under the Apache License, Version 2.0 (the "License"); you may | ||
| // not use this file except in compliance with the License. You may obtain | ||
| // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 | ||
|
|
||
| package driver | ||
|
|
||
| import ( | ||
| "context" | ||
| "errors" | ||
|
|
||
| "go.mongodb.org/mongo-driver/x/mongo/driver/description" | ||
| ) | ||
|
|
||
| // ExecuteExhaust reads a response from the provided StreamerConnection. This will error if the connection's | ||
| // CurrentlyStreaming function returns false. | ||
| func (op Operation) ExecuteExhaust(ctx context.Context, conn StreamerConnection, scratch []byte) error { | ||
| if !conn.CurrentlyStreaming() { | ||
| return errors.New("exhaust read must be done with a connection that is currently streaming") | ||
| } | ||
|
|
||
| scratch = scratch[:0] | ||
| res, err := op.readWireMessage(ctx, conn, scratch) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| if op.ProcessResponseFn != nil { | ||
| if err = op.ProcessResponseFn(res, nil, description.Server{}); err != nil { | ||
| return err | ||
| } | ||
| } | ||
|
|
||
| return nil | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I usually keep a
design.mdfile around and it's annoying to have to manually ignore it.