-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remote scheme provider #53
Changes from 6 commits
3013a57
27beabe
5f9c81a
107daaf
7283bd3
9c95653
4d1ff8a
fa33b90
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
/* | ||
* Copyright 2017 47 Degrees, LLC. <http://www.47deg.com> | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package freestyle.cassandra | ||
package schema.provider | ||
|
||
import cats.implicits._ | ||
import cats.~> | ||
import com.datastax.driver.core._ | ||
import freestyle._ | ||
import freestyle.FreeS | ||
import freestyle.cassandra.schema.provider.metadata.SchemaConversions | ||
import freestyle.cassandra.schema.{SchemaDefinition, SchemaDefinitionProviderError} | ||
|
||
import scala.collection.JavaConverters._ | ||
import scala.concurrent.duration._ | ||
import scala.concurrent.{Await, Future} | ||
import scala.language.postfixOps | ||
|
||
class MetadataSchemaProvider(cluster: Cluster) | ||
extends SchemaDefinitionProvider | ||
with SchemaConversions { | ||
|
||
def extractTables(keyspaceMetadata: KeyspaceMetadata): List[AbstractTableMetadata] = | ||
keyspaceMetadata.getTables.asScala.toList | ||
|
||
def extractIndexes(tableMetadataList: List[AbstractTableMetadata]): List[IndexMetadata] = | ||
tableMetadataList.flatMap { | ||
case (t: TableMetadata) => t.getIndexes.asScala.toList | ||
case _ => Nil | ||
} | ||
|
||
def extractUserTypes(keyspaceMetadata: KeyspaceMetadata): List[UserType] = | ||
keyspaceMetadata.getUserTypes.asScala.toList | ||
|
||
override def schemaDefinition: Either[SchemaDefinitionProviderError, SchemaDefinition] = { | ||
|
||
import scala.concurrent.ExecutionContext.Implicits.global | ||
|
||
import freestyle.async.implicits._ | ||
import freestyle.cassandra.api._ | ||
import freestyle.cassandra.handlers.implicits._ | ||
|
||
implicit val clusterAPIInterpreter: ClusterAPI.Op ~> Future = | ||
clusterAPIHandler[Future] andThen apiInterpreter[Future, Cluster](cluster) | ||
|
||
def metadataF[F[_]](implicit clusterAPI: ClusterAPI[F]): FreeS[F, Metadata] = | ||
for { | ||
_ <- clusterAPI.connect | ||
metadata <- clusterAPI.metadata | ||
_ <- clusterAPI.close | ||
} yield metadata | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You could write this as
Where the first one is a bit closer to what you wrote (as it is defined in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Cool, I have a doubt here. I was trying to add a recover in the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is this open PR in Cats : typelevel/cats#1662 (issue typelevel/cats#1630). Doobie uses something similar in its You can use the |
||
|
||
Either.catchNonFatal { | ||
Await.result(metadataF[ClusterAPI.Op].interpret[Future], 10.seconds) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can see that Currently it is unfortunate twice: first we immediately have to interpret a If there is no way we can do that, I would put the val fut: Future[SchemaDefinitionProviderError, SchemaDefinition] =
metadataF[ClusterAPI.Op].interpret[Future].attempt.map {
_.leftMap (SchemaDefinitionProviderError(_)) flatMap { metadata =>
// ...
}
}
Await.result(fut, 10.seconds) // :( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're right, clearly the worst part of the PR :( I created a ticket for changing the return type to |
||
} leftMap (SchemaDefinitionProviderError(_)) flatMap { metadata => | ||
val keyspaceList: List[KeyspaceMetadata] = metadata.getKeyspaces.asScala.toList | ||
val tableList: List[AbstractTableMetadata] = keyspaceList.flatMap(extractTables) | ||
val indexList: List[IndexMetadata] = extractIndexes(tableList) | ||
val userTypeList: List[UserType] = keyspaceList.flatMap(extractUserTypes) | ||
|
||
for { | ||
keyspaces <- keyspaceList.traverse(toCreateKeyspace) | ||
tables <- tableList.traverse(toCreateTable) | ||
indexes <- indexList.traverse(toCreateIndex(_)) | ||
userTypes <- userTypeList.traverse(toUserType) | ||
} yield keyspaces ++ tables ++ indexes ++ userTypes | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You could also use keyspaceList.traverse(toCreateKeyspace) |+|
tableList.traverse(toCreateTable) |+|
indexList.traverse(toCreateIndex(_)) |+|
userTypeList.traverse(toUserType) |
||
|
||
} | ||
} | ||
} | ||
|
||
object MetadataSchemaProvider { | ||
|
||
implicit def metadataSchemaProvider(implicit cluster: Cluster): SchemaDefinitionProvider = | ||
new MetadataSchemaProvider(cluster) | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,209 @@ | ||
/* | ||
* Copyright 2017 47 Degrees, LLC. <http://www.47deg.com> | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package freestyle.cassandra | ||
package schema.provider.metadata | ||
|
||
import cats.implicits._ | ||
import com.datastax.driver.core.{ | ||
AbstractTableMetadata, | ||
ColumnMetadata, | ||
IndexMetadata, | ||
KeyspaceMetadata, | ||
TupleType, | ||
UserType, | ||
DataType => DatastaxDataType | ||
} | ||
import freestyle.cassandra.schema.SchemaDefinitionProviderError | ||
import troy.cql.ast._ | ||
import troy.cql.ast.ddl.Keyspace.Replication | ||
import troy.cql.ast.ddl.Table.PrimaryKey | ||
import troy.cql.ast.ddl.{Field, Index, Table} | ||
|
||
import scala.collection.JavaConverters._ | ||
import scala.language.postfixOps | ||
|
||
trait SchemaConversions { | ||
|
||
def toCreateKeyspace( | ||
keyspaceMetadata: KeyspaceMetadata): Either[SchemaDefinitionProviderError, CreateKeyspace] = | ||
Either.catchNonFatal { | ||
val name: String = Option(keyspaceMetadata.getName) | ||
.getOrElse(throw new NullPointerException("Schema name is null")) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably better to use an |
||
val replication: Option[Replication] = Option(keyspaceMetadata.getReplication) | ||
.flatMap { m => | ||
val seq = m.asScala.toSeq | ||
if (seq.isEmpty) None else Option(Replication(seq.sortBy(_._1))) | ||
} | ||
CreateKeyspace( | ||
ifNotExists = false, | ||
keyspaceName = KeyspaceName(name), | ||
properties = replication map (Seq(_)) getOrElse Seq.empty) | ||
} leftMap (SchemaDefinitionProviderError(_)) | ||
|
||
def toCreateTable( | ||
metadata: AbstractTableMetadata): Either[SchemaDefinitionProviderError, CreateTable] = | ||
Either.catchNonFatal { | ||
for { | ||
columns <- metadata.getColumns.asScala.toList.traverse(toTableColumn) | ||
primaryKey <- toPrimaryKey( | ||
metadata.getPartitionKey.asScala.toList, | ||
metadata.getClusteringColumns.asScala.toList) | ||
} yield | ||
CreateTable( | ||
ifNotExists = false, | ||
tableName = TableName(Some(KeyspaceName(metadata.getKeyspace.getName)), metadata.getName), | ||
columns = columns, | ||
primaryKey = Some(primaryKey), | ||
options = Seq.empty | ||
) | ||
} leftMap (SchemaDefinitionProviderError(_)) joinRight | ||
|
||
def readTable(metadata: IndexMetadata): TableName = | ||
TableName(Some(KeyspaceName(metadata.getTable.getKeyspace.getName)), metadata.getTable.getName) | ||
|
||
def toCreateIndex( | ||
metadata: IndexMetadata, | ||
readTable: (IndexMetadata) => TableName = readTable): Either[ | ||
SchemaDefinitionProviderError, | ||
CreateIndex] = | ||
Either.catchNonFatal { | ||
CreateIndex( | ||
isCustom = metadata.isCustomIndex, | ||
ifNotExists = false, | ||
indexName = Option(metadata.getName), | ||
tableName = readTable(metadata), | ||
identifier = Index.Identifier(metadata.getTarget), | ||
using = | ||
if (metadata.isCustomIndex) | ||
// The options are not visible in the IndexMetadata class | ||
Some(Index.Using(metadata.getIndexClassName, None)) | ||
else None | ||
) | ||
} leftMap (SchemaDefinitionProviderError(_)) | ||
|
||
def toUserType(userType: UserType): Either[SchemaDefinitionProviderError, CreateType] = | ||
Either.catchNonFatal { | ||
userType.getFieldNames.asScala.toList.traverse { fieldName => | ||
toField(fieldName, userType.getFieldType(fieldName)) | ||
} map { list => | ||
CreateType( | ||
ifNotExists = false, | ||
typeName = TypeName(Some(KeyspaceName(userType.getKeyspace)), userType.getTypeName), | ||
fields = list) | ||
} | ||
} leftMap (SchemaDefinitionProviderError(_)) joinRight | ||
|
||
private[this] def toField( | ||
name: String, | ||
datastaxDataType: DatastaxDataType): Either[SchemaDefinitionProviderError, Field] = | ||
toDataType(datastaxDataType) map { dataType => | ||
Field(name, dataType) | ||
} | ||
|
||
private[this] def toTableColumn( | ||
metadata: ColumnMetadata): Either[SchemaDefinitionProviderError, Table.Column] = | ||
toDataType(metadata.getType).map { dataType => | ||
Table.Column( | ||
name = metadata.getName, | ||
dataType = dataType, | ||
isStatic = metadata.isStatic, | ||
isPrimaryKey = false) | ||
} | ||
|
||
private[this] def toDataType( | ||
dataType: DatastaxDataType): Either[SchemaDefinitionProviderError, DataType] = { | ||
|
||
import DatastaxDataType._ | ||
|
||
def toDataTypeNative( | ||
dataType: DatastaxDataType): Either[SchemaDefinitionProviderError, DataType.Native] = | ||
dataType.getName match { | ||
case Name.ASCII => DataType.Ascii.asRight | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Where is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
case Name.BIGINT => DataType.BigInt.asRight | ||
case Name.BLOB => DataType.Blob.asRight | ||
case Name.BOOLEAN => DataType.Boolean.asRight | ||
case Name.COUNTER => DataType.Counter.asRight | ||
case Name.DATE => DataType.Date.asRight | ||
case Name.DECIMAL => DataType.Decimal.asRight | ||
case Name.DOUBLE => DataType.Double.asRight | ||
case Name.FLOAT => DataType.Float.asRight | ||
case Name.INET => DataType.Inet.asRight | ||
case Name.INT => DataType.Int.asRight | ||
case Name.SMALLINT => DataType.Smallint.asRight | ||
case Name.TEXT => DataType.Text.asRight | ||
case Name.TIME => DataType.Time.asRight | ||
case Name.TIMESTAMP => DataType.Timestamp.asRight | ||
case Name.TIMEUUID => DataType.Timeuuid.asRight | ||
case Name.TINYINT => DataType.Tinyint.asRight | ||
case Name.UUID => DataType.Uuid.asRight | ||
case Name.VARCHAR => DataType.Varchar.asRight | ||
case Name.VARINT => DataType.Varint.asRight | ||
case _ => | ||
Left(SchemaDefinitionProviderError(s"Native DataType ${dataType.getName} not supported")) | ||
} | ||
|
||
def toCollectionType( | ||
collectionType: CollectionType): Either[SchemaDefinitionProviderError, DataType] = { | ||
|
||
val typeArgs: List[DatastaxDataType] = collectionType.getTypeArguments.asScala.toList | ||
|
||
val maybeCol = collectionType.getName match { | ||
case Name.LIST => | ||
typeArgs.headOption map { typeArg => | ||
toDataTypeNative(typeArg) map DataType.List | ||
} | ||
case Name.SET => | ||
typeArgs.headOption map { typeArg => | ||
toDataTypeNative(typeArg) map DataType.Set | ||
} | ||
case Name.MAP => | ||
typeArgs.headOption.flatMap(t1 => typeArgs.tail.headOption.map(t2 => (t1, t2))) map { | ||
tupleArgs => | ||
for { | ||
dataNative1 <- toDataTypeNative(tupleArgs._1) | ||
dataNative2 <- toDataTypeNative(tupleArgs._2) | ||
} yield DataType.Map(dataNative1, dataNative2) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe : for {
t1 <- typeArgs.headOption
t2 <- typeArgs.tail.headOption
} yield (toDataTypeNative(t1) |@| toDataTypeNative(t2)).map(DataType.Map) |
||
} | ||
case _ => None | ||
} | ||
|
||
maybeCol getOrElse { | ||
Left( | ||
SchemaDefinitionProviderError( | ||
s"Error parsing collection DataType '${collectionType.asFunctionParameterString()}'")) | ||
} | ||
} | ||
|
||
def toTupleType(tupleType: TupleType): Either[SchemaDefinitionProviderError, DataType] = | ||
tupleType.getComponentTypes.asScala.toList traverse toDataTypeNative map DataType.Tuple | ||
|
||
dataType match { | ||
case nativeType: NativeType => toDataTypeNative(nativeType) | ||
case customType: CustomType => Right(DataType.Custom(customType.getCustomTypeClassName)) | ||
case collectionType: CollectionType => toCollectionType(collectionType) | ||
case tupleType: TupleType => toTupleType(tupleType) | ||
case userType: UserType => | ||
Right(DataType.UserDefined(KeyspaceName(userType.getKeyspace), userType.getTypeName)) | ||
} | ||
} | ||
|
||
private[this] def toPrimaryKey( | ||
partitionKeys: List[ColumnMetadata], | ||
clusteringColumns: List[ColumnMetadata]): Either[SchemaDefinitionProviderError, PrimaryKey] = | ||
PrimaryKey(partitionKeys.map(_.getName), clusteringColumns.map(_.getName)).asRight | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you could add a type alias for
Either[SchemaDefinitionProviderError, ?]
here?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely