Skip to content

Commit e7ffaf0

Browse files
committed
accept complete http request instead of form to upload file
1 parent 3b0b2a5 commit e7ffaf0

File tree

8 files changed

+147
-23
lines changed

8 files changed

+147
-23
lines changed

src/main/scala/com/advancedtelematic/treehub/http/ObjectResource.scala

+16-2
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package com.advancedtelematic.treehub.http
22

33
import akka.http.scaladsl.model.StatusCodes
44
import akka.http.scaladsl.model.headers.Authorization
5-
import akka.http.scaladsl.server.{Directive0, Directive1, PathMatcher1}
5+
import akka.http.scaladsl.server.{Directive0, Directive1, PathMatcher1, ValidationRejection}
66
import akka.stream.Materializer
77
import com.advancedtelematic.data.DataType.ObjectId
88
import com.advancedtelematic.libats.data.DataType.Namespace
@@ -11,6 +11,7 @@ import com.advancedtelematic.treehub.repo_metrics.UsageMetricsRouter.{UpdateBand
1111
import com.advancedtelematic.treehub.repo_metrics.UsageMetricsRouter
1212
import slick.jdbc.MySQLProfile.api._
1313
import cats.syntax.either._
14+
1415
import scala.concurrent.ExecutionContext
1516
import scala.util.Success
1617

@@ -56,11 +57,24 @@ class ObjectResource(namespace: Directive1[Namespace],
5657
complete(f)
5758
} ~
5859
(post & hintNamespaceStorage(ns)) {
60+
// TODO: Use storeUploadedFile and change ObjectStore api to accept File or DataBytes, when using databytes, require size, stream up
61+
62+
headerValueByName("x-ats-accept-redirect") { _ =>
63+
val f = objectStore.storeOutOfBand(ns, objectId, content).map(_ => StatusCodes.OK)
64+
complete(f)
65+
} ~
5966
fileUpload("file") { case (_, content) =>
6067
val f = objectStore.store(ns, objectId, content).map(_ => StatusCodes.OK)
6168
complete(f)
62-
}
69+
} ~
70+
extractRequestEntity { entity =>
71+
entity.contentLengthOption match {
72+
case Some(size) => complete(objectStore.storeStream(ns, objectId, size, entity.dataBytes).map(_ => StatusCodes.OK))
73+
case None => reject
74+
}
75+
}
6376
}
6477
}
6578
}
79+
6680
}

src/main/scala/com/advancedtelematic/treehub/object_store/BlobStore.scala

+12-1
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,28 @@
11
package com.advancedtelematic.treehub.object_store
22

3-
import akka.http.scaladsl.model.{HttpEntity, HttpResponse, MediaTypes, StatusCodes}
3+
import akka.http.scaladsl.model._
44
import akka.stream.scaladsl.Source
55
import akka.util.ByteString
66
import com.advancedtelematic.data.DataType.ObjectId
77
import com.advancedtelematic.libats.data.DataType.Namespace
8+
import com.advancedtelematic.treehub.object_store.BlobStore.OutOfBandStoreResult
89

910
import scala.concurrent.Future
1011
import scala.util.control.NoStackTrace
1112

13+
object BlobStore {
14+
sealed trait OutOfBandStoreResult
15+
case class UploadAt(uri: Uri) extends OutOfBandStoreResult
16+
case class Failure(cause: Throwable) extends OutOfBandStoreResult
17+
}
18+
1219
trait BlobStore {
1320
def store(namespace: Namespace, id: ObjectId, blob: Source[ByteString, _]): Future[Long]
1421

22+
// def storeStream(namespace: Namespace, id: ObjectId, size: Long, blob: Source[ByteString, _]): Future[Long]
23+
24+
def storeOutOfBand(namespace: Namespace, id: ObjectId, blob: => Source[ByteString, _]): Future[OutOfBandStoreResult]
25+
1526
def buildResponse(namespace: Namespace, id: ObjectId): Future[HttpResponse]
1627

1728
def readFull(namespace: Namespace, id: ObjectId): Future[ByteString]

src/main/scala/com/advancedtelematic/treehub/object_store/LocalFsBlobStore.scala

+3
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ class LocalFsBlobStore(root: Path)(implicit ec: ExecutionContext, mat: Materiali
4949
} yield res
5050
}
5151

52+
override def storeStream(namespace: Namespace, id: ObjectId, size: Long, blob: Source[ByteString, _]): Future[Long] =
53+
store(namespace, id, blob)
54+
5255
override def buildResponse(ns: Namespace, id: ObjectId): Future[HttpResponse] = {
5356
exists(ns, id).flatMap {
5457
case true =>

src/main/scala/com/advancedtelematic/treehub/object_store/ObjectStore.scala

+28-5
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,39 @@ class ObjectStore(blobStore: BlobStore)(implicit ec: ExecutionContext, db: Datab
1616

1717
import scala.async.Async._
1818

19-
def store(namespace: Namespace, id: ObjectId, blob: Source[ByteString, _]): Future[TObject] = {
20-
// TODO: Do we already know size somehow? Sort of...
19+
def storeOutOfBand(namespace: Namespace, id: ObjectId, size: Long, blob: Source[ByteString, _]): Future[TObject] = {
20+
blobStore.storeStream()
21+
}
22+
23+
// TODO:SM Almost the same as `store`
24+
def storeStream(namespace: Namespace, id: ObjectId, size: Long, blob: Source[ByteString, _]): Future[TObject] = {
25+
val obj = TObject(namespace, id, size, ObjectStatus.SERVER_UPLOADING)
26+
lazy val createF = objectRepository.create(obj)
27+
28+
lazy val uploadF = async {
29+
val _size = await(blobStore.storeStream(namespace, id, size, blob))
30+
val newObj = obj.copy(byteSize = _size, status = ObjectStatus.UPLOADED)
31+
await(objectRepository.update(namespace, id, _size, ObjectStatus.UPLOADED))
32+
newObj
33+
}.recoverWith {
34+
case e =>
35+
objectRepository.delete(namespace, id)
36+
.flatMap(_ => FastFuture.failed(e))
37+
.recoverWith { case _ => FastFuture.failed(e) }
38+
}
2139

40+
createF.flatMap(_ => uploadF)
41+
}
42+
43+
44+
def store(namespace: Namespace, id: ObjectId, blob: Source[ByteString, _]): Future[TObject] = {
2245
val obj = TObject(namespace, id, -1, ObjectStatus.SERVER_UPLOADING)
2346
lazy val createF = objectRepository.create(obj)
2447

2548
lazy val uploadF = async {
26-
val size = await(blobStore.store(namespace, id, blob))
27-
val newObj = obj.copy(byteSize = size, status = ObjectStatus.UPLOADED)
28-
await(objectRepository.update(namespace, id, size, ObjectStatus.UPLOADED))
49+
val _size = await(blobStore.store(namespace, id, blob))
50+
val newObj = obj.copy(byteSize = _size, status = ObjectStatus.UPLOADED)
51+
await(objectRepository.update(namespace, id, _size, ObjectStatus.UPLOADED))
2952
newObj
3053
}.recoverWith {
3154
case e =>

src/main/scala/com/advancedtelematic/treehub/object_store/S3BlobStore.scala

+26-4
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,23 @@
11
package com.advancedtelematic.treehub.object_store
22

3-
import java.io.File
3+
import java.io.{File, InputStream}
44
import java.nio.file.Paths
55
import java.time.{Duration, Instant}
66

77
import akka.http.scaladsl.model.headers.Location
88
import akka.http.scaladsl.model.{HttpResponse, StatusCodes, Uri}
99
import akka.stream.Materializer
10-
import akka.stream.scaladsl.{FileIO, Source, StreamConverters}
10+
import akka.stream.scaladsl.{FileIO, Keep, Source, StreamConverters}
1111
import akka.util.ByteString
12+
import cats.instances.long
1213
import cats.syntax.either._
1314
import com.advancedtelematic.common.DigestCalculator
1415
import com.advancedtelematic.data.DataType.ObjectId
1516
import com.advancedtelematic.libats.data.DataType.Namespace
1617
import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider}
1718
import com.amazonaws.regions.Regions
1819
import com.amazonaws.services.s3.AmazonS3ClientBuilder
19-
import com.amazonaws.services.s3.model.{CannedAccessControlList, PutObjectRequest}
20+
import com.amazonaws.services.s3.model.{CannedAccessControlList, GetObjectRequest, ObjectMetadata, PutObjectRequest}
2021
import org.slf4j.LoggerFactory
2122

2223
import scala.async.Async._
@@ -36,6 +37,27 @@ class S3BlobStore(s3Credentials: S3Credentials, allowRedirects: Boolean)
3637
.withRegion(s3Credentials.region)
3738
.build()
3839

40+
override def storeStream(namespace: Namespace, id: ObjectId, size: Long, blob: Source[ByteString, _]): Future[Long] = {
41+
val filename = objectFilename(namespace, id)
42+
43+
val sink = StreamConverters.asInputStream().mapMaterializedValue { is =>
44+
val meta = new ObjectMetadata()
45+
meta.setContentLength(size)
46+
val request = new PutObjectRequest(s3Credentials.blobBucketId, filename, is, meta).withCannedAcl(CannedAccessControlList.AuthenticatedRead)
47+
48+
log.info(s"Uploading $filename to amazon s3")
49+
50+
async {
51+
await(Future { blocking { s3client.putObject(request) } })
52+
log.info(s"$filename with size $size uploaded to s3")
53+
size
54+
}
55+
}
56+
57+
blob.runWith(sink)
58+
}
59+
60+
3961
override def store(namespace: Namespace, id: ObjectId, blob: Source[ByteString, _]): Future[Long] = {
4062
val filename = objectFilename(namespace, id)
4163
val tempFile = File.createTempFile(filename, ".tmp")
@@ -44,7 +66,7 @@ class S3BlobStore(s3Credentials: S3Credentials, allowRedirects: Boolean)
4466
// so we always need to cache the file into the filesystem before uploading
4567
val sink = FileIO.toPath(tempFile.toPath).mapMaterializedValue {
4668
_.flatMap { result =>
47-
if(result.wasSuccessful) {
69+
if (result.wasSuccessful) {
4870
upload(tempFile, filename).andThen { case _ => Try(tempFile.delete()) }
4971
} else
5072
Future.failed(result.getError)

src/test/resources/blobs/myfile.bin

1 MB
Binary file not shown.

src/test/scala/com/advancedtelematic/treehub/http/ObjectResourceSpec.scala

+29-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ package com.advancedtelematic.treehub.http
66

77
import akka.http.scaladsl.model._
88
import akka.pattern.ask
9+
import akka.stream.scaladsl.Source
10+
import akka.util.ByteString
911
import com.advancedtelematic.treehub.db.ObjectRepositorySupport
1012
import com.advancedtelematic.util.FakeUsageUpdate.{CurrentBandwith, CurrentStorage}
1113
import com.advancedtelematic.util.ResourceSpec.ClientTObject
@@ -15,7 +17,7 @@ import scala.concurrent.duration._
1517

1618
class ObjectResourceSpec extends TreeHubSpec with ResourceSpec with ObjectRepositorySupport {
1719

18-
test("POST creates a new blob") {
20+
test("POST creates a new blob when uploading form with `file` field") {
1921
val obj = new ClientTObject()
2022

2123
Post(apiUri(s"objects/${obj.prefixedObjectId}"), obj.form) ~> routes ~> check {
@@ -29,6 +31,32 @@ class ObjectResourceSpec extends TreeHubSpec with ResourceSpec with ObjectReposi
2931
}
3032
}
3133

34+
// This is the same as using `curl -H "Content-Type: application/octet-stream" --data-binary @file`
35+
test("POST creates a new blob when uploading application/octet-stream directory") {
36+
val obj = new ClientTObject()
37+
38+
Post(apiUri(s"objects/${obj.prefixedObjectId}"), obj.blob) ~> routes ~> check {
39+
status shouldBe StatusCodes.OK
40+
}
41+
42+
Get(apiUri(s"objects/${obj.prefixedObjectId}")) ~> routes ~> check {
43+
status shouldBe StatusCodes.OK
44+
responseAs[Array[Byte]] shouldBe obj.blob
45+
}
46+
}
47+
48+
test("POST redirects client to redirect endpoint if client supports it") {
49+
val obj = new ClientTObject()
50+
51+
val accept = HttpHeader
52+
53+
54+
Post(apiUri(s"objects/${obj.prefixedObjectId}"), obj.blob) ~> routes ~> check {
55+
status shouldBe StatusCodes.OK
56+
}
57+
58+
}
59+
3260
test("POST hints updater to update current storage") {
3361
val obj = new ClientTObject()
3462

src/test/scala/com/advancedtelematic/treehub/object_store/S3BlobStoreIntegrationSpec.scala

+33-10
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package com.advancedtelematic.treehub.object_store
22

3+
import java.io.File
4+
35
import akka.actor.ActorSystem
46
import akka.http.scaladsl.model.StatusCodes
57
import akka.stream.ActorMaterializer
6-
import akka.stream.scaladsl.Source
8+
import akka.stream.scaladsl.{FileIO, Source}
79
import akka.util.ByteString
810
import com.advancedtelematic.data.DataType.{ObjectId, ObjectIdOps, ObjectStatus, TObject}
911
import com.advancedtelematic.util.TreeHubSpec
@@ -22,22 +24,24 @@ class S3BlobStoreIntegrationSpec extends TreeHubSpec {
2224

2325
val s3BlobStore = new S3BlobStore(s3Credentials, true)
2426

25-
override implicit def patienceConfig = PatienceConfig().copy(timeout = Span(3, Seconds))
27+
override implicit def patienceConfig = PatienceConfig().copy(timeout = Span(15, Seconds))
2628

2729
test("can store object") {
2830
val tobj = TObject(defaultNs, ObjectId.parse("ce720e82a727efa4b30a6ab73cefe31a8d4ec6c0d197d721f07605913d2a279a.commit").toOption.get, 0L, ObjectStatus.UPLOADED)
31+
val blob = ByteString("this is byte.")
2932

30-
val source = Source.single(ByteString("this is byte."))
33+
val source = Source.single(blob)
3134

3235
val size = s3BlobStore.store(defaultNs, tobj.id, source).futureValue
3336

3437
size shouldBe 13
3538
}
3639

37-
test("can retrieve an object") {
40+
test("can retrieve big object") {
3841
val tobj = TObject(defaultNs, ObjectId.parse("ce720e82a727efa4b30a6ab73cefe31a8d4ec6c0d197d721f07605913d2a279a.commit").toOption.get, 0L, ObjectStatus.UPLOADED)
42+
val blob = ByteString("this is byte. Call me. maybe.")
3943

40-
val source = Source.single(ByteString("this is byte. Call me. maybe."))
44+
val source = Source.single(blob)
4145

4246
val f = async {
4347
await(s3BlobStore.store(defaultNs, tobj.id, source))
@@ -48,10 +52,28 @@ class S3BlobStoreIntegrationSpec extends TreeHubSpec {
4852
s3BlobStore.exists(tobj.namespace, tobj.id).futureValue shouldBe true
4953
}
5054

55+
56+
test("XXX can retrieve an object") {
57+
val file = new File(this.getClass.getResource(s"/blobs/myfile.bin").getFile)
58+
val source = FileIO.fromPath(file.toPath)
59+
val tobj = TObject(defaultNs, ObjectId.parse("625e61876cbe98fbf164c8ce5975c6d69a4ba0e9fa57c729ea06d02fd966a9cc.file").toOption.get, 0L, ObjectStatus.UPLOADED)
60+
61+
val f = async {
62+
await(s3BlobStore.store(defaultNs, tobj.id, source))
63+
64+
println("upload finished")
65+
66+
await(s3BlobStore.readFull(tobj.namespace, tobj.id))
67+
}
68+
69+
f.futureValue.size shouldBe file.length()
70+
s3BlobStore.exists(tobj.namespace, tobj.id).futureValue shouldBe true
71+
}
72+
5173
test("build response builds a redirect") {
5274
val tobj = TObject(defaultNs, ObjectId.parse("ce720e82a727efa4b30a6ab73cefe31a8d4ec6c0d197d721f07605913d2a279a.commit").toOption.get, 0L, ObjectStatus.UPLOADED)
53-
54-
val source = Source.single(ByteString("this is byte. Call me. maybe."))
75+
val blob = ByteString("this is byte. Call me. maybe.")
76+
val source = Source.single(blob)
5577

5678
val response = async {
5779
await(s3BlobStore.store(defaultNs, tobj.id, source))
@@ -63,9 +85,11 @@ class S3BlobStoreIntegrationSpec extends TreeHubSpec {
6385
}
6486

6587
test("build response a response containing the object content") {
66-
val tobj = TObject(defaultNs, ObjectId.parse("ce720e82a727efa4b30a6ab73cefe31a8d4ec6c0d197d721f07605913d2a279a.commit").toOption.get, 0L, ObjectStatus.UPLOADED)
88+
pending
6789

68-
val source = Source.single(ByteString("this is byte. Call me. maybe."))
90+
val tobj = TObject(defaultNs, ObjectId.parse("ce720e82a727efa4b30a6ab73cefe31a8d4ec6c0d197d721f07605913d2a279a.commit").toOption.get, 0L, ObjectStatus.UPLOADED)
91+
val blob = ByteString("this is byte. Call me. maybe.")
92+
val source = Source.single(blob)
6993

7094
val response = async {
7195
await(s3BlobStore.store(defaultNs, tobj.id, source))
@@ -75,5 +99,4 @@ class S3BlobStoreIntegrationSpec extends TreeHubSpec {
7599
response.status shouldBe StatusCodes.OK
76100
response.entity.dataBytes.runFold(ByteString.empty)(_ ++ _).futureValue.utf8String shouldBe "this is byte. Call me. maybe."
77101
}
78-
79102
}

0 commit comments

Comments
 (0)