Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remote scheme provider #53

Merged
merged 8 commits into from
Aug 29, 2017
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions core/src/main/scala/schema/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ package object schema {
case class SchemaDefinitionProviderError(msg: String, maybeCause: Option[Throwable] = None)
extends SchemaError(msg, maybeCause)

object SchemaDefinitionProviderError {
def apply(e: Throwable): SchemaDefinitionProviderError =
SchemaDefinitionProviderError(e.getMessage, Some(e))
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you could add a type alias for Either[SchemaDefinitionProviderError, ?] here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely

case class SchemaValidatorError(msg: String, maybeCause: Option[Throwable] = None)
extends SchemaError(msg, maybeCause)

Expand Down
91 changes: 91 additions & 0 deletions core/src/main/scala/schema/provider/MetadataSchemaProvider.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright 2017 47 Degrees, LLC. <http://www.47deg.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package freestyle.cassandra
package schema.provider

import cats.implicits._
import cats.~>
import com.datastax.driver.core._
import freestyle._
import freestyle.FreeS
import freestyle.cassandra.schema.provider.metadata.SchemaConversions
import freestyle.cassandra.schema.{SchemaDefinition, SchemaDefinitionProviderError}

import scala.collection.JavaConverters._
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.language.postfixOps

class MetadataSchemaProvider(cluster: Cluster)
extends SchemaDefinitionProvider
with SchemaConversions {

def extractTables(keyspaceMetadata: KeyspaceMetadata): List[AbstractTableMetadata] =
keyspaceMetadata.getTables.asScala.toList

def extractIndexes(tableMetadataList: List[AbstractTableMetadata]): List[IndexMetadata] =
tableMetadataList.flatMap {
case (t: TableMetadata) => t.getIndexes.asScala.toList
case _ => Nil
}

def extractUserTypes(keyspaceMetadata: KeyspaceMetadata): List[UserType] =
keyspaceMetadata.getUserTypes.asScala.toList

override def schemaDefinition: Either[SchemaDefinitionProviderError, SchemaDefinition] = {

import scala.concurrent.ExecutionContext.Implicits.global

import freestyle.async.implicits._
import freestyle.cassandra.api._
import freestyle.cassandra.handlers.implicits._

implicit val clusterAPIInterpreter: ClusterAPI.Op ~> Future =
clusterAPIHandler[Future] andThen apiInterpreter[Future, Cluster](cluster)

def metadataF[F[_]](implicit clusterAPI: ClusterAPI[F]): FreeS[F, Metadata] =
for {
_ <- clusterAPI.connect
metadata <- clusterAPI.metadata
_ <- clusterAPI.close
} yield metadata
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could write this as

  • clusterAPI.connect >> clusterAPI.metadata << clusterAPI.close or
  • clusterAPI.connect *> clusterAPI.metadata <* clusterAPI.close.

Where the first one is a bit closer to what you wrote (as it is defined in flatMap syntax) while the second one (defined in cartesian syntax) is probably used more often. In this case there is no real difference.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, I have a doubt here. I was trying to add a recover in the clusterAPI.metadata call, because we want to call to clusterAPI.close even when the previous call fails. What's the best way to achieve this from your point of view? Thanks

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is this open PR in Cats : typelevel/cats#1662 (issue typelevel/cats#1630).

Doobie uses something similar in its Transactor. ConnectionIO can contain side effects though (and can thus define a MonadError instance), and that is not something you can do with every Free / FreeS.

You can use the guarantee implementation from the PR linked to above after you interpret to an M with a MonadError[M, Throwable]
It is possible we can do something like they do in quasar to create a MonadError for FreeS if your algebras contain the Error effect, but that would need some more thought / work.


Either.catchNonFatal {
Await.result(metadataF[ClusterAPI.Op].interpret[Future], 10.seconds)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see that schemaDefinition needs to return an Either[...] at the moment, I am not sure if we can change that somehow.

Currently it is unfortunate twice: first we immediately have to interpret a FreeS and then we we have to block on a Future. It doesn't appear to be trivial to change this though ...

If there is no way we can do that, I would put the Await on the end.

val fut: Future[SchemaDefinitionProviderError, SchemaDefinition] =
  metadataF[ClusterAPI.Op].interpret[Future].attempt.map {
    _.leftMap (SchemaDefinitionProviderError(_)) flatMap { metadata =>
      // ...
    }
  }
Await.result(fut, 10.seconds) // :(

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, clearly the worst part of the PR :(

I created a ticket for changing the return type to M[_] (#52)

} leftMap (SchemaDefinitionProviderError(_)) flatMap { metadata =>
val keyspaceList: List[KeyspaceMetadata] = metadata.getKeyspaces.asScala.toList
val tableList: List[AbstractTableMetadata] = keyspaceList.flatMap(extractTables)
val indexList: List[IndexMetadata] = extractIndexes(tableList)
val userTypeList: List[UserType] = keyspaceList.flatMap(extractUserTypes)

for {
keyspaces <- keyspaceList.traverse(toCreateKeyspace)
tables <- tableList.traverse(toCreateTable)
indexes <- indexList.traverse(toCreateIndex(_))
userTypes <- userTypeList.traverse(toUserType)
} yield keyspaces ++ tables ++ indexes ++ userTypes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could also use |+| here (Either s Semigroup uses the Semigroup of the right side, and for Semigroup[List].combine is ::: / ++).

keyspaceList.traverse(toCreateKeyspace) |+|
tableList.traverse(toCreateTable) |+|
indexList.traverse(toCreateIndex(_)) |+|
userTypeList.traverse(toUserType)


}
}
}

object MetadataSchemaProvider {

implicit def metadataSchemaProvider(implicit cluster: Cluster): SchemaDefinitionProvider =
new MetadataSchemaProvider(cluster)

}
36 changes: 28 additions & 8 deletions core/src/main/scala/schema/provider/TroySchemaProvider.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,38 @@
package freestyle.cassandra
package schema.provider

import java.io.InputStream

import cats.syntax.either._
import freestyle.cassandra.schema._
import troy.cql.ast.CqlParser

class TroySchemaProvider(cql: String) extends SchemaDefinitionProvider {
class TroySchemaProvider(cqlF: => Either[SchemaDefinitionProviderError, String])
extends SchemaDefinitionProvider {

override def schemaDefinition: Either[SchemaDefinitionProviderError, SchemaDefinition] =
CqlParser.parseSchema(cql) match {
case CqlParser.Success(res, _) => Right(res)
case CqlParser.Failure(msg, next) =>
Left(
SchemaDefinitionProviderError(
s"Parse Failure: $msg, line = ${next.pos.line}, column = ${next.pos.column}"))
case CqlParser.Error(msg, _) => Left(SchemaDefinitionProviderError(msg))
cqlF.flatMap { cql =>
CqlParser.parseSchema(cql) match {
case CqlParser.Success(res, _) => Right(res)
case CqlParser.Failure(msg, next) =>
Left(
SchemaDefinitionProviderError(
s"Parse Failure: $msg, line = ${next.pos.line}, column = ${next.pos.column}"))
case CqlParser.Error(msg, _) => Left(SchemaDefinitionProviderError(msg))
}
}
}

object TroySchemaProvider {

def apply(cql: String): TroySchemaProvider = new TroySchemaProvider(Right(cql))

def apply(is: InputStream): TroySchemaProvider = new TroySchemaProvider(
Either.catchNonFatal {
scala.io.Source.fromInputStream(is).mkString
} leftMap { e =>
SchemaDefinitionProviderError(e.getMessage, Some(e))
}
)

}
209 changes: 209 additions & 0 deletions core/src/main/scala/schema/provider/metadata/SchemaConversions.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
/*
* Copyright 2017 47 Degrees, LLC. <http://www.47deg.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package freestyle.cassandra
package schema.provider.metadata

import cats.implicits._
import com.datastax.driver.core.{
AbstractTableMetadata,
ColumnMetadata,
IndexMetadata,
KeyspaceMetadata,
TupleType,
UserType,
DataType => DatastaxDataType
}
import freestyle.cassandra.schema.SchemaDefinitionProviderError
import troy.cql.ast._
import troy.cql.ast.ddl.Keyspace.Replication
import troy.cql.ast.ddl.Table.PrimaryKey
import troy.cql.ast.ddl.{Field, Index, Table}

import scala.collection.JavaConverters._
import scala.language.postfixOps

trait SchemaConversions {

def toCreateKeyspace(
keyspaceMetadata: KeyspaceMetadata): Either[SchemaDefinitionProviderError, CreateKeyspace] =
Either.catchNonFatal {
val name: String = Option(keyspaceMetadata.getName)
.getOrElse(throw new NullPointerException("Schema name is null"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably better to use an IllegalArgumentException in this case

val replication: Option[Replication] = Option(keyspaceMetadata.getReplication)
.flatMap { m =>
val seq = m.asScala.toSeq
if (seq.isEmpty) None else Option(Replication(seq.sortBy(_._1)))
}
CreateKeyspace(
ifNotExists = false,
keyspaceName = KeyspaceName(name),
properties = replication map (Seq(_)) getOrElse Seq.empty)
} leftMap (SchemaDefinitionProviderError(_))

def toCreateTable(
metadata: AbstractTableMetadata): Either[SchemaDefinitionProviderError, CreateTable] =
Either.catchNonFatal {
for {
columns <- metadata.getColumns.asScala.toList.traverse(toTableColumn)
primaryKey <- toPrimaryKey(
metadata.getPartitionKey.asScala.toList,
metadata.getClusteringColumns.asScala.toList)
} yield
CreateTable(
ifNotExists = false,
tableName = TableName(Some(KeyspaceName(metadata.getKeyspace.getName)), metadata.getName),
columns = columns,
primaryKey = Some(primaryKey),
options = Seq.empty
)
} leftMap (SchemaDefinitionProviderError(_)) joinRight

def readTable(metadata: IndexMetadata): TableName =
TableName(Some(KeyspaceName(metadata.getTable.getKeyspace.getName)), metadata.getTable.getName)

def toCreateIndex(
metadata: IndexMetadata,
readTable: (IndexMetadata) => TableName = readTable): Either[
SchemaDefinitionProviderError,
CreateIndex] =
Either.catchNonFatal {
CreateIndex(
isCustom = metadata.isCustomIndex,
ifNotExists = false,
indexName = Option(metadata.getName),
tableName = readTable(metadata),
identifier = Index.Identifier(metadata.getTarget),
using =
if (metadata.isCustomIndex)
// The options are not visible in the IndexMetadata class
Some(Index.Using(metadata.getIndexClassName, None))
else None
)
} leftMap (SchemaDefinitionProviderError(_))

def toUserType(userType: UserType): Either[SchemaDefinitionProviderError, CreateType] =
Either.catchNonFatal {
userType.getFieldNames.asScala.toList.traverse { fieldName =>
toField(fieldName, userType.getFieldType(fieldName))
} map { list =>
CreateType(
ifNotExists = false,
typeName = TypeName(Some(KeyspaceName(userType.getKeyspace)), userType.getTypeName),
fields = list)
}
} leftMap (SchemaDefinitionProviderError(_)) joinRight

private[this] def toField(
name: String,
datastaxDataType: DatastaxDataType): Either[SchemaDefinitionProviderError, Field] =
toDataType(datastaxDataType) map { dataType =>
Field(name, dataType)
}

private[this] def toTableColumn(
metadata: ColumnMetadata): Either[SchemaDefinitionProviderError, Table.Column] =
toDataType(metadata.getType).map { dataType =>
Table.Column(
name = metadata.getName,
dataType = dataType,
isStatic = metadata.isStatic,
isPrimaryKey = false)
}

private[this] def toDataType(
dataType: DatastaxDataType): Either[SchemaDefinitionProviderError, DataType] = {

import DatastaxDataType._

def toDataTypeNative(
dataType: DatastaxDataType): Either[SchemaDefinitionProviderError, DataType.Native] =
dataType.getName match {
case Name.ASCII => DataType.Ascii.asRight
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is asRight defined? as suually denotes an internal cast and if this is is just lifting the value into Either.Right we should use toRight or the right() syntax from cats.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

asRight is the new syntax in Cats (see PR typelevel/cats#1454).

asRight and asLeft where chosen so they can't be confused with the .right and .left methods on Either for RightProjection and LeftProjection.

case Name.BIGINT => DataType.BigInt.asRight
case Name.BLOB => DataType.Blob.asRight
case Name.BOOLEAN => DataType.Boolean.asRight
case Name.COUNTER => DataType.Counter.asRight
case Name.DATE => DataType.Date.asRight
case Name.DECIMAL => DataType.Decimal.asRight
case Name.DOUBLE => DataType.Double.asRight
case Name.FLOAT => DataType.Float.asRight
case Name.INET => DataType.Inet.asRight
case Name.INT => DataType.Int.asRight
case Name.SMALLINT => DataType.Smallint.asRight
case Name.TEXT => DataType.Text.asRight
case Name.TIME => DataType.Time.asRight
case Name.TIMESTAMP => DataType.Timestamp.asRight
case Name.TIMEUUID => DataType.Timeuuid.asRight
case Name.TINYINT => DataType.Tinyint.asRight
case Name.UUID => DataType.Uuid.asRight
case Name.VARCHAR => DataType.Varchar.asRight
case Name.VARINT => DataType.Varint.asRight
case _ =>
Left(SchemaDefinitionProviderError(s"Native DataType ${dataType.getName} not supported"))
}

def toCollectionType(
collectionType: CollectionType): Either[SchemaDefinitionProviderError, DataType] = {

val typeArgs: List[DatastaxDataType] = collectionType.getTypeArguments.asScala.toList

val maybeCol = collectionType.getName match {
case Name.LIST =>
typeArgs.headOption map { typeArg =>
toDataTypeNative(typeArg) map DataType.List
}
case Name.SET =>
typeArgs.headOption map { typeArg =>
toDataTypeNative(typeArg) map DataType.Set
}
case Name.MAP =>
typeArgs.headOption.flatMap(t1 => typeArgs.tail.headOption.map(t2 => (t1, t2))) map {
tupleArgs =>
for {
dataNative1 <- toDataTypeNative(tupleArgs._1)
dataNative2 <- toDataTypeNative(tupleArgs._2)
} yield DataType.Map(dataNative1, dataNative2)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe :

for {
  t1 <- typeArgs.headOption
  t2 <- typeArgs.tail.headOption
} yield (toDataTypeNative(t1) |@| toDataTypeNative(t2)).map(DataType.Map)

}
case _ => None
}

maybeCol getOrElse {
Left(
SchemaDefinitionProviderError(
s"Error parsing collection DataType '${collectionType.asFunctionParameterString()}'"))
}
}

def toTupleType(tupleType: TupleType): Either[SchemaDefinitionProviderError, DataType] =
tupleType.getComponentTypes.asScala.toList traverse toDataTypeNative map DataType.Tuple

dataType match {
case nativeType: NativeType => toDataTypeNative(nativeType)
case customType: CustomType => Right(DataType.Custom(customType.getCustomTypeClassName))
case collectionType: CollectionType => toCollectionType(collectionType)
case tupleType: TupleType => toTupleType(tupleType)
case userType: UserType =>
Right(DataType.UserDefined(KeyspaceName(userType.getKeyspace), userType.getTypeName))
}
}

private[this] def toPrimaryKey(
partitionKeys: List[ColumnMetadata],
clusteringColumns: List[ColumnMetadata]): Either[SchemaDefinitionProviderError, PrimaryKey] =
PrimaryKey(partitionKeys.map(_.getName), clusteringColumns.map(_.getName)).asRight

}
Loading