Skip to content

Commit

Permalink
4.0.3: Update quasar to 68.2.0
Browse files Browse the repository at this point in the history
(Merge branch 'djspiewak-update-quasar')

This is an extension of #166.
  • Loading branch information
jsantos17 committed Sep 15, 2018
2 parents 9e37ac8 + fc1302a commit 1501621
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 97 deletions.
28 changes: 18 additions & 10 deletions datasource/src/main/scala/quasar/physical/s3/S3DataSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@

package quasar.physical.s3

import quasar.api.QueryEvaluator
import quasar.api.datasource.DatasourceType
import quasar.api.resource.ResourcePath.{Leaf, Root}
import quasar.api.resource.{ResourceName, ResourcePath, ResourcePathType}
import quasar.common.data.Data
import quasar.connector.MonadResourceErr
import quasar.connector.datasource.LightweightDatasource
import quasar.contrib.pathy.APath
Expand All @@ -33,28 +33,36 @@ import cats.effect.{Effect, Timer}
import cats.syntax.flatMap._
import cats.syntax.option._
import fs2.Stream
import jawn.Facade
import org.http4s.{Request, Header, Headers}
import org.http4s.client.Client
import pathy.Path
import pathy.Path.{DirName, FileName}
import qdata.QDataEncode
import qdata.json.QDataFacade
import scalaz.syntax.applicative._
import scalaz.{\/-, -\/, OptionT}
import shims._

final class S3DataSource[F[_]: Effect: Timer: MonadResourceErr](
client: Client[F],
config: S3Config)
extends LightweightDatasource[F, Stream[F, ?], Stream[F, Data]] {
extends LightweightDatasource[F, Stream[F, ?]] {
def kind: DatasourceType = s3.datasourceKind

def evaluate(path: ResourcePath): F[Stream[F, Data]] =
path match {
case Root =>
Stream.empty.covaryAll[F, Data].pure[F]
case Leaf(file) =>
impl.evaluate[F](config.parsing, client, config.bucket, file, signRequest(config)) map {
case None => Stream.empty
case Some(s) => s
def evaluator[R: QDataEncode]: QueryEvaluator[F, ResourcePath, Stream[F, R]] =
new QueryEvaluator[F, ResourcePath, Stream[F, R]] {
implicit val facade: Facade[R] = QDataFacade.qdata[R]

def evaluate(path: ResourcePath): F[Stream[F, R]] =
path match {
case Root =>
Stream.empty.covaryAll[F, R].pure[F]
case Leaf(file) =>
impl.evaluate[F, R](config.parsing, client, config.bucket, file, signRequest(config)) map {
case None => Stream.empty
case Some(s) => s
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import quasar.api.datasource.DatasourceError.{
}
import quasar.api.datasource.DatasourceType
import quasar.api.resource.ResourcePath
import quasar.common.data.Data
import quasar.connector.Datasource
import quasar.connector.LightweightDatasourceModule
import quasar.connector.MonadResourceErr
Expand All @@ -46,12 +45,12 @@ object S3DataSourceModule extends LightweightDatasourceModule {
def kind: DatasourceType = s3.datasourceKind

def lightweightDatasource[F[_]: ConcurrentEffect: MonadResourceErr: Timer](config: Json)
: F[InitializationError[Json] \/ Disposable[F, Datasource[F, Stream[F, ?], ResourcePath, Stream[F, Data]]]] = {
: F[InitializationError[Json] \/ Disposable[F, Datasource[F, Stream[F, ?], ResourcePath]]] = {
config.as[S3Config].result match {
case Right(s3Config) => {
Http1Client[F]() flatMap { client =>
val s3Ds = new S3DataSource[F](client, s3Config)
val ds: Datasource[F, Stream[F, ?], ResourcePath, Stream[F, Data]] = s3Ds
val ds: Datasource[F, Stream[F, ?], ResourcePath] = s3Ds

s3Ds.isLive.ifM({
val disposable = Disposable(ds, client.shutdown)
Expand All @@ -62,14 +61,14 @@ object S3DataSourceModule extends LightweightDatasourceModule {
val err: InitializationError[Json] =
InvalidConfiguration(s3.datasourceKind, config, NonEmptyList(msg))

err.left[Disposable[F, Datasource[F, Stream[F, ?], ResourcePath, Stream[F, Data]]]].point[F]
err.left[Disposable[F, Datasource[F, Stream[F, ?], ResourcePath]]].point[F]
})
}
}

case Left((msg, _)) =>
(MalformedConfiguration(kind, config, msg): InitializationError[Json])
.left[Disposable[F, Datasource[F, Stream[F, ?], ResourcePath, Stream[F, Data]]]].point[F]
.left[Disposable[F, Datasource[F, Stream[F, ?], ResourcePath]]].point[F]
}
}

Expand Down
138 changes: 60 additions & 78 deletions datasource/src/main/scala/quasar/physical/s3/impl/evaluate.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@
package quasar.physical.s3
package impl

import quasar.common.data.Data
import quasar.api.resource.ResourcePath
import quasar.connector.{MonadResourceErr, ResourceError}
import quasar.contrib.pathy._
import quasar.physical.s3.S3JsonParsing
import quasar.connector.{MonadResourceErr, ResourceError}
import quasar.api.resource.ResourcePath

import slamdata.Predef._

Expand All @@ -30,34 +29,69 @@ import cats.effect.{Effect, Timer, Sync}
import cats.syntax.applicative._
import cats.syntax.flatMap._
import cats.syntax.option._

import fs2.{Pipe, Stream}
import io.circe.{Json, ParsingFailure}
import io.circe.fs2.{byteArrayParser, byteStreamParser}
import jawn.{Facade, ParseException}
import jawnfs2._
import org.http4s.client._
import org.http4s.{Request, Response, Status, Uri}
import pathy.Path
import shims._

object evaluate {
// circe's streaming parser, which we select based on the
// passed S3JsonParsing
private def circePipe[F[_]](jsonParsing: S3JsonParsing): Pipe[F, Byte, Json] = jsonParsing match {
case S3JsonParsing.JsonArray => byteArrayParser[F]
case S3JsonParsing.LineDelimited => byteStreamParser[F]

def apply[F[_]: Effect: Timer, R: Facade](
jsonParsing: S3JsonParsing,
client: Client[F],
uri: Uri,
file: AFile,
sign: Request[F] => F[Request[F]])
(implicit MR: MonadResourceErr[F])
: F[Option[Stream[F, R]]] = {
// Convert the pathy Path to a POSIX path, dropping
// the first slash, which is what S3 expects for object paths
val objectPath = Path.posixCodec.printPath(file).drop(1)

// Put the object path after the bucket URI
val queryUri = appendPathUnencoded(uri, objectPath)
val request = Request[F](uri = queryUri)

sign(request) >>= { req =>
val stream = OptionT(
streamRequest[F, R](client, req) { resp =>
resp.body.chunks.map(_.toByteBuffer).through(parse(jsonParsing))
})

stream.map(_.handleErrorWith {
case ParseException(message, _, _, _) =>
Stream.eval(MR.raiseError(parseError(file, jsonParsing, message)))
}).value
}
}

// as it says on the tin, converts circe's JSON type to
// quasar's Data type
@SuppressWarnings(Array("org.wartremover.warts.Recursion"))
private def circeJsonToData(json: Json): Data = {
json.fold(
Data.Null,
Data.Bool,
n => Data.Dec(n.toBigDecimal.getOrElse(n.toDouble)),
Data.Str,
js => Data.Arr(js.map(circeJsonToData)(scala.collection.breakOut)),
js => Data.Obj(ListMap(js.toList.map { case (k, v) => k -> circeJsonToData(v) }: _*))
)
////

private def parse[F[_], R: Facade](jsonParsing: S3JsonParsing)
: Pipe[F, ByteBuffer, R] =
jsonParsing match {
case S3JsonParsing.JsonArray => unwrapJsonArray[F, ByteBuffer, R]
case S3JsonParsing.LineDelimited => parseJsonStream[F, ByteBuffer, R]
}

private def parseError(path: AFile, parsing: S3JsonParsing, message: String)
: ResourceError = {
val msg: String =
s"Could not parse the file as JSON. Ensure you've configured the correct jsonParsing option for this bucket: $message"

val expectedFormat: String = parsing match {
case S3JsonParsing.LineDelimited => "Newline-delimited JSON"
case S3JsonParsing.JsonArray => "Array-wrapped JSON"
}

ResourceError.malformedResource(
ResourcePath.Leaf(path),
expectedFormat,
msg)
}

// there is no method in http4s 0.16.6a that does what we
Expand All @@ -67,7 +101,10 @@ object evaluate {
// if it's `Some(resp)` we compute an fs2 stream from
// it using `f` and then call `dispose` on that response
// once we've finished streaming.
private def streamRequestThroughFs2[F[_]: Sync, A](client: Client[F], req: Request[F])(f: Response[F] => Stream[F, A]): F[Option[Stream[F, A]]] = {
private def streamRequest[F[_]: Sync, A](
client: Client[F], req: Request[F])(
f: Response[F] => Stream[F, A])
: F[Option[Stream[F, A]]] =
client.open(req).flatMap {
case DisposableResponse(response, dispose) =>
response.status match {
Expand All @@ -76,59 +113,4 @@ object evaluate {
case s => Sync[F].raiseError(new Exception(s"Unexpected status $s"))
}
}
}

private def noParseError(path: AFile, parsing: S3JsonParsing, message: String): ResourceError = {
val noParseMsg = s"Could not parse the file as JSON. Ensure you've configured the correct jsonParsing option for this bucket: $message"
val expectedFormat = parsing match {
case S3JsonParsing.LineDelimited => "Newline-delimited JSON"
case S3JsonParsing.JsonArray => "Array-wrapped JSON"
}

ResourceError.malformedResource(
ResourcePath.Leaf(path),
expectedFormat,
noParseMsg)
}

// putting it all together.
def apply[F[_]: Effect: Timer](
jsonParsing: S3JsonParsing,
client: Client[F],
uri: Uri,
file: AFile,
sign: Request[F] => F[Request[F]])
(implicit MR: MonadResourceErr[F])
: F[Option[Stream[F, Data]]] = {
// convert the pathy Path to a POSIX path, dropping
// the first slash, like S3 expects for object paths
val objectPath = Path.posixCodec.printPath(file).drop(1)

// Put the object path after the bucket URI
val queryUri = appendPathUnencoded(uri, objectPath)
val request = Request[F](uri = queryUri)

// figure out how we're going to parse the object as JSON
val circeJsonPipe = circePipe[F](jsonParsing)

sign(request) >>= { r =>
val stream =
OptionT(streamRequestThroughFs2[F, Data](client, r) { resp =>
// convert the data to JSON, using the parsing method
// of our choice
val asJson: Stream[F, Json] = resp.body.through(circeJsonPipe)

// convert the JSON from circe's representation to ours
val asData: Stream[F, Data] = asJson.map(circeJsonToData)

// and we're done.
asData
})

stream.map(_.handleErrorWith {
case ParsingFailure(message, _) =>
Stream.eval(MR.raiseError(noParseError(file, jsonParsing, message)))
}).value
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package quasar.physical.s3
import slamdata.Predef._

import quasar.api.resource.{ResourceName, ResourcePath, ResourcePathType}
import quasar.common.data.Data
import quasar.connector.DatasourceSpec
import quasar.connector.ResourceError
import quasar.contrib.scalaz.MonadError_
Expand Down Expand Up @@ -88,8 +89,8 @@ class S3DataSourceSpec extends DatasourceSpec[IO, Stream[IO, ?]] {
}

"read line-delimited and array JSON" >>* {
val ld = datasourceLD.evaluate(ResourcePath.root() / ResourceName("testData") / ResourceName("lines.json"))
val array = datasource.evaluate(ResourcePath.root() / ResourceName("testData") / ResourceName("array.json"))
val ld = datasourceLD.evaluator[Data].evaluate(ResourcePath.root() / ResourceName("testData") / ResourceName("lines.json"))
val array = datasource.evaluator[Data].evaluate(ResourcePath.root() / ResourceName("testData") / ResourceName("array.json"))

(ld |@| array).tupled.flatMap {
case (readLD, readArray) => {
Expand Down
4 changes: 4 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@ object Dependencies {
private val circeJawnVersion = "0.9.3"
private val fs2Version = "0.10.5"
private val quasarVersion = IO.read(file("./quasar-version")).trim
private val qdataVersion = IO.read(file("./qdata-version")).trim
private val jawnFs2Version = "0.12.2"
private val shimsVersion = "1.2.1"
private val specsVersion = "4.1.2"

// http4s-blaze-client's version has to be in sync with
// quasar's http4s version. The same goes for any
// dependencies, transitive or otherwise.
def datasourceCore = Seq(
"org.http4s" %% "jawn-fs2" % jawnFs2Version,
"com.slamdata" %% "qdata-json" % qdataVersion,
"org.http4s" %% "http4s-scala-xml" % http4sVersion,
"org.http4s" %% "http4s-blaze-client" % http4sVersion,
"org.scala-lang.modules" %% "scala-xml" % scalaXmlVersion,
Expand Down
1 change: 1 addition & 0 deletions qdata-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
2.1.0
2 changes: 1 addition & 1 deletion quasar-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
65.1.0
68.2.0
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version in ThisBuild := "4.0.2"
version in ThisBuild := "4.0.3"

0 comments on commit 1501621

Please sign in to comment.