Skip to content
This repository was archived by the owner on Mar 16, 2022. It is now read-only.

Commit

Permalink
Improvements to Eventing and bugfixes
Browse files Browse the repository at this point in the history
  • Loading branch information
viktorklang committed Dec 18, 2019
1 parent 5a67753 commit 2755449
Show file tree
Hide file tree
Showing 14 changed files with 352 additions and 107 deletions.
38 changes: 35 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ lazy val root = (project in file("."))
`java-support`,
`scala-support`,
`java-shopping-cart`,
`java-pingpong`,
`akka-client`,
operator,
`tck`,
Expand Down Expand Up @@ -572,7 +573,7 @@ lazy val `proxy-postgres` = (project in file("proxy/postgres"))
)

lazy val `proxy-tests` = (project in file("proxy/proxy-tests"))
.dependsOn(`proxy-core`, `akka-client`)
.dependsOn(`proxy-core`, `akka-client`, `java-pingpong`)
.settings(
common,
name := "cloudstate-proxy-tests",
Expand Down Expand Up @@ -783,7 +784,38 @@ lazy val `java-shopping-cart` = (project in file("samples/java-shopping-cart"))
// logLevel in assembly := Level.Debug,
assemblyMergeStrategy in assembly := {
/*ADD CUSTOMIZATIONS HERE*/
//case PathList("META-INF", "io.netty.versions.properties") => MergeStrategy.last
case PathList("META-INF", "io.netty.versions.properties") => MergeStrategy.last
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)
}
)

lazy val `java-pingpong` = (project in file("samples/java-pingpong"))
.dependsOn(`java-support`)
.enablePlugins(AkkaGrpcPlugin, AssemblyPlugin, JavaAppPackaging, DockerPlugin)
.settings(
name := "java-pingpong",
dockerSettings,
dockerBaseImage := "adoptopenjdk/openjdk8",
mainClass in Compile := Some("io.cloudstate.samples.pingpong.Main"),
PB.generate in Compile := (PB.generate in Compile).dependsOn(PB.generate in (`java-support`, Compile)).value,
akkaGrpcGeneratedLanguages := Seq(AkkaGrpc.Java),
PB.protoSources in Compile ++= {
val baseDir = (baseDirectory in ThisBuild).value / "protocols"
Seq(baseDir / "frontend", (sourceDirectory in Compile).value / "protos")
},
PB.targets in Compile := Seq(
PB.gens.java -> (sourceManaged in Compile).value
),
javacOptions in Compile ++= Seq("-encoding", "UTF-8", "-source", "1.8", "-target", "1.8"),
mainClass in assembly := (mainClass in Compile).value,
assemblyJarName in assembly := "java-pingpong.jar",
test in assembly := {},
// logLevel in assembly := Level.Debug,
assemblyMergeStrategy in assembly := {
/*ADD CUSTOMIZATIONS HERE*/
case PathList("META-INF", "io.netty.versions.properties") => MergeStrategy.last
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)
Expand All @@ -806,7 +838,7 @@ lazy val `scala-shopping-cart` = (project in file("samples/scala-shopping-cart")
// logLevel in assembly := Level.Debug,
assemblyMergeStrategy in assembly := {
/*ADD CUSTOMIZATIONS HERE*/
//case PathList("META-INF", "io.netty.versions.properties") => MergeStrategy.last
case PathList("META-INF", "io.netty.versions.properties") => MergeStrategy.last
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.cloudstate.javasupport;

import com.typesafe.config.Config;
import com.google.protobuf.Descriptors;
import io.cloudstate.javasupport.crdt.CrdtEntity;
import io.cloudstate.javasupport.crdt.CrdtEntityFactory;
Expand Down Expand Up @@ -216,7 +217,37 @@ public CloudState registerCrdtEntity(
* @return a CompletionStage which will be completed when the server has shut down.
*/
public CompletionStage<Done> start() {
return new CloudStateRunner(services).run();
return createRunner().run();
}

/**
* Starts a server with the configured entities, using the supplied configuration.
*
* @return a CompletionStage which will be completed when the server has shut down.
*/
public CompletionStage<Done> start(Config config) {
return createRunner(config).run();
}

/**
* Creates a CloudStateRunner using the currently configured services. In order to start the
* server, `run()` must be invoked on the returned CloudStateRunner.
*
* @return a CloudStateRunner
*/
public CloudStateRunner createRunner() {
return new CloudStateRunner(services);
}

/**
* Creates a CloudStateRunner using the currently configured services, using the supplied
* configuration. In order to start the server, `run()` must be invoked on the returned
* CloudStateRunner.
*
* @return a CloudStateRunner
*/
public CloudStateRunner createRunner(Config config) {
return new CloudStateRunner(services, config);
}

private AnySupport newAnySupport(Descriptors.FileDescriptor[] descriptors) {
Expand Down
2 changes: 1 addition & 1 deletion java-support/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ cloudstate {

coordinated-shutdown.exit-jvm = on

http.server{
http.server {
preview.enable-http2 = on

// Disable - we receive connections from localhost, so they'll never be dropped
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,15 @@ final class CloudStateRunner private[this] (_system: ActorSystem, services: Map[

// TODO JavaDoc
def this(services: java.util.Map[String, StatefulService]) {
this(
{
val conf = ConfigFactory.load()
// We do this to apply the cloud-state specific akka configuration to the ActorSystem we create for hosting the user function
ActorSystem("StatefulService", conf.getConfig("cloudstate.system").withFallback(conf))
},
services.asScala.toMap
)
this(ActorSystem("StatefulService", {
val conf = ConfigFactory.load()
conf.getConfig("cloudstate.system").withFallback(conf)
}), services.asScala.toMap)
}

// TODO JavaDoc
def this(services: java.util.Map[String, StatefulService], config: Config) {
this(ActorSystem("StatefulService", config), services.asScala.toMap)
}

private val rootContext = new Context {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ class AnySupportSpec extends WordSpec with Matchers with OptionValues {
defaultAny.typeUrl should ===(AnySupport.CloudStatePrimitive + name)
defaultAny.value.size() shouldBe 0
anySupport.decode(defaultAny) should ===(defaultValue)

}

"support se/deserializing strings" in testPrimitive("string", "foo", "")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,23 @@ object EventingManager {
val endpoints =
entity.serviceDescriptor.getMethods.iterator.asScala.foldLeft(Map.empty[MethodDescriptor, Eventing]) {
case (map, method) =>
val e = EventingProto.eventing.get(Options.convertMethodOptions(method))
// FIXME Validate that in and out are set appropriately!!
log.debug("EventingProto.events for {}", method.getFullName + " " + e)
e.filter(e => e.in != "" || e.out != "").fold(map)(map.updated(method, _))
EventingProto.eventing.get(Options.convertMethodOptions(method)) match {
case None => map
case Some(e) =>
(e.in, e.out) match {
case (null, null) | ("", "") => map
case (in, out) if in == out =>
throw new IllegalStateException(
s"Endpoint '${method.getFullName}' has the same input topic as output topic ('${in}'), this is not allowed."
)
case (in, out) =>
log.debug("EventingProto.events for {}: {} -> {}",
method.getFullName: AnyRef,
in: AnyRef,
out: AnyRef)
map.updated(method, e)
}
}
}

if (endpoints.isEmpty) Nil
Expand Down Expand Up @@ -104,7 +117,7 @@ object EventingManager {
] =
for {
EventMapping(entity, routes) <- eventMappings
(mdesc, eventing) <- routes
(mdesc, eventing) <- routes.toSeq // Important since we do not want dedupe that we get from the map otherwise
} yield {
log.info("Creating route for {}", eventing)
val commandHandler = new CommandHandler(entity, mdesc, router, noEmitter, entityDiscoveryClient, log) // Could we reuse these from Serve?
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package io.cloudstate.proxy.eventing
import java.io.File
import java.net.ServerSocket
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicReference}
import java.util.concurrent.{CompletionStage, TimeUnit}
import com.typesafe.config.Config

import akka.{Done, NotUsed}
import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem}
Expand All @@ -23,31 +25,26 @@ import scala.concurrent.duration._
import scala.sys.process.Process
import scala.util.Random

import io.cloudstate.pingpong._
import io.cloudstate.proxy.eventing.pingpong._
import io.cloudstate.javasupport.{CloudState, CloudStateRunner}

/**
This test requires a running Google Pubsub Emulator,
easiest is to start it in an emulator, just don't forget to stop the image.
`docker run --rm --expose=8085 --volume=/data --name=googlepubsub -d -p 8085:8085 google/cloud-sdk:latest /bin/sh -c "gcloud beta emulators pubsub start --project=test --host-port=0.0.0.0:8085 --data-dir=/data"`
*/
class GooglePubsubSpec extends WordSpec with BeforeAndAfterAll with Eventually with Matchers with ScalaFutures {
// Working directory should be the base directory of the sbt build, but in case it's not, try walking up the
// directory hierarchy until we find the the samples directory.
val baseDir = {
@annotation.tailrec
def locateBaseDir(dir: File): File =
if (new File(dir, "samples").isDirectory) {
dir
} else if (dir.getParentFile == null) {
sys.error("Could not locate base directory")
} else locateBaseDir(dir.getParentFile)
locateBaseDir(new File("."))
}
val nodeSupportDir = new File(baseDir, "node_support")
val crdtExampleDir = new File(new File(baseDir, "samples"), "js-shopping-cart")

lazy val projectId = System.getenv("PUBSUB_PROJECT_ID")

final def runTheseTests: Boolean = projectId != null

// We only need to start one user function, since the multiple Akka nodes can all connect to it, it doesn't make
// a difference whether they use different services or the same.
var userFunction: Process = _
var userFunction: CloudStateRunner = _
var userFunctionDone: CompletionStage[Done] = _
var node: ActorSystem = _
var testSystem: ActorSystem = _
var support: GCPubsubEventingSupport = _
Expand All @@ -59,13 +56,20 @@ class GooglePubsubSpec extends WordSpec with BeforeAndAfterAll with Eventually w
implicit def implicitTestSystem = testSystem
implicit def implicitMaterializer = materializer
implicit def implicitDispatcher = implicitTestSystem.dispatcher
"be tested" in pending // FIXME ADD TESTS HERE
"be tested" in {
assume(runTheseTests)
}
}

override protected def beforeAll(): Unit = {
val projectId = "test"
val pubsub_port = 8085
val pubsub_host = "localhost"
def startUserFunction(config: Config): CloudStateRunner =
new CloudState()
.registerEventSourcedEntity(classOf[PingPongEntity],
Pingpong.getDescriptor.findServiceByName("PingPongService"),
Pingpong.getDescriptor)
.createRunner(config)

override protected def beforeAll(): Unit = if (runTheseTests) {
val user_function_port = freePort()

implicit val s = ActorSystem(
"GooglePubsubSpec",
Expand All @@ -82,27 +86,23 @@ class GooglePubsubSpec extends WordSpec with BeforeAndAfterAll with Eventually w

val pubsub =
EventingManager
.createSupport(ConfigFactory.parseString(s"""
|support = "google-pubsub"
|google-pubsub.host = "${pubsub_host}"
|google-pubsub.port = ${pubsub_port}
|google-pubsub.rootCa = "none"
|google-pubsub.callCredentials = "none"
|google-pubsub.project-id = "${projectId}"
|google-pubsub.poll-interval = 1s
|google-pubsub.upstream-ack-deadline = 10s
|google-pubsub.downstream-batch-deadline = 5s
|google-pubsub.downstream-batch-size = 10
|google-pubsub.manage-topics-and-subscriptions = "by-proxy"
""".stripMargin))
.createSupport(ConfigFactory.load("googlepubsub.conf").resolve())
.collect({ case g: GCPubsubEventingSupport => g })
.getOrElse(throw new IllegalStateException("Unable to create EventingSupport for Google Pubsub"))

testSystem = s
materializer = m
support = pubsub

userFunction = Process(Seq("node", "index.js"), crdtExampleDir).run()
userFunction = startUserFunction(
ConfigFactory.parseString(s"""
|cloudstate.user-function-interface = "0.0.0.0"
|cloudstate.user-function-port = ${user_function_port}
|cloudstate.eventsourced.snapshot-every = 100
""".stripMargin).withFallback(ConfigFactory.defaultReference()).resolve()
)

userFunctionDone = userFunction.run()

node = CloudStateProxyMain.start(
ConfigFactory.load("dev-mode.conf").withFallback(ConfigFactory.defaultReference()).resolve()
Expand All @@ -119,7 +119,7 @@ class GooglePubsubSpec extends WordSpec with BeforeAndAfterAll with Eventually w
}

override protected def afterAll(): Unit = {
Option(userFunction).foreach(_.destroy())
Option(userFunction).foreach(_.terminate().toCompletableFuture().get(10, TimeUnit.SECONDS)) // FIXME timeout?
Option(testSystem).foreach(_.terminate())
Option(node).foreach(_.terminate())
}
Expand All @@ -129,5 +129,4 @@ class GooglePubsubSpec extends WordSpec with BeforeAndAfterAll with Eventually w
try socket.getLocalPort
finally socket.close()
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.cloudstate.samples.pingpong;

import io.cloudstate.javasupport.CloudState;
import io.cloudstate.pingpong.Pingpong;

public final class Main {
public static final void main(String[] args) throws Exception {
new CloudState()
.registerEventSourcedEntity(
PingPongEntity.class,
Pingpong.getDescriptor().findServiceByName("PingPongService"),
Pingpong.getDescriptor())
.start()
.toCompletableFuture()
.get();
}
}
Loading

0 comments on commit 2755449

Please sign in to comment.