Skip to content

Typed Mongo API transaction support #417

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jun 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,10 @@ jobs:
node-version: 12

- name: Setup MongoDB
uses: supercharge/mongodb-github-action@1.3.0
uses: supercharge/mongodb-github-action@1.7.0
with:
mongodb-version: 4.4
mongodb-version: 5.0.8
mongodb-replica-set: test-rs

- name: Setup Redis
run: ./install-redis.sh
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ final class MiscMacros(ctx: blackbox.Context) extends AbstractMacroCommons(ctx)
q"new ${c.prefix}.ValName(${owner.asTerm.getter.name.decodedName.toString})"
}

def optionalizeFirstArg(expr: Tree): Tree = expr match {
case Apply(MaybeTypeApply(Select(prefix, name: TermName), targs), head :: tail) =>
q"if($head ne null) $expr else ${c.untypecheck(prefix)}.$name[..$targs](..$tail)"
case _ =>
c.abort(expr.pos, "function application expected")
}

def compilationError(error: Tree): Tree = error match {
case StringLiteral(errorStr) =>
abortAt(errorStr, error.pos)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package com.avsystem.commons
package mongo.typed

import cats.effect.ExitCase
import com.mongodb.reactivestreams.client.ClientSession
import com.mongodb.session.ServerSession
import com.mongodb.{ClientSessionOptions, ServerAddress, TransactionOptions}
import monix.eval.Task
import org.bson.{BsonDocument, BsonTimestamp}

import java.io.Closeable

/**
* Better typed wrapper over [[ClientSession]].
*/
class TypedClientSession(val nativeSession: ClientSession)
extends Closeable with TypedMongoUtils {

def hasActiveTransaction: Boolean =
nativeSession.hasActiveTransaction

def transactionOptions: TransactionOptions =
nativeSession.getTransactionOptions

def startTransaction(
transactionOptions: TransactionOptions = TransactionOptions.builder().build(),
): Unit =
nativeSession.startTransaction(transactionOptions)

def commitTransaction: Task[Unit] =
empty(nativeSession.commitTransaction())

def abortTransaction: Task[Unit] =
empty(nativeSession.abortTransaction())

/**
* Executes a MongoDB transaction - whose contents are expressed as Monix [[Task]].
* If the task succeeds, the transaction is committed. If the task fails, the transaction is aborted and the
* error is propagated. The transaction is also aborted upon cancellation.
*/
def inTransaction[T](
transactionOptions: TransactionOptions = TransactionOptions.builder().build(),
)(
task: Task[T],
): Task[T] = Task.defer {
startTransaction(transactionOptions)
task.guaranteeCase {
case ExitCase.Completed => commitTransaction
case _ => abortTransaction
}
}

def pinnedServerAddress: Option[ServerAddress] =
Option(nativeSession.getPinnedServerAddress)

def transactionContext(): Option[AnyRef] =
Option(nativeSession.getTransactionContext)

def setTransactionContext(address: ServerAddress, transactionContext: Any): Unit =
nativeSession.setTransactionContext(address, transactionContext)

def clearTransactionContext(): Unit =
nativeSession.clearTransactionContext()

def recoveryToken(): Option[BsonDocument] =
Option(nativeSession.getRecoveryToken)

def setRecoveryToken(recoverToken: BsonDocument): Unit =
nativeSession.setRecoveryToken(recoverToken)

def options: ClientSessionOptions =
nativeSession.getOptions

def casuallyConsistent: Boolean =
nativeSession.isCausallyConsistent

def originator: AnyRef =
nativeSession.getOriginator

def serverSession: ServerSession =
nativeSession.getServerSession

def operationTime: BsonTimestamp =
nativeSession.getOperationTime

def advanceOperationTime(operationTime: BsonTimestamp): Unit =
nativeSession.advanceOperationTime(operationTime)

def advanceClusterTime(clusterTime: BsonDocument): Unit =
nativeSession.advanceClusterTime(clusterTime)

def snapshotTimestamp: BsonTimestamp =
nativeSession.getSnapshotTimestamp

def setSnapshotTimestamp(snapshotTimestamp: BsonTimestamp): Unit =
nativeSession.setSnapshotTimestamp(snapshotTimestamp)

def clusterTime: BsonDocument =
nativeSession.getClusterTime

def close(): Unit =
nativeSession.close()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package com.avsystem.commons
package mongo.typed

import com.avsystem.commons.mongo.BsonValueInput
import com.avsystem.commons.serialization.GenCodec
import com.mongodb._
import com.mongodb.connection.ClusterDescription
import com.mongodb.reactivestreams.client.{MongoClient, MongoClients}
import monix.eval.Task
import monix.reactive.Observable
import org.bson.Document

import java.io.Closeable

object TypedMongoClient {
def apply(): TypedMongoClient =
new TypedMongoClient(MongoClients.create())

def apply(connectionString: String): TypedMongoClient =
new TypedMongoClient(MongoClients.create(connectionString))

def apply(connectionString: ConnectionString): TypedMongoClient =
new TypedMongoClient(MongoClients.create(connectionString))

def apply(settings: MongoClientSettings): TypedMongoClient =
new TypedMongoClient(MongoClients.create(settings))

def apply(
connectionString: ConnectionString,
driverInformation: MongoDriverInformation,
): TypedMongoClient =
new TypedMongoClient(MongoClients.create(connectionString, driverInformation))

def apply(
settings: MongoClientSettings,
driverInformation: MongoDriverInformation,
): TypedMongoClient =
new TypedMongoClient(MongoClients.create(settings, driverInformation))
}

/**
* A better-typed wrapper over [[MongoClient]]. Uses Monix [[Task]] and [[Observable]] instead of
* [[org.reactivestreams.Publisher]]. Returns similar better-typed wrappers for database and client session objects.
*/
class TypedMongoClient(
val nativeClient: MongoClient,
val clientSession: OptArg[TypedClientSession] = OptArg.Empty,
) extends TypedMongoUtils with Closeable {
private val sessionOrNull = clientSession.toOpt.map(_.nativeSession).orNull

def withSession(session: TypedClientSession): TypedMongoClient =
new TypedMongoClient(nativeClient, session)

def getDatabase(name: String): TypedMongoDatabase =
new TypedMongoDatabase(nativeClient.getDatabase(name))

def listDatabaseNames: Observable[String] =
multi(optionalizeFirstArg(nativeClient.listDatabaseNames(sessionOrNull)))

def listDatabases: Observable[Document] =
multi(optionalizeFirstArg(nativeClient.listDatabases(sessionOrNull)))

def listDatabases[T: GenCodec]: Observable[T] =
listDatabases.map(doc => BsonValueInput.read[T](doc.toBsonDocument))

//TODO: `watch` methods

def startSession(
options: ClientSessionOptions = ClientSessionOptions.builder().build(),
): Task[TypedClientSession] =
single(nativeClient.startSession(options)).map(new TypedClientSession(_))

/**
* Executes some code in context of a MongoDB client session. The session is closed afterwards.
*
* Note: in order for actual MongoDB operations to be associated with the session, you need to use
* `withSession` on [[TypedMongoClient]], [[TypedMongoDatabase]] or [[TypedMongoCollection]] and use the
* returned copy of these objects.
*/
def inSession[T](
options: ClientSessionOptions = ClientSessionOptions.builder().build(),
)(
task: TypedClientSession => Task[T],
): Task[T] =
startSession(options).bracket(task)(s => Task(s.close()))

/**
* Executes some code in context of a MongoDB client session, within a transaction.
* After the [[Task]] finishes, fails or is cancelled, the transaction is either committed or aborted depending
* on the outcome and the session is closed.
*
* Note: in order for actual MongoDB operations to be associated with the session and the transaction,
* you need to use `withSession` on [[TypedMongoClient]], [[TypedMongoDatabase]] or [[TypedMongoCollection]]
* and use the returned copy of these objects.
*/
def inTransaction[T](
sessionOptions: ClientSessionOptions = ClientSessionOptions.builder().build(),
transactionOptions: TransactionOptions = TransactionOptions.builder().build(),
)(
task: TypedClientSession => Task[T],
): Task[T] =
inSession(sessionOptions)(s => s.inTransaction(transactionOptions)(task(s)))

def clusterDescription(): ClusterDescription =
nativeClient.getClusterDescription

def close(): Unit = nativeClient.close()
}
Loading