Skip to content

ensure all Liquibase schema evolutions are complete when Future completes #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ language: scala
dist: trusty
sudo: false
scala:
- 2.12.4
- 2.12.12
jdk:
- openjdk8
cache:
Expand All @@ -27,6 +27,7 @@ script:
- sbt ++$TRAVIS_SCALA_VERSION compile
- sbt ++$TRAVIS_SCALA_VERSION test:compile
- sbt ++$TRAVIS_SCALA_VERSION coverage test
- sbt driver/it:test
after_success:
- find $HOME/.sbt -name "*.lock" | xargs rm
- find $HOME/.ivy2 -name "ivydata-*.properties" | xargs rm
Expand Down
17 changes: 13 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import sbt.Keys.sbtPlugin
import Dependencies.Library
import sbt.Keys._

lazy val driver = project.in(file("slick-postgres-driver"))
.enablePlugins(BasicSettings)
.settings(
name := "slick-postgres-driver",
crossScalaVersions := Seq("2.12.4", "2.11.8")
crossScalaVersions := Seq("2.12.12", "2.11.12")
)
.settings(
libraryDependencies ++= Seq(
Expand All @@ -17,14 +15,25 @@ lazy val driver = project.in(file("slick-postgres-driver"))
Library.Slick.slickPg,
Library.Slick.slickPgCore,
Library.Slick.slickPgJoda,
Library.Slick.slickPgPlayJson)
Library.Slick.slickPgPlayJson,
Library.ScalaTest.test,
Library.TestContainers.scalaTest,
Library.TestContainers.postgresql
)
)
.settings(
publishTo := {
val prefix = if (isSnapshot.value) "snapshots" else "releases"
Some(s3resolver.value("HAT Library Artifacts " + prefix, s3("library-artifacts-" + prefix + ".hubofallthings.com")) withMavenPatterns)
}
)
.configs(IntegrationTest)
.settings(
Defaults.itSettings,
fork in IntegrationTest := true,
envVars in IntegrationTest := Map("TESTCONTAINERS_RYUK_DISABLED" -> "true")
)


lazy val plugin = project.in(file("sbt-slick-postgres-generator"))
.enablePlugins(BasicSettings)
Expand Down
2 changes: 1 addition & 1 deletion project/BuildSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ object BasicSettings extends AutoPlugin {

override def projectSettings = Seq(
organization := "org.hatdex",
version := "0.0.11",
version := "0.0.12",
resolvers ++= Dependencies.resolvers,
scalaVersion := Dependencies.Versions.scalaVersion,
crossScalaVersions := Dependencies.Versions.crossScala,
Expand Down
21 changes: 18 additions & 3 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,18 @@ import sbt._
object Dependencies {

object Versions {
val crossScala = Seq("2.12.4")
val crossScala = Seq("2.12.12")
val scalaVersion = crossScala.head
}

val resolvers = Seq(
"Atlassian Releases" at "https://maven.atlassian.com/public/",
"scalaz-bintray" at "http://dl.bintray.com/scalaz/releases",
"Sonatype snapshots" at "https://oss.sonatype.org/content/repositories/snapshots/")
Resolver.bintrayRepo("scalaz", "releases"),
Resolver.sonatypeRepo("snapshots")
)

object Library {

object Db {
val liquibase = "org.liquibase" % "liquibase-maven-plugin" % "3.6.0"
}
Expand All @@ -44,5 +46,18 @@ object Dependencies {
val jodaTime = "joda-time" % "joda-time" % "2.9.9"
val slf4j = "org.slf4j" % "slf4j-api" % "1.7.18"
}

object ScalaTest {
private val version = "3.2.2"
val test = "org.scalatest" %% "scalatest" % version % IntegrationTest
}

object TestContainers {
private val version = "0.38.4"
val scalaTest = "com.dimafeng" %% "testcontainers-scala-scalatest" % version % IntegrationTest
val postgresql = "com.dimafeng" %% "testcontainers-scala-postgresql" % version % IntegrationTest
}

}

}
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@
# Written by Andrius Aucinas <andrius.aucinas@hatdex.org>, 14/08/17 09:17
#

sbt.version=1.0.2
sbt.version=1.3.8
4 changes: 4 additions & 0 deletions slick-postgres-driver/src/it/resources/evolution_01.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
CREATE TABLE table_one(
id INTEGER NOT NULL PRIMARY KEY,
value JSONB NOT NULL
);
2 changes: 2 additions & 0 deletions slick-postgres-driver/src/it/resources/evolution_02.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE table_one
ADD COLUMN type VARCHAR;
11 changes: 11 additions & 0 deletions slick-postgres-driver/src/it/resources/logback-test.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<configuration>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{"yyyy-MM-dd'T'HH:mm:ss.SSSXXX", UTC} %level %logger - %msg%n</pattern>
</encoder>
</appender>

<root level="info">
<appender-ref ref="CONSOLE" />
</root>
</configuration>
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package org.hatdex.libs.dal

import java.sql.ResultSet

import com.dimafeng.testcontainers.{ForAllTestContainer, PostgreSQLContainer}
import com.typesafe.config.{Config, ConfigFactory}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import org.slf4j.{Logger, LoggerFactory}
import slick.jdbc.JdbcBackend.Database
import slick.jdbc.JdbcProfile

import scala.collection.JavaConverters._
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, ExecutionContext}

class BaseSchemaMigrationImplSpec extends AnyFlatSpec with ForAllTestContainer with Matchers {

override val container: PostgreSQLContainer = PostgreSQLContainer()

it should "apply each schema evolution in order" in {
val schemaMigration = new SchemaMigrationImpl(container)

Await.ready(
schemaMigration.run(Seq("evolution_01.sql", "evolution_02.sql")),
10.seconds
)

verifyTableCreated(schemaMigration.db)
}

private def verifyTableCreated(db: JdbcProfile#Backend#Database) {
val session = db.createSession()
try {
val rs = session.conn.createStatement().executeQuery("SELECT column_name, data_type FROM information_schema.columns WHERE table_name='table_one' ORDER BY ordinal_position")
rsToIterator(rs).toList shouldBe List(
"id" -> "integer",
"value" -> "jsonb",
"type" -> "character varying"
)
} finally session.close()
}

private def rsToIterator(rs: ResultSet): Iterator[(String, String)] =
new Iterator[(String, String)] {
def hasNext: Boolean = rs.next()
def next(): (String, String) = rs.getString(1) -> rs.getString(2)
}

}

private class SchemaMigrationImpl(container: PostgreSQLContainer)
extends BaseSchemaMigrationImpl {

override protected implicit val ec: ExecutionContext = scala.concurrent.ExecutionContext.Implicits.global
override protected val logger: Logger = LoggerFactory.getLogger(this.getClass)
override protected val configuration: Config = ConfigFactory.empty()

override val db: JdbcProfile#Backend#Database = {
val dbConfigKey = "dockerpg"
val cfg = ConfigFactory.parseMap(
Map(
dbConfigKey -> Map(
"url" -> container.container.getJdbcUrl,
"driver" -> container.container.getDriverClassName,
"connectionPool" -> "disabled",
"keepAliveConnection" -> true,
"user" -> container.container.getUsername,
"password" -> container.container.getPassword
).asJava
).asJava
)
Database.forConfig("dockerpg", cfg)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import slick.jdbc.JdbcProfile

import scala.collection.JavaConverters._
import scala.concurrent.{ ExecutionContext, Future, blocking }
import scala.util.Try
import scala.util.control.NonFatal

/**
* Runs Liquibase based database schema and data migrations. This is the only place for all related
Expand All @@ -41,7 +41,7 @@ trait BaseSchemaMigrationImpl extends SchemaMigration {
protected val defaultSchemaName = "hat"
protected val liquibaseSchemaName = "public"

def run(evolutionsConfig: String = "db.default.evolutions"): Future[Unit] = {
def run(evolutionsConfig: String = "db.default.evolutions"): Future[Unit] =
Option(configuration.getStringList(evolutionsConfig))
.map(_.asScala)
.map { migrations =>
Expand All @@ -51,118 +51,76 @@ trait BaseSchemaMigrationImpl extends SchemaMigration {
logger.warn("No evolutions configured")
Future.successful(())
}
}

/**
* Invoke this method to apply all DB migrations.
*/
def run(changeLogFiles: Seq[String]): Future[Unit] = {
logger.error(s"Running schema migrations: ${changeLogFiles.mkString(", ")}")
Future(db.createSession().conn).map { dbConnection ⇒
changeLogFiles.foldLeft(Future.successful(())) { (execution, evolution) ⇒
execution.flatMap { _ ⇒
logger.error(s"Running evolution $evolution")
updateDb(evolution, dbConnection)
}
} onComplete {
_ ⇒ dbConnection.close()
logger.info(s"Running schema migrations: ${changeLogFiles.mkString(", ")}")
Future(db.createSession().conn).flatMap { dbConnection =>
val sequencedEvolutions: Future[Unit] = changeLogFiles.foldLeft(Future.successful(())) { (execution, evolution) =>
execution.flatMap(_ => updateDb(evolution, dbConnection))
}
sequencedEvolutions.onComplete(_ => dbConnection.close())
sequencedEvolutions
} recover {
case e logger.error(s"Running database evolutions failed: ${e.getMessage}", e)
case e => logger.error("Running database evolutions failed", e)
}
}

def resetDatabase(): Future[Unit] = {
val eventuallyEvolved = Future {
val dbConnection = db.createSession().conn

val liquibase = blocking {
createLiquibase(dbConnection, "")
}
def resetDatabase(): Future[Unit] =
Future {
blocking {
Try(liquibase.dropAll())
.recover {
case e =>
liquibase.forceReleaseLocks()
logger.error(s"Error dropping all database information")
throw e
}
liquibase.forceReleaseLocks()
val dbConnection = db.createSession().conn
val liquibase = createLiquibase(dbConnection, "")
try liquibase.dropAll()
catch {
case NonFatal(th) =>
logger.error("Error dropping all database information", th)
throw th
} finally liquibase.forceReleaseLocks()
}
}

eventuallyEvolved.failed.foreach { e =>
logger.error(s"Error updating database: ${e.getMessage}")
}

eventuallyEvolved
}

def rollback(changeLogFiles: Seq[String]): Future[Unit] = {
logger.info(s"Rolling back schema migrations: ${changeLogFiles.mkString(", ")}")
changeLogFiles.foldLeft(Future.successful(())) { (execution, evolution) => execution.flatMap { _ => rollbackDb(evolution) } }
}

private def updateDb(diffFilePath: String, dbConnection: Connection): Future[Unit] = {
val eventuallyEvolved = Future {

logger.info(s"Liquibase running evolutions $diffFilePath on db: [${dbConnection.getMetaData.getURL}]")
val liquibase = blocking {
createLiquibase(dbConnection, diffFilePath)
}
private def updateDb(diffFilePath: String, dbConnection: Connection): Future[Unit] =
Future {
blocking {
logger.info(s"Liquibase running evolutions $diffFilePath on db: [${dbConnection.getMetaData.getURL}]")
val liquibase = createLiquibase(dbConnection, diffFilePath)
listChangesets(liquibase, new Contexts(changeContexts))
Try(liquibase.update(changeContexts))
.recover {
case e =>
liquibase.forceReleaseLocks()
logger.error(s"Error executing schema evolutions: ${e.getMessage}")
throw e
}
liquibase.forceReleaseLocks()
try liquibase.update(changeContexts)
catch {
case NonFatal(th) =>
logger.error(s"Error executing schema evolutions: ${th.getMessage}")
throw th
} finally liquibase.forceReleaseLocks()
}
}

eventuallyEvolved.failed.foreach { e =>
logger.error(s"Error updating database: ${e.getMessage}")
}

eventuallyEvolved
}

private def rollbackDb(diffFilePath: String): Future[Unit] = {
val eventuallyEvolved = Future {

val dbConnection = db.createSession().conn

logger.info(s"Liquibase rolling back evolutions $diffFilePath on db: [${dbConnection.getMetaData.getURL}]")
val liquibase = blocking {
createLiquibase(dbConnection, diffFilePath)
}
private def rollbackDb(diffFilePath: String): Future[Unit] =
Future {
blocking {
val dbConnection = db.createSession().conn
logger.info(s"Liquibase rolling back evolutions $diffFilePath on db: [${dbConnection.getMetaData.getURL}]")
val liquibase = createLiquibase(dbConnection, diffFilePath)
val contexts = new Contexts(changeContexts)
val changesetsExecuted = liquibase.getChangeSetStatuses(contexts, new LabelExpression()).asScala.filterNot(_.getWillRun)
Try(liquibase.rollback(changesetsExecuted.length, contexts, new LabelExpression()))
.recover {
case e =>
liquibase.forceReleaseLocks()
logger.error(s"Error executing schema evolutions: ${e.getMessage}")
throw e
}
liquibase.forceReleaseLocks()
try liquibase.rollback(changesetsExecuted.length, contexts, new LabelExpression())
catch {
case NonFatal(th) =>
logger.error(s"Error rolling back schema evolutions: ${th.getMessage}")
throw th
} finally liquibase.forceReleaseLocks()
}
}

eventuallyEvolved.failed.foreach { e =>
logger.error(s"Error updating database: ${e.getMessage}")
}

eventuallyEvolved
}

private def listChangesets(liquibase: Liquibase, contexts: Contexts): Unit = {
val changesetStatuses = liquibase.getChangeSetStatuses(contexts, new LabelExpression()).asScala

logger.info("Existing changesets:")
changesetStatuses.foreach { cs =>
if (cs.getWillRun) {
Expand All @@ -176,7 +134,6 @@ trait BaseSchemaMigrationImpl extends SchemaMigration {
protected def createLiquibase(dbConnection: Connection, diffFilePath: String): Liquibase = {
val classLoader = configuration.getClass.getClassLoader
val resourceAccessor = new ClassLoaderResourceAccessor(classLoader)

val database = DatabaseFactory.getInstance()
.findCorrectDatabaseImplementation(new JdbcConnection(dbConnection))
database.setDefaultSchemaName(defaultSchemaName)
Expand Down