Skip to content

Commit

Permalink
DEX-1155 Fix sync with node (#526)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexander-branevskiy authored Apr 24, 2021
1 parent b88cf93 commit bedeebc
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -802,7 +802,7 @@ class OrderDynamicFeeTestSuite extends OrderFeeBaseTestSuite {

broadcastAndAwait(mkTransfer(alice, bob, defaultAssetQuantity / 2, eth, 0.005.waves))

dex1.restartWithNewSuiteConfig(ConfigFactory.parseString("waves.dex.order-fee.-1.mode = percent"))
dex1.restartWithNewSuiteConfig(ConfigFactory.parseString("waves.dex.order-fee.-1.mode = percent").withFallback(dexInitialSuiteConfig))
check()

dex1.restartWithNewSuiteConfig(ConfigFactory.parseString("waves.dex.order-fee.-1.mode = fixed").withFallback(dexInitialSuiteConfig))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ import com.wavesplatform.dex.grpc.integration.clients.domain.portfolio.Implicits
import com.wavesplatform.dex.grpc.integration.clients.domain.{TransactionWithChanges, WavesNodeEvent}
import com.wavesplatform.dex.grpc.integration.protobuf.PbToDexConversions._
import com.wavesplatform.dex.grpc.integration.settings.GrpcClientSettings
import com.wavesplatform.dex.grpc.integration.tool.RestartableManagedChannel
import com.wavesplatform.dex.it.api.HasToxiProxy
import com.wavesplatform.dex.it.docker.WavesNodeContainer
import com.wavesplatform.dex.it.test.NoStackTraceCancelAfterFailure
import im.mak.waves.transactions.Transaction
import io.grpc.ManagedChannel
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.nio.NioSocketChannel
import monix.eval.Task
Expand Down Expand Up @@ -63,13 +63,15 @@ class BlockchainUpdatesClientTestSuite extends IntegrationSuiteBase with HasToxi
noDataTimeout = 5.minutes
)

private lazy val blockchainUpdatesChannel: ManagedChannel =
grpcSettings.toNettyChannelBuilder
.executor((command: Runnable) => grpcExecutor.execute(command))
.eventLoopGroup(eventLoopGroup)
.channelType(classOf[NioSocketChannel])
.usePlaintext()
.build
private lazy val blockchainUpdatesChannel: RestartableManagedChannel =
new RestartableManagedChannel(() =>
grpcSettings.toNettyChannelBuilder
.executor((command: Runnable) => grpcExecutor.execute(command))
.eventLoopGroup(eventLoopGroup)
.channelType(classOf[NioSocketChannel])
.usePlaintext()
.build
)

private lazy val client =
new DefaultBlockchainUpdatesClient(eventLoopGroup, blockchainUpdatesChannel, monixScheduler, grpcSettings.noDataTimeout)(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package com.wavesplatform.dex.grpc.integration.clients.blockchainupdates

import com.wavesplatform.dex.domain.utils.ScorexLogging
import com.wavesplatform.dex.grpc.integration.effect.Implicits.NettyFutureOps
import io.grpc.ManagedChannel
import com.wavesplatform.dex.grpc.integration.tool.RestartableManagedChannel
import io.netty.channel.EventLoopGroup
import monix.execution.Scheduler

import java.util.concurrent.TimeUnit
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}

trait BlockchainUpdatesClient {
Expand All @@ -23,7 +23,7 @@ trait BlockchainUpdatesClient {

class DefaultBlockchainUpdatesClient(
eventLoopGroup: EventLoopGroup,
channel: ManagedChannel,
channel: RestartableManagedChannel,
monixScheduler: Scheduler,
noDataTimeout: FiniteDuration
)(implicit grpcExecutionContext: ExecutionContext)
Expand All @@ -34,8 +34,7 @@ class DefaultBlockchainUpdatesClient(

override def close(): Future[Unit] = {
blockchainEvents.close()
channel.shutdown()
channel.awaitTermination(500, TimeUnit.MILLISECONDS)
channel.shutdown(500.millis)
// TODO DEX-998
if (eventLoopGroup.isShuttingDown) Future.successful(())
else eventLoopGroup.shutdownGracefully(0, 500, TimeUnit.MILLISECONDS).asScala.map(_ => ())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import com.wavesplatform.dex.domain.utils.ScorexLogging
import com.wavesplatform.dex.grpc.integration.clients.ControlledStream.SystemEvent
import com.wavesplatform.dex.grpc.integration.clients.domain.BlockRef
import com.wavesplatform.dex.grpc.integration.protobuf.PbToDexConversions._
import com.wavesplatform.dex.grpc.integration.tool.RestartableManagedChannel
import com.wavesplatform.dex.grpc.observers.IntegrationObserver
import com.wavesplatform.events.api.grpc.protobuf.{BlockchainUpdatesApiGrpc, SubscribeEvent, SubscribeRequest}
import com.wavesplatform.events.protobuf.BlockchainUpdated.Append.Body
import com.wavesplatform.events.protobuf.BlockchainUpdated.Update
import io.grpc.stub.ClientCalls
import io.grpc.{CallOptions, ClientCall, Grpc, ManagedChannel}
import io.grpc.{CallOptions, ClientCall, Grpc}
import monix.execution.{Cancelable, Scheduler}
import monix.reactive.Observable
import monix.reactive.subjects.ConcurrentSubject
Expand All @@ -23,7 +24,7 @@ import scala.concurrent.duration.FiniteDuration
From the docs of reactive streams: the grammar must still be respected: (onNext)* (onComplete | onError)
On error we just restart the stream, so r receives updates from a new stream. That is why we don't propagate errors to r
*/
class GrpcBlockchainUpdatesControlledStream(channel: ManagedChannel, noDataTimeout: FiniteDuration)(implicit scheduler: Scheduler)
class GrpcBlockchainUpdatesControlledStream(channel: RestartableManagedChannel, noDataTimeout: FiniteDuration)(implicit scheduler: Scheduler)
extends BlockchainUpdatesControlledStream
with ScorexLogging {

Expand All @@ -39,7 +40,8 @@ class GrpcBlockchainUpdatesControlledStream(channel: ManagedChannel, noDataTimeo
require(height >= 1, "We can not get blocks on height <= 0")
log.info(s"Connecting to Blockchain events stream, getting blocks from $height")

val call = channel.newCall(BlockchainUpdatesApiGrpc.METHOD_SUBSCRIBE, CallOptions.DEFAULT)
channel.restart()
val call = channel.getChannel.newCall(BlockchainUpdatesApiGrpc.METHOD_SUBSCRIBE, CallOptions.DEFAULT)
val observer = new BlockchainUpdatesObserver(call)
grpcObserver.getAndSet(observer.some).foreach(_.close())
ClientCalls.asyncServerStreamingCall(call, new SubscribeRequest(height), observer)
Expand Down Expand Up @@ -119,6 +121,8 @@ class GrpcBlockchainUpdatesControlledStream(channel: ManagedChannel, noDataTimeo
if (grpcObserver.get().contains(this)) {
log.warn(s"No data for $noDataTimeout, restarting!")
stopGrpcObserver()
channel.stop()
internalSystemStream.onNext(SystemEvent.Stopped)
} else log.warn("False positive no-data timeout")
}))
.foreach(_.cancel())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import com.wavesplatform.dex.grpc.integration.protobuf.DexToPbConversions._
import com.wavesplatform.dex.grpc.integration.protobuf.PbToDexConversions._
import com.wavesplatform.dex.grpc.integration.services.UtxTransaction
import com.wavesplatform.dex.grpc.integration.settings.WavesBlockchainClientSettings
import com.wavesplatform.dex.grpc.integration.tool.RestartableManagedChannel
import com.wavesplatform.protobuf.transaction.SignedTransaction
import io.grpc.ManagedChannel
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.nio.NioSocketChannel
import monix.eval.Task
Expand All @@ -53,7 +53,7 @@ class CombinedWavesBlockchainClient(
type Balances = Map[Address, Map[Asset, Long]]
type Leases = Map[Address, Long]

@volatile private var blockchainStatus: Status = Starting()
@volatile private var blockchainStatus: Status = Starting()

override def status(): Status = blockchainStatus

Expand Down Expand Up @@ -242,6 +242,7 @@ class CombinedWavesBlockchainClient(

override def close(): Future[Unit] =
meClient.close().zip(bClient.close()).map(_ => ())

}

object CombinedWavesBlockchainClient extends ScorexLogging {
Expand All @@ -264,7 +265,7 @@ object CombinedWavesBlockchainClient extends ScorexLogging {
val eventLoopGroup = new NioEventLoopGroup

log.info(s"Building Matcher Extension gRPC client for server: ${wavesBlockchainClientSettings.grpc.target}")
val matcherExtensionChannel: ManagedChannel =
val matcherExtensionChannel =
wavesBlockchainClientSettings.grpc.toNettyChannelBuilder
.executor((command: Runnable) => grpcExecutionContext.execute(command))
.eventLoopGroup(eventLoopGroup)
Expand All @@ -273,13 +274,15 @@ object CombinedWavesBlockchainClient extends ScorexLogging {
.build

log.info(s"Building Blockchain Updates Extension gRPC client for server: ${wavesBlockchainClientSettings.blockchainUpdatesGrpc.target}")
val blockchainUpdatesChannel: ManagedChannel =
wavesBlockchainClientSettings.blockchainUpdatesGrpc.toNettyChannelBuilder
.executor((command: Runnable) => grpcExecutionContext.execute(command))
.eventLoopGroup(eventLoopGroup)
.channelType(classOf[NioSocketChannel])
.usePlaintext()
.build
val blockchainUpdatesChannel =
new RestartableManagedChannel(() =>
wavesBlockchainClientSettings.blockchainUpdatesGrpc.toNettyChannelBuilder
.executor((command: Runnable) => grpcExecutionContext.execute(command))
.eventLoopGroup(eventLoopGroup)
.channelType(classOf[NioSocketChannel])
.usePlaintext()
.build
)

new CombinedWavesBlockchainClient(
wavesBlockchainClientSettings.combinedClientSettings,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.wavesplatform.dex.grpc.integration.tool

import io.grpc.ManagedChannel

import java.util.concurrent.TimeUnit
import scala.concurrent.duration.Duration

final class RestartableManagedChannel(mkManagedChannel: () => ManagedChannel) {

private var channel: ManagedChannel = _
private var isClosed: Boolean = false

def stop(): Unit = synchronized {
checkIsClosed()
if (channel != null) {
channel.shutdown()
channel = null
}
}

def restart(): Unit = synchronized {
checkIsClosed()
if (channel != null)
channel.shutdown()
channel = mkManagedChannel()
}

def getChannel: ManagedChannel = synchronized {
checkIsClosed()
if (channel == null)
channel = mkManagedChannel()
channel
}

def shutdown(awaitTime: Duration): Unit = synchronized {
mkClosed()
if (channel != null) {
channel.shutdown()
channel.awaitTermination(awaitTime.toMillis, TimeUnit.MILLISECONDS)
channel = null
}
}

private def checkIsClosed(): Unit =
if (isClosed)
throw new RuntimeException("managed channel is closed")

private def mkClosed(): Unit = {
checkIsClosed()
isClosed = true
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package com.wavesplatform.dex.grpc.integration.tool

import com.wavesplatform.dex.WavesIntegrationSuiteBase
import io.grpc.ManagedChannel
import org.scalamock.scalatest.MockFactory

import java.util.concurrent.TimeUnit
import scala.concurrent.duration._

class RestartableManagedChannelSuite extends WavesIntegrationSuiteBase with MockFactory {

"RestartableManagedChannel should" - {

"getChannel" in {
val channel = mock[ManagedChannel]
val maker = mockFunction[ManagedChannel]
maker.expects().returning(channel).once()
val restartableManagedChannel = new RestartableManagedChannel(maker)
restartableManagedChannel.getChannel shouldBe channel
}

"shutdown" in testShutdown { (awaitTime, restartableManagedChannel) =>
restartableManagedChannel.shutdown(awaitTime)
}

"not do any ops after shutting down" in testShutdown { (awaitTime, restartableManagedChannel) =>
restartableManagedChannel.shutdown(awaitTime)
intercept[RuntimeException](restartableManagedChannel.restart())
intercept[RuntimeException](restartableManagedChannel.getChannel)
intercept[RuntimeException](restartableManagedChannel.shutdown(awaitTime))
}

"restart without triggering getChannel" in {
val channel = mock[ManagedChannel]
val maker = mockFunction[ManagedChannel]
maker.expects().returning(channel).once()
val restartableManagedChannel = new RestartableManagedChannel(maker)
restartableManagedChannel.restart()
restartableManagedChannel.getChannel shouldBe channel
}

"restart" in {
val channel1 = mock[ManagedChannel]
val channel2 = mock[ManagedChannel]
val maker = mockFunction[ManagedChannel]
maker.expects().returning(channel1).once()
maker.expects().returning(channel2).once()
(channel1.shutdown _).expects().returning(channel1).once()
val restartableManagedChannel = new RestartableManagedChannel(maker)
restartableManagedChannel.getChannel //force channel creation
restartableManagedChannel.restart()
}

"stop current channel" in {
val channel = mock[ManagedChannel]
(channel.shutdown _).expects().returning(channel).once()
val maker = mockFunction[ManagedChannel]
maker.expects().returning(channel).once()
val restartableManagedChannel = new RestartableManagedChannel(maker)
restartableManagedChannel.getChannel //force channel creation
restartableManagedChannel.stop()
}
}

private def testShutdown(f: (Duration, RestartableManagedChannel) => Unit): Unit = {
val awaitTime = 10.seconds
val channel = mock[ManagedChannel]
(channel.shutdown _).expects().returning(channel).once()
(channel.awaitTermination(_: Long, _: TimeUnit))
.expects(awaitTime.toMillis, TimeUnit.MILLISECONDS)
.returning(true)
.once()
val maker = mockFunction[ManagedChannel]
maker.expects().returning(channel).once()
val restartableManagedChannel = new RestartableManagedChannel(maker)
restartableManagedChannel.getChannel //force channel creation
f(awaitTime, restartableManagedChannel)
}

}

0 comments on commit bedeebc

Please sign in to comment.