Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 21 additions & 14 deletions delta/app/src/main/resources/app.conf
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,27 @@ app {
slow-query-threshold = 2 seconds
}

elem-query {
type = "delay"
batch-size = 30
delay = 2s
}

project-last-update {
batch {
# the maximum batching size, corresponding to the maximum number of elements being aggregated
# at the same time before pushing the update.
max-elements = 100
# the maximum batching duration.
max-interval = 1 second
}
query {
batch-size = 30
refresh-strategy = 1 second
}
inactive-interval = 10 minutes
}

# Database export configuration
export {
batch-size = 30
Expand Down Expand Up @@ -295,20 +316,6 @@ app {
}
}

project-last-update {
batch {
# the maximum batching size, corresponding to the maximum number of elements being aggregated
# at the same time before pushing the update.
max-elements = 100
# the maximum batching duration.
max-interval = 1 seconds
}
query {
batch-size = 30
refresh-strategy = 1s
}
}

# Type hierarchy configuration
type-hierarchy {
# the type hierarchy event-log configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.resources.ResourcesConfig
import ch.epfl.bluebrain.nexus.delta.sdk.schemas.SchemasConfig
import ch.epfl.bluebrain.nexus.delta.sdk.sse.SseConfig
import ch.epfl.bluebrain.nexus.delta.sdk.typehierarchy.TypeHierarchyConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.config.{DatabaseConfig, ProjectLastUpdateConfig, ProjectionConfig}
import ch.epfl.bluebrain.nexus.delta.sourcing.config.{DatabaseConfig, ElemQueryConfig, ProjectLastUpdateConfig, ProjectionConfig}
import ch.epfl.bluebrain.nexus.delta.sourcing.exporter.ExportConfig
import com.typesafe.config.Config
import pureconfig.ConfigReader
Expand Down Expand Up @@ -50,6 +50,7 @@ final case class AppConfig(
schemas: SchemasConfig,
typeHierarchy: TypeHierarchyConfig,
serviceAccount: ServiceAccountConfig,
elemQuery: ElemQueryConfig,
sse: SseConfig,
projections: ProjectionConfig,
projectLastUpdate: ProjectLastUpdateConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,18 @@ import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions.{projects, supervision}
import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.{ProjectHealer, ProjectRejection, ProjectsHealth}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.SupervisedDescription
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{ProjectActivitySignals, SupervisedDescription}
import io.circe.generic.semiauto.deriveEncoder
import io.circe.syntax.KeyOps
import io.circe.syntax.{EncoderOps, KeyOps}
import io.circe.{Encoder, Json}

class SupervisionRoutes(
identities: Identities,
aclCheck: AclCheck,
supervised: IO[List[SupervisedDescription]],
projectsHealth: ProjectsHealth,
projectHealer: ProjectHealer
projectHealer: ProjectHealer,
activitySignals: ProjectActivitySignals
)(implicit
baseUri: BaseUri,
cr: RemoteContextResolution,
Expand All @@ -54,6 +55,9 @@ class SupervisionRoutes(
if (projects.isEmpty) emit(StatusCodes.OK, IO.pure(allProjectsAreHealthy))
else emit(StatusCodes.InternalServerError, IO.pure(unhealthyProjectsEncoder(projects)))
}
},
(pathPrefix("activity") & pathPrefix("projects") & get & pathEndOrSingleSlash) {
emit(activitySignals.activityMap.map(_.asJson))
}
)
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.model._
import ch.epfl.bluebrain.nexus.delta.sdk.plugin.PluginDef
import ch.epfl.bluebrain.nexus.delta.sdk.projects.{OwnerPermissionsScopeInitialization, ProjectsConfig, ScopeInitializationErrorStore}
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.config.{DatabaseConfig, ProjectLastUpdateConfig, ProjectionConfig, QueryConfig}
import ch.epfl.bluebrain.nexus.delta.sourcing.config.{DatabaseConfig, ElemQueryConfig, ProjectLastUpdateConfig, ProjectionConfig, QueryConfig}
import ch.megard.akka.http.cors.scaladsl.settings.CorsSettings
import com.typesafe.config.Config
import izumi.distage.model.definition.{Id, ModuleDef}
Expand All @@ -57,6 +57,7 @@ class DeltaModule(appCfg: AppConfig, config: Config)(implicit classLoader: Class
make[FusionConfig].from { appCfg.fusion }
make[ProjectsConfig].from { appCfg.projects }
make[ProjectionConfig].from { appCfg.projections }
make[ElemQueryConfig].from { appCfg.elemQuery }
make[ProjectLastUpdateConfig].from { appCfg.projectLastUpdate }
make[QueryConfig].from { appCfg.projections.query }
make[BaseUri].from { appCfg.http.baseUri }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.organizations.Organizations
import ch.epfl.bluebrain.nexus.delta.sdk.projects.Projects
import ch.epfl.bluebrain.nexus.delta.sdk.sse.{SseElemStream, SseEncoder, SseEventLog}
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.query.ElemStreaming
import izumi.distage.model.definition.{Id, ModuleDef}

/**
Expand All @@ -40,9 +40,7 @@ object EventsModule extends ModuleDef {
)(jo)
}

make[SseElemStream].from { (qc: QueryConfig, xas: Transactors) =>
SseElemStream(qc, xas)
}
make[SseElemStream].from { (elemStreaming: ElemStreaming) => SseElemStream(elemStreaming) }

make[EventsRoutes].from {
(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.provisioning.ProjectProvisioning
import ch.epfl.bluebrain.nexus.delta.sdk.quotas.Quotas
import ch.epfl.bluebrain.nexus.delta.sdk.sse.SseEncoder
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.projections.ProjectLastUpdateStore
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Supervisor
import izumi.distage.model.definition.{Id, ModuleDef}

Expand Down Expand Up @@ -100,6 +101,7 @@ object ProjectsModule extends ModuleDef {
config: AppConfig,
serviceAccount: ServiceAccount,
supervisor: Supervisor,
projectLastUpdateStore: ProjectLastUpdateStore,
xas: Transactors,
clock: Clock[IO]
) =>
Expand All @@ -109,6 +111,7 @@ object ProjectsModule extends ModuleDef {
config.projects.deletion,
serviceAccount,
supervisor,
projectLastUpdateStore,
xas,
clock
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package ch.epfl.bluebrain.nexus.delta.wiring
import cats.effect.{Clock, IO, Sync}
import ch.epfl.bluebrain.nexus.delta.sdk.ResourceShifts
import ch.epfl.bluebrain.nexus.delta.sdk.stream.GraphResourceStream
import ch.epfl.bluebrain.nexus.delta.sourcing.config.{ProjectLastUpdateConfig, ProjectionConfig, QueryConfig}
import ch.epfl.bluebrain.nexus.delta.sourcing.projections.{ProjectLastUpdateStore, ProjectionErrors, Projections}
import ch.epfl.bluebrain.nexus.delta.sourcing.config.{ElemQueryConfig, ProjectLastUpdateConfig, ProjectionConfig}
import ch.epfl.bluebrain.nexus.delta.sourcing.projections.{ProjectLastUpdateStore, ProjectLastUpdateStream, ProjectionErrors, Projections}
import ch.epfl.bluebrain.nexus.delta.sourcing.query.ElemStreaming
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.PurgeProjectionCoordinator.PurgeProjection
import ch.epfl.bluebrain.nexus.delta.sourcing.stream._
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.pipes._
Expand All @@ -18,8 +19,13 @@ import izumi.distage.model.definition.ModuleDef
object StreamModule extends ModuleDef {
addImplicit[Sync[IO]]

make[GraphResourceStream].from { (qc: QueryConfig, xas: Transactors, shifts: ResourceShifts) =>
GraphResourceStream(qc, xas, shifts)
make[ElemStreaming].from {
(xas: Transactors, queryConfig: ElemQueryConfig, activitySignals: ProjectActivitySignals) =>
new ElemStreaming(xas, queryConfig, activitySignals)
}

make[GraphResourceStream].from { (elemStreaming: ElemStreaming, shifts: ResourceShifts) =>
GraphResourceStream(elemStreaming, shifts)
}

many[PipeDef].add(DiscardMetadata)
Expand Down Expand Up @@ -54,10 +60,18 @@ object StreamModule extends ModuleDef {
}

make[ProjectLastUpdateStore].from { (xas: Transactors) => ProjectLastUpdateStore(xas) }
make[ProjectLastUpdateStream].from { (xas: Transactors, config: ProjectLastUpdateConfig) =>
ProjectLastUpdateStream(xas, config.query)
}

make[ProjectLastUpdateProjection].fromEffect {
make[ProjectLastUpdateWrites].fromEffect {
(supervisor: Supervisor, store: ProjectLastUpdateStore, xas: Transactors, config: ProjectLastUpdateConfig) =>
ProjectLastUpdateProjection(supervisor, store, xas, config.batch, config.query)
ProjectLastUpdateWrites(supervisor, store, xas, config.batch)
}

make[ProjectActivitySignals].fromEffect {
(supervisor: Supervisor, stream: ProjectLastUpdateStream, clock: Clock[IO], config: ProjectLastUpdateConfig) =>
ProjectActivitySignals(supervisor, stream, clock, config.inactiveInterval)
}

make[PurgeProjectionCoordinator.type].fromEffect {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
import ch.epfl.bluebrain.nexus.delta.sdk.identities.Identities
import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.{ProjectHealer, ProjectsHealth}
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Supervisor
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{ProjectActivitySignals, Supervisor}
import izumi.distage.model.definition.{Id, ModuleDef}

/**
Expand All @@ -31,14 +31,16 @@ object SupervisionModule extends ModuleDef {
rc: RemoteContextResolution @Id("aggregate"),
jo: JsonKeyOrdering,
projectsHealth: ProjectsHealth,
projectHealer: ProjectHealer
projectHealer: ProjectHealer,
projectActivitySignals: ProjectActivitySignals
) =>
new SupervisionRoutes(
identities,
aclCheck,
supervisor.getRunningProjections(),
projectsHealth,
projectHealer
projectHealer,
projectActivitySignals
)(baseUri, rc, jo)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,29 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Anonymous, Authent
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Label, ProjectRef}
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.stream._
import fs2.concurrent.SignallingRef
import org.scalatest.Assertion

import java.time.Instant

class SupervisionRoutesSpec extends BaseRouteSpec {

private val superviser = User("superviser", realm)
private val supervisor = User("supervisor", realm)

implicit private val callerSuperviser: Caller =
Caller(superviser, Set(superviser, Anonymous, Authenticated(realm), Group("group", realm)))
implicit private val callerSupervisor: Caller =
Caller(supervisor, Set(supervisor, Anonymous, Authenticated(realm), Group("group", realm)))

private val asSuperviser = addCredentials(OAuth2BearerToken("superviser"))
private val asSupervisor = addCredentials(OAuth2BearerToken("supervisor"))

private val identities = IdentitiesDummy(callerSuperviser)
private val identities = IdentitiesDummy(callerSupervisor)
private val aclCheck = AclSimpleCheck().accepted

private val projectRef = ProjectRef(Label.unsafe("myorg"), Label.unsafe("myproject"))
private val projectRef2 = ProjectRef(Label.unsafe("myorg"), Label.unsafe("myproject2"))
private val project = ProjectRef.unsafe("myorg", "myproject")
private val project2 = ProjectRef.unsafe("myorg", "myproject2")

private val unhealthyProjects = Set(projectRef, projectRef2)
private val unhealthyProjects = Set(project, project2)

private val metadata = ProjectionMetadata("module", "name", Some(projectRef), None)
private val metadata = ProjectionMetadata("module", "name", Some(project), None)
private val progress = ProjectionProgress(Offset.start, Instant.EPOCH, 1L, 1L, 1L)
private val description1 =
SupervisedDescription(metadata, ExecutionStrategy.PersistentSingleNode, 1, ExecutionStatus.Running, progress)
Expand Down Expand Up @@ -69,21 +70,27 @@ class SupervisionRoutesSpec extends BaseRouteSpec {
override def heal(project: ProjectRef): IO[Unit] = IO.unit
}

private val activitySignals = new ProjectActivitySignals {
override def apply(project: ProjectRef): IO[Option[SignallingRef[IO, Boolean]]] = IO.none
override def activityMap: IO[Map[ProjectRef, Boolean]] = IO.pure(Map(project -> true, project2 -> false))
}

private def routesTemplate(unhealthyProjects: Set[ProjectRef], healer: ProjectHealer) = Route.seal(
new SupervisionRoutes(
identities,
aclCheck,
IO.pure { List(description1, description2) },
projectsHealth(unhealthyProjects),
healer
healer,
activitySignals
).routes
)

private val routes = routesTemplate(Set.empty, noopHealer)

override def beforeAll(): Unit = {
super.beforeAll()
aclCheck.append(AclAddress.Root, superviser -> Set(supervision.read, projects.write)).accepted
aclCheck.append(AclAddress.Root, supervisor -> Set(supervision.read, projects.write)).accepted
}

"The supervision projection endpoint" should {
Expand All @@ -95,7 +102,7 @@ class SupervisionRoutesSpec extends BaseRouteSpec {
}

"be accessible with supervision/read permission and return expected payload" in {
Get("/v1/supervision/projections") ~> asSuperviser ~> routes ~> check {
Get("/v1/supervision/projections") ~> asSupervisor ~> routes ~> check {
response.status shouldEqual StatusCodes.OK
response.asJson shouldEqual jsonContentOf("supervision/supervision-running-proj-response.json")
}
Expand All @@ -113,14 +120,14 @@ class SupervisionRoutesSpec extends BaseRouteSpec {

"return a successful http code when there are no unhealthy projects" in {
val routesWithHealthyProjects = routesTemplate(Set.empty, noopHealer)
Get("/v1/supervision/projects") ~> asSuperviser ~> routesWithHealthyProjects ~> check {
Get("/v1/supervision/projects") ~> asSupervisor ~> routesWithHealthyProjects ~> check {
response.status shouldEqual StatusCodes.OK
}
}

"return an error code when there are unhealthy projects" in {
val routesWithUnhealthyProjects = routesTemplate(unhealthyProjects, noopHealer)
Get("/v1/supervision/projects") ~> asSuperviser ~> routesWithUnhealthyProjects ~> check {
Get("/v1/supervision/projects") ~> asSupervisor ~> routesWithUnhealthyProjects ~> check {
response.status shouldEqual StatusCodes.InternalServerError
response.asJson shouldEqual
json"""
Expand Down Expand Up @@ -154,7 +161,7 @@ class SupervisionRoutesSpec extends BaseRouteSpec {
val project = ProjectRef(Label.unsafe("myorg"), Label.unsafe("myproject"))
val routesWithHealer = routesTemplate(Set.empty, projectHealer)

Post(s"/v1/supervision/projects/$project/heal") ~> asSuperviser ~> routesWithHealer ~> check {
Post(s"/v1/supervision/projects/$project/heal") ~> asSupervisor ~> routesWithHealer ~> check {
response.status shouldEqual StatusCodes.OK
response.asJson shouldEqual
json"""
Expand All @@ -168,7 +175,7 @@ class SupervisionRoutesSpec extends BaseRouteSpec {

"return an error if the healing failed" in {
val routesWithFailingHealer = routesTemplate(Set.empty, failingHealer)
Post("/v1/supervision/projects/myorg/myproject/heal") ~> asSuperviser ~> routesWithFailingHealer ~> check {
Post("/v1/supervision/projects/myorg/myproject/heal") ~> asSupervisor ~> routesWithFailingHealer ~> check {
response.status shouldEqual StatusCodes.InternalServerError
response.asJson shouldEqual
json"""
Expand All @@ -184,4 +191,21 @@ class SupervisionRoutesSpec extends BaseRouteSpec {

}

"The supervision project activity endpoint" should {

"be forbidden without supervision/read permission" in {
Get("/v1/supervision/activity/projects") ~> routes ~> check {
response.shouldBeForbidden
}
}

"be accessible with supervision/read permission and return expected payload" in {
Get("/v1/supervision/activity/projects") ~> asSupervisor ~> routes ~> check {
response.status shouldEqual StatusCodes.OK
response.asJson shouldEqual json"""{ "$project": true, "$project2": false }"""
}
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@ import ch.epfl.bluebrain.nexus.delta.sdk.identities.Identities
import ch.epfl.bluebrain.nexus.delta.sdk.model._
import ch.epfl.bluebrain.nexus.delta.sdk.projects.{FetchContext, Projects}
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.projections.Projections
import ch.epfl.bluebrain.nexus.delta.sourcing.query.SelectFilter
import ch.epfl.bluebrain.nexus.delta.sourcing.query.{ElemStreaming, SelectFilter}
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Supervisor
import izumi.distage.model.definition.{Id, ModuleDef}

Expand All @@ -33,8 +32,8 @@ class GraphAnalyticsPluginModule(priority: Int) extends ModuleDef {
GraphAnalytics(client, fetchContext, config.prefix, config.termAggregations)
}

make[GraphAnalyticsStream].from { (qc: QueryConfig, xas: Transactors) =>
GraphAnalyticsStream(qc, xas)
make[GraphAnalyticsStream].from { (elemStreaming: ElemStreaming, xas: Transactors) =>
GraphAnalyticsStream(elemStreaming, xas)
}

make[GraphAnalyticsCoordinator].fromEffect {
Expand Down
Loading