-
Notifications
You must be signed in to change notification settings - Fork 23
Typed Mongo API transaction support #417
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
a1a29d7
mongo transaction support, typed wrappers for MongoClient, MongoDatab…
c5362c8
added some nicer utilities for transactions + some tests
91e45e7
fixed translation of Publisher<Void> to Task[Unit]
df04e31
scaladocs and cosmetics
a8262fe
updated mongodb-github-actions, mongodb and enabled replica set
1618094
TypedMongoCollection auxiliary bincompat constructor
c1838ad
Merge branch 'master' into typedmongo-transactions
9b097cd
review fixes in TypedMongoCollection
33d20b5
trailing commas
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
103 changes: 103 additions & 0 deletions
103
commons-mongo/jvm/src/main/scala/com/avsystem/commons/mongo/typed/TypedClientSession.scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
package com.avsystem.commons | ||
package mongo.typed | ||
|
||
import cats.effect.ExitCase | ||
import com.mongodb.reactivestreams.client.ClientSession | ||
import com.mongodb.session.ServerSession | ||
import com.mongodb.{ClientSessionOptions, ServerAddress, TransactionOptions} | ||
import monix.eval.Task | ||
import org.bson.{BsonDocument, BsonTimestamp} | ||
|
||
import java.io.Closeable | ||
|
||
/** | ||
* Better typed wrapper over [[ClientSession]]. | ||
*/ | ||
class TypedClientSession(val nativeSession: ClientSession) | ||
extends Closeable with TypedMongoUtils { | ||
|
||
def hasActiveTransaction: Boolean = | ||
nativeSession.hasActiveTransaction | ||
|
||
def transactionOptions: TransactionOptions = | ||
nativeSession.getTransactionOptions | ||
|
||
def startTransaction( | ||
transactionOptions: TransactionOptions = TransactionOptions.builder().build(), | ||
): Unit = | ||
nativeSession.startTransaction(transactionOptions) | ||
|
||
def commitTransaction: Task[Unit] = | ||
empty(nativeSession.commitTransaction()) | ||
|
||
def abortTransaction: Task[Unit] = | ||
empty(nativeSession.abortTransaction()) | ||
|
||
/** | ||
* Executes a MongoDB transaction - whose contents are expressed as Monix [[Task]]. | ||
* If the task succeeds, the transaction is committed. If the task fails, the transaction is aborted and the | ||
* error is propagated. The transaction is also aborted upon cancellation. | ||
*/ | ||
def inTransaction[T]( | ||
transactionOptions: TransactionOptions = TransactionOptions.builder().build(), | ||
)( | ||
task: Task[T], | ||
): Task[T] = Task.defer { | ||
startTransaction(transactionOptions) | ||
task.guaranteeCase { | ||
case ExitCase.Completed => commitTransaction | ||
case _ => abortTransaction | ||
} | ||
} | ||
|
||
def pinnedServerAddress: Option[ServerAddress] = | ||
Option(nativeSession.getPinnedServerAddress) | ||
|
||
def transactionContext(): Option[AnyRef] = | ||
Option(nativeSession.getTransactionContext) | ||
|
||
def setTransactionContext(address: ServerAddress, transactionContext: Any): Unit = | ||
nativeSession.setTransactionContext(address, transactionContext) | ||
|
||
def clearTransactionContext(): Unit = | ||
nativeSession.clearTransactionContext() | ||
|
||
def recoveryToken(): Option[BsonDocument] = | ||
Option(nativeSession.getRecoveryToken) | ||
|
||
def setRecoveryToken(recoverToken: BsonDocument): Unit = | ||
nativeSession.setRecoveryToken(recoverToken) | ||
|
||
def options: ClientSessionOptions = | ||
nativeSession.getOptions | ||
|
||
def casuallyConsistent: Boolean = | ||
nativeSession.isCausallyConsistent | ||
|
||
def originator: AnyRef = | ||
nativeSession.getOriginator | ||
|
||
def serverSession: ServerSession = | ||
nativeSession.getServerSession | ||
|
||
def operationTime: BsonTimestamp = | ||
nativeSession.getOperationTime | ||
|
||
def advanceOperationTime(operationTime: BsonTimestamp): Unit = | ||
nativeSession.advanceOperationTime(operationTime) | ||
|
||
def advanceClusterTime(clusterTime: BsonDocument): Unit = | ||
nativeSession.advanceClusterTime(clusterTime) | ||
|
||
def snapshotTimestamp: BsonTimestamp = | ||
nativeSession.getSnapshotTimestamp | ||
|
||
def setSnapshotTimestamp(snapshotTimestamp: BsonTimestamp): Unit = | ||
nativeSession.setSnapshotTimestamp(snapshotTimestamp) | ||
|
||
def clusterTime: BsonDocument = | ||
nativeSession.getClusterTime | ||
|
||
def close(): Unit = | ||
nativeSession.close() | ||
} |
108 changes: 108 additions & 0 deletions
108
commons-mongo/jvm/src/main/scala/com/avsystem/commons/mongo/typed/TypedMongoClient.scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
package com.avsystem.commons | ||
package mongo.typed | ||
|
||
import com.avsystem.commons.mongo.BsonValueInput | ||
import com.avsystem.commons.serialization.GenCodec | ||
import com.mongodb._ | ||
import com.mongodb.connection.ClusterDescription | ||
import com.mongodb.reactivestreams.client.{MongoClient, MongoClients} | ||
import monix.eval.Task | ||
import monix.reactive.Observable | ||
import org.bson.Document | ||
|
||
import java.io.Closeable | ||
|
||
object TypedMongoClient { | ||
def apply(): TypedMongoClient = | ||
new TypedMongoClient(MongoClients.create()) | ||
|
||
def apply(connectionString: String): TypedMongoClient = | ||
new TypedMongoClient(MongoClients.create(connectionString)) | ||
|
||
def apply(connectionString: ConnectionString): TypedMongoClient = | ||
new TypedMongoClient(MongoClients.create(connectionString)) | ||
|
||
def apply(settings: MongoClientSettings): TypedMongoClient = | ||
new TypedMongoClient(MongoClients.create(settings)) | ||
|
||
def apply( | ||
connectionString: ConnectionString, | ||
driverInformation: MongoDriverInformation, | ||
): TypedMongoClient = | ||
new TypedMongoClient(MongoClients.create(connectionString, driverInformation)) | ||
|
||
def apply( | ||
settings: MongoClientSettings, | ||
driverInformation: MongoDriverInformation, | ||
): TypedMongoClient = | ||
new TypedMongoClient(MongoClients.create(settings, driverInformation)) | ||
} | ||
|
||
/** | ||
* A better-typed wrapper over [[MongoClient]]. Uses Monix [[Task]] and [[Observable]] instead of | ||
* [[org.reactivestreams.Publisher]]. Returns similar better-typed wrappers for database and client session objects. | ||
*/ | ||
class TypedMongoClient( | ||
wodomierz marked this conversation as resolved.
Show resolved
Hide resolved
|
||
val nativeClient: MongoClient, | ||
wodomierz marked this conversation as resolved.
Show resolved
Hide resolved
|
||
val clientSession: OptArg[TypedClientSession] = OptArg.Empty, | ||
) extends TypedMongoUtils with Closeable { | ||
private val sessionOrNull = clientSession.toOpt.map(_.nativeSession).orNull | ||
|
||
def withSession(session: TypedClientSession): TypedMongoClient = | ||
new TypedMongoClient(nativeClient, session) | ||
|
||
def getDatabase(name: String): TypedMongoDatabase = | ||
new TypedMongoDatabase(nativeClient.getDatabase(name)) | ||
|
||
def listDatabaseNames: Observable[String] = | ||
multi(optionalizeFirstArg(nativeClient.listDatabaseNames(sessionOrNull))) | ||
|
||
def listDatabases: Observable[Document] = | ||
multi(optionalizeFirstArg(nativeClient.listDatabases(sessionOrNull))) | ||
|
||
def listDatabases[T: GenCodec]: Observable[T] = | ||
listDatabases.map(doc => BsonValueInput.read[T](doc.toBsonDocument)) | ||
|
||
//TODO: `watch` methods | ||
ghik marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
def startSession( | ||
options: ClientSessionOptions = ClientSessionOptions.builder().build(), | ||
): Task[TypedClientSession] = | ||
single(nativeClient.startSession(options)).map(new TypedClientSession(_)) | ||
ghik marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
/** | ||
* Executes some code in context of a MongoDB client session. The session is closed afterwards. | ||
* | ||
* Note: in order for actual MongoDB operations to be associated with the session, you need to use | ||
* `withSession` on [[TypedMongoClient]], [[TypedMongoDatabase]] or [[TypedMongoCollection]] and use the | ||
* returned copy of these objects. | ||
*/ | ||
def inSession[T]( | ||
ghik marked this conversation as resolved.
Show resolved
Hide resolved
|
||
options: ClientSessionOptions = ClientSessionOptions.builder().build(), | ||
)( | ||
task: TypedClientSession => Task[T], | ||
): Task[T] = | ||
startSession(options).bracket(task)(s => Task(s.close())) | ||
|
||
/** | ||
* Executes some code in context of a MongoDB client session, within a transaction. | ||
* After the [[Task]] finishes, fails or is cancelled, the transaction is either committed or aborted depending | ||
* on the outcome and the session is closed. | ||
* | ||
* Note: in order for actual MongoDB operations to be associated with the session and the transaction, | ||
* you need to use `withSession` on [[TypedMongoClient]], [[TypedMongoDatabase]] or [[TypedMongoCollection]] | ||
* and use the returned copy of these objects. | ||
*/ | ||
def inTransaction[T]( | ||
sessionOptions: ClientSessionOptions = ClientSessionOptions.builder().build(), | ||
transactionOptions: TransactionOptions = TransactionOptions.builder().build(), | ||
)( | ||
task: TypedClientSession => Task[T], | ||
): Task[T] = | ||
inSession(sessionOptions)(s => s.inTransaction(transactionOptions)(task(s))) | ||
|
||
def clusterDescription(): ClusterDescription = | ||
nativeClient.getClusterDescription | ||
|
||
def close(): Unit = nativeClient.close() | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.