Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
7d5303d
🔨refactor: シーケンスに関わる処理を SequenceContext のメソッドとして実装
negokaz-tis Aug 30, 2021
31d0b71
🔨refactor: SequenceFactoryWorker の状態を ready と empty の2つの状態に変更
negokaz-tis Aug 30, 2021
59c6134
🐞fix: シーケンスの予約可能数の計算で最大シーケンス番号を考慮する
negokaz-tis Aug 30, 2021
efdb991
🚨test: 採番値の予約に失敗した場合のテストケースを追加
negokaz-tis Sep 2, 2021
2ab6ac1
✨feat: Cassandra に書き込んでいる途中に来た採番値予約の要求は最後のもの以外を無視する
negokaz-tis Sep 2, 2021
6271eaf
🔀Revert "✨feat: Cassandra に書き込んでいる途中に来た採番値予約の要求は最後のもの以外を無視する"
negokaz-tis Sep 3, 2021
104d8e4
📚doc: Update CHANGELOG
negokaz-tis Sep 3, 2021
5b3f53d
🐞fix: 採番値が枯渇しているときに予約のリトライが行われない問題を修正
negokaz-tis Sep 9, 2021
54d8f1f
🐞fix: 予約完了後に枯渇したりオーバフローするのは異常なのでエラーにする
negokaz-tis Sep 9, 2021
d5c6215
🐞fix: maxSequenceValue を超えて予約しないように修正
negokaz-tis Sep 10, 2021
6e19510
📚doc: コメントの計算式を修正
negokaz-tis Sep 10, 2021
cfc8fc5
📚doc: SequenceFactoryWorker の振る舞いに関するテストケースを洗い出し
negokaz-tis Sep 13, 2021
ee7e5f8
📚doc: InitialSequenceReserved のコメントの計算式を修正
negokaz-tis Sep 13, 2021
6401fa5
Merge branch 'feature/fix-sequence-factory' into fix-sequence-factory
tksugimoto Sep 13, 2021
acd87db
🐞fix: 採番値がoverflowしている状態で予約リトライの応答が返ってくると Worker が stop する問題を修正
negokaz-tis Sep 17, 2021
dc28d4a
🔀Revert "📚doc: SequenceFactoryWorker の振る舞いに関するテストケースを洗い出し"
negokaz-tis Sep 23, 2021
f7c7b05
🐞fix: 採番値の在庫が少なくなったときの予約で 1 つ余計に採番値を予約していた問題を修正
negokaz-tis Sep 24, 2021
152c6d5
🚨test: 採番値が枯渇しているときにリトライされたリセットの応答が遅れて返ってきてもユニークな採番値を発行できる
negokaz-tis Sep 24, 2021
b1395f6
🐞fix: リセットのリトライによりユニークな採番値が発行できないケースある問題を修正
negokaz-tis Sep 24, 2021
25bc6f2
📚doc: SequenceFactoryWorkerSpec のコメントを修正
negokaz-tis Sep 24, 2021
b459f5f
📚doc: SequenceFactoryWorkerSpec のコメントを修正
negokaz-tis Sep 24, 2021
7d2e100
🐞fix: リセット時に採番値が余分に 1 つ多く予約される問題を修正
negokaz-tis Sep 24, 2021
e186160
📚doc: CHANGELOG を更新
negokaz-tis Sep 24, 2021
8410a62
🚨test: SequenceReset の仕様変更を SequenceFactoryWorkerSpec に反映
negokaz-tis Sep 24, 2021
70835e6
🔀Merge branch 'feature/fix-sequence-factory' into fix-sequence-factory
negokaz-tis Sep 24, 2021
889b26c
🚨test: テストケースを削除します: 予約中の採番要求で次番号が上限を超えた場合はリセットする
negokaz-tis Sep 24, 2021
a2cdece
🚨test: 予約に失敗した場合、次の採番要求まで予約がリトライされていないことを確認
negokaz-tis Sep 24, 2021
3a1e86b
👷chore: LoggingTestKit を使えるように lerna-tests に logback の依存を追加する
negokaz-tis Sep 24, 2021
59d5e2f
🐞fix: ログ出力で remain が多く報告される問題を修正します
negokaz-tis Sep 24, 2021
b9813b6
🚨test: 採番値が枯渇するまでの予約要求を全て失敗させるように変更
negokaz-tis Sep 24, 2021
6c93671
🔀Merge branch 'feature/fix-sequence-factory' into fix-sequence-factory
negokaz-tis Sep 27, 2021
1854b11
Merge branch 'feature/fix-sequence-factory' into fix-sequence-factory
tksugimoto Sep 27, 2021
02214b8
🚨test: logback-test.xml を追加しログの出る量を減らします
negokaz-tis Sep 27, 2021
1f85a6a
🚨test: 重複するテストケースを削除します
negokaz-tis Sep 27, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- `lerna-util-sequence`
- Reserving an unnecessary extra sequence value when resetting [PR#65](https://github.com/lerna-stack/lerna-app-library/pull/65)
- Can not generating sequence number correctly on corner cases [#49](https://github.com/lerna-stack/lerna-app-library/issues/49)
- Reserving an unnecessary extra sequence value when there are not enough sequence values in stock [PR#57(comment)](https://github.com/lerna-stack/lerna-app-library/pull/57#discussion_r713544755)

## [v2.0.0] - 2021-07-16
[v2.0.0]: https://github.com/lerna-stack/lerna-app-library/compare/v1.0.0...v2.0.0
Expand Down
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ lazy val lernaTests = lernaModule("lerna-tests")
libraryDependencies ++= Seq(
TestDependencies.Expecty.expecty,
Dependencies.Akka.testKit,
Dependencies.Logback.classic,
),
)

Expand Down
16 changes: 16 additions & 0 deletions lerna-util-sequence/src/main/resources/logback-test.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<conversionRule conversionWord="msg" converterClass="lerna.log.logback.converter.OneLineEventConverter" />
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%date{HH:mm:ss.SSS}\t%-5level\t%logger\t%X{akkaSource:--}\t%X{traceId:--}\t%X{tenantId:--}\t%msg%xEx%nopex%n</pattern>
</encoder>
</appender>

<logger level="INFO" name="akka" />
<logger level="INFO" name="lerna" />

<root level="WARN">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,55 @@ private[sequence] object SequenceFactoryWorker extends AppTypedActorLogging {
sealed trait DomainEvent
final case class SequenceGenerated(value: BigInt, sequenceSubId: Option[String]) extends DomainEvent

final case class SequenceContext(maxReservedValue: BigInt, nextValue: BigInt)
final case class SequenceConfig(
maxSequenceValue: BigInt,
incrementStep: Int,
reservationAmount: Int,
reservationFactor: Int,
)

/** ここに実装されているメソッドは採番前に実行されることを想定する。
* 例えば [[remainAmount]] は、[[nextValue]] がまだ消費されていない前提で残数を返す。
*/
final case class SequenceContext(maxReservedValue: BigInt, nextValue: BigInt) {

/** 採番値を次に進める */
def next()(implicit config: SequenceConfig): SequenceContext =
copy(nextValue = nextValue + config.incrementStep)

/** 採番可能なシーケンスの残数 */
def remainAmount(implicit config: SequenceConfig): BigInt =
if (isEmpty) {
BigInt(0)
} else {
val remainExceptNextValue =
if (maxReservedValue > nextValue) {
((maxReservedValue - nextValue) / config.incrementStep)
} else BigInt(0)
remainExceptNextValue + 1 // nextValue 分 + 1 する
}

/** 追加で予約可能なシーケンスの数 */
def freeAmount(implicit config: SequenceConfig): Int =
Math.min(
// 予約数制限(reservationAmount)の中で採番可能なシーケンスの数
(config.reservationAmount - remainAmount).toInt,
// 最大シーケンス番号(maxSequenceValue)までの間で採番可能なシーケンスの数
((config.maxSequenceValue - maxReservedValue) / config.incrementStep).toInt,
)

/** 発行できるシーケンスの最大値を超えている */
def isOverflow(implicit config: SequenceConfig): Boolean =
nextValue > config.maxSequenceValue

/** 発行できるシーケンスが少なくなっている */
def isStarving(implicit config: SequenceConfig): Boolean =
remainAmount <= (config.reservationAmount / config.reservationFactor)

/** 発行できるシーケンスがない */
def isEmpty: Boolean =
nextValue > maxReservedValue
}
}

private[sequence] final class SequenceFactoryWorker(
Expand Down Expand Up @@ -81,133 +129,195 @@ private[sequence] final class SequenceFactoryWorker(

def createBehavior(): Behavior[Command] = {
context.self ! Initialize
notReady
notReady(initializeTried = false)
}

private val responseMapper: ActorRef[SequenceStore.ReservationResponse] =
context.messageAdapter(response => WrappedSequenceStoreResponse(response))

private[this] def notReady = Behaviors.receiveMessage[Command] {
private[this] implicit val config: SequenceConfig = SequenceConfig(
maxSequenceValue = maxSequenceValue,
incrementStep = incrementStep,
reservationAmount = reservationAmount,
reservationFactor = reservationFactor,
)

@SuppressWarnings(Array("org.wartremover.warts.Recursion"))
private[this] def notReady(initializeTried: Boolean): Behavior[Command] = Behaviors.receiveMessage[Command] {
case Initialize =>
sequenceStore ! SequenceStore.InitialReserveSequence(firstValue, reservationAmount, sequenceSubId, responseMapper)
Behaviors.same
case WrappedSequenceStoreResponse(msg: SequenceStore.InitialSequenceReserved) =>
if (msg.initialValue > maxSequenceValue) {
sequenceStore ! SequenceStore.ResetReserveSequence(firstValue, reservationAmount, sequenceSubId, responseMapper)
resetting
} else {
logger.info("initial reserved: max:{}, initial:{}", msg.maxReservedValue, msg.initialValue)
stashBuffer.unstashAll(ready(SequenceContext(msg.maxReservedValue, nextValue = msg.initialValue)))
}
logger.info("initial reserved: max:{}, initial:{}", msg.maxReservedValue, msg.initialValue)
val sequenceContext: SequenceContext =
SequenceContext(msg.maxReservedValue, nextValue = msg.initialValue)
stashBuffer.unstashAll(prepareNextSequence(nextSequence = sequenceContext))
case WrappedSequenceStoreResponse(SequenceStore.ReservationFailed) =>
notReady(initializeTried = true)
case message: GenerateSequence =>
if (initializeTried) {
context.self ! Initialize
}
stashBuffer.stash(message)
Behaviors.same
case ReceiveTimeout => Behaviors.stopped
case WrappedSequenceStoreResponse(_: SequenceStore.SequenceReserved) => Behaviors.unhandled
case WrappedSequenceStoreResponse(_: SequenceStore.SequenceReset) => Behaviors.unhandled
case WrappedSequenceStoreResponse(SequenceStore.ReservationFailed) => Behaviors.unhandled // FIXME
}

@SuppressWarnings(Array("lerna.warts.CyclomaticComplexity", "org.wartremover.warts.Recursion"))
private[this] def ready(implicit sequenceContext: SequenceContext): Behavior[Command] =
Behaviors.receiveMessage[Command] {
case msg: GenerateSequence =>
if (msg.sequenceSubId === sequenceSubId) {
import sequenceContext._

if (nextValue <= maxSequenceValue) {
msg.replyTo ! SequenceGenerated(nextValue, sequenceSubId)
logger.debug("SequenceGenerated when ready: {}", nextValue)
} else {
stashBuffer.stash(msg)
}
private[this] def ready(sequenceContext: SequenceContext): Behavior[Command] = Behaviors.receiveMessage {
case msg: GenerateSequence =>
if (msg.sequenceSubId === sequenceSubId) {
acceptGenerateSequence(msg, sequenceContext)
} else Behaviors.unhandled
case WrappedSequenceStoreResponse(msg: SequenceStore.SequenceReserved) =>
handleSequenceReserved(msg, sequenceContext)
case WrappedSequenceStoreResponse(SequenceStore.ReservationFailed) => Behaviors.same
case ReceiveTimeout => Behaviors.stopped
case Initialize => Behaviors.unhandled
case WrappedSequenceStoreResponse(_: SequenceStore.InitialSequenceReserved) => Behaviors.unhandled
case WrappedSequenceStoreResponse(msg: SequenceStore.SequenceReset) =>
// reset するときは必ず empty になっているため
Behaviors.unhandled
}

val newNextValue = nextValue + incrementStep
val remainAmount = // 残数
if (maxReservedValue > newNextValue) {
(maxReservedValue - newNextValue) / incrementStep
} else BigInt(0)

if (newNextValue > maxSequenceValue) {
sequenceStore ! SequenceStore.ResetReserveSequence(
firstValue,
reservationAmount,
sequenceSubId,
responseMapper,
)
resetting
} else if (remainAmount <= (reservationAmount / reservationFactor)) {
val amount = (reservationAmount - remainAmount).toInt
logger.info(
"Reserving sequence: remain {}, add {}, current max reserved: {}",
remainAmount,
amount,
maxReservedValue,
)
sequenceStore ! SequenceStore.ReserveSequence(maxReservedValue, amount, sequenceSubId, responseMapper)
reserving(sequenceContext.copy(nextValue = newNextValue))
} else {
ready(sequenceContext.copy(nextValue = newNextValue))
}
} else {
Behaviors.unhandled
}
case ReceiveTimeout => Behaviors.stopped
case Initialize => Behaviors.unhandled
case _: WrappedSequenceStoreResponse => Behaviors.unhandled
private[this] def empty(sequenceContext: SequenceContext): Behavior[Command] = Behaviors.receiveMessage {
case msg: GenerateSequence =>
// no reply
logger.warn(
"Pending generate sequence because reserving sequence: current max reserved: {}, next sequence value: {}",
sequenceContext.maxReservedValue,
sequenceContext.nextValue,
)
stashBuffer.stash(msg)
prepareNextSequence(nextSequence = sequenceContext)
case WrappedSequenceStoreResponse(msg: SequenceStore.SequenceReserved) =>
stashBuffer.unstashAll(handleSequenceReserved(msg, sequenceContext))
case WrappedSequenceStoreResponse(msg: SequenceStore.SequenceReset) =>
stashBuffer.unstashAll(handleSequenceReset(msg, sequenceContext))
case WrappedSequenceStoreResponse(SequenceStore.ReservationFailed) => Behaviors.same
case ReceiveTimeout => Behaviors.stopped
case Initialize => Behaviors.unhandled
case WrappedSequenceStoreResponse(_: SequenceStore.InitialSequenceReserved) => Behaviors.unhandled
}

private[this] def acceptGenerateSequence(
msg: GenerateSequence,
sequenceContext: SequenceContext,
): Behavior[Command] = {
msg.replyTo ! SequenceGenerated(sequenceContext.nextValue, sequenceSubId)
logger.debug("SequenceGenerated: {}", sequenceContext.nextValue)
prepareNextSequence(nextSequence = sequenceContext.next())
}

@SuppressWarnings(Array("lerna.warts.CyclomaticComplexity"))
private[this] def prepareNextSequence(nextSequence: SequenceContext): Behavior[Command] = {
if (nextSequence.isOverflow) {
reset()
empty(nextSequence)
} else if (nextSequence.isEmpty) {
val freeAmount = nextSequence.freeAmount
if (freeAmount > 0) {
reserve(sequenceContext = nextSequence, amount = freeAmount)
empty(nextSequence)
} else {
val message =
s"freeAmount (${freeAmount.toString}) must be greater than 0 because freeAmount ≦ 0 means that the next sequence is overflow"
logger.error(new IllegalStateException(message), message)
Behaviors.stopped
}
} else if (nextSequence.isStarving) {
val freeAmount = nextSequence.freeAmount
if (freeAmount > 0) {
reserve(sequenceContext = nextSequence, amount = freeAmount)
ready(nextSequence)
} else {
ready(nextSequence)
}
} else {
ready(nextSequence)
}
}

@SuppressWarnings(Array("lerna.warts.CyclomaticComplexity", "org.wartremover.warts.Recursion"))
private[this] def reserving(implicit sequenceContext: SequenceContext): Behavior[Command] =
Behaviors.receiveMessage {
case msg: GenerateSequence =>
if (msg.sequenceSubId === sequenceSubId) {
import sequenceContext._

if (nextValue > maxSequenceValue) {
stashBuffer.stash(msg)
sequenceStore ! SequenceStore.ResetReserveSequence(
firstValue,
reservationAmount,
sequenceSubId,
responseMapper,
)
resetting
} else if (nextValue > maxReservedValue) {
logger.warn("Pending generate sequence because reserving sequence: {}", nextValue)
stashBuffer.stash(msg)
Behaviors.same
} else {
msg.replyTo ! SequenceGenerated(nextValue, sequenceSubId)
logger.debug("SequenceGenerated when reserving: {}", nextValue)
reserving(sequenceContext.copy(nextValue = nextValue + incrementStep))
}
} else {
Behaviors.unhandled
}
case WrappedSequenceStoreResponse(msg: SequenceStore.SequenceReserved) =>
stashBuffer.unstashAll(ready(sequenceContext.copy(maxReservedValue = msg.maxReservedValue)))
case WrappedSequenceStoreResponse(SequenceStore.ReservationFailed) =>
// 即座にリトライしたところで予約できる見込みは薄いケースがあるので、
// クライアントから再び採番を要求されたときにリトライする方針とする
stashBuffer.unstashAll(ready)
case ReceiveTimeout => Behaviors.stopped
case Initialize => Behaviors.unhandled
case WrappedSequenceStoreResponse(_: SequenceStore.InitialSequenceReserved) => Behaviors.unhandled
case WrappedSequenceStoreResponse(_: SequenceStore.SequenceReset) => Behaviors.unhandled
private[this] def reserve(sequenceContext: SequenceContext, amount: Int): Unit = {
logger.info(
"Reserving sequence: remain {}, add {}, current max reserved: {}",
sequenceContext.remainAmount,
amount,
sequenceContext.maxReservedValue,
)
sequenceStore ! SequenceStore.ReserveSequence(
sequenceContext.maxReservedValue,
amount,
sequenceSubId,
responseMapper,
)
}

private[this] def handleSequenceReserved(
msg: SequenceStore.SequenceReserved,
sequenceContext: SequenceContext,
): Behavior[Command] = {
if (msg.maxReservedValue > sequenceContext.maxReservedValue) {
val nextSequence = sequenceContext.copy(maxReservedValue = msg.maxReservedValue)
if (nextSequence.isOverflow) {
val message =
s"Worker should reserve sequence so that it does not overflow [${sequenceContext.toString}, ${msg.toString}]"
logger.error(new IllegalStateException(message), message)
Behaviors.stopped
} else if (nextSequence.isEmpty) {
val message =
s"Sequence normally never dries up after reserving [${sequenceContext.toString}, ${msg.toString}]"
logger.error(new IllegalStateException(message), message)
Behaviors.stopped
} else {
ready(nextSequence)
}
} else if (msg.maxReservedValue === sequenceContext.maxReservedValue) {
// 採番予約のリトライにより同じ結果が返ってきた場合
Behaviors.same
} else {
// SequenceStore が予約要求した順とはことなる順番で結果を返した場合に到達する可能性があるが、
// 現在の SequenceStore の実装では結果の順序が前後することがないので、到達しないはずのコード。
val message =
s"Ignore the maxReservedValue since it reverts to the old value [${sequenceContext.toString}, ${msg.toString}]"
logger.warn(new IllegalStateException(message), message)
Behaviors.same
}
}

private[this] def resetting: Behavior[Command] = Behaviors.receiveMessage[Command] {
case WrappedSequenceStoreResponse(msg: SequenceStore.SequenceReset) =>
private[this] def reset(): Unit = {
sequenceStore ! SequenceStore.ResetReserveSequence(
firstValue,
reservationAmount,
sequenceSubId,
responseMapper,
)
}

private[this] def handleSequenceReset(
msg: SequenceStore.SequenceReset,
sequenceContext: SequenceContext,
): Behavior[Command] = {
if (sequenceContext.isOverflow) {
logger.warn("reset sequence: {}", msg.maxReservedValue)
stashBuffer.unstashAll(ready(SequenceContext(msg.maxReservedValue, nextValue = firstValue)))
case message: GenerateSequence =>
stashBuffer.stash(message)
val nextSequence = SequenceContext(msg.maxReservedValue, nextValue = firstValue)
if (nextSequence.isOverflow) {
val message =
s"Worker should reset sequence so that it does not overflow [${sequenceContext.toString}, ${msg.toString}]"
logger.error(new IllegalStateException(message), message)
Behaviors.stopped
} else if (nextSequence.isEmpty) {
val message =
s"Sequence normally never dries up after resetting [${sequenceContext.toString}, ${msg.toString}]"
logger.error(new IllegalStateException(message), message)
Behaviors.stopped
} else {
ready(nextSequence)
}
} else {
// リセットのリトライによる応答が遅れて返ってきた場合
Behaviors.same
case ReceiveTimeout => Behaviors.stopped
case Initialize => Behaviors.unhandled
case WrappedSequenceStoreResponse(_: SequenceStore.InitialSequenceReserved) => Behaviors.unhandled
case WrappedSequenceStoreResponse(_: SequenceStore.SequenceReserved) => Behaviors.unhandled
case WrappedSequenceStoreResponse(SequenceStore.ReservationFailed) => Behaviors.unhandled // FIXME
}
}
}
Loading