Skip to content

Commit fc82b4b

Browse files
authored
Merge pull request dataswift#1 from dataswift/migration-run-buggy-future
ensure all Liquibase schema evolutions are complete when Future completes
2 parents 15a6699 + 0eaecf9 commit fc82b4b

File tree

10 files changed

+168
-93
lines changed

10 files changed

+168
-93
lines changed

.travis.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ language: scala
22
dist: trusty
33
sudo: false
44
scala:
5-
- 2.12.4
5+
- 2.12.12
66
jdk:
77
- openjdk8
88
cache:
@@ -27,6 +27,7 @@ script:
2727
- sbt ++$TRAVIS_SCALA_VERSION compile
2828
- sbt ++$TRAVIS_SCALA_VERSION test:compile
2929
- sbt ++$TRAVIS_SCALA_VERSION coverage test
30+
- sbt driver/it:test
3031
after_success:
3132
- find $HOME/.sbt -name "*.lock" | xargs rm
3233
- find $HOME/.ivy2 -name "ivydata-*.properties" | xargs rm

build.sbt

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
1-
import sbt.Keys.sbtPlugin
21
import Dependencies.Library
3-
import sbt.Keys._
42

53
lazy val driver = project.in(file("slick-postgres-driver"))
64
.enablePlugins(BasicSettings)
75
.settings(
86
name := "slick-postgres-driver",
9-
crossScalaVersions := Seq("2.12.4", "2.11.8")
7+
crossScalaVersions := Seq("2.12.12", "2.11.12")
108
)
119
.settings(
1210
libraryDependencies ++= Seq(
@@ -17,14 +15,25 @@ lazy val driver = project.in(file("slick-postgres-driver"))
1715
Library.Slick.slickPg,
1816
Library.Slick.slickPgCore,
1917
Library.Slick.slickPgJoda,
20-
Library.Slick.slickPgPlayJson)
18+
Library.Slick.slickPgPlayJson,
19+
Library.ScalaTest.test,
20+
Library.TestContainers.scalaTest,
21+
Library.TestContainers.postgresql
22+
)
2123
)
2224
.settings(
2325
publishTo := {
2426
val prefix = if (isSnapshot.value) "snapshots" else "releases"
2527
Some(s3resolver.value("HAT Library Artifacts " + prefix, s3("library-artifacts-" + prefix + ".hubofallthings.com")) withMavenPatterns)
2628
}
2729
)
30+
.configs(IntegrationTest)
31+
.settings(
32+
Defaults.itSettings,
33+
fork in IntegrationTest := true,
34+
envVars in IntegrationTest := Map("TESTCONTAINERS_RYUK_DISABLED" -> "true")
35+
)
36+
2837

2938
lazy val plugin = project.in(file("sbt-slick-postgres-generator"))
3039
.enablePlugins(BasicSettings)

project/BuildSettings.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ object BasicSettings extends AutoPlugin {
1818

1919
override def projectSettings = Seq(
2020
organization := "org.hatdex",
21-
version := "0.0.11",
21+
version := "0.0.12",
2222
resolvers ++= Dependencies.resolvers,
2323
scalaVersion := Dependencies.Versions.scalaVersion,
2424
crossScalaVersions := Dependencies.Versions.crossScala,

project/Dependencies.scala

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,18 @@ import sbt._
1111
object Dependencies {
1212

1313
object Versions {
14-
val crossScala = Seq("2.12.4")
14+
val crossScala = Seq("2.12.12")
1515
val scalaVersion = crossScala.head
1616
}
1717

1818
val resolvers = Seq(
1919
"Atlassian Releases" at "https://maven.atlassian.com/public/",
20-
"scalaz-bintray" at "http://dl.bintray.com/scalaz/releases",
21-
"Sonatype snapshots" at "https://oss.sonatype.org/content/repositories/snapshots/")
20+
Resolver.bintrayRepo("scalaz", "releases"),
21+
Resolver.sonatypeRepo("snapshots")
22+
)
2223

2324
object Library {
25+
2426
object Db {
2527
val liquibase = "org.liquibase" % "liquibase-maven-plugin" % "3.6.0"
2628
}
@@ -44,5 +46,18 @@ object Dependencies {
4446
val jodaTime = "joda-time" % "joda-time" % "2.9.9"
4547
val slf4j = "org.slf4j" % "slf4j-api" % "1.7.18"
4648
}
49+
50+
object ScalaTest {
51+
private val version = "3.2.2"
52+
val test = "org.scalatest" %% "scalatest" % version % IntegrationTest
53+
}
54+
55+
object TestContainers {
56+
private val version = "0.38.4"
57+
val scalaTest = "com.dimafeng" %% "testcontainers-scala-scalatest" % version % IntegrationTest
58+
val postgresql = "com.dimafeng" %% "testcontainers-scala-postgresql" % version % IntegrationTest
59+
}
60+
4761
}
62+
4863
}

project/build.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,4 @@
66
# Written by Andrius Aucinas <andrius.aucinas@hatdex.org>, 14/08/17 09:17
77
#
88

9-
sbt.version=1.0.2
9+
sbt.version=1.3.8
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
CREATE TABLE table_one(
2+
id INTEGER NOT NULL PRIMARY KEY,
3+
value JSONB NOT NULL
4+
);
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
ALTER TABLE table_one
2+
ADD COLUMN type VARCHAR;
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
<configuration>
2+
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
3+
<encoder>
4+
<pattern>%d{"yyyy-MM-dd'T'HH:mm:ss.SSSXXX", UTC} %level %logger - %msg%n</pattern>
5+
</encoder>
6+
</appender>
7+
8+
<root level="info">
9+
<appender-ref ref="CONSOLE" />
10+
</root>
11+
</configuration>
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package org.hatdex.libs.dal
2+
3+
import java.sql.ResultSet
4+
5+
import com.dimafeng.testcontainers.{ForAllTestContainer, PostgreSQLContainer}
6+
import com.typesafe.config.{Config, ConfigFactory}
7+
import org.scalatest.flatspec.AnyFlatSpec
8+
import org.scalatest.matchers.should.Matchers
9+
import org.slf4j.{Logger, LoggerFactory}
10+
import slick.jdbc.JdbcBackend.Database
11+
import slick.jdbc.JdbcProfile
12+
13+
import scala.collection.JavaConverters._
14+
import scala.concurrent.duration.DurationInt
15+
import scala.concurrent.{Await, ExecutionContext}
16+
17+
class BaseSchemaMigrationImplSpec extends AnyFlatSpec with ForAllTestContainer with Matchers {
18+
19+
override val container: PostgreSQLContainer = PostgreSQLContainer()
20+
21+
it should "apply each schema evolution in order" in {
22+
val schemaMigration = new SchemaMigrationImpl(container)
23+
24+
Await.ready(
25+
schemaMigration.run(Seq("evolution_01.sql", "evolution_02.sql")),
26+
10.seconds
27+
)
28+
29+
verifyTableCreated(schemaMigration.db)
30+
}
31+
32+
private def verifyTableCreated(db: JdbcProfile#Backend#Database) {
33+
val session = db.createSession()
34+
try {
35+
val rs = session.conn.createStatement().executeQuery("SELECT column_name, data_type FROM information_schema.columns WHERE table_name='table_one' ORDER BY ordinal_position")
36+
rsToIterator(rs).toList shouldBe List(
37+
"id" -> "integer",
38+
"value" -> "jsonb",
39+
"type" -> "character varying"
40+
)
41+
} finally session.close()
42+
}
43+
44+
private def rsToIterator(rs: ResultSet): Iterator[(String, String)] =
45+
new Iterator[(String, String)] {
46+
def hasNext: Boolean = rs.next()
47+
def next(): (String, String) = rs.getString(1) -> rs.getString(2)
48+
}
49+
50+
}
51+
52+
private class SchemaMigrationImpl(container: PostgreSQLContainer)
53+
extends BaseSchemaMigrationImpl {
54+
55+
override protected implicit val ec: ExecutionContext = scala.concurrent.ExecutionContext.Implicits.global
56+
override protected val logger: Logger = LoggerFactory.getLogger(this.getClass)
57+
override protected val configuration: Config = ConfigFactory.empty()
58+
59+
override val db: JdbcProfile#Backend#Database = {
60+
val dbConfigKey = "dockerpg"
61+
val cfg = ConfigFactory.parseMap(
62+
Map(
63+
dbConfigKey -> Map(
64+
"url" -> container.container.getJdbcUrl,
65+
"driver" -> container.container.getDriverClassName,
66+
"connectionPool" -> "disabled",
67+
"keepAliveConnection" -> true,
68+
"user" -> container.container.getUsername,
69+
"password" -> container.container.getPassword
70+
).asJava
71+
).asJava
72+
)
73+
Database.forConfig("dockerpg", cfg)
74+
}
75+
76+
}

slick-postgres-driver/src/main/scala/org/hatdex/libs/dal/BaseSchemaMigrationImpl.scala

Lines changed: 40 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import slick.jdbc.JdbcProfile
2020

2121
import scala.collection.JavaConverters._
2222
import scala.concurrent.{ ExecutionContext, Future, blocking }
23-
import scala.util.Try
23+
import scala.util.control.NonFatal
2424

2525
/**
2626
* Runs Liquibase based database schema and data migrations. This is the only place for all related
@@ -41,7 +41,7 @@ trait BaseSchemaMigrationImpl extends SchemaMigration {
4141
protected val defaultSchemaName = "hat"
4242
protected val liquibaseSchemaName = "public"
4343

44-
def run(evolutionsConfig: String = "db.default.evolutions"): Future[Unit] = {
44+
def run(evolutionsConfig: String = "db.default.evolutions"): Future[Unit] =
4545
Option(configuration.getStringList(evolutionsConfig))
4646
.map(_.asScala)
4747
.map { migrations =>
@@ -51,118 +51,76 @@ trait BaseSchemaMigrationImpl extends SchemaMigration {
5151
logger.warn("No evolutions configured")
5252
Future.successful(())
5353
}
54-
}
5554

5655
/**
5756
* Invoke this method to apply all DB migrations.
5857
*/
5958
def run(changeLogFiles: Seq[String]): Future[Unit] = {
60-
logger.error(s"Running schema migrations: ${changeLogFiles.mkString(", ")}")
61-
Future(db.createSession().conn).map { dbConnection
62-
changeLogFiles.foldLeft(Future.successful(())) { (execution, evolution)
63-
execution.flatMap { _
64-
logger.error(s"Running evolution $evolution")
65-
updateDb(evolution, dbConnection)
66-
}
67-
} onComplete {
68-
_ dbConnection.close()
59+
logger.info(s"Running schema migrations: ${changeLogFiles.mkString(", ")}")
60+
Future(db.createSession().conn).flatMap { dbConnection =>
61+
val sequencedEvolutions: Future[Unit] = changeLogFiles.foldLeft(Future.successful(())) { (execution, evolution) =>
62+
execution.flatMap(_ => updateDb(evolution, dbConnection))
6963
}
64+
sequencedEvolutions.onComplete(_ => dbConnection.close())
65+
sequencedEvolutions
7066
} recover {
71-
case e logger.error(s"Running database evolutions failed: ${e.getMessage}", e)
67+
case e => logger.error("Running database evolutions failed", e)
7268
}
7369
}
7470

75-
def resetDatabase(): Future[Unit] = {
76-
val eventuallyEvolved = Future {
77-
val dbConnection = db.createSession().conn
78-
79-
val liquibase = blocking {
80-
createLiquibase(dbConnection, "")
81-
}
71+
def resetDatabase(): Future[Unit] =
72+
Future {
8273
blocking {
83-
Try(liquibase.dropAll())
84-
.recover {
85-
case e =>
86-
liquibase.forceReleaseLocks()
87-
logger.error(s"Error dropping all database information")
88-
throw e
89-
}
90-
liquibase.forceReleaseLocks()
74+
val dbConnection = db.createSession().conn
75+
val liquibase = createLiquibase(dbConnection, "")
76+
try liquibase.dropAll()
77+
catch {
78+
case NonFatal(th) =>
79+
logger.error("Error dropping all database information", th)
80+
throw th
81+
} finally liquibase.forceReleaseLocks()
9182
}
9283
}
9384

94-
eventuallyEvolved.failed.foreach { e =>
95-
logger.error(s"Error updating database: ${e.getMessage}")
96-
}
97-
98-
eventuallyEvolved
99-
}
100-
10185
def rollback(changeLogFiles: Seq[String]): Future[Unit] = {
10286
logger.info(s"Rolling back schema migrations: ${changeLogFiles.mkString(", ")}")
10387
changeLogFiles.foldLeft(Future.successful(())) { (execution, evolution) => execution.flatMap { _ => rollbackDb(evolution) } }
10488
}
10589

106-
private def updateDb(diffFilePath: String, dbConnection: Connection): Future[Unit] = {
107-
val eventuallyEvolved = Future {
108-
109-
logger.info(s"Liquibase running evolutions $diffFilePath on db: [${dbConnection.getMetaData.getURL}]")
110-
val liquibase = blocking {
111-
createLiquibase(dbConnection, diffFilePath)
112-
}
90+
private def updateDb(diffFilePath: String, dbConnection: Connection): Future[Unit] =
91+
Future {
11392
blocking {
93+
logger.info(s"Liquibase running evolutions $diffFilePath on db: [${dbConnection.getMetaData.getURL}]")
94+
val liquibase = createLiquibase(dbConnection, diffFilePath)
11495
listChangesets(liquibase, new Contexts(changeContexts))
115-
Try(liquibase.update(changeContexts))
116-
.recover {
117-
case e =>
118-
liquibase.forceReleaseLocks()
119-
logger.error(s"Error executing schema evolutions: ${e.getMessage}")
120-
throw e
121-
}
122-
liquibase.forceReleaseLocks()
96+
try liquibase.update(changeContexts)
97+
catch {
98+
case NonFatal(th) =>
99+
logger.error(s"Error executing schema evolutions: ${th.getMessage}")
100+
throw th
101+
} finally liquibase.forceReleaseLocks()
123102
}
124103
}
125104

126-
eventuallyEvolved.failed.foreach { e =>
127-
logger.error(s"Error updating database: ${e.getMessage}")
128-
}
129-
130-
eventuallyEvolved
131-
}
132-
133-
private def rollbackDb(diffFilePath: String): Future[Unit] = {
134-
val eventuallyEvolved = Future {
135-
136-
val dbConnection = db.createSession().conn
137-
138-
logger.info(s"Liquibase rolling back evolutions $diffFilePath on db: [${dbConnection.getMetaData.getURL}]")
139-
val liquibase = blocking {
140-
createLiquibase(dbConnection, diffFilePath)
141-
}
105+
private def rollbackDb(diffFilePath: String): Future[Unit] =
106+
Future {
142107
blocking {
108+
val dbConnection = db.createSession().conn
109+
logger.info(s"Liquibase rolling back evolutions $diffFilePath on db: [${dbConnection.getMetaData.getURL}]")
110+
val liquibase = createLiquibase(dbConnection, diffFilePath)
143111
val contexts = new Contexts(changeContexts)
144112
val changesetsExecuted = liquibase.getChangeSetStatuses(contexts, new LabelExpression()).asScala.filterNot(_.getWillRun)
145-
Try(liquibase.rollback(changesetsExecuted.length, contexts, new LabelExpression()))
146-
.recover {
147-
case e =>
148-
liquibase.forceReleaseLocks()
149-
logger.error(s"Error executing schema evolutions: ${e.getMessage}")
150-
throw e
151-
}
152-
liquibase.forceReleaseLocks()
113+
try liquibase.rollback(changesetsExecuted.length, contexts, new LabelExpression())
114+
catch {
115+
case NonFatal(th) =>
116+
logger.error(s"Error rolling back schema evolutions: ${th.getMessage}")
117+
throw th
118+
} finally liquibase.forceReleaseLocks()
153119
}
154120
}
155121

156-
eventuallyEvolved.failed.foreach { e =>
157-
logger.error(s"Error updating database: ${e.getMessage}")
158-
}
159-
160-
eventuallyEvolved
161-
}
162-
163122
private def listChangesets(liquibase: Liquibase, contexts: Contexts): Unit = {
164123
val changesetStatuses = liquibase.getChangeSetStatuses(contexts, new LabelExpression()).asScala
165-
166124
logger.info("Existing changesets:")
167125
changesetStatuses.foreach { cs =>
168126
if (cs.getWillRun) {
@@ -176,7 +134,6 @@ trait BaseSchemaMigrationImpl extends SchemaMigration {
176134
protected def createLiquibase(dbConnection: Connection, diffFilePath: String): Liquibase = {
177135
val classLoader = configuration.getClass.getClassLoader
178136
val resourceAccessor = new ClassLoaderResourceAccessor(classLoader)
179-
180137
val database = DatabaseFactory.getInstance()
181138
.findCorrectDatabaseImplementation(new JdbcConnection(dbConnection))
182139
database.setDefaultSchemaName(defaultSchemaName)

0 commit comments

Comments
 (0)