Skip to content

Add OperationContext to sync path #1097

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.connection.AsyncConnection;
import com.mongodb.internal.connection.Cluster;
import com.mongodb.internal.connection.OperationContext;
import com.mongodb.internal.connection.ReadConcernAwareNoOpSessionContext;
import com.mongodb.internal.connection.Server;
import com.mongodb.internal.selector.ReadPreferenceServerSelector;
Expand All @@ -51,6 +52,7 @@ public class AsyncClusterBinding extends AbstractReferenceCounted implements Asy
@Nullable
private final ServerApi serverApi;
private final RequestContext requestContext;
private final OperationContext operationContext;

/**
* Creates an instance.
Expand All @@ -69,6 +71,7 @@ public AsyncClusterBinding(final Cluster cluster, final ReadPreference readPrefe
this.readConcern = (notNull("readConcern", readConcern));
this.serverApi = serverApi;
this.requestContext = notNull("requestContext", requestContext);
operationContext = new OperationContext();
}

@Override
Expand Down Expand Up @@ -103,6 +106,11 @@ public RequestContext getRequestContext() {
return requestContext;
}

@Override
public OperationContext getOperationContext() {
return operationContext;
}

@Override
public void getReadConnectionSource(final SingleResultCallback<AsyncConnectionSource> callback) {
getAsyncClusterBindingConnectionSource(new ReadPreferenceServerSelector(readPreference), callback);
Expand All @@ -117,7 +125,7 @@ public void getReadConnectionSource(final int minWireVersion, final ReadPreferen
} else {
ReadPreferenceWithFallbackServerSelector readPreferenceWithFallbackServerSelector
= new ReadPreferenceWithFallbackServerSelector(readPreference, minWireVersion, fallbackReadPreference);
cluster.selectServerAsync(readPreferenceWithFallbackServerSelector, (result, t) -> {
cluster.selectServerAsync(readPreferenceWithFallbackServerSelector, operationContext, (result, t) -> {
if (t != null) {
callback.onResult(null, t);
} else {
Expand All @@ -140,7 +148,7 @@ public void getConnectionSource(final ServerAddress serverAddress, final SingleR

private void getAsyncClusterBindingConnectionSource(final ServerSelector serverSelector,
final SingleResultCallback<AsyncConnectionSource> callback) {
cluster.selectServerAsync(serverSelector, (result, t) -> {
cluster.selectServerAsync(serverSelector, operationContext, (result, t) -> {
if (t != null) {
callback.onResult(null, t);
} else {
Expand Down Expand Up @@ -184,14 +192,19 @@ public RequestContext getRequestContext() {
return requestContext;
}

@Override
public OperationContext getOperationContext() {
return operationContext;
}

@Override
public ReadPreference getReadPreference() {
return appliedReadPreference;
}

@Override
public void getConnection(final SingleResultCallback<AsyncConnection> callback) {
server.getConnectionAsync(callback);
server.getConnectionAsync(operationContext, callback);
}

public AsyncConnectionSource retain() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,16 @@
package com.mongodb.internal.binding;

import com.mongodb.ReadPreference;
import com.mongodb.RequestContext;
import com.mongodb.ServerApi;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.connection.ServerDescription;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.connection.AsyncConnection;
import com.mongodb.internal.session.SessionContext;
import com.mongodb.lang.Nullable;

/**
* A source of connections to a single MongoDB server.
*
* <p>This class is not part of the public API and may be removed or changed at any time</p>
*/
public interface AsyncConnectionSource extends ReferenceCounted {
public interface AsyncConnectionSource extends BindingContext, ReferenceCounted {

/**
* Gets the current description of this source.
Expand All @@ -39,20 +35,6 @@ public interface AsyncConnectionSource extends ReferenceCounted {
*/
ServerDescription getServerDescription();

/**
* Gets the session context for this source
*
* @return the session context, which may not be null
*
* <p>This class is not part of the public API and may be removed or changed at any time</p>
*/
SessionContext getSessionContext();

@Nullable
ServerApi getServerApi();

RequestContext getRequestContext();

/**
* Gets the read preference that was applied when selecting this source.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,37 +17,20 @@
package com.mongodb.internal.binding;

import com.mongodb.ReadPreference;
import com.mongodb.RequestContext;
import com.mongodb.ServerApi;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.session.SessionContext;
import com.mongodb.lang.Nullable;

/**
* An asynchronous factory of connection sources to servers that can be read from and that satisfy the specified read preference.
*
* <p>This class is not part of the public API and may be removed or changed at any time</p>
*/
public interface AsyncReadBinding extends ReferenceCounted {
public interface AsyncReadBinding extends BindingContext, ReferenceCounted {
/**
* The read preference that all connection sources returned by this instance will satisfy.
* @return the non-null read preference
*/
ReadPreference getReadPreference();

/**
* Gets the session context for this binding.
*
* @return the session context, which may not be null
*
*/
SessionContext getSessionContext();

@Nullable
ServerApi getServerApi();

RequestContext getRequestContext();

/**
* Returns a connection source to a server that satisfies the read preference with which this instance is configured.
* @param callback the to be passed the connection source
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,14 @@

package com.mongodb.internal.binding;

import com.mongodb.RequestContext;
import com.mongodb.ServerApi;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.session.SessionContext;
import com.mongodb.lang.Nullable;

/**
* An asynchronous factory of connection sources to servers that can be written to, e.g, a standalone, a mongos, or a replica set primary.
*
* <p>This class is not part of the public API and may be removed or changed at any time</p>
*/
public interface AsyncWriteBinding extends ReferenceCounted {
public interface AsyncWriteBinding extends BindingContext, ReferenceCounted {

/**
* Supply a connection source to a server that can be written to
Expand All @@ -36,19 +32,6 @@ public interface AsyncWriteBinding extends ReferenceCounted {
*/
void getWriteConnectionSource(SingleResultCallback<AsyncConnectionSource> callback);

/**
* Gets the session context for this binding.
*
* @return the session context, which may not be null
*
*/
SessionContext getSessionContext();

@Nullable
ServerApi getServerApi();

RequestContext getRequestContext();

@Override
AsyncWriteBinding retain();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.mongodb.internal.binding;

import com.mongodb.RequestContext;
import com.mongodb.ServerApi;
import com.mongodb.internal.connection.OperationContext;
import com.mongodb.internal.session.SessionContext;
import com.mongodb.lang.Nullable;


/**
* <p>This class is not part of the public API and may be removed or changed at any time</p>
*/
public interface BindingContext {
SessionContext getSessionContext();

@Nullable
ServerApi getServerApi();

RequestContext getRequestContext();

OperationContext getOperationContext();
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.mongodb.ServerAddress;
import com.mongodb.ServerApi;
import com.mongodb.connection.ClusterConnectionMode;
import com.mongodb.internal.connection.OperationContext;
import com.mongodb.connection.ServerDescription;
import com.mongodb.internal.connection.Cluster;
import com.mongodb.internal.connection.Connection;
Expand Down Expand Up @@ -50,6 +51,7 @@ public class ClusterBinding extends AbstractReferenceCounted implements ClusterA
@Nullable
private final ServerApi serverApi;
private final RequestContext requestContext;
private final OperationContext operationContext;

/**
* Creates an instance.
Expand All @@ -66,6 +68,7 @@ public ClusterBinding(final Cluster cluster, final ReadPreference readPreference
this.readConcern = notNull("readConcern", readConcern);
this.serverApi = serverApi;
this.requestContext = notNull("requestContext", requestContext);
operationContext = new OperationContext();
}

/**
Expand Down Expand Up @@ -103,9 +106,14 @@ public RequestContext getRequestContext() {
return requestContext;
}

@Override
public OperationContext getOperationContext() {
return operationContext;
}

@Override
public ConnectionSource getReadConnectionSource() {
return new ClusterBindingConnectionSource(cluster.selectServer(new ReadPreferenceServerSelector(readPreference)), readPreference);
return new ClusterBindingConnectionSource(cluster.selectServer(new ReadPreferenceServerSelector(readPreference), operationContext), readPreference);
}

@Override
Expand All @@ -116,19 +124,19 @@ public ConnectionSource getReadConnectionSource(final int minWireVersion, final
} else {
ReadPreferenceWithFallbackServerSelector readPreferenceWithFallbackServerSelector
= new ReadPreferenceWithFallbackServerSelector(readPreference, minWireVersion, fallbackReadPreference);
ServerTuple serverTuple = cluster.selectServer(readPreferenceWithFallbackServerSelector);
ServerTuple serverTuple = cluster.selectServer(readPreferenceWithFallbackServerSelector, operationContext);
return new ClusterBindingConnectionSource(serverTuple, readPreferenceWithFallbackServerSelector.getAppliedReadPreference());
}
}

@Override
public ConnectionSource getWriteConnectionSource() {
return new ClusterBindingConnectionSource(cluster.selectServer(new WritableServerSelector()), readPreference);
return new ClusterBindingConnectionSource(cluster.selectServer(new WritableServerSelector(), operationContext), readPreference);
}

@Override
public ConnectionSource getConnectionSource(final ServerAddress serverAddress) {
return new ClusterBindingConnectionSource(cluster.selectServer(new ServerAddressSelector(serverAddress)), readPreference);
return new ClusterBindingConnectionSource(cluster.selectServer(new ServerAddressSelector(serverAddress), operationContext), readPreference);
}

private final class ClusterBindingConnectionSource extends AbstractReferenceCounted implements ConnectionSource {
Expand All @@ -153,6 +161,11 @@ public SessionContext getSessionContext() {
return new ReadConcernAwareNoOpSessionContext(readConcern);
}

@Override
public OperationContext getOperationContext() {
return operationContext;
}

@Override
public ServerApi getServerApi() {
return serverApi;
Expand All @@ -170,7 +183,7 @@ public ReadPreference getReadPreference() {

@Override
public Connection getConnection() {
return server.getConnection();
return server.getConnection(operationContext);
}

public ConnectionSource retain() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,18 @@
package com.mongodb.internal.binding;

import com.mongodb.ReadPreference;
import com.mongodb.RequestContext;
import com.mongodb.ServerApi;
import com.mongodb.connection.ServerDescription;
import com.mongodb.internal.connection.Connection;
import com.mongodb.internal.session.SessionContext;
import com.mongodb.lang.Nullable;

/**
* A source of connections to a single MongoDB server.
*
* <p>This class is not part of the public API and may be removed or changed at any time</p>
*/
public interface ConnectionSource extends ReferenceCounted {
public interface ConnectionSource extends BindingContext, ReferenceCounted {

ServerDescription getServerDescription();

SessionContext getSessionContext();

@Nullable
ServerApi getServerApi();

RequestContext getRequestContext();

ReadPreference getReadPreference();

Connection getConnection();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,13 @@
package com.mongodb.internal.binding;

import com.mongodb.ReadPreference;
import com.mongodb.RequestContext;
import com.mongodb.ServerApi;
import com.mongodb.internal.session.SessionContext;
import com.mongodb.lang.Nullable;

/**
* A factory of connection sources to servers that can be read from and that satisfy the specified read preference.
*
* <p>This class is not part of the public API and may be removed or changed at any time</p>
*/
public interface ReadBinding extends ReferenceCounted {
public interface ReadBinding extends BindingContext, ReferenceCounted {
ReadPreference getReadPreference();

/**
Expand All @@ -48,13 +44,6 @@ public interface ReadBinding extends ReferenceCounted {
*/
ConnectionSource getReadConnectionSource(int minWireVersion, ReadPreference fallbackReadPreference);

SessionContext getSessionContext();

@Nullable
ServerApi getServerApi();

RequestContext getRequestContext();

@Override
ReadBinding retain();
}
Loading