Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ plugins {
application

id("com.google.protobuf") version "0.9.4"
id("org.pkl-lang") version "0.30.2"
id("com.ncorti.ktfmt.gradle") version "0.24.0"
id("org.pkl-lang") version "0.30.2"

kotlin("jvm")
kotlin("kapt") version "2.3.0"
kotlin("jvm") version "2.3.0"
}

group = "ac.at.uibk.dps.cirrina"
Expand Down Expand Up @@ -53,6 +54,9 @@ dependencies {
implementation("com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.15.1")
implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.15.1")

implementation("com.google.dagger:dagger:2.59")
kapt("com.google.dagger:dagger-compiler:2.59")

implementation("com.google.guava:guava:33.0.0-jre")

implementation("com.google.protobuf:protobuf-java:4.32.0")
Expand All @@ -63,6 +67,9 @@ dependencies {

implementation("io.github.microutils:kotlin-logging:3.0.5")

implementation("io.micrometer:micrometer-core:1.17.0-M1")
implementation("io.micrometer:micrometer-registry-influx:1.17.0-M1")

implementation("io.nats:jnats:2.17.3")

implementation(platform("io.opentelemetry:opentelemetry-bom:1.38.0"))
Expand Down
10 changes: 0 additions & 10 deletions compose.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
configs:
telegraf_config:
file: ./telegraf.conf
services:
influxdb:
image: "influxdb:2"
Expand All @@ -13,13 +10,6 @@ services:
- DOCKER_INFLUXDB_INIT_ORG=org
- DOCKER_INFLUXDB_INIT_BUCKET=bucket
- DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=bzO10KmR8x
telegraf:
image: "telegraf:latest"
ports:
- "4317:4317"
configs:
- source: telegraf_config
target: /etc/telegraf/telegraf.conf
nats:
image: "nats:latest"
command:
Expand Down
94 changes: 20 additions & 74 deletions src/main/java/at/ac/uibk/dps/cirrina/cirrina/Cirrina.kt
Original file line number Diff line number Diff line change
@@ -1,106 +1,52 @@
package at.ac.uibk.dps.cirrina.cirrina

import at.ac.uibk.dps.cirrina.execution.`object`.context.Context
import at.ac.uibk.dps.cirrina.execution.`object`.context.EtcdContext
import at.ac.uibk.dps.cirrina.execution.`object`.event.EventHandler
import at.ac.uibk.dps.cirrina.execution.`object`.event.NatsEventHandler
import at.ac.uibk.dps.cirrina.execution.service.RandomServiceImplementationSelector
import at.ac.uibk.dps.cirrina.execution.service.ServiceImplementationBuilder
import at.ac.uibk.dps.cirrina.io.CsmParser
import java.net.URI
import at.ac.uibk.dps.cirrina.di.DaggerCirrinaComponent
import java.util.logging.LogManager
import mu.KotlinLogging
import org.apache.commons.lang3.builder.ToStringBuilder
import org.apache.commons.lang3.builder.ToStringStyle.SIMPLE_STYLE
import org.apache.commons.lang3.builder.ToStringStyle.SHORT_PREFIX_STYLE

private val logger = KotlinLogging.logger {}

/** The primary orchestrator for the Cirrina runtime environment. */
class Cirrina {

/** Bootstraps and executes the runtime. */
fun run() {
try {
setupEventHandler().use { eventHandler ->
setupPersistentContext().use { persistentContext ->
buildRuntime(eventHandler, persistentContext).run()
}
}
} catch (ex: Exception) {
logger.error(ex) { "a fatal error occurred during runtime execution" }
}
}

private fun setupEventHandler(): EventHandler =
newEventHandler().also { handler ->
if (handler is NatsEventHandler) {
logger.info { "awaiting nats connection..." }
val component = DaggerCirrinaComponent.create()

handler.awaitReady(NATS_CONNECTION_TIMEOUT)
handler.subscribe(NatsEventHandler.GLOBAL_SOURCE, "*")
handler.subscribe(NatsEventHandler.PERIPHERAL_SOURCE, "*")
}
}

private fun setupPersistentContext(): Context =
newPersistentContext().also { context ->
if (context is EtcdContext) {
logger.info { "awaiting etcd connection..." }

context.awaitReady(ETCD_CONNECTION_TIMEOUT)
}
}
// Run the runtime
runCatching {
component.eventHandler().use { _ ->
component.persistentContext().use { _ -> component.runtime().run() }
}

private fun buildRuntime(eventHandler: EventHandler, persistentContext: Context): Runtime =
CsmParser.parseServiceImplementationBindings(
URI(EnvironmentVariables.serviceBindingsPath.get())
)
.let { bindings ->
Runtime(
URI(EnvironmentVariables.appPath.get()),
EnvironmentVariables.instantiate.get(),
eventHandler,
persistentContext,
RandomServiceImplementationSelector(
ServiceImplementationBuilder.from(bindings.bindings).build().getOrThrow()
),
)
// Flush the metrics
component.meterRegistry().close()
}

private fun newEventHandler(): EventHandler =
when (EnvironmentVariables.eventProvider.get()) {
EventProvider.NATS -> NatsEventHandler(EnvironmentVariables.natsEventUrl.get())
}

private fun newPersistentContext(): Context =
when (EnvironmentVariables.contextProvider.get()) {
PersistentContextProvider.ETCD ->
EtcdContext(listOf(EnvironmentVariables.etcdContextUrl.get()))
}
.onFailure { ex -> logger.error(ex) { "a fatal error occurred during runtime execution" } }
}

companion object {
const val NATS_CONNECTION_TIMEOUT = 60000L
const val ETCD_CONNECTION_TIMEOUT = 60000L
/** Timeout for the NATS connection. */
const val NATS_CONNECTION_TIMEOUT = 1000L

/** Timeout for the ETCD connection. */
const val ETCD_CONNECTION_TIMEOUT = 1000L

init {
ToStringBuilder.setDefaultStyle(SIMPLE_STYLE)
configureStringBuilder()
configureLogging()
startHealthService()
}

private fun configureStringBuilder() = ToStringBuilder.setDefaultStyle(SHORT_PREFIX_STYLE)

private fun configureLogging() =
runCatching {
Cirrina::class.java.getResourceAsStream("/logging.properties")?.use {
LogManager.getLogManager().readConfiguration(it)
} ?: logger.warn { "logging properties file not found" }
}
.onFailure { logger.error(it) { "could not load logging properties" } }

private fun startHealthService() =
runCatching { HealthService(EnvironmentVariables.healthPort.get()) }
.onFailure { logger.error(it) { "could not start health service" } }
}
}

/** Global application entry point. */
fun main() = Cirrina().run()
Original file line number Diff line number Diff line change
Expand Up @@ -13,53 +13,58 @@ enum class PersistentContextProvider {
/** An environment variable, which can be converted from a string to the required type. */
data class EnvironmentVariable<T>(
val name: String,
val required: Boolean = false,
val default: T? = null,
val default: T,
val mapper: (String) -> T = { it as T },
) {
/** Get the value of the environment variable. */
fun get(): T {
val value = System.getenv(name)
return when {
value != null -> mapper(value)
default != null -> default
required -> error("missing required environment variable: $name")
else -> error("environment variable '$name' is missing but not marked required")
else -> default
}
}
}

/** Common interface for acquiring environment variables. */
object EnvironmentVariables {
/** The NATS event server URL. */
val natsEventUrl = EnvironmentVariable("NATS_EVENT_URL", default = "nats://localhost:4222/")
val natsEventUrl = EnvironmentVariable("NATS_EVENT_URL", "nats://localhost:4222/")

/** The Etcd context server URL. */
val etcdContextUrl = EnvironmentVariable("ETCD_CONTEXT_URL", default = "http://localhost:2379")
val etcdContextUrl = EnvironmentVariable("ETCD_CONTEXT_URL", "http://localhost:2379")

/** InfluxDB metric variables. */
val influxMetricUrl = EnvironmentVariable("INFLUX_METRIC_URL", "http://localhost:8086")

val influxMetricOrg = EnvironmentVariable("INFLUX_METRIC_ORG", "org")
val influxMetricBucket = EnvironmentVariable("INFLUX_METRIC_BUCKET", "bucket")
val influxMetricToken = EnvironmentVariable("INFLUX_METRIC_TOKEN", "bzO10KmR8x")

/** The path to the CSML application. */
val appPath = EnvironmentVariable<String>("APP_PATH", required = true)
val csmMainUri = EnvironmentVariable("CSM_MAIN_URI", "file:///app/main.pkl")

/** The path to the service implementation bindings. */
val serviceBindingsPath = EnvironmentVariable<String>("SERVICE_BINDINGS_PATH", required = true)
val csmServiceBindingsUri =
EnvironmentVariable("CSM_SERVICE_BINDINGS_URL", "file:///app/services.pkl")

/** The state machine names to instantiate. */
val instantiate =
val csmStateMachines =
EnvironmentVariable(
name = "INSTANTIATE",
default = emptyList(),
mapper = { value -> value.split(",").map { it.trim() }.filter { it.isNotEmpty() } },
"CSM_STATE_MACHINES",
emptyList(),
{ value -> value.split(",").map { it.trim() }.filter { it.isNotEmpty() } },
)

/** The event provider to use. */
val eventProvider =
EnvironmentVariable(
name = "EVENT_PROVIDER",
default = EventProvider.NATS,
mapper = { value ->
"EVENT_PROVIDER",
EventProvider.NATS,
{ value ->
try {
EventProvider.valueOf(value.uppercase())
} catch (_: IllegalArgumentException) {
} catch (_: Exception) {
error("invalid value for environment variable EVENT_PROVIDER")
}
},
Expand All @@ -68,17 +73,14 @@ object EnvironmentVariables {
/** The context provider to use. */
val contextProvider =
EnvironmentVariable(
name = "CONTEXT_PROVIDER",
default = PersistentContextProvider.ETCD,
mapper = { value ->
"CONTEXT_PROVIDER",
PersistentContextProvider.ETCD,
{ value ->
try {
PersistentContextProvider.valueOf(value.uppercase())
} catch (_: IllegalArgumentException) {
} catch (_: Exception) {
error("invalid value for environment variable CONTEXT_PROVIDER")
}
},
)

/** The port to use for the health server. */
val healthPort = EnvironmentVariable("HEALTH_PORT", default = 0xCAFE) { it.toInt() }
}
37 changes: 0 additions & 37 deletions src/main/java/at/ac/uibk/dps/cirrina/cirrina/HealthService.kt

This file was deleted.

Loading
Loading