-
Notifications
You must be signed in to change notification settings - Fork 0
Use Alpakka Cassandra 2.0.2 instead of DataStax Java Driver 3.11.0 #54
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
df339c4
b9691b4
9064023
bfe5bb7
6c57802
785eee8
9efa34f
1696490
ed3ab0d
e60e510
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,29 @@ | ||
| # | ||
| # Breaks the backward compatibility | ||
| # | ||
| ProblemFilters.exclude[MissingClassProblem]("lerna.util.sequence.FutureConverters") | ||
| ProblemFilters.exclude[MissingClassProblem]("lerna.util.sequence.FutureConverters$") | ||
|
|
||
|
|
||
| # | ||
| # Package private classes: The following changes don't break the backward compatibility | ||
| # | ||
| ProblemFilters.exclude[MissingClassProblem]("lerna.util.sequence.FixedRetryPolicy") | ||
| ProblemFilters.exclude[MissingClassProblem]("lerna.util.sequence.FutureConverters$ListenableFutureConverter") | ||
| ProblemFilters.exclude[MissingClassProblem]("lerna.util.sequence.FutureConverters$ListenableFutureConverter$") | ||
| ProblemFilters.exclude[IncompatibleResultTypeProblem]("lerna.util.sequence.SequenceStore#SessionContext.session") | ||
| ProblemFilters.exclude[IncompatibleResultTypeProblem]("lerna.util.sequence.SequenceStore#SessionContext.selectSequenceReservationStatement") | ||
| ProblemFilters.exclude[IncompatibleResultTypeProblem]("lerna.util.sequence.SequenceStore#SessionContext.insertSequenceReservationStatement") | ||
| ProblemFilters.exclude[IncompatibleMethTypeProblem]("lerna.util.sequence.SequenceStore#SessionContext.copy") | ||
| ProblemFilters.exclude[IncompatibleResultTypeProblem]("lerna.util.sequence.SequenceStore#SessionContext.copy$default$1") | ||
| ProblemFilters.exclude[IncompatibleResultTypeProblem]("lerna.util.sequence.SequenceStore#SessionContext.copy$default$2") | ||
| ProblemFilters.exclude[IncompatibleResultTypeProblem]("lerna.util.sequence.SequenceStore#SessionContext.copy$default$3") | ||
| ProblemFilters.exclude[IncompatibleMethTypeProblem]("lerna.util.sequence.SequenceStore#SessionContext.apply") | ||
| ProblemFilters.exclude[IncompatibleSignatureProblem]("lerna.util.sequence.SequenceStore#SessionContext.unapply") | ||
| ProblemFilters.exclude[IncompatibleMethTypeProblem]("lerna.util.sequence.SequenceStore#SessionContext.this") | ||
| ProblemFilters.exclude[IncompatibleResultTypeProblem]("lerna.util.sequence.SequenceStore#SessionOpened.session") | ||
| ProblemFilters.exclude[IncompatibleMethTypeProblem]("lerna.util.sequence.SequenceStore#SessionOpened.copy") | ||
| ProblemFilters.exclude[IncompatibleResultTypeProblem]("lerna.util.sequence.SequenceStore#SessionOpened.copy$default$1") | ||
| ProblemFilters.exclude[IncompatibleMethTypeProblem]("lerna.util.sequence.SequenceStore#SessionOpened.this") | ||
| ProblemFilters.exclude[IncompatibleMethTypeProblem]("lerna.util.sequence.SequenceStore#SessionOpened.apply") | ||
| ProblemFilters.exclude[IncompatibleSignatureProblem]("lerna.util.sequence.SequenceStore#SessionOpened.unapply") |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,58 +32,87 @@ lerna.util.sequence { | |
| // example = ${lerna.util.sequence.cassandra.default} | ||
| } | ||
| default { | ||
| # コンタクトポイント | ||
| contact-points = ["127.0.0.1"] | ||
xirc marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| # Full path to the Datastax Java driver's configuration. | ||
| # Some settings is not overridable in a profile. | ||
| # If we prefer to override such settings for each tenant, | ||
| # we must use a different path for each tenant by overriding this setting. | ||
| # See also https://docs.datastax.com/en/developer/java-driver/4.6/manual/core/configuration/ | ||
| datastax-java-driver-config = "datastax-java-driver" | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 次の driver に関連する項目は
|
||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @xirc 設定項目 default = ${alpakka.cassandra} {
# Profiles to use.
# We can set different profiles for reads and writes.
# See also https://docs.datastax.com/en/developer/java-driver/4.6/manual/core/configuration/#execution-profiles
read-profile = "lerna-util-sequence-profile"
... 略 ...キーの衝突を避けるため、次のように設定を継承するための新しい設定項目を作っても良いかもしれません。 defualt {
alpakka.cassandra = ${alpakka.cassandra}
# Profiles to use.
# We can set different profiles for reads and writes.
# See also https://docs.datastax.com/en/developer/java-driver/4.6/manual/core/configuration/#execution-profiles
read-profile = "lerna-util-sequence-profile"
... 略 ...
}config の継承については次のページに説明があります。
このようにしておくと、ユーザーが また、Alpakka Cassandra が読み込む設定が増えたとしても lerna-util-sequence の reference.conf を改修する必要がなくなるはずです。
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 継承することも可能だと思います。
|
||
| # Profiles to use. | ||
| # We can set different profiles for reads and writes. | ||
| # See also https://docs.datastax.com/en/developer/java-driver/4.6/manual/core/configuration/#execution-profiles | ||
| read-profile = "lerna-util-sequence-profile" | ||
| write-profile = "lerna-util-sequence-profile" | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. profile 機能を活用し、consistency level 等が設定できます。 デフォルト構成では、read, write の profile は同じものを使います。 |
||
|
|
||
| # シーケンスに関する情報を保存するキースペース | ||
| keyspace = "sequence" | ||
|
|
||
| # シーケンスの確保済み最大値を永続化するテーブル | ||
| table = "sequence_reservation" | ||
|
|
||
| # 書き込み時の整合性レベル | ||
| # see com.datastax.driver.core.ConsistencyLevel | ||
| write-consistency = QUORUM | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
|
||
| # insert など書き込みの失敗でリトライする回数 | ||
| write-retries = 3 | ||
|
|
||
| # 読み込み時の整合性レベル | ||
| # see com.datastax.driver.core.ConsistencyLevel | ||
| read-consistency = QUORUM | ||
|
|
||
| # select など読み込みの失敗でリトライする回数 | ||
| read-retries = 3 | ||
|
|
||
| # キースペースのレプリケーション係数 | ||
| data-center-replication-factors = [ | ||
| "datacenter1:1" | ||
| ] | ||
|
|
||
| # 接続先を指定したローカルデータセンターに制限する設定 | ||
| # この設定を有効にした場合は DCAwareRoundRobinPolicy を使用する | ||
| # この設定を指定しない場合はデフォルトで Datastax 標準 の RoundRobinPolicy が使われる | ||
| # マルチデータセンター構成でより効率的に処理するためには設定を推奨 | ||
| local-datacenter = "" | ||
|
|
||
| # 認証 | ||
| authentication { | ||
xirc marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| # ユーザー名 | ||
| username = "cassandra" | ||
| # パスワード | ||
| password = "cassandra" | ||
| } | ||
| } | ||
| } | ||
|
|
||
| # Default datastax java driver profiles to use. | ||
| # See also https://docs.datastax.com/en/developer/java-driver/4.6/manual/core/configuration/reference/ | ||
| datastax-java-driver { | ||
|
|
||
| profiles { | ||
| lerna-util-sequence-profile { | ||
| basic.request { | ||
| consistency = QUORUM | ||
| } | ||
|
|
||
| # See: https://docs.datastax.com/en/developer/java-driver/3.6/manual/socket_options/#socket-options | ||
| socket { | ||
| # If we prefer to set the request timeout, we must overrride this setting. | ||
| # See also https://docs.datastax.com/en/developer/java-driver/4.6/manual/core/configuration/reference/ | ||
| // basic.request.timeout = 2 seconds | ||
|
|
||
| # コネクションを確立するまでのタイムアウト | ||
| connection-timeout = 5000 millis | ||
xirc marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| # The default load balancing policy requires a local datacenter to be specified. | ||
| # See also https://docs.datastax.com/en/developer/java-driver/4.6/manual/core/load_balancing/#built-in-policies | ||
| // basic.load-balancing-policy { | ||
| // class = DefaultLoadBalancingPolicy | ||
| // local-datacenter = datacenter1 | ||
| // } | ||
|
|
||
| # If we prefer to use own retry policy, we must override this setting. | ||
| # See also https://docs.datastax.com/en/developer/java-driver/4.6/manual/core/retries/ | ||
| // advanced.retry-policy { | ||
| // class = DefaultRetryPolicy | ||
| // } | ||
|
|
||
| # ホストごとの read timeout。Cassandra 側のタイムアウト時間よりも大きい値を設定すること | ||
| # 最悪の場合、1つのクエリの実行で read-timeout * (write-retries or read-retires) の時間がかかる | ||
| read-timeout = 12000 millis | ||
xirc marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
| } | ||
|
|
||
| # If we prefer to use different contact points rather than the default, we must override this setting. | ||
| # See also https://docs.datastax.com/en/developer/java-driver/4.6/manual/core/#contact-points | ||
| // basic.contact-points = [ "127.0.0.1:9042" ] | ||
|
|
||
| # If we prefer to retry connecting at an inititialization time, we must override this setting. | ||
| # See also https://docs.datastax.com/en/developer/java-driver/4.6/manual/core/reconnection/#at-init-time | ||
| // advanced.reconnect-on-init = true | ||
|
|
||
| # If we prefer to use authentication, we must override these settings. | ||
| # See also https://docs.datastax.com/en/developer/java-driver/4.6/manual/core/authentication/ | ||
| // advanced.auth-provider { | ||
| // class = PlainTextAuthProvider | ||
| // username = cassandra | ||
| // password = cassandra | ||
| // } | ||
|
|
||
| # If we prefer to set connection timeout, we must override this setting. | ||
| # See also | ||
| # * https://docs.datastax.com/en/developer/java-driver/4.6/manual/core/pooling/ | ||
| # * https://docs.datastax.com/en/developer/java-driver/4.6/manual/core/configuration/reference/#:~:text=connect-timeout | ||
| // advanced.connection { | ||
| // connect-timeout = 5 seconds | ||
| // } | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,21 +1,31 @@ | ||
| package lerna.util.sequence | ||
|
|
||
| import com.datastax.oss.driver.api.core.cql.SimpleStatement | ||
|
|
||
| private[sequence] final class CassandraStatements(config: SequenceFactoryCassandraConfig) { | ||
|
|
||
| val createKeyspace: String = | ||
| val createKeyspaceCQL: String = | ||
| s""" | ||
| |CREATE KEYSPACE IF NOT EXISTS ${config.cassandraKeyspace} | ||
| | WITH REPLICATION = { | ||
| | ${config.cassandraReplication} | ||
| | } | ||
| """.stripMargin | ||
| val createKeyspace: SimpleStatement = | ||
| SimpleStatement | ||
| .newInstance(createKeyspaceCQL) | ||
| .setIdempotent(true) | ||
|
|
||
| val useKeyspace: String = | ||
| val useKeyspaceCQL: String = | ||
| s""" | ||
| |USE ${config.cassandraKeyspace} | ||
| """.stripMargin | ||
| val useKeyspace: SimpleStatement = | ||
| SimpleStatement | ||
| .newInstance(useKeyspaceCQL) | ||
| .setIdempotent(true) | ||
|
|
||
| val createTable: String = | ||
| val createTableCQL: String = | ||
| s""" | ||
| |CREATE TABLE IF NOT EXISTS ${config.cassandraTable} ( | ||
| | sequence_id varchar, | ||
|
|
@@ -25,18 +35,30 @@ private[sequence] final class CassandraStatements(config: SequenceFactoryCassand | |
| | PRIMARY KEY ((sequence_id, sequence_sub_id, node_id)) | ||
| |) | ||
| """.stripMargin | ||
| val createTable: SimpleStatement = | ||
| SimpleStatement | ||
| .newInstance(createTableCQL) | ||
| .setIdempotent(true) | ||
|
|
||
| val selectSequenceReservation: String = | ||
| val selectSequenceReservationCQL: String = | ||
| s""" | ||
| |SELECT max_reserved_value FROM ${config.cassandraTable} | ||
| | WHERE sequence_id = ? | ||
| | AND sequence_sub_id = ? | ||
| | AND node_id = ? | ||
| """.stripMargin | ||
| val selectSequenceReservation: SimpleStatement = | ||
| SimpleStatement | ||
| .newInstance(selectSequenceReservationCQL) | ||
| .setIdempotent(true) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
QueryBuilder を使わない限り、冪等かどうかは手動で設定する必要があります。 https://docs.datastax.com/en/developer/java-driver/3.11/manual/retries/#retries
冪等の設定は、prepared statement 、 bounded statement を介した場合でも引き継がれます。 また、4.x では冪等を明示しない場合のデフォルト値を、設定 |
||
|
|
||
| val insertSequenceReservation: String = | ||
| val insertSequenceReservationCQL: String = | ||
| s""" | ||
| |INSERT INTO ${config.cassandraTable} (sequence_id, sequence_sub_id, node_id, max_reserved_value) VALUES (?, ?, ?, ?) | ||
| """.stripMargin | ||
| val insertSequenceReservation: SimpleStatement = | ||
| SimpleStatement | ||
| .newInstance(insertSequenceReservationCQL) | ||
| .setIdempotent(true) | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| package lerna.util.sequence | ||
|
|
||
| import akka.actor.ClassicActorSystemProvider | ||
| import com.datastax.oss.driver.api.core.CqlSession | ||
|
|
||
| import scala.compat.java8.FutureConverters._ | ||
| import scala.concurrent.{ ExecutionContextExecutor, Future } | ||
|
|
||
| private[sequence] object CqlSessionProvider { | ||
|
|
||
| /** Connect to the Cassandra cluster and returns a [[Future]] instance containing [[CqlSession]]. | ||
| * | ||
| * A driver configuration is resolved from the given [[ClassicActorSystemProvider]] and [[SequenceFactoryCassandraConfig]]. | ||
| * The connection initialization will be done asynchronously in a driver internal thread pool. | ||
| */ | ||
| def connect( | ||
| systemProvider: ClassicActorSystemProvider, | ||
| config: SequenceFactoryCassandraConfig, | ||
| ): Future[CqlSession] = { | ||
| implicit val executionContext: ExecutionContextExecutor = systemProvider.classicSystem.dispatcher | ||
| // NOTE: | ||
| // Session 確立に Alpakka Cassandra の機能を活用することを検討しました。 | ||
| // 例えば akka.stream.alpakka.cassandra.CqlSessionProvider を使用することで、 | ||
| // Akka Discovery を使用した Cassandra の contact points 検出などより高度な機能を提供できます。 | ||
| // 幾つかの検討事項があり、採用を見送ることとしました。 | ||
| // https://github.com/lerna-stack/lerna-app-library/issues/58 | ||
| for { | ||
| configLoader <- Future.fromTry(config.resolveDriverConfigLoader(systemProvider)) | ||
| session <- CqlSession.builder().withConfigLoader(configLoader).buildAsync().toScala | ||
| } yield { | ||
| session | ||
| } | ||
| } | ||
|
|
||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
scala/scala-java8-compat: A Java 8 compatibility kit for Scala.
幾つか追加したテストで Java Optional => Scala Option に変換する等のために必要です。