Skip to content

Commit 68d69df

Browse files
author
Roman Janusz
authored
Merge pull request #417 from AVSystem/typedmongo-transactions
Typed Mongo API transaction support
2 parents b254808 + 33d20b5 commit 68d69df

File tree

10 files changed

+512
-98
lines changed

10 files changed

+512
-98
lines changed

.github/workflows/ci.yml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,10 @@ jobs:
6262
node-version: 12
6363

6464
- name: Setup MongoDB
65-
uses: supercharge/mongodb-github-action@1.3.0
65+
uses: supercharge/mongodb-github-action@1.7.0
6666
with:
67-
mongodb-version: 4.4
67+
mongodb-version: 5.0.8
68+
mongodb-replica-set: test-rs
6869

6970
- name: Setup Redis
7071
run: ./install-redis.sh

commons-macros/src/main/scala/com/avsystem/commons/macros/misc/MiscMacros.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,13 @@ final class MiscMacros(ctx: blackbox.Context) extends AbstractMacroCommons(ctx)
7575
q"new ${c.prefix}.ValName(${owner.asTerm.getter.name.decodedName.toString})"
7676
}
7777

78+
def optionalizeFirstArg(expr: Tree): Tree = expr match {
79+
case Apply(MaybeTypeApply(Select(prefix, name: TermName), targs), head :: tail) =>
80+
q"if($head ne null) $expr else ${c.untypecheck(prefix)}.$name[..$targs](..$tail)"
81+
case _ =>
82+
c.abort(expr.pos, "function application expected")
83+
}
84+
7885
def compilationError(error: Tree): Tree = error match {
7986
case StringLiteral(errorStr) =>
8087
abortAt(errorStr, error.pos)
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package com.avsystem.commons
2+
package mongo.typed
3+
4+
import cats.effect.ExitCase
5+
import com.mongodb.reactivestreams.client.ClientSession
6+
import com.mongodb.session.ServerSession
7+
import com.mongodb.{ClientSessionOptions, ServerAddress, TransactionOptions}
8+
import monix.eval.Task
9+
import org.bson.{BsonDocument, BsonTimestamp}
10+
11+
import java.io.Closeable
12+
13+
/**
14+
* Better typed wrapper over [[ClientSession]].
15+
*/
16+
class TypedClientSession(val nativeSession: ClientSession)
17+
extends Closeable with TypedMongoUtils {
18+
19+
def hasActiveTransaction: Boolean =
20+
nativeSession.hasActiveTransaction
21+
22+
def transactionOptions: TransactionOptions =
23+
nativeSession.getTransactionOptions
24+
25+
def startTransaction(
26+
transactionOptions: TransactionOptions = TransactionOptions.builder().build(),
27+
): Unit =
28+
nativeSession.startTransaction(transactionOptions)
29+
30+
def commitTransaction: Task[Unit] =
31+
empty(nativeSession.commitTransaction())
32+
33+
def abortTransaction: Task[Unit] =
34+
empty(nativeSession.abortTransaction())
35+
36+
/**
37+
* Executes a MongoDB transaction - whose contents are expressed as Monix [[Task]].
38+
* If the task succeeds, the transaction is committed. If the task fails, the transaction is aborted and the
39+
* error is propagated. The transaction is also aborted upon cancellation.
40+
*/
41+
def inTransaction[T](
42+
transactionOptions: TransactionOptions = TransactionOptions.builder().build(),
43+
)(
44+
task: Task[T],
45+
): Task[T] = Task.defer {
46+
startTransaction(transactionOptions)
47+
task.guaranteeCase {
48+
case ExitCase.Completed => commitTransaction
49+
case _ => abortTransaction
50+
}
51+
}
52+
53+
def pinnedServerAddress: Option[ServerAddress] =
54+
Option(nativeSession.getPinnedServerAddress)
55+
56+
def transactionContext(): Option[AnyRef] =
57+
Option(nativeSession.getTransactionContext)
58+
59+
def setTransactionContext(address: ServerAddress, transactionContext: Any): Unit =
60+
nativeSession.setTransactionContext(address, transactionContext)
61+
62+
def clearTransactionContext(): Unit =
63+
nativeSession.clearTransactionContext()
64+
65+
def recoveryToken(): Option[BsonDocument] =
66+
Option(nativeSession.getRecoveryToken)
67+
68+
def setRecoveryToken(recoverToken: BsonDocument): Unit =
69+
nativeSession.setRecoveryToken(recoverToken)
70+
71+
def options: ClientSessionOptions =
72+
nativeSession.getOptions
73+
74+
def casuallyConsistent: Boolean =
75+
nativeSession.isCausallyConsistent
76+
77+
def originator: AnyRef =
78+
nativeSession.getOriginator
79+
80+
def serverSession: ServerSession =
81+
nativeSession.getServerSession
82+
83+
def operationTime: BsonTimestamp =
84+
nativeSession.getOperationTime
85+
86+
def advanceOperationTime(operationTime: BsonTimestamp): Unit =
87+
nativeSession.advanceOperationTime(operationTime)
88+
89+
def advanceClusterTime(clusterTime: BsonDocument): Unit =
90+
nativeSession.advanceClusterTime(clusterTime)
91+
92+
def snapshotTimestamp: BsonTimestamp =
93+
nativeSession.getSnapshotTimestamp
94+
95+
def setSnapshotTimestamp(snapshotTimestamp: BsonTimestamp): Unit =
96+
nativeSession.setSnapshotTimestamp(snapshotTimestamp)
97+
98+
def clusterTime: BsonDocument =
99+
nativeSession.getClusterTime
100+
101+
def close(): Unit =
102+
nativeSession.close()
103+
}
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package com.avsystem.commons
2+
package mongo.typed
3+
4+
import com.avsystem.commons.mongo.BsonValueInput
5+
import com.avsystem.commons.serialization.GenCodec
6+
import com.mongodb._
7+
import com.mongodb.connection.ClusterDescription
8+
import com.mongodb.reactivestreams.client.{MongoClient, MongoClients}
9+
import monix.eval.Task
10+
import monix.reactive.Observable
11+
import org.bson.Document
12+
13+
import java.io.Closeable
14+
15+
object TypedMongoClient {
16+
def apply(): TypedMongoClient =
17+
new TypedMongoClient(MongoClients.create())
18+
19+
def apply(connectionString: String): TypedMongoClient =
20+
new TypedMongoClient(MongoClients.create(connectionString))
21+
22+
def apply(connectionString: ConnectionString): TypedMongoClient =
23+
new TypedMongoClient(MongoClients.create(connectionString))
24+
25+
def apply(settings: MongoClientSettings): TypedMongoClient =
26+
new TypedMongoClient(MongoClients.create(settings))
27+
28+
def apply(
29+
connectionString: ConnectionString,
30+
driverInformation: MongoDriverInformation,
31+
): TypedMongoClient =
32+
new TypedMongoClient(MongoClients.create(connectionString, driverInformation))
33+
34+
def apply(
35+
settings: MongoClientSettings,
36+
driverInformation: MongoDriverInformation,
37+
): TypedMongoClient =
38+
new TypedMongoClient(MongoClients.create(settings, driverInformation))
39+
}
40+
41+
/**
42+
* A better-typed wrapper over [[MongoClient]]. Uses Monix [[Task]] and [[Observable]] instead of
43+
* [[org.reactivestreams.Publisher]]. Returns similar better-typed wrappers for database and client session objects.
44+
*/
45+
class TypedMongoClient(
46+
val nativeClient: MongoClient,
47+
val clientSession: OptArg[TypedClientSession] = OptArg.Empty,
48+
) extends TypedMongoUtils with Closeable {
49+
private val sessionOrNull = clientSession.toOpt.map(_.nativeSession).orNull
50+
51+
def withSession(session: TypedClientSession): TypedMongoClient =
52+
new TypedMongoClient(nativeClient, session)
53+
54+
def getDatabase(name: String): TypedMongoDatabase =
55+
new TypedMongoDatabase(nativeClient.getDatabase(name))
56+
57+
def listDatabaseNames: Observable[String] =
58+
multi(optionalizeFirstArg(nativeClient.listDatabaseNames(sessionOrNull)))
59+
60+
def listDatabases: Observable[Document] =
61+
multi(optionalizeFirstArg(nativeClient.listDatabases(sessionOrNull)))
62+
63+
def listDatabases[T: GenCodec]: Observable[T] =
64+
listDatabases.map(doc => BsonValueInput.read[T](doc.toBsonDocument))
65+
66+
//TODO: `watch` methods
67+
68+
def startSession(
69+
options: ClientSessionOptions = ClientSessionOptions.builder().build(),
70+
): Task[TypedClientSession] =
71+
single(nativeClient.startSession(options)).map(new TypedClientSession(_))
72+
73+
/**
74+
* Executes some code in context of a MongoDB client session. The session is closed afterwards.
75+
*
76+
* Note: in order for actual MongoDB operations to be associated with the session, you need to use
77+
* `withSession` on [[TypedMongoClient]], [[TypedMongoDatabase]] or [[TypedMongoCollection]] and use the
78+
* returned copy of these objects.
79+
*/
80+
def inSession[T](
81+
options: ClientSessionOptions = ClientSessionOptions.builder().build(),
82+
)(
83+
task: TypedClientSession => Task[T],
84+
): Task[T] =
85+
startSession(options).bracket(task)(s => Task(s.close()))
86+
87+
/**
88+
* Executes some code in context of a MongoDB client session, within a transaction.
89+
* After the [[Task]] finishes, fails or is cancelled, the transaction is either committed or aborted depending
90+
* on the outcome and the session is closed.
91+
*
92+
* Note: in order for actual MongoDB operations to be associated with the session and the transaction,
93+
* you need to use `withSession` on [[TypedMongoClient]], [[TypedMongoDatabase]] or [[TypedMongoCollection]]
94+
* and use the returned copy of these objects.
95+
*/
96+
def inTransaction[T](
97+
sessionOptions: ClientSessionOptions = ClientSessionOptions.builder().build(),
98+
transactionOptions: TransactionOptions = TransactionOptions.builder().build(),
99+
)(
100+
task: TypedClientSession => Task[T],
101+
): Task[T] =
102+
inSession(sessionOptions)(s => s.inTransaction(transactionOptions)(task(s)))
103+
104+
def clusterDescription(): ClusterDescription =
105+
nativeClient.getClusterDescription
106+
107+
def close(): Unit = nativeClient.close()
108+
}

0 commit comments

Comments
 (0)