Skip to content
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ lazy val `flowly-mongodb` = project
.settings(
name := "flowly-mongodb",
libraryDependencies ++= Seq(
"org.mongojack" % "mongojack" % "2.10.0",
"org.mongojack" % "mongojack" % "4.0.0",
"com.fasterxml.jackson.core" % "jackson-databind" % jacksonVersion,
"com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % jacksonVersion,
"com.fasterxml.jackson.module" %% "jackson-module-scala" % jacksonVersion
Expand Down
11 changes: 4 additions & 7 deletions flowly-demo/src/main/scala/flowly/demo/MainTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,30 @@ package flowly.demo
* limitations under the License.
*/

import java.io.IOError
import java.time.Instant

import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper, SerializationFeature}
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.mongodb.MongoClient
import com.mongodb.client.MongoClients
import flowly.core.context.{ExecutionContextFactory, Key, ReadableExecutionContext, WritableExecutionContext}
import flowly.core.events.EventListener
import flowly.core.repository.model.Attempts
import flowly.core.repository.{InMemoryRepository, Repository}
import flowly.core.repository.Repository
import flowly.core.tasks.basic._
import flowly.core.tasks.compose.{Alternative, Retry, Retryable}
import flowly.core.tasks.compose.{Retry, Retryable}
import flowly.core.tasks.strategies.scheduling.SchedulingStrategy
import flowly.core.tasks.strategies.stopping.StoppingStrategy
import flowly.core.Workflow
import flowly.mongodb.{CustomDateModule, MongoDBRepository}

import scala.util.Try


object MainTest extends App {

trait RepositoryComponent {
this: ObjectMapperRepositoryComponent =>
val client = new MongoClient("localhost")
val client = MongoClients.create("localhost")
lazy val repository = new MongoDBRepository(client, "flowly", "demo", objectMapperRepository)
// lazy val repository = new InMemoryRepository
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@ import java.time.Instant
import java.util.Date
import com.fasterxml.jackson.databind._
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.mongodb.MongoClient
import com.mongodb.client.MongoClient
import com.mongodb.client.MongoCursor
import com.mongodb.client.model.IndexOptions
import com.mongodb.client.model.{FindOneAndUpdateOptions, IndexOptions, ReplaceOptions}
import flowly.core.compat.CompatUtils
import flowly.core.repository.Repository
import flowly.core.repository.model.{Session, Status}
import flowly.core.repository.model.Session.{SessionId, Status}
import flowly.core.repository.model.Session.SessionId
import flowly.core.{ErrorOr, SessionNotFound}

import javax.persistence.{OptimisticLockException, PersistenceException}
import org.bson.Document
import org.bson.{Document, UuidRepresentation}
import org.mongojack.JacksonMongoCollection

import scala.language.implicitConversions
Expand All @@ -24,10 +24,7 @@ import scala.util.{Failure, Success, Try}
class MongoDBRepository(client: MongoClient, databaseName: String, collectionName: String, objectMapper: ObjectMapper with ScalaObjectMapper) extends Repository {

protected val collection: JacksonMongoCollection[Session] = {
val mongoCollection = client.getDatabase(databaseName).getCollection(collectionName)

val builder: JacksonMongoCollection.JacksonMongoCollectionBuilder[Session] = JacksonMongoCollection.builder()
val coll = builder.withObjectMapper(objectMapper).build(mongoCollection, classOf[Session])
val coll = JacksonMongoCollection.builder().withObjectMapper(objectMapper).build(client, databaseName, collectionName, classOf[Session], UuidRepresentation.STANDARD)

// Initialize sessionId index
coll.createIndex(Document("sessionId" -> 1.asJava), new IndexOptions().unique(true))
Expand Down Expand Up @@ -60,16 +57,13 @@ class MongoDBRepository(client: MongoClient, databaseName: String, collectionNam

private[flowly] def update(session: Session): ErrorOr[Session] = {
Try {
// Update will replace every document field and it is going to increment in one unit its version
val document = JacksonMongoCollection.convertToDocument(session, objectMapper, classOf[Session])
document.remove("version")

val update = Document("$set" -> document, "$inc" -> Document("version" -> 1.asJava))

// Condition: there is a session with the same sessionId and version
val query = Document("sessionId" -> session.sessionId, "version" -> session.version.asJava)

collection.findAndModify(query, Document(), Document(), collection.serializeFields(update), true, false)
// Update will replace every document field and it is going to increment in one unit its version
val newSession = session.copy(version = session.version + 1)
val updateOptions = new ReplaceOptions().upsert(true)
collection.replaceOne(query, newSession, updateOptions)
newSession

} match {
case Success(null) => Left(new OptimisticLockException(s"Session ${session.sessionId} was modified by another transaction"))
Expand Down
4 changes: 2 additions & 2 deletions project/CommonSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ object CommonSettings {

val settings: Seq[Def.Setting[_]] =
Seq(organization := "com.despegar.flowly",
publishTo := Some("Nexus Despegar" at s"https://cnd-backend/nexus/repository/${if (isSnapshot.value) "maven-snapshots" else "maven-releases"}/"),
publishTo := Some("Nexus Despegar" at s"https://backoffice-secure.despegar.com/nexus/repository/${if (isSnapshot.value) "maven-snapshots" else "maven-releases"}/"),
resolvers += Opts.resolver.mavenLocalFile,
resolvers += Resolver.mavenLocal,
resolvers += "Typesafe repository" at "https://repo.typesafe.com/typesafe/releases/",
resolvers += "Nexus Public Repository" at "https://cnd-backend/nexus/repository/maven-all/",
resolvers += "Nexus Public Repository" at "https://backoffice-secure.despegar.com/nexus/repository/maven-all/",
scalaVersion := "2.13.0",
crossScalaVersions := Seq("2.12.8", "2.13.0"))
}
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version in ThisBuild := "0.2.8-SNAPSHOT"
version in ThisBuild := "0.2.9.3-SNAPSHOT"