Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph

import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.kernel.kamon.KamonMetricComponent
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.SparqlClient
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.indexing.IndexingViewDef.{ActiveViewDef, DeprecatedViewDef}
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.indexing.{IndexingViewDef, SparqlSink}
Expand Down Expand Up @@ -33,6 +34,8 @@ final class SparqlIndexingAction(
override val timeout: FiniteDuration
) extends IndexingAction {

override protected def kamonMetricComponent: KamonMetricComponent = KamonMetricComponent("blazegraph-indexing")

private def compile(view: IndexingViewDef, elem: Elem[GraphResource]): IO[Option[CompiledProjection]] = view match {
// Synchronous indexing only applies to views that index the latest version
case active: ActiveViewDef if active.selectFilter.tag == Tag.Latest =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch

import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.kernel.kamon.KamonMetricComponent
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient.Refresh
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.indexing.IndexingViewDef.{ActiveViewDef, DeprecatedViewDef}
Expand All @@ -10,8 +11,8 @@ import ch.epfl.bluebrain.nexus.delta.sdk.IndexingAction
import ch.epfl.bluebrain.nexus.delta.sourcing.config.BatchConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, ProjectRef, SuccessElemStream, Tag}
import ch.epfl.bluebrain.nexus.delta.sourcing.state.GraphResource
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Sink
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.*
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Sink
import fs2.Stream

import scala.concurrent.duration.FiniteDuration
Expand All @@ -35,6 +36,10 @@ final class ElasticSearchIndexingAction(
)(implicit cr: RemoteContextResolution)
extends IndexingAction {

override protected def kamonMetricComponent: KamonMetricComponent = KamonMetricComponent(
"elasticsearch-custom-indexing"
)

private def compile(view: IndexingViewDef, elem: Elem[GraphResource]): IO[Option[CompiledProjection]] = view match {
// Synchronous indexing only applies to views that index the latest version
case active: ActiveViewDef if active.selectFilter.tag == Tag.latest =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.indexing

import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.kernel.kamon.KamonMetricComponent
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient.Refresh
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.config.MainIndexConfig
Expand All @@ -23,6 +24,10 @@ final class MainIndexingAction(sink: Sink, override val timeout: FiniteDuration)
cr: RemoteContextResolution
) extends IndexingAction {

override protected def kamonMetricComponent: KamonMetricComponent = KamonMetricComponent(
"elasticsearch-main-indexing"
)

private def compile(project: ProjectRef, elem: Elem[GraphResource]) =
CompiledProjection.compile(
mainIndexingProjectionMetadata(project),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import cats.data.NonEmptyList
import cats.effect.{IO, Ref}
import cats.syntax.all.*
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.kernel.syntax.*
import ch.epfl.bluebrain.nexus.delta.kernel.kamon.KamonMetricComponent
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution
import ch.epfl.bluebrain.nexus.delta.sdk.IndexingAction.logger
import ch.epfl.bluebrain.nexus.delta.sdk.IndexingMode.{Async, Sync}
Expand All @@ -21,6 +23,8 @@ trait IndexingAction {

implicit private val bc: BatchConfig = BatchConfig.individual

protected def kamonMetricComponent: KamonMetricComponent

/**
* The maximum duration accepted to perform the synchronous indexing
* @return
Expand Down Expand Up @@ -53,7 +57,7 @@ trait IndexingAction {
.toList
errors <- errorsRef.get
} yield errors
}
}.span("sync-indexing")(kamonMetricComponent)

private def runProjection(compiled: CompiledProjection, saveFailedElems: List[FailedElem] => IO[Unit]) = {
for {
Expand Down