Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,24 @@ package ch.epfl.bluebrain.nexus.ship.files
import akka.http.scaladsl.model.Uri
import akka.http.scaladsl.model.Uri.Path
import cats.effect.IO
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.RetryStrategy.logError
import ch.epfl.bluebrain.nexus.delta.kernel.{Logger, RetryStrategy}
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UrlUtils
import ch.epfl.bluebrain.nexus.delta.kernel.{Logger, RetryStrategy}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileAttributes
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.{CopyOptions, S3LocationGenerator, S3OperationResult}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.{CopyOptions, S3LocationGenerator}
import ch.epfl.bluebrain.nexus.delta.sdk.syntax._
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import ch.epfl.bluebrain.nexus.ship.config.FileProcessingConfig
import ch.epfl.bluebrain.nexus.ship.files.FileCopier.FileCopyResult
import ch.epfl.bluebrain.nexus.ship.files.FileCopier.FileCopyResult.{FileCopySkipped, FileCopySuccess}
import ch.epfl.bluebrain.nexus.delta.sdk.syntax._
import software.amazon.awssdk.services.s3.model.S3Exception

import scala.concurrent.duration.DurationInt

trait FileCopier {

def copyFile(project: ProjectRef, attributes: FileAttributes, forceContentType: Boolean): IO[FileCopyResult]
def copyFile(project: ProjectRef, attributes: FileAttributes): IO[FileCopyResult]

}

Expand Down Expand Up @@ -55,7 +54,7 @@ object FileCopier {
val importBucket = config.importBucket
val targetBucket = config.targetBucket
val locationGenerator = new S3LocationGenerator(config.prefix.getOrElse(Path.Empty))
(project: ProjectRef, attributes: FileAttributes, forceContentType: Boolean) =>
(project: ProjectRef, attributes: FileAttributes) =>
{
val origin = attributes.path
val patchedFileName = if (attributes.filename.isEmpty) "file" else attributes.filename
Expand All @@ -73,15 +72,6 @@ object FileCopier {
s3StorageClient.copyObjectMultiPart(importBucket, originKey, targetBucket, targetKey, copyOptions)
} else
s3StorageClient.copyObject(importBucket, originKey, targetBucket, targetKey, copyOptions)
}.flatMap {
case S3OperationResult.Success => IO.unit
case S3OperationResult.AlreadyExists =>
IO.whenA(forceContentType) {
attributes.mediaType.traverse { mediaType =>
logger.info(s"Patching to content type $mediaType for file $patchedFileName") >>
s3StorageClient.updateContentType(targetBucket, targetKey, mediaType)
}.void
}
}.timed
.flatMap { case (duration, _) =>
IO.whenA(duration > longCopyThreshold)(
Expand All @@ -104,7 +94,6 @@ object FileCopier {
}.retry(copyRetryStrategy)
}

def apply(): FileCopier = (_: ProjectRef, attributes: FileAttributes, _: Boolean) =>
IO.pure(FileCopySuccess(attributes.path))
def apply(): FileCopier = (_: ProjectRef, attributes: FileAttributes) => IO.pure(FileCopySuccess(attributes.path))

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.kernel.http.MediaTypeDetectorConfig
import ch.epfl.bluebrain.nexus.delta.kernel.utils.FileUtils
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.{Files, MediaTypeDetector}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.Files.definition
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileCommand.CancelEvent
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileEvent._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileRejection.{FileNotFound, IncorrectRev, ResourceAlreadyExists}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.{Files, MediaTypeDetector}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.FetchStorage
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.Storage
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.LinkFileAction
Expand All @@ -27,7 +27,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.{ScopedEventLog, Transactors}
import ch.epfl.bluebrain.nexus.ship._
import ch.epfl.bluebrain.nexus.ship.config.InputConfig
import ch.epfl.bluebrain.nexus.ship.files.FileCopier.FileCopyResult.{FileCopySkipped, FileCopySuccess}
import ch.epfl.bluebrain.nexus.ship.files.FileProcessor.{forceMediaType, logger, patchMediaType}
import ch.epfl.bluebrain.nexus.ship.files.FileProcessor.{logger, patchMediaType}
import ch.epfl.bluebrain.nexus.ship.files.FileWiring._
import ch.epfl.bluebrain.nexus.ship.storages.StorageWiring
import ch.epfl.bluebrain.nexus.ship.storages.StorageWiring.linkS3FileOperationOnly
Expand Down Expand Up @@ -74,8 +74,7 @@ class FileProcessor private (
val newMediaType = patchMediaType(attrs.filename, attrs.mediaType)
val newAttrs = e.attributes.copy(mediaType = newMediaType)
val customMetadata = Some(getCustomMetadata(newAttrs))
val fct = forceMediaType(attrs.mediaType, newMediaType)
fileCopier.copyFile(e.project, newAttrs, fct).flatMap {
fileCopier.copyFile(e.project, newAttrs).flatMap {
case FileCopySuccess(newPath) =>
val linkRequest = FileLinkRequest(newPath, newMediaType, customMetadata)
files
Expand All @@ -88,8 +87,7 @@ class FileProcessor private (
val newMediaType = patchMediaType(attrs.filename, attrs.mediaType)
val newAttrs = e.attributes.copy(mediaType = newMediaType)
val customMetadata = Some(getCustomMetadata(newAttrs))
val fct = forceMediaType(attrs.mediaType, newMediaType)
fileCopier.copyFile(e.project, newAttrs, fct).flatMap {
fileCopier.copyFile(e.project, newAttrs).flatMap {
case FileCopySuccess(newPath) =>
val linkRequest = FileLinkRequest(newPath, newMediaType, customMetadata)
files
Expand Down Expand Up @@ -140,11 +138,6 @@ object FileProcessor {
.map(ContentType(_, () => HttpCharsets.`UTF-8`))
.orElse(original)

def forceMediaType(
originalMediaType: Option[ContentType],
newMediaType: Option[ContentType]
): Boolean = originalMediaType != newMediaType

private val noop = new EventProcessor[FileEvent] {
override def resourceType: EntityType = Files.entityType

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,4 @@ class FileProcessorSuite extends NexusSuite {
)
}

test("Patching media type for a media type that changes") {
assertEquals(
FileProcessor.forceMediaType(
Some(ContentTypes.`application/octet-stream`),
Some(ContentTypes.`application/json`)
),
true
)
}

test("Patching media type for a media type that does not change") {
assertEquals(
FileProcessor.forceMediaType(
Some(ContentTypes.`application/json`),
Some(ContentTypes.`application/json`)
),
false
)
}

}