Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to quasar 68.1.0 #166

Merged
merged 2 commits into from
Sep 15, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 18 additions & 10 deletions datasource/src/main/scala/quasar/physical/s3/S3DataSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@

package quasar.physical.s3

import quasar.api.QueryEvaluator
import quasar.api.datasource.DatasourceType
import quasar.api.resource.ResourcePath.{Leaf, Root}
import quasar.api.resource.{ResourceName, ResourcePath, ResourcePathType}
import quasar.common.data.Data
import quasar.connector.MonadResourceErr
import quasar.connector.datasource.LightweightDatasource
import quasar.contrib.pathy.APath
Expand All @@ -33,28 +33,36 @@ import cats.effect.{Effect, Timer}
import cats.syntax.flatMap._
import cats.syntax.option._
import fs2.Stream
import jawn.Facade
import org.http4s.{Request, Header, Headers}
import org.http4s.client.Client
import pathy.Path
import pathy.Path.{DirName, FileName}
import qdata.QDataEncode
import qdata.json.QDataFacade
import scalaz.syntax.applicative._
import scalaz.{\/-, -\/, OptionT}
import shims._

final class S3DataSource[F[_]: Effect: Timer: MonadResourceErr](
client: Client[F],
config: S3Config)
extends LightweightDatasource[F, Stream[F, ?], Stream[F, Data]] {
extends LightweightDatasource[F, Stream[F, ?]] {
def kind: DatasourceType = s3.datasourceKind

def evaluate(path: ResourcePath): F[Stream[F, Data]] =
path match {
case Root =>
Stream.empty.covaryAll[F, Data].pure[F]
case Leaf(file) =>
impl.evaluate[F](config.parsing, client, config.bucket, file, signRequest(config)) map {
case None => Stream.empty
case Some(s) => s
def evaluator[R: QDataEncode]: QueryEvaluator[F, ResourcePath, Stream[F, R]] =
new QueryEvaluator[F, ResourcePath, Stream[F, R]] {
implicit val facade: Facade[R] = QDataFacade.qdata[R]

def evaluate(path: ResourcePath): F[Stream[F, R]] =
path match {
case Root =>
Stream.empty.covaryAll[F, R].pure[F]
case Leaf(file) =>
impl.evaluate[F, R](config.parsing, client, config.bucket, file, signRequest(config)) map {
case None => Stream.empty
case Some(s) => s
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import quasar.api.datasource.DatasourceError.{
}
import quasar.api.datasource.DatasourceType
import quasar.api.resource.ResourcePath
import quasar.common.data.Data
import quasar.connector.Datasource
import quasar.connector.LightweightDatasourceModule
import quasar.connector.MonadResourceErr
Expand All @@ -46,12 +45,12 @@ object S3DataSourceModule extends LightweightDatasourceModule {
def kind: DatasourceType = s3.datasourceKind

def lightweightDatasource[F[_]: ConcurrentEffect: MonadResourceErr: Timer](config: Json)
: F[InitializationError[Json] \/ Disposable[F, Datasource[F, Stream[F, ?], ResourcePath, Stream[F, Data]]]] = {
: F[InitializationError[Json] \/ Disposable[F, Datasource[F, Stream[F, ?], ResourcePath]]] = {
config.as[S3Config].result match {
case Right(s3Config) => {
Http1Client[F]() flatMap { client =>
val s3Ds = new S3DataSource[F](client, s3Config)
val ds: Datasource[F, Stream[F, ?], ResourcePath, Stream[F, Data]] = s3Ds
val ds: Datasource[F, Stream[F, ?], ResourcePath] = s3Ds

s3Ds.isLive.ifM({
val disposable = Disposable(ds, client.shutdown)
Expand All @@ -62,14 +61,14 @@ object S3DataSourceModule extends LightweightDatasourceModule {
val err: InitializationError[Json] =
InvalidConfiguration(s3.datasourceKind, config, NonEmptyList(msg))

err.left[Disposable[F, Datasource[F, Stream[F, ?], ResourcePath, Stream[F, Data]]]].point[F]
err.left[Disposable[F, Datasource[F, Stream[F, ?], ResourcePath]]].point[F]
})
}
}

case Left((msg, _)) =>
(MalformedConfiguration(kind, config, msg): InitializationError[Json])
.left[Disposable[F, Datasource[F, Stream[F, ?], ResourcePath, Stream[F, Data]]]].point[F]
.left[Disposable[F, Datasource[F, Stream[F, ?], ResourcePath]]].point[F]
}
}

Expand Down
138 changes: 60 additions & 78 deletions datasource/src/main/scala/quasar/physical/s3/impl/evaluate.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@
package quasar.physical.s3
package impl

import quasar.common.data.Data
import quasar.api.resource.ResourcePath
import quasar.connector.{MonadResourceErr, ResourceError}
import quasar.contrib.pathy._
import quasar.physical.s3.S3JsonParsing
import quasar.connector.{MonadResourceErr, ResourceError}
import quasar.api.resource.ResourcePath

import slamdata.Predef._

Expand All @@ -30,34 +29,69 @@ import cats.effect.{Effect, Timer, Sync}
import cats.syntax.applicative._
import cats.syntax.flatMap._
import cats.syntax.option._

import fs2.{Pipe, Stream}
import io.circe.{Json, ParsingFailure}
import io.circe.fs2.{byteArrayParser, byteStreamParser}
import jawn.{Facade, ParseException}
import jawnfs2._
import org.http4s.client._
import org.http4s.{Request, Response, Status, Uri}
import pathy.Path
import shims._

object evaluate {
// circe's streaming parser, which we select based on the
// passed S3JsonParsing
private def circePipe[F[_]](jsonParsing: S3JsonParsing): Pipe[F, Byte, Json] = jsonParsing match {
case S3JsonParsing.JsonArray => byteArrayParser[F]
case S3JsonParsing.LineDelimited => byteStreamParser[F]

def apply[F[_]: Effect: Timer, R: Facade](
jsonParsing: S3JsonParsing,
client: Client[F],
uri: Uri,
file: AFile,
sign: Request[F] => F[Request[F]])
(implicit MR: MonadResourceErr[F])
: F[Option[Stream[F, R]]] = {
// Convert the pathy Path to a POSIX path, dropping
// the first slash, which is what S3 expects for object paths
val objectPath = Path.posixCodec.printPath(file).drop(1)

// Put the object path after the bucket URI
val queryUri = appendPathUnencoded(uri, objectPath)
val request = Request[F](uri = queryUri)

sign(request) >>= { req =>
val stream = OptionT(
streamRequest[F, R](client, req) { resp =>
resp.body.chunks.map(_.toByteBuffer).through(parse(jsonParsing))
})

stream.map(_.handleErrorWith {
case ParseException(message, _, _, _) =>
Stream.eval(MR.raiseError(parseError(file, jsonParsing, message)))
}).value
}
}

// as it says on the tin, converts circe's JSON type to
// quasar's Data type
@SuppressWarnings(Array("org.wartremover.warts.Recursion"))
private def circeJsonToData(json: Json): Data = {
json.fold(
Data.Null,
Data.Bool,
n => Data.Dec(n.toBigDecimal.getOrElse(n.toDouble)),
Data.Str,
js => Data.Arr(js.map(circeJsonToData)(scala.collection.breakOut)),
js => Data.Obj(ListMap(js.toList.map { case (k, v) => k -> circeJsonToData(v) }: _*))
)
////

private def parse[F[_], R: Facade](jsonParsing: S3JsonParsing)
: Pipe[F, ByteBuffer, R] =
jsonParsing match {
case S3JsonParsing.JsonArray => unwrapJsonArray[F, ByteBuffer, R]
case S3JsonParsing.LineDelimited => parseJsonStream[F, ByteBuffer, R]
}

private def parseError(path: AFile, parsing: S3JsonParsing, message: String)
: ResourceError = {
val msg: String =
s"Could not parse the file as JSON. Ensure you've configured the correct jsonParsing option for this bucket: $message"

val expectedFormat: String = parsing match {
case S3JsonParsing.LineDelimited => "Newline-delimited JSON"
case S3JsonParsing.JsonArray => "Array-wrapped JSON"
}

ResourceError.malformedResource(
ResourcePath.Leaf(path),
expectedFormat,
msg)
}

// there is no method in http4s 0.16.6a that does what we
Expand All @@ -67,7 +101,10 @@ object evaluate {
// if it's `Some(resp)` we compute an fs2 stream from
// it using `f` and then call `dispose` on that response
// once we've finished streaming.
private def streamRequestThroughFs2[F[_]: Sync, A](client: Client[F], req: Request[F])(f: Response[F] => Stream[F, A]): F[Option[Stream[F, A]]] = {
private def streamRequest[F[_]: Sync, A](
client: Client[F], req: Request[F])(
f: Response[F] => Stream[F, A])
: F[Option[Stream[F, A]]] =
client.open(req).flatMap {
case DisposableResponse(response, dispose) =>
response.status match {
Expand All @@ -76,59 +113,4 @@ object evaluate {
case s => Sync[F].raiseError(new Exception(s"Unexpected status $s"))
}
}
}

private def noParseError(path: AFile, parsing: S3JsonParsing, message: String): ResourceError = {
val noParseMsg = s"Could not parse the file as JSON. Ensure you've configured the correct jsonParsing option for this bucket: $message"
val expectedFormat = parsing match {
case S3JsonParsing.LineDelimited => "Newline-delimited JSON"
case S3JsonParsing.JsonArray => "Array-wrapped JSON"
}

ResourceError.malformedResource(
ResourcePath.Leaf(path),
expectedFormat,
noParseMsg)
}

// putting it all together.
def apply[F[_]: Effect: Timer](
jsonParsing: S3JsonParsing,
client: Client[F],
uri: Uri,
file: AFile,
sign: Request[F] => F[Request[F]])
(implicit MR: MonadResourceErr[F])
: F[Option[Stream[F, Data]]] = {
// convert the pathy Path to a POSIX path, dropping
// the first slash, like S3 expects for object paths
val objectPath = Path.posixCodec.printPath(file).drop(1)

// Put the object path after the bucket URI
val queryUri = appendPathUnencoded(uri, objectPath)
val request = Request[F](uri = queryUri)

// figure out how we're going to parse the object as JSON
val circeJsonPipe = circePipe[F](jsonParsing)

sign(request) >>= { r =>
val stream =
OptionT(streamRequestThroughFs2[F, Data](client, r) { resp =>
// convert the data to JSON, using the parsing method
// of our choice
val asJson: Stream[F, Json] = resp.body.through(circeJsonPipe)

// convert the JSON from circe's representation to ours
val asData: Stream[F, Data] = asJson.map(circeJsonToData)

// and we're done.
asData
})

stream.map(_.handleErrorWith {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to still handle errors like this. Let me see what jawnfs2 does with errors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reinstated it.

case ParsingFailure(message, _) =>
Stream.eval(MR.raiseError(noParseError(file, jsonParsing, message)))
}).value
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package quasar.physical.s3
import slamdata.Predef._

import quasar.api.resource.{ResourceName, ResourcePath, ResourcePathType}
import quasar.common.data.Data
import quasar.connector.DatasourceSpec
import quasar.connector.ResourceError
import quasar.contrib.scalaz.MonadError_
Expand Down Expand Up @@ -88,8 +89,8 @@ class S3DataSourceSpec extends DatasourceSpec[IO, Stream[IO, ?]] {
}

"read line-delimited and array JSON" >>* {
val ld = datasourceLD.evaluate(ResourcePath.root() / ResourceName("testData") / ResourceName("lines.json"))
val array = datasource.evaluate(ResourcePath.root() / ResourceName("testData") / ResourceName("array.json"))
val ld = datasourceLD.evaluator[Data].evaluate(ResourcePath.root() / ResourceName("testData") / ResourceName("lines.json"))
val array = datasource.evaluator[Data].evaluate(ResourcePath.root() / ResourceName("testData") / ResourceName("array.json"))

(ld |@| array).tupled.flatMap {
case (readLD, readArray) => {
Expand Down
4 changes: 4 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@ object Dependencies {
private val circeJawnVersion = "0.9.3"
private val fs2Version = "0.10.5"
private val quasarVersion = IO.read(file("./quasar-version")).trim
private val qdataVersion = IO.read(file("./qdata-version")).trim
private val jawnFs2Version = "0.12.2"
private val shimsVersion = "1.2.1"
private val specsVersion = "4.1.2"

// http4s-blaze-client's version has to be in sync with
// quasar's http4s version. The same goes for any
// dependencies, transitive or otherwise.
def datasourceCore = Seq(
"org.http4s" %% "jawn-fs2" % jawnFs2Version,
"com.slamdata" %% "qdata-json" % qdataVersion,
"org.http4s" %% "http4s-scala-xml" % http4sVersion,
"org.http4s" %% "http4s-blaze-client" % http4sVersion,
"org.scala-lang.modules" %% "scala-xml" % scalaXmlVersion,
Expand Down
1 change: 1 addition & 0 deletions qdata-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
2.1.0
2 changes: 1 addition & 1 deletion quasar-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
65.1.0
68.1.0