Skip to content

Merge redesign into develop #50

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Nov 9, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
df479f6
Upgrade to OPAL 4.0
johannesduesing Jun 17, 2021
d6f09a0
Adopt tests: Change http to https, adapt to OPAL upgrade
johannesduesing Jun 17, 2021
3717948
Merge pull request #1 from johannesduesing/feature/upgrade-opal
johannesduesing Jun 18, 2021
73664f6
Upgrade to latest version of Elastic4s. Upgrade to latest AKKA versio…
johannesduesing Oct 6, 2021
b9de0e9
Upgrade to scala 2.12.15, move to SBT 1.5.5, cleanup and upgrade depe…
johannesduesing Oct 6, 2021
4453790
Upgrade to OPAL 4.0.0, fixed test errors, removed unused code
johannesduesing Oct 18, 2021
da60e4e
Fixed various deprecation issues
johannesduesing Oct 18, 2021
3412b30
Crawler now mines POM file information only. Functionality for calcul…
johannesduesing Oct 19, 2021
f5e55e5
Add publish date to index
johannesduesing Oct 19, 2021
0fe70c0
Switched to an external library for resolving effective POM models. T…
johannesduesing Oct 20, 2021
3ec7241
Reintroduced Hermes Analyzer into pipeline, fixed bug related to OPAL…
johannesduesing Oct 20, 2021
63a8469
Add error handling for different processing phases
johannesduesing Oct 20, 2021
0171ae5
Removed unused code
johannesduesing Oct 20, 2021
42b8e3b
Add Tests for PomFileReadActor. Removed unused GitIdentifier. Fixed b…
johannesduesing Oct 21, 2021
7fb1f48
Removed unused library
johannesduesing Oct 21, 2021
1d168c2
Merge remote-tracking branch 'origin/develop' into feature/redesign
johannesduesing Oct 21, 2021
422e828
Fixed some unecessary merge artefacts
johannesduesing Oct 21, 2021
8afd7cb
Upgrade to latest version of JWT core to mitigate vulnerabilties dete…
johannesduesing Oct 21, 2021
d5b1a20
Pinned secure version of guava
johannesduesing Oct 21, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add error handling for different processing phases
  • Loading branch information
johannesduesing committed Oct 20, 2021
commit 63a846901010a687da0b42e3160b6dbb1f8cdb5e
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,20 @@

package de.upb.cs.swt.delphi.crawler.discovery.maven

import akka.NotUsed
import akka.actor.{ActorRef, ActorSystem}
import akka.event.LoggingAdapter
import akka.pattern.ask
import akka.routing.{RoundRobinPool, SmallestMailboxPool}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.{Sink, Source}
import akka.util.Timeout
import com.sksamuel.elastic4s.ElasticApi.RichFuture
import com.sksamuel.elastic4s.ElasticClient
import de.upb.cs.swt.delphi.core.model.MavenIdentifier
import de.upb.cs.swt.delphi.crawler.control.Phase
import de.upb.cs.swt.delphi.crawler.control.Phase.Phase
import de.upb.cs.swt.delphi.crawler.model.MavenArtifact
import de.upb.cs.swt.delphi.crawler.model.ProcessingPhase.ProcessingPhase
import de.upb.cs.swt.delphi.crawler.model.{MavenArtifact, ProcessingError, ProcessingPhase, ProcessingPhaseFailedException}
import de.upb.cs.swt.delphi.crawler.preprocessing.MavenDownloadActor
import de.upb.cs.swt.delphi.crawler.processing.{HermesActor, HermesResults, PomFileReadActor}
import de.upb.cs.swt.delphi.crawler.storage.ArtifactExistsQuery
Expand Down Expand Up @@ -75,7 +77,40 @@ class MavenDiscoveryProcess(configuration: Configuration, elasticPool: ActorRef)
implicit val client: ElasticClient =
ElasticHelper.buildElasticClient(configuration)

var filteredSource =
val identifierSource = buildThrottledSource()

implicit val timeout: Timeout = Timeout(5 minutes)

val preprocessing = buildPreprocessingPipeline(identifierSource)

preprocessing
.mapAsync(8)(artifact => (pomReaderPool ? artifact).mapTo[MavenArtifact])
.filter{ artifact =>

if(artifact.metadata.isEmpty){
(elasticPool ? new ProcessingError(artifact.identifier, ProcessingPhase.PomProcessing,
"Unexpected error while processing POM file.", None)).await
false
} else if(!checkValidPackaging(artifact)) {
log.error(s"Invalid packaging for ${artifact.identifier.toUniqueString}")
(elasticPool ? new ProcessingError(artifact.identifier, ProcessingPhase.PomProcessing,
"JAR file not found although packaging is JAR", None)).await
false
} else {
true
}
}
.alsoTo(storageSink[MavenArtifact]())
.mapAsync(8)(artifact => (hermesPool ? artifact).mapTo[Try[HermesResults]])
.filter(isSuccessAndReportFailure(_, ProcessingPhase.MetricsExtraction))
.map(_.get)
.runWith(storageSink[HermesResults]())

Success(0L)
}

private def buildThrottledSource()(implicit log: LoggingAdapter, client: ElasticClient): Source[MavenIdentifier, NotUsed] = {
var source =
createSource(configuration.mavenRepoBase)
.filter(m => {
val before = seen.contains(m)
Expand All @@ -85,47 +120,39 @@ class MavenDiscoveryProcess(configuration: Configuration, elasticPool: ActorRef)
.filter(m => !exists(m)) // ask elastic
.throttle(configuration.throttle.element, configuration.throttle.per, configuration.throttle.maxBurst, configuration.throttle.mode)


if (configuration.limit > 0) {
filteredSource = filteredSource.take(configuration.limit)
source = source.take(configuration.limit)
}

implicit val timeout: Timeout = Timeout(5 minutes)

val preprocessing =
filteredSource
.alsoTo(createSinkFromActorRef[MavenIdentifier](elasticPool))
.mapAsync(8)(identifier => (downloaderPool ? identifier).mapTo[Try[MavenArtifact]])
.filter(artifact => artifact.isSuccess)
.map(artifact => artifact.get)


val processing =
preprocessing
.mapAsync(8)(artifact => (pomReaderPool ? artifact).mapTo[MavenArtifact])
.filter{ artifact =>
val isValid = checkValidPackaging(artifact)

if(!isValid){
log.error(s"Invalid packaging for ${artifact.identifier.toUniqueString}")
}

isValid
}
.alsoTo(createSinkFromActorRef[MavenArtifact](elasticPool))
.mapAsync(8)(artifact => (hermesPool ? artifact).mapTo[Try[HermesResults]])
.filter(_.isSuccess)
.map(_.get)
.runWith(createSinkFromActorRef[HermesResults](elasticPool))
source
}

Success(0L)
private def buildPreprocessingPipeline(source: Source[MavenIdentifier, _])(implicit timeout: Timeout): Source[MavenArtifact, _] = {
source
.alsoTo(storageSink[MavenIdentifier]())
.mapAsync(8)(identifier => (downloaderPool ? identifier).mapTo[Try[MavenArtifact]])
.filter(isSuccessAndReportFailure(_, ProcessingPhase.Downloading))
.map(artifact => artifact.get)
}

private def checkValidPackaging(artifact: MavenArtifact): Boolean = {
artifact.jarFile.isDefined ||
artifact.metadata.isDefined && !artifact.metadata.get.packaging.equalsIgnoreCase("jar")
}

private def isSuccessAndReportFailure[T](element: Try[T], processingPhase: ProcessingPhase)
(implicit timeout: Timeout) = element match {
case Failure(ProcessingPhaseFailedException(ident, cause)) =>
(elasticPool ? new ProcessingError(ident, processingPhase, cause.getMessage, Some(cause))).await
false
case Failure(ex) =>
(elasticPool ? new ProcessingError(null, processingPhase, ex.getMessage, Some(ex))).await
false
case _ => true
}

private def storageSink[T](): Sink[T, NotUsed] = createSinkFromActorRef[T](elasticPool)

private def createSinkFromActorRef[T](actorRef: ActorRef) = {
Sink.actorRefWithAck[T](actorRef, StreamInitialized, Ack, StreamCompleted, StreamFailure)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright (C) 2018 The Delphi Team.
// See the LICENCE file distributed with this work for additional
// information regarding copyright ownership.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package de.upb.cs.swt.delphi.crawler.model

import de.upb.cs.swt.delphi.core.model.MavenIdentifier
import de.upb.cs.swt.delphi.crawler.model.ProcessingPhase.ProcessingPhase

case class ProcessingError(identifier: MavenIdentifier, phase: ProcessingPhase, message: String, cause: Option[Throwable])

object ProcessingError {

def newDownloadError(ident: MavenIdentifier, cause: Throwable): ProcessingError = {
new ProcessingError(ident, ProcessingPhase.Downloading, cause.getMessage, Some(cause))
}

def newPomProcessingError(ident: MavenIdentifier, cause: Throwable): ProcessingError = {
new ProcessingError(ident, ProcessingPhase.PomProcessing, cause.getMessage, Some(cause))
}

def newMetricsExtractionError(ident: MavenIdentifier, cause: Throwable): ProcessingError = {
new ProcessingError(ident, ProcessingPhase.MetricsExtraction, cause.getMessage, Some(cause))
}

}

object ProcessingPhase extends Enumeration {
type ProcessingPhase = Value

val Downloading, PomProcessing, MetricsExtraction = Value
}

case class ProcessingPhaseFailedException(ident: MavenIdentifier, cause: Throwable) extends Throwable {
override def getMessage: String = s"Processing of ${ident.toString} failed: ${cause.getMessage}"

override def printStackTrace(): Unit = cause.printStackTrace()
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package de.upb.cs.swt.delphi.crawler.preprocessing

import akka.actor.{Actor, ActorLogging, ActorSystem, Props}
import de.upb.cs.swt.delphi.core.model.MavenIdentifier
import de.upb.cs.swt.delphi.crawler.model.{JarFile, MavenArtifact, PomFile}
import de.upb.cs.swt.delphi.crawler.model.{JarFile, MavenArtifact, PomFile, ProcessingPhaseFailedException}
import de.upb.cs.swt.delphi.crawler.tools.{HttpDownloader, HttpException}
import org.joda.time.format.DateTimeFormat

Expand Down Expand Up @@ -54,19 +54,19 @@ class MavenDownloadActor extends Actor with ActorLogging {
case Success(jarStream) =>
sender() ! Success(MavenArtifact(mavenIdent, PomFile(pomStream),
Some(JarFile(jarStream, mavenIdent.toJarLocation.toURL)), pomPublicationDate, None))
case Failure(HttpException(code)) if code.intValue() == 404 =>
case Failure(ex@HttpException(code)) if code.intValue() == 404 =>
log.warning(s"No JAR file could be located for ${mavenIdent.toUniqueString}")
sender() ! Success(MavenArtifact(mavenIdent, PomFile(pomStream), None, pomPublicationDate, None))
case Failure(ex) =>
log.error(ex, s"Failed to download JAR file for ${mavenIdent.toUniqueString}")
sender() ! Failure(ex)
sender() ! Failure(ProcessingPhaseFailedException(mavenIdent, ex))
}
case Failure(HttpException(code)) if code.intValue() == 404 =>
case Failure(ex@HttpException(code)) if code.intValue() == 404 =>
log.error(s"Failed to download POM file for ${mavenIdent.toUniqueString}")
sender() ! Failure(HttpException(code))
sender() ! Failure(ProcessingPhaseFailedException(mavenIdent, ex))
case Failure(ex) =>
log.error(ex, s"Failed to download POM file for ${mavenIdent.toUniqueString}")
sender() ! Failure(ex)
sender() ! Failure(ProcessingPhaseFailedException(mavenIdent, ex))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,29 @@ package de.upb.cs.swt.delphi.crawler.processing

import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import de.upb.cs.swt.delphi.core.model.MavenIdentifier
import de.upb.cs.swt.delphi.crawler.model.MavenArtifact
import de.upb.cs.swt.delphi.crawler.model.{MavenArtifact, ProcessingPhaseFailedException}

import scala.util.{Failure, Success, Try}

class HermesActor() extends Actor with ActorLogging with OPALFunctionality with HermesFunctionality {

def receive: PartialFunction[Any, Unit] = {
case m : MavenArtifact => {
log.info(s"Starting analysis for $m")
log.info(s"Starting analysis for ${m.identifier.toString}")

val hermesResult = Try {
computeHermesResult(m, reifyProject(m))
}

hermesResult match {
case Success(r) => log.info(s"Hermes run successful for ${m.identifier.toUniqueString}")
case Failure(ex) => log.error(ex, s"Hermes run failed for ${m.identifier}")
case Success(r) =>
sender() ! hermesResult
case Failure(ex) =>
log.error(ex, s"Hermes run failed for ${m.identifier}")
sender() ! Failure(ProcessingPhaseFailedException(m.identifier, ex))
}

sender() ! hermesResult

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,6 @@ class PomFileReadActor(configuration: Configuration) extends Actor with ActorLog

sender() ! MavenArtifact.withMetadata(artifact, metadata)

log.info(s"Successfully processed POM file for $identifier")

case Failure(ex) =>
log.error(s"Failed to parse POM file for artifact $identifier",ex )
// Best effort semantics: If parsing fails, artifact is returned without metadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import akka.event.LoggingAdapter
import com.sksamuel.elastic4s.ElasticClient
import de.upb.cs.swt.delphi.core.model.{Identifier, MavenIdentifier}
import de.upb.cs.swt.delphi.crawler.discovery.git.GitIdentifier
import de.upb.cs.swt.delphi.crawler.model.MavenArtifact
import de.upb.cs.swt.delphi.crawler.model.{MavenArtifact, ProcessingError}
import de.upb.cs.swt.delphi.crawler.tools.ActorStreamIntegrationSignals.{Ack, StreamCompleted, StreamFailure, StreamInitialized}
import de.upb.cs.swt.delphi.crawler.processing.HermesResults

Expand All @@ -47,7 +47,6 @@ class ElasticActor(client: ElasticClient) extends Actor with ActorLogging with A
log.error(ex, s"Stream failed!")

case m : MavenIdentifier =>
log.info(s"pushing $m")
store(m)
sender() ! Ack

Expand All @@ -63,6 +62,10 @@ class ElasticActor(client: ElasticClient) extends Actor with ActorLogging with A
store(h)
sender() ! Ack

case e: ProcessingError =>
store(e)
sender() ! Ack

case x => log.warning("Received unknown message: [{}] ", x)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,17 @@ trait ElasticIndexMaintenance extends AppLogging {
ObjectField(name = "identifier", properties = identifierFields),
ObjectField(name = "features", properties = featureList)
)
createIndex(errorIndexName) mapping properties (
keywordField("name"),
ObjectField(name = "identifier", properties = identifierFields),
keywordField("phase"),
textField("message")
)
}.await

//Increases maximum number of nested fields
client.execute {
updateSettings(delphi).set(
updateSettings(metricIndexName).set(
"index.mapping.nested_fields.limit", "250"
)
}.await
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import com.sksamuel.elastic4s.{ElasticClient, Response}
import com.sksamuel.elastic4s.requests.indexes.IndexResponse
import de.upb.cs.swt.delphi.core.model.MavenIdentifier
import de.upb.cs.swt.delphi.crawler.discovery.git.GitIdentifier
import de.upb.cs.swt.delphi.crawler.model.MavenArtifactMetadata
import de.upb.cs.swt.delphi.crawler.model.{MavenArtifactMetadata, ProcessingError}
import de.upb.cs.swt.delphi.crawler.processing.{HermesAnalyzer, HermesResults}
import org.joda.time.DateTime

Expand Down Expand Up @@ -78,6 +78,28 @@ trait ElasticStoreQueries {
}.await
}

def store(error: ProcessingError)(implicit client: ElasticClient, log: LoggingAdapter): Option[Response[IndexResponse]] = {
elasticId(error.identifier) match {
case Some(id) =>
val result = client.execute{
indexInto(errorIndexName).fields(
"name" -> id,
"phase" -> error.phase.toString,
"message" -> error.message,
"identifier" -> Map(
"groupId" -> error.identifier.groupId,
"artifactId" -> error.identifier.artifactId,
"version" -> error.identifier.version.get
),
)
}.await
Some(result)
case None =>
log.warning(s"Tried to push error for non-existing identifier: ${error.identifier.toString}.")
None
}
}

def store(ident: MavenIdentifier, metadata: MavenArtifactMetadata, published: Option[DateTime])
(implicit client: ElasticClient, log: LoggingAdapter): Option[Response[IndexResponse]] = {
elasticId(ident) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,9 @@

package de.upb.cs.swt.delphi.crawler

import com.sksamuel.elastic4s.IndexAndType

package object storage {
val delphi = "delphi"
val project = "project"
val delphiProjectType: IndexAndType = IndexAndType(delphi,project)

val identifierIndexName = "identifiers"
val metricIndexName = "metrics"
val metadataIndexName = "metadata"
val errorIndexName = "errors"
}