Skip to content

Commit ecfd28b

Browse files
author
Jovica Andric
committed
Updated method that checks current schema version with consistency level ONE instead of ALL
1 parent 790a058 commit ecfd28b

File tree

4 files changed

+37
-16
lines changed

4 files changed

+37
-16
lines changed

src/main/java/io/smartcat/migration/CassandraVersioner.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,16 @@ public class CassandraVersioner {
2424
private static final String DESCRIPTION = "description";
2525

2626
private static final String CREATE_SCHEMA_VERSION_CQL = String.format("CREATE TABLE IF NOT EXISTS %s (",
27-
SCHEMA_VERSION_CF)
28-
+ String.format("%s text,", TYPE)
29-
+ String.format("%s int,", VERSION)
30-
+ String.format("%s bigint,", TIMESTAMP)
31-
+ String.format("%s text,", DESCRIPTION)
27+
SCHEMA_VERSION_CF) + String.format("%s text,", TYPE) + String.format("%s int,", VERSION)
28+
+ String.format("%s bigint,", TIMESTAMP) + String.format("%s text,", DESCRIPTION)
3229
+ String.format("PRIMARY KEY (%s, %s)", TYPE, VERSION)
3330
+ String.format(") WITH CLUSTERING ORDER BY (%s DESC)", VERSION) + " AND COMMENT='Schema version';";
3431

3532
private final Session session;
3633

3734
/**
3835
* Create Cassandra versioner for active session.
36+
*
3937
* @param session Active Cassandra session
4038
*/
4139
public CassandraVersioner(final Session session) {
@@ -50,16 +48,17 @@ private void createSchemaVersion() {
5048
}
5149

5250
/**
53-
* Get current database version for given migration type with ALL consistency. Select one row since
54-
* migration history is saved ordered descending by timestamp. If there are no rows in the schema_version table,
55-
* return 0 as default database version. Data version is changed by executing migrations.
51+
* Get current database version for given migration type with ALL consistency. Select one row since migration
52+
* history is saved ordered descending by timestamp. If there are no rows in the schema_version table, return 0 as
53+
* default database version. Data version is changed by executing migrations.
5654
*
5755
* @param type Migration type
56+
* @param consistencyLevel Desired consistency level
5857
* @return Database version for given type
5958
*/
60-
public int getCurrentVersion(final MigrationType type) {
59+
public int getCurrentVersion(final MigrationType type, final ConsistencyLevel consistencyLevel) {
6160
final Statement select = QueryBuilder.select().all().from(SCHEMA_VERSION_CF)
62-
.where(QueryBuilder.eq(TYPE, type.name())).limit(1).setConsistencyLevel(ConsistencyLevel.ALL);
61+
.where(QueryBuilder.eq(TYPE, type.name())).limit(1).setConsistencyLevel(consistencyLevel);
6362
final ResultSet result = session.execute(select);
6463

6564
final Row row = result.one();

src/main/java/io/smartcat/migration/MigrationEngine.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
11
package io.smartcat.migration;
22

3-
import io.smartcat.migration.exceptions.MigrationException;
3+
import static com.datastax.driver.core.ConsistencyLevel.ALL;
4+
import static com.datastax.driver.core.ConsistencyLevel.ONE;
5+
46
import org.slf4j.Logger;
57
import org.slf4j.LoggerFactory;
68

79
import com.datastax.driver.core.Session;
810

11+
import io.smartcat.migration.exceptions.MigrationException;
12+
913
/**
1014
* Migration engine wraps Migrator and provides DSL like API.
11-
*
1215
*/
1316
public class MigrationEngine {
1417

@@ -19,6 +22,7 @@ private MigrationEngine() {
1922

2023
/**
2124
* Create migrator out of session fully prepared for doing migration of resources.
25+
*
2226
* @param session Datastax driver session object
2327
* @return migrator instance with versioner and session which can migrate resources
2428
*/
@@ -30,11 +34,13 @@ public static Migrator withSession(final Session session) {
3034
* Migrator handles migrations and errors.
3135
*/
3236
public static class Migrator {
37+
3338
private final Session session;
3439
private final CassandraVersioner versioner;
3540

3641
/**
3742
* Create new Migrator with active Cassandra session.
43+
*
3844
* @param session Active Cassandra session
3945
*/
4046
public Migrator(final Session session) {
@@ -55,7 +61,7 @@ public boolean migrate(final MigrationResources resources) {
5561
for (final Migration migration : resources.getMigrations()) {
5662
final MigrationType type = migration.getType();
5763
final int migrationVersion = migration.getVersion();
58-
final int version = versioner.getCurrentVersion(type);
64+
final int version = versioner.getCurrentVersion(type, ONE);
5965

6066
LOGGER.info("Db is version {} for type {}.", version, type.name());
6167
LOGGER.info("Compare {} migration version {} with description {}", type.name(), migrationVersion,
@@ -67,6 +73,8 @@ public boolean migrate(final MigrationResources resources) {
6773
continue;
6874
}
6975

76+
checkIfAllReplicasAreUp(type);
77+
7078
migration.setSession(session);
7179

7280
final long start = System.currentTimeMillis();
@@ -94,6 +102,17 @@ public boolean migrate(final MigrationResources resources) {
94102

95103
return true;
96104
}
105+
106+
/**
107+
* Method that checks if all replicas are up by getting current migration type with consistency
108+
* level ALL. This method will only execute if there are new migrations. Method will throw exception if any of
109+
* replicas is down and migration process will break
110+
*
111+
* @param type Migration type
112+
*/
113+
private void checkIfAllReplicasAreUp(final MigrationType type) {
114+
versioner.getCurrentVersion(type, ALL);
115+
}
97116
}
98117

99118
}

src/test/java/io/smartcat/migration/CassandraVersionerTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.smartcat.migration;
22

3+
import static com.datastax.driver.core.ConsistencyLevel.ALL;
34
import static io.smartcat.migration.MigrationType.SCHEMA;
45
import static org.hamcrest.CoreMatchers.is;
56
import static org.junit.Assert.assertThat;
@@ -34,7 +35,7 @@ public void setUp() throws Exception {
3435
public void whenSchemaVersionTableIsEmptyThenCurrentVersionShouldBe0() throws Exception {
3536
expectRetrieveEmptyCurrentVersion();
3637

37-
int currentVersion = versioner.getCurrentVersion(SCHEMA);
38+
int currentVersion = versioner.getCurrentVersion(SCHEMA, ALL);
3839

3940
assertThat(currentVersion, is(0));
4041
}
@@ -45,7 +46,7 @@ public void whenSchemaVersionTableIsNotEmptyThenCurrentVersionShouldBeRetrievedF
4546

4647
expectRetrieveCurrentVersion(expectedVersion);
4748

48-
int currentVersion = versioner.getCurrentVersion(SCHEMA);
49+
int currentVersion = versioner.getCurrentVersion(SCHEMA, ALL);
4950

5051
assertThat(currentVersion, is(expectedVersion));
5152
}

src/test/java/io/smartcat/migration/MigratorTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.smartcat.migration;
22

3+
import static com.datastax.driver.core.ConsistencyLevel.ALL;
34
import static org.hamcrest.CoreMatchers.is;
45
import static org.hamcrest.CoreMatchers.not;
56
import static org.junit.Assert.assertFalse;
@@ -24,6 +25,7 @@
2425
import io.smartcat.migration.migrations.schema.AddBookISBNFieldMigration;
2526

2627
public class MigratorTest extends BaseTest {
28+
2729
private static final Logger LOGGER = LoggerFactory.getLogger(MigratorTest.class);
2830

2931
private static final String CONTACT_POINT = "localhost";
@@ -138,7 +140,7 @@ public void skipMigrationWithSameVersionThanCurrentSchemaVersion() throws Except
138140
}
139141

140142
private int getCurrentVersion() {
141-
return versioner.getCurrentVersion(MigrationType.SCHEMA);
143+
return versioner.getCurrentVersion(MigrationType.SCHEMA, ALL);
142144
}
143145

144146
private void assertTableDoesntContainsColumns(String table, String... columns) {

0 commit comments

Comments
 (0)