Skip to content

Commit

Permalink
8.0.6: Handle redirects
Browse files Browse the repository at this point in the history
[ch3113]

My original intent was to use [`http4s`'s `FollowRedirect` client middleware](https://github.com/http4s/http4s/blob/v0.19.0/client/src/main/scala/org/http4s/client/middleware/FollowRedirect.scala#L39) to follow the redirects. However, that middleware does not forward query strings, breaking every request made by `FollowRedirect`. In order to try this I had to refactor all the request signing into a client middleware (`AwsV4Signing`). The current implementation does not use `FollowRedirect`, but I decided to keep `AwsV4Signing` since it simplifies stuff.

This is working just fine for `301`s and will probably work for Slalom, but I still need to finish a few things:

- [ ] Handle `301 Moved Permanently` correctly by storing the redirect URI, rather than the original. We have an extra round trip with every request at the moment.
- [ ] Handle more than one redirect.
- [ ] Handle `302 Found`, `303 See other`, `307 Temporary Redirect`, and `308 Permanent Redirect`.
- [ ] Unit tests for `preflightCheck`.
- [ ] Integration tests.

Depending on urgency, we can either merge as-is and address the TODOs in another PR or we can wait until I finish the remaining TODOs here.
  • Loading branch information
djspiewak committed Nov 7, 2018
2 parents d1e2a03 + ec2d0de commit b193bef
Show file tree
Hide file tree
Showing 10 changed files with 176 additions and 117 deletions.
44 changes: 41 additions & 3 deletions datasource/src/main/scala/quasar/physical/s3/RequestSigning.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,17 @@ import quasar.physical.s3.impl.s3EncodeQueryParams

import java.nio.charset.StandardCharsets
import java.security.MessageDigest
import java.time.LocalDateTime
import java.time.{LocalDateTime, OffsetDateTime, ZoneOffset}
import java.time.format.DateTimeFormatter
import javax.crypto.Mac
import javax.crypto.spec.SecretKeySpec

import cats.effect.Sync
import cats.effect.{Bracket, Effect, Resource, Sync}
import cats.implicits._
import fs2.Stream
import org.http4s.client.Client
import org.http4s.headers.{Authorization, Date}
import org.http4s.{Header, Headers, Method, Request, Uri}
import org.http4s.{Header, Headers, Method, Request, Response, Uri}

/**
* Extracted from aws4s: https://github.com/aws4s/aws4s
Expand Down Expand Up @@ -201,3 +202,40 @@ object PayloadSigning {
/** Payload is not signed. Use only if consuming the payload twice would be problematic. */
case object Unsigned extends PayloadSigning
}

object AwsV4Signing {
def apply[F[_]: Bracket[?[_], Throwable]: Effect](conf: S3Config)(client: Client[F]): Client[F] = {
def signRequest: Request[F] => F[Request[F]] =
conf.credentials match {
case Some(creds) => {
val requestSigning = for {
time <- Effect[F].delay(OffsetDateTime.now())
datetime <- Effect[F].catchNonFatal(
LocalDateTime.ofEpochSecond(time.toEpochSecond, 0, ZoneOffset.UTC))
signing = RequestSigning(
Credentials(creds.accessKey, creds.secretKey, None),
creds.region,
ServiceName.S3,
PayloadSigning.Signed,
datetime)
} yield signing

req => {
// Requests that require signing also require `host` to always be present
val req0 = req.uri.host match {
case Some(host) => req.withHeaders(Headers(Header("host", host.value)))
case None => req
}

requestSigning >>= (_.signedHeaders[F](req0).map(req0.withHeaders(_)))
}
}
case None => req => req.pure[F]
}

def signAndSubmit: Request[F] => Resource[F, Response[F]] =
(req => Resource.suspend(signRequest(req).map(client.run(_))))

Client(signAndSubmit)
}
}
68 changes: 25 additions & 43 deletions datasource/src/main/scala/quasar/physical/s3/S3Datasource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,13 @@ import quasar.contrib.scalaz.MonadError_

import slamdata.Predef.{Stream => _, _}

import java.time.{OffsetDateTime, ZoneOffset, LocalDateTime}

import cats.data.OptionT
import cats.effect.Effect
import cats.syntax.applicative._
import cats.syntax.flatMap._
import cats.syntax.apply._
import cats.syntax.functor._
import cats.syntax.option._
import fs2.Stream
import org.http4s.{Request, Header, Headers}
import org.http4s.client.Client
import pathy.Path
import pathy.Path.{DirName, FileName}
Expand All @@ -48,6 +45,7 @@ final class S3Datasource[F[_]: Effect: MonadResourceErr](
extends LightweightDatasource[F, Stream[F, ?], QueryResult[F]] {

import ParsableType.JsonVariant
import S3Datasource._

def kind: DatasourceType = s3.datasourceKind

Expand All @@ -62,16 +60,12 @@ final class S3Datasource[F[_]: Effect: MonadResourceErr](
case S3JsonParsing.LineDelimited => JsonVariant.LineDelimited
}

impl.evaluate[F](client, config.bucket, file, signRequest(config))
impl.evaluate[F](client, config.bucket, file)
.map(QueryResult.typed(ParsableType.json(jvar, false), _))
}

def prefixedChildPaths(path: ResourcePath): F[Option[Stream[F, (ResourceName, ResourcePathType)]]] =
impl.children(
client,
config.bucket,
dropEmpty(path.toPath),
signRequest(config)) map {
impl.children(client, config.bucket, dropEmpty(path.toPath)) map {
case None =>
none[Stream[F, (ResourceName, ResourcePathType)]]
case Some(paths) =>
Expand All @@ -85,12 +79,20 @@ final class S3Datasource[F[_]: Effect: MonadResourceErr](
case Root => false.pure[F]
case Leaf(file) => Path.refineType(dropEmpty(file)) match {
case -\/(_) => false.pure[F]
case \/-(f) => impl.isResource(client, config.bucket, f, signRequest(config))
case \/-(f) => impl.isResource(client, config.bucket, f)
}
}

def isLive: F[Boolean] =
OptionT(prefixedChildPaths(ResourcePath.Root)).isDefined
def isLive: F[Liveness] = {
val listing = OptionT(prefixedChildPaths(ResourcePath.Root)).isDefined
val live = impl.preflightCheck(client, config)

(listing, live).mapN {
case (true, None) => Liveness.live
case (false, Some(newConfig)) => Liveness.redirected(newConfig)
case _ => Liveness.notLive
}
}

//

Expand All @@ -100,37 +102,17 @@ final class S3Datasource[F[_]: Effect: MonadResourceErr](
case Some((d, -\/(DirName(dn)))) if dn.isEmpty => d
case _ => path
}

private def signRequest(c: S3Config): Request[F] => F[Request[F]] =
S3Datasource.signRequest(c)
}

object S3Datasource {
def signRequest[F[_]: Effect](c: S3Config): Request[F] => F[Request[F]] =
c.credentials match {
case Some(creds) => {
val requestSigning = for {
time <- Effect[F].delay(OffsetDateTime.now())
datetime <- Effect[F].catchNonFatal(
LocalDateTime.ofEpochSecond(time.toEpochSecond, 0, ZoneOffset.UTC))
signing = RequestSigning(
Credentials(creds.accessKey, creds.secretKey, None),
creds.region,
ServiceName.S3,
PayloadSigning.Signed,
datetime)
} yield signing

req => {
// Requests that require signing also require `host` to always be present
val req0 = req.uri.host match {
case Some(host) => req.withHeaders(Headers(Header("host", host.value)))
case None => req
}

requestSigning >>= (_.signedHeaders[F](req0).map(req0.withHeaders(_)))
}
}
case None => req => req.pure[F]
}
sealed abstract class Liveness
final case class Redirected(conf: S3Config) extends Liveness
final case object Live extends Liveness
final case object NotLive extends Liveness

object Liveness {
def live: Liveness = Live
def notLive: Liveness = NotLive
def redirected(conf: S3Config): Liveness = Redirected(conf)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,18 @@ import quasar.api.datasource.{DatasourceError, DatasourceType}
import quasar.api.datasource.DatasourceError.InitializationError
import quasar.api.resource.ResourcePath
import quasar.connector.{Datasource, LightweightDatasourceModule, MonadResourceErr, QueryResult}
import quasar.physical.s3.S3Datasource.{Live, NotLive, Redirected}

import scala.concurrent.ExecutionContext

import argonaut.{EncodeJson, Json}
import cats.effect.{ConcurrentEffect, ContextShift, Timer}
import fs2.Stream
import org.http4s.client.Client
import org.http4s.client.blaze.BlazeClientBuilder
import scalaz.{\/, NonEmptyList}
import scalaz.syntax.either._
import scalaz.syntax.functor._
import cats.syntax.applicative._
import cats.syntax.flatMap._
import cats.syntax.option._
Expand All @@ -44,23 +47,24 @@ object S3DatasourceModule extends LightweightDatasourceModule {
: F[InitializationError[Json] \/ Disposable[F, Datasource[F, Stream[F, ?], ResourcePath, QueryResult[F]]]] =
config.as[S3Config].result match {
case Right(s3Config) =>
val clientResource = BlazeClientBuilder[F](ec).resource
val c = s3.resourceToDisposable(clientResource)
mkClient(s3Config).flatMap { disposableClient =>
val s3Ds = new S3Datasource[F](disposableClient.unsafeValue, s3Config)

c.flatMap { client =>
val s3Ds = new S3Datasource[F](client.unsafeValue, s3Config)
val ds: Datasource[F, Stream[F, ?], ResourcePath, QueryResult[F]] = s3Ds

s3Ds.isLive.ifM({
Disposable(ds, client.dispose).right.pure[F]
},
{
val msg = "Unable to ListObjects at the root of the bucket"

DatasourceError
.accessDenied[Json, InitializationError[Json]](kind, config, msg)
.left.pure[F]
})
s3Ds.isLive map {
case Redirected(newConfig) =>
val ds: Datasource[F, Stream[F, ?], ResourcePath, QueryResult[F]] =
new S3Datasource[F](disposableClient.unsafeValue, newConfig)
Disposable(ds, disposableClient.dispose).right
case Live =>
val ds: Datasource[F, Stream[F, ?], ResourcePath, QueryResult[F]] =
new S3Datasource[F](disposableClient.unsafeValue, s3Config)
Disposable(ds, disposableClient.dispose).right
case NotLive =>
val msg = "Unable to ListObjects at the root of the bucket"
DatasourceError
.accessDenied[Json, InitializationError[Json]](kind, config, msg)
.left
}
}

case Left((msg, _)) =>
Expand All @@ -81,4 +85,15 @@ object S3DatasourceModule extends LightweightDatasourceModule {
c.credentials.fold(c)(_ => c.copy(credentials = redactedCreds.some)))
.fold(config)(rc => EncodeJson.of[S3Config].encode(rc))
}

///

private def mkClient[F[_]: ConcurrentEffect](conf: S3Config)
(implicit ec: ExecutionContext)
: F[Disposable[F, Client[F]]] = {
val clientResource = BlazeClientBuilder[F](ec).resource
val signingClient = clientResource.map(AwsV4Signing(conf)(_))

s3.resourceToDisposable(signingClient)
}
}
51 changes: 19 additions & 32 deletions datasource/src/main/scala/quasar/physical/s3/impl/children.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,9 @@ import cats.instances.list._
import cats.instances.option._
import cats.instances.tuple._
import cats.syntax.alternative._
import cats.syntax.applicative._
import cats.syntax.bifunctor._
import cats.syntax.either._
import cats.syntax.eq._
import cats.syntax.flatMap._
import cats.syntax.functor._
import cats.syntax.option._
import cats.syntax.traverse._
Expand All @@ -62,18 +60,14 @@ object children {
// sends them in.
//
// FIXME: dir should be ADir and pathToDir should be deleted
def apply[F[_]: Effect](
client: Client[F],
bucket: Uri,
dir: APath,
sign: Request[F] => F[Request[F]])
def apply[F[_]: Effect](client: Client[F], bucket: Uri, dir: APath)
: F[Option[Stream[F, PathSegment]]] = {
val msg = "Unexpected failure when streaming a multi-page response for ListBuckets"
val stream0 =
handleS3(fetchResults(client, bucket, dir, None, sign)) map (results =>
handleS3(fetchResults(client, bucket, dir, None)) map (results =>
Stream.iterateEval(results) {
case (_, next0) =>
handleS3(fetchResults(client, bucket, dir, next0, sign))
handleS3(fetchResults(client, bucket, dir, next0))
.getOrElseF(Sync[F].raiseError(new Exception(msg)))
})

Expand All @@ -86,48 +80,41 @@ object children {

///

// converts non-recoverable errors to runtime errors. Also decide
// which errors we want to report as None rather than runtime exceptions.
private def handleS3[F[_]: Sync, A](e: EitherT[F, S3Error, A]): OptionT[F, A] =
OptionT(e.value.flatMap {
case Left(S3Error.NotFound) => none.pure[F]
case Left(S3Error.Forbidden) => none.pure[F]
case Left(S3Error.MalformedResponse) => none.pure[F]
case Left(S3Error.UnexpectedResponse(msg)) => Sync[F].raiseError(new Exception(msg))
case Right(a) => a.some.pure[F]
})
private def handleS3[F[_]: Sync, A](e: EitherT[F, S3Error, A])
: OptionT[F, A] = e.toOption

// FIXME parse the results as they arrive using an XML streaming parser, instead of paging
// one response at a time
private def fetchResults[F[_]: Effect](
client: Client[F],
bucket: Uri,
dir: APath,
next: Option[ContinuationToken],
sign: Request[F] => F[Request[F]])
next: Option[ContinuationToken])
: EitherT[F, S3Error, (Stream[F, APath], Option[ContinuationToken])] =
listObjects(client, bucket, dir, next, sign)
listObjects(client, bucket, dir, next)
.flatMap(extractList(_).toEitherT)
.map(_.leftMap(Stream.emits(_)))

private def toPathSegment[F[_]](s: Stream[F, APath], dir: APath): Stream[F, PathSegment] =
s.filter(path => Path.parentDir(path) === pathToDir(dir))
.filter(path => path =!= dir)
s.filter(Path.parentDir(_) === pathToDir(dir))
.filter(_ =!= dir)
.flatMap(p => Stream.emits(Path.peel(p).toList))
.map(_._2)

private def listObjects[F[_]: Effect](
client: Client[F],
bucket: Uri,
dir: APath,
next: Option[ContinuationToken],
sign: Request[F] => F[Request[F]])
next: Option[ContinuationToken])
: EitherT[F, S3Error, Elem] =
EitherT(sign(listingRequest(client, bucket, dir, next)).flatMap { r =>
Sync[F].recover[Either[S3Error, Elem]](client.expect[Elem](r)(utf8Xml).map(_.asRight)) {
case UnexpectedStatus(Status.Forbidden) => S3Error.Forbidden.asLeft
case MalformedMessageBodyFailure(_, _) => S3Error.MalformedResponse.asLeft
}
EitherT(Sync[F].recover[Either[S3Error, Elem]](
client.expect(listingRequest(client, bucket, dir, next))(utf8Xml).map(_.asRight)) {
case UnexpectedStatus(Status.Forbidden) =>
S3Error.Forbidden.asLeft[Elem]
case UnexpectedStatus(Status.MovedPermanently) =>
S3Error.UnexpectedResponse(Status.MovedPermanently.reason).asLeft[Elem]
case MalformedMessageBodyFailure(_, _) =>
S3Error.MalformedResponse.asLeft[Elem]
})

private def listingRequest[F[_]](
Expand All @@ -152,7 +139,7 @@ object children {
val ct0 = ct.map(_.value).map(("continuation-token", _))

val q = Query.fromString(s3EncodeQueryParams(
List(delimiter, listType, prefix, ct0).unite.toMap))
List(listType, delimiter, prefix, ct0).unite.toMap))

Request[F](uri = listingQuery.copy(query = q))
}
Expand Down
10 changes: 3 additions & 7 deletions datasource/src/main/scala/quasar/physical/s3/impl/evaluate.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,8 @@ import shims._

object evaluate {

def apply[F[_]: Effect](
client: Client[F],
uri: Uri,
file: AFile,
sign: Request[F] => F[Request[F]])
(implicit MR: MonadResourceErr[F])
def apply[F[_]: Effect](client: Client[F], uri: Uri, file: AFile)
(implicit MR: MonadResourceErr[F])
: F[Stream[F, Byte]] = {
// Convert the pathy Path to a POSIX path, dropping
// the first slash, which is what S3 expects for object paths
Expand All @@ -48,7 +44,7 @@ object evaluate {
val queryUri = appendPathS3Encoded(uri, objectPath)
val request = Request[F](uri = queryUri)

sign(request) >>= (streamRequest[F, Byte](client, _, file)(_.body))
streamRequest[F, Byte](client, request, file)(_.body)
}

////
Expand Down
Loading

0 comments on commit b193bef

Please sign in to comment.