Skip to content
This repository was archived by the owner on Mar 16, 2022. It is now read-only.

Commit

Permalink
Wip eventing tests √ (#168)
Browse files Browse the repository at this point in the history
* Removing the TestEventingSupport, instead opting to test against Google Pubsub

* Preparing the EventingSpec to run conditionally depending on whether there's a configured support available

* Improvements to Eventing and bugfixes
  • Loading branch information
viktorklang committed Jan 10, 2020
1 parent 5f44e09 commit 8fcad4a
Show file tree
Hide file tree
Showing 15 changed files with 357 additions and 143 deletions.
38 changes: 35 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ lazy val root = (project in file("."))
`java-support`,
`scala-support`,
`java-shopping-cart`,
`java-pingpong`,
`akka-client`,
operator,
`tck`,
Expand Down Expand Up @@ -572,7 +573,7 @@ lazy val `proxy-postgres` = (project in file("proxy/postgres"))
)

lazy val `proxy-tests` = (project in file("proxy/proxy-tests"))
.dependsOn(`proxy-core`, `akka-client`)
.dependsOn(`proxy-core`, `akka-client`, `java-pingpong`)
.settings(
common,
name := "cloudstate-proxy-tests",
Expand Down Expand Up @@ -783,7 +784,38 @@ lazy val `java-shopping-cart` = (project in file("samples/java-shopping-cart"))
// logLevel in assembly := Level.Debug,
assemblyMergeStrategy in assembly := {
/*ADD CUSTOMIZATIONS HERE*/
//case PathList("META-INF", "io.netty.versions.properties") => MergeStrategy.last
case PathList("META-INF", "io.netty.versions.properties") => MergeStrategy.last
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)
}
)

lazy val `java-pingpong` = (project in file("samples/java-pingpong"))
.dependsOn(`java-support`)
.enablePlugins(AkkaGrpcPlugin, AssemblyPlugin, JavaAppPackaging, DockerPlugin)
.settings(
name := "java-pingpong",
dockerSettings,
dockerBaseImage := "adoptopenjdk/openjdk8",
mainClass in Compile := Some("io.cloudstate.samples.pingpong.Main"),
PB.generate in Compile := (PB.generate in Compile).dependsOn(PB.generate in (`java-support`, Compile)).value,
akkaGrpcGeneratedLanguages := Seq(AkkaGrpc.Java),
PB.protoSources in Compile ++= {
val baseDir = (baseDirectory in ThisBuild).value / "protocols"
Seq(baseDir / "frontend", (sourceDirectory in Compile).value / "protos")
},
PB.targets in Compile := Seq(
PB.gens.java -> (sourceManaged in Compile).value
),
javacOptions in Compile ++= Seq("-encoding", "UTF-8", "-source", "1.8", "-target", "1.8"),
mainClass in assembly := (mainClass in Compile).value,
assemblyJarName in assembly := "java-pingpong.jar",
test in assembly := {},
// logLevel in assembly := Level.Debug,
assemblyMergeStrategy in assembly := {
/*ADD CUSTOMIZATIONS HERE*/
case PathList("META-INF", "io.netty.versions.properties") => MergeStrategy.last
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)
Expand All @@ -806,7 +838,7 @@ lazy val `scala-shopping-cart` = (project in file("samples/scala-shopping-cart")
// logLevel in assembly := Level.Debug,
assemblyMergeStrategy in assembly := {
/*ADD CUSTOMIZATIONS HERE*/
//case PathList("META-INF", "io.netty.versions.properties") => MergeStrategy.last
case PathList("META-INF", "io.netty.versions.properties") => MergeStrategy.last
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.cloudstate.javasupport;

import com.typesafe.config.Config;
import com.google.protobuf.Descriptors;
import io.cloudstate.javasupport.crdt.CrdtEntity;
import io.cloudstate.javasupport.crdt.CrdtEntityFactory;
Expand Down Expand Up @@ -216,7 +217,37 @@ public CloudState registerCrdtEntity(
* @return a CompletionStage which will be completed when the server has shut down.
*/
public CompletionStage<Done> start() {
return new CloudStateRunner(services).run();
return createRunner().run();
}

/**
* Starts a server with the configured entities, using the supplied configuration.
*
* @return a CompletionStage which will be completed when the server has shut down.
*/
public CompletionStage<Done> start(Config config) {
return createRunner(config).run();
}

/**
* Creates a CloudStateRunner using the currently configured services. In order to start the
* server, `run()` must be invoked on the returned CloudStateRunner.
*
* @return a CloudStateRunner
*/
public CloudStateRunner createRunner() {
return new CloudStateRunner(services);
}

/**
* Creates a CloudStateRunner using the currently configured services, using the supplied
* configuration. In order to start the server, `run()` must be invoked on the returned
* CloudStateRunner.
*
* @return a CloudStateRunner
*/
public CloudStateRunner createRunner(Config config) {
return new CloudStateRunner(services, config);
}

private AnySupport newAnySupport(Descriptors.FileDescriptor[] descriptors) {
Expand Down
2 changes: 1 addition & 1 deletion java-support/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ cloudstate {

coordinated-shutdown.exit-jvm = on

http.server{
http.server {
preview.enable-http2 = on

// Disable - we receive connections from localhost, so they'll never be dropped
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,15 @@ final class CloudStateRunner private[this] (_system: ActorSystem, services: Map[

// TODO JavaDoc
def this(services: java.util.Map[String, StatefulService]) {
this(
{
val conf = ConfigFactory.load()
// We do this to apply the cloud-state specific akka configuration to the ActorSystem we create for hosting the user function
ActorSystem("StatefulService", conf.getConfig("cloudstate.system").withFallback(conf))
},
services.asScala.toMap
)
this(ActorSystem("StatefulService", {
val conf = ConfigFactory.load()
conf.getConfig("cloudstate.system").withFallback(conf)
}), services.asScala.toMap)
}

// TODO JavaDoc
def this(services: java.util.Map[String, StatefulService], config: Config) {
this(ActorSystem("StatefulService", config), services.asScala.toMap)
}

private val rootContext = new Context {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ class AnySupportSpec extends WordSpec with Matchers with OptionValues {
defaultAny.typeUrl should ===(AnySupport.CloudStatePrimitive + name)
defaultAny.value.size() shouldBe 0
anySupport.decode(defaultAny) should ===(defaultValue)

}

"support se/deserializing strings" in testPrimitive("string", "foo", "")
Expand Down
11 changes: 1 addition & 10 deletions proxy/core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ cloudstate.proxy {

# Configures the eventing functionality
eventing {
# Valid options are: "none", "test", and "google-pubsub"
# Valid options are currently: "none", and "google-pubsub"
support = "none"

# This configuration is used when support is set to "google-pubsub"
Expand Down Expand Up @@ -115,14 +115,5 @@ cloudstate.proxy {
# NOT AVAILABLE YET: "using-crd" means that the proxy will create CRDs and a k8s operator will manage them
manage-topics-and-subscriptions = "manually"
}

test {
poll-initial-delay = 1s
poll-interval = 1s
data {
# Value of events.in on endpoint = <base64 rfc2045 encoded string literal of a Protobuf Any of the payload type denoted by the path>
# For example: foo = "CghOYWNob21hbhIGRE4tMDAxGgtEZWF0aCBOYWNobyAD"
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.slf4j.{Logger, LoggerFactory}
import Serve.{CommandHandler}

trait Emitter {
def emit(payload: ProtobufAny, method: MethodDescriptor): Boolean = ???
def emit(payload: ProtobufAny, method: MethodDescriptor): Boolean
}

object Emitters {
Expand All @@ -46,36 +46,6 @@ trait EventingSupport {
def createDestination(destinationName: String, handler: CommandHandler): Flow[ProtobufAny, AnyRef, NotUsed]
}

class TestEventingSupport(config: Config, materializer: ActorMaterializer) extends EventingSupport {
final val pollInitialDelay: FiniteDuration = config.getDuration("poll-initial-delay").toMillis.millis
final val pollInterval: FiniteDuration = config.getDuration("poll-interval").toMillis.millis

final val sampleData = config.getConfig("data")

def createSource(sourceName: String, handler: CommandHandler): Source[UserFunctionCommand, Future[Cancellable]] = {
EventingManager.log.debug("Creating eventing source for {}", sourceName)
val command =
sampleData.getString(sourceName) match {
case null | "" =>
EventingManager.log.error("No sample data found for {}", handler.fullCommandName)
throw new IllegalStateException(s"No test sample data found for ${handler.fullCommandName}")
case data => handler.serializer.parse(ProtobufAny.parseFrom(Base64.rfc2045.decode(data)))
}
Source.tick(pollInitialDelay, pollInterval, command).mapMaterializedValue(_ => Future.never)
}

def createDestination(destinationName: String, handler: CommandHandler): Flow[ProtobufAny, AnyRef, NotUsed] = {
val (msg, eventMsg) =
if (destinationName == "")
("Discarding response: {}", "Discarding event: {}")
else
("Publishing response: {}", "Publishing event: {}")

Flow[ProtobufAny]
.alsoTo(Sink.foreach(m => EventingManager.log.info(msg, m)))
}
}

object EventingManager {

final val log = LoggerFactory.getLogger("EventingManager")
Expand All @@ -90,10 +60,23 @@ object EventingManager {
val endpoints =
entity.serviceDescriptor.getMethods.iterator.asScala.foldLeft(Map.empty[MethodDescriptor, Eventing]) {
case (map, method) =>
val e = EventingProto.eventing.get(Options.convertMethodOptions(method))
// FIXME Validate that in and out are set appropriately!!
log.debug("EventingProto.events for {}", method.getFullName + " " + e)
e.filter(e => e.in != "" || e.out != "").fold(map)(map.updated(method, _))
EventingProto.eventing.get(Options.convertMethodOptions(method)) match {
case None => map
case Some(e) =>
(e.in, e.out) match {
case (null, null) | ("", "") => map
case (in, out) if in == out =>
throw new IllegalStateException(
s"Endpoint '${method.getFullName}' has the same input topic as output topic ('${in}'), this is not allowed."
)
case (in, out) =>
log.debug("EventingProto.events for {}: {} -> {}",
method.getFullName: AnyRef,
in: AnyRef,
out: AnyRef)
map.updated(method, e)
}
}
}

if (endpoints.isEmpty) Nil
Expand All @@ -102,15 +85,12 @@ object EventingManager {

def createSupport(eventConfig: Config)(implicit materializer: ActorMaterializer): Option[EventingSupport] =
eventConfig.getString("support") match {
case s @ "google-pubsub" =>
log.info("Creating google-pubsub eventing support")
Some(new GCPubsubEventingSupport(eventConfig.getConfig(s), materializer))
case s @ "test" =>
log.info("Creating test eventing support")
Some(new TestEventingSupport(eventConfig.getConfig(s), materializer))
case "none" =>
log.info("Eventing support turned off in configuration")
None
case s @ "google-pubsub" =>
log.info("Creating google-pubsub eventing support")
Some(new GCPubsubEventingSupport(eventConfig.getConfig(s), materializer))
case other =>
throw new IllegalStateException(s"Check your configuration. There is no eventing support named: $other")
}
Expand All @@ -137,7 +117,7 @@ object EventingManager {
] =
for {
EventMapping(entity, routes) <- eventMappings
(mdesc, eventing) <- routes
(mdesc, eventing) <- routes.toSeq // Important since we do not want dedupe that we get from the map otherwise
} yield {
log.info("Creating route for {}", eventing)
val commandHandler = new CommandHandler(entity, mdesc, router, noEmitter, entityDiscoveryClient, log) // Could we reuse these from Serve?
Expand Down

This file was deleted.

Loading

0 comments on commit 8fcad4a

Please sign in to comment.