Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SQLTransformer #126

Open
Mageswaran1989 opened this issue Feb 13, 2017 · 20 comments
Open

SQLTransformer #126

Mageswaran1989 opened this issue Feb 13, 2017 · 20 comments

Comments

@Mageswaran1989
Copy link
Contributor

Mageswaran1989 commented Feb 13, 2017

We have a Spark ML pipeline with SQLTransformer along with UDF.

I checked at https://github.com/combust/mleap/tree/master/mleap-spark/src/main/scala/org/apache/spark/ml/bundle/ops/feature
and didn't find any relevant operation.

Could you please give direction on how to include it if feasible?

Error Log:

java.util.NoSuchElementException: key not found: org.apache.spark.ml.feature.SQLTransformer
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:59)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at scala.collection.AbstractMap.apply(Map.scala:59)
at ml.combust.bundle.BundleRegistry.opForObj(BundleRegistry.scala:100)
at ml.combust.bundle.serializer.GraphSerializer$$anonfun$writeNode$1.apply(GraphSerializer.scala:31)
at ml.combust.bundle.serializer.GraphSerializer$$anonfun$writeNode$1.apply(GraphSerializer.scala:30)
at scala.util.Try$.apply(Try.scala:192)
at ml.combust.bundle.serializer.GraphSerializer.writeNode(GraphSerializer.scala:30)
at ml.combust.bundle.serializer.GraphSerializer$$anonfun$write$2.apply(GraphSerializer.scala:21)
at ml.combust.bundle.serializer.GraphSerializer$$anonfun$write$2.apply(GraphSerializer.scala:21)
at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:35)
at ml.combust.bundle.serializer.GraphSerializer.write(GraphSerializer.scala:20)
at org.apache.spark.ml.bundle.ops.PipelineOp$$anon$1.store(PipelineOp.scala:21)
at org.apache.spark.ml.bundle.ops.PipelineOp$$anon$1.store(PipelineOp.scala:14)
at ml.combust.bundle.serializer.ModelSerializer$$anonfun$write$1.apply(ModelSerializer.scala:92)
at ml.combust.bundle.serializer.ModelSerializer$$anonfun$write$1.apply(ModelSerializer.scala:88)
at scala.util.Try$.apply(Try.scala:192)
at ml.combust.bundle.serializer.ModelSerializer.write(ModelSerializer.scala:88)
at ml.combust.bundle.serializer.NodeSerializer$$anonfun$write$1.apply(NodeSerializer.scala:88)
at ml.combust.bundle.serializer.NodeSerializer$$anonfun$write$1.apply(NodeSerializer.scala:84)
at scala.util.Try$.apply(Try.scala:192)
at ml.combust.bundle.serializer.NodeSerializer.write(NodeSerializer.scala:84)
at ml.combust.bundle.serializer.GraphSerializer$$anonfun$writeNode$1.apply(GraphSerializer.scala:34)
at ml.combust.bundle.serializer.GraphSerializer$$anonfun$writeNode$1.apply(GraphSerializer.scala:30)
at scala.util.Try$.apply(Try.scala:192)
at ml.combust.bundle.serializer.GraphSerializer.writeNode(GraphSerializer.scala:30)
at ml.combust.bundle.serializer.GraphSerializer$$anonfun$write$2.apply(GraphSerializer.scala:21)
at ml.combust.bundle.serializer.GraphSerializer$$anonfun$write$2.apply(GraphSerializer.scala:21)
at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:35)
at ml.combust.bundle.serializer.GraphSerializer.write(GraphSerializer.scala:20)
at org.apache.spark.ml.bundle.ops.PipelineOp$$anon$1.store(PipelineOp.scala:21)
at org.apache.spark.ml.bundle.ops.PipelineOp$$anon$1.store(PipelineOp.scala:14)
at ml.combust.bundle.serializer.ModelSerializer$$anonfun$write$1.apply(ModelSerializer.scala:92)
at ml.combust.bundle.serializer.ModelSerializer$$anonfun$write$1.apply(ModelSerializer.scala:88)
at scala.util.Try$.apply(Try.scala:192)
at ml.combust.bundle.serializer.ModelSerializer.write(ModelSerializer.scala:88)
at ml.combust.bundle.serializer.NodeSerializer$$anonfun$write$1.apply(NodeSerializer.scala:88)
at ml.combust.bundle.serializer.NodeSerializer$$anonfun$write$1.apply(NodeSerializer.scala:84)
at scala.util.Try$.apply(Try.scala:192)
at ml.combust.bundle.serializer.NodeSerializer.write(NodeSerializer.scala:84)
at ml.combust.bundle.serializer.BundleSerializer$$anonfun$write$1.apply(BundleSerializer.scala:34)
at ml.combust.bundle.serializer.BundleSerializer$$anonfun$write$1.apply(BundleSerializer.scala:29)
at scala.util.Try$.apply(Try.scala:192)
at ml.combust.bundle.serializer.BundleSerializer.write(BundleSerializer.scala:29)
at ml.combust.bundle.BundleWriter.save(BundleWriter.scala:27)
at $anonfun$2.apply(:105)
at $anonfun$2.apply(:104)
at resource.AbstractManagedResource$$anonfun$5.apply(AbstractManagedResource.scala:88)
at scala.util.control.Exception$Catch$$anonfun$either$1.apply(Exception.scala:125)
at scala.util.control.Exception$Catch$$anonfun$either$1.apply(Exception.scala:125)
at scala.util.control.Exception$Catch.apply(Exception.scala:103)
at scala.util.control.Exception$Catch.either(Exception.scala:125)
at resource.AbstractManagedResource.acquireFor(AbstractManagedResource.scala:88)
at resource.ManagedResourceOperations$class.apply(ManagedResourceOperations.scala:26)
at resource.AbstractManagedResource.apply(AbstractManagedResource.scala:50)
at resource.ManagedResourceOperations$class.acquireAndGet(ManagedResourceOperations.scala:25)
at resource.AbstractManagedResource.acquireAndGet(AbstractManagedResource.scala:50)
at resource.ManagedResourceOperations$class.foreach(ManagedResourceOperations.scala:53)
at resource.AbstractManagedResource.foreach(AbstractManagedResource.scala:50)
... 62 elided

@Mageswaran1989
Copy link
Contributor Author

will following http://mleap-docs.combust.ml/mleap-runtime/custom-transformer.html help? i.e creating a custom transformer instead of UDF?

@seme0021
Copy link
Contributor

@Mageswaran1989 MLeap doesn't support the SQLTransformer for a couple of reasons:

  • mleap-runtime doesn't contain a sql engine to support arbitrary sql operations
  • MLeap only supports row-based transformations, whereas sql can have groupby, join, windowing, and many other operations

What logic do you have in your SQLTransformer? People have asked this question before and we recommend:

  • For non-row operations, move the SQL out of the ML Pipeline that you plan to serialize
  • For row-based operations, use the available ML transformers or write a custom transformer <- this is where the custom transformer documentation will help.

@Mageswaran1989
Copy link
Contributor Author

Mageswaran1989 commented Feb 15, 2017

@seme0021 custom transformer documentation along with TokenizerOp helped me to solve my simple filtering operation on words based on length.

When I try loading it on LeapFrame:

  1. I was not able to read a normal json file with FrameReader(). My guess is that FrameReader() needs the schema information encoded in the json file of interrest, correct if I am wrong.
  2. Above one was by passed with spray json parsing and loading it on LeapFrame() with custom MLeap Schema and LocalDataset()
  3. When loaded Mleap Pipeline works imitating the Spark pipeline, however features vector (nullable = true) in Spark is converted to TensorType(DoubleType(false),false)) in MLeap

@hollinwilkins
Copy link
Member

@Mageswaran1989

Hey there,

I'll respond to your questions here:

  1. You are right, the JSON-encoded LeapFrame needs to have the schema information serialized as well. Please see an example in the documentation here: http://mleap-docs.combust.ml/core-concepts/data-frames/ under the section "Example Leap Frame"

  2. Glad to hear you got it working :)

  3. MLeap does not use Spark linalg.DenseVector and SparseVector for storing Tensors. We have our own Tensor format that we will use so we can support arbitrary n-dimensional data. This is mostly so we can easily integrate with deep learning libraries like Tensorflow and Torch. There is a very easy way to convert an MLeap tensor to a Dense or Sparse Spark vector if you wish though. Please see the code here: https://github.com/combust/mleap/blob/master/mleap-runtime/src/main/scala/ml/combust/mleap/runtime/converter/VectorConverters.scala

Importing this class will give you implicit conversions between MLeap Tensors and Spark Vectors.

import ml.combust.mleap.runtime.converter.VectorConverters._
import ml.combust.mleap.tensor.Tensor
import org.apache.spark.ml.linalg.Vector

val mleapTensor = Tensor.denseVector(Array[Double](23.4, 55.6, 34.4))
val sparkVector: Vector = mleapTensor

As far as debugging the LeapFrame goes, we have an issue open for making a nice .show function as you suggest, follow it here: #121

Cheers,
Hollin

@Mageswaran1989
Copy link
Contributor Author

@hollinwilkins Thank you!

Seems like I have to bug you for some more time ;)

How to store/bundle Spark org.apache.spark.ml.linalg.Matrix with ml.combust.bundle.dsl.Value?

@hollinwilkins
Copy link
Member

@Mageswaran1989 You can use the Value.tensor method to store an n-dimensional array such as a matrix.

@hollinwilkins
Copy link
Member

@kidtronnix
Copy link

@hollinwilkins I am trying to create some custom transformers too. I looked at the docs and from the looks of thing I need to make those changes to extend the library and then repackage so I upload it on something like Databricks.

@hollinwilkins
Copy link
Member

@smaxwellstewart As per custom transformers, you will be creating them in a separate library and packaging them in some way that Databricks can have access to them. And assembly jar would accomplish this quite nicely.

Sorry our docs are a bit incomplete here, but going through this process with you will help us to create them so that people in the future will have it easier. We would be happy to set up a time to chat and walk through the process as well if you like. Email at hollin at combust.ml and mikhail at combust.ml

Best

@singh-dhirendra
Copy link
Contributor

@hollinwilkins I understand the issues while SQLTransformer when doing group by operations, however, SQLTransformer comes with lots of handy SQL functions like (dateDiff(), log(), if(),)which helps in transforming the input data, and most of these functions operate on a single row so won't necessarily have issues which a SQL transformer which operates on multiple rows, are there any recommendations how can we achieve the same result as the single row based SQL methods?

@hollinwilkins
Copy link
Member

@singh-dhirendra For binary/unary math operations you can use mleap-extension, which provides UnaryMath and BinaryMath transformers. Eventually we may support SQLTransformer, but in order to truly support it would be a bit of effort. We would need to switch from row-based transformation to frame-based transformation, which has a few other benefits as well. Then we would need to decide on a SQL engine that we can use and wire everything up, which will also be quite a task. If we do move in that direction, I think splitting off an optional module for the SQLTransformer which includes the SQL engine would be desirable.

Short answer to your question:

  1. Use custom transformers for the operations you need and contribute them back to MLeap and mleap-spark-extension
  2. Long-term, we may be switching from row-based transformation to frame-based, in which case SQLTransformer becomes more of a possibility

@hollinwilkins
Copy link
Member

Also, if you are interested in helping with any of these pieces:

  1. Moving to frame-based transformation
  2. Finding a suitable SQL engine and building the SQLTransformer

We would definitely appreciate it!

@singh-dhirendra
Copy link
Contributor

singh-dhirendra commented Jun 23, 2017

Thanks @hollinwilkins for your suggestion, I am happy to contribute to frame-based transformations or SQLTransformer let me know if there is doc how that would potentially work

@PowerToThePeople111
Copy link

PowerToThePeople111 commented Aug 4, 2018

I ran into the same issue, but mainly because I want to reduce the size of the dataframe which is returned as a response by the mleap-serving service when asking it for a prediction. The idea was to use the SQLTransformer to only select certain columns of the final frame after the prediction was made and some indexes are changed to labels again.

But this projection could also be done in a different way by simply filtering the final json objects for the requested properties, which would be probably way easier to implement and potentially of great interest. I think this makes especially much sense when the dataframe containing all the features and base data is really big.

Is there any mechanism to configure the service, so that only certain columns of the frame are returned?

And if not, would you like to have one added? ;-)

Edit: For all in need of some reduced overhead: the place to look at is ml.combust.mleap.serving.MleapResource.

If you just wanna reduce the size of your resulting dataframe, you can do something like:

service.transform(frame) 
             .get 
             .select("ColumnYouNeed1", "ColumnYouNeed2", "ColumnYouNeed3", ...)
             .get

This still keeps all the overhead of the mleap dataframes with the column descriptions.

@hollinwilkins
Copy link
Member

@PowerToThePeople111 Hey, we are about to merge this branch in which will make the existing mleap-serving obsolete: https://github.com/combust/mleap/tree/feature/mleap-grpc

With each transform request, we introduce a TransformOptions object, which includes the ability to filter down to only the fields you care about in the result. I am writing up documentation for this right now.

@femibyte
Copy link

femibyte commented Apr 2, 2019

Any progress on this issue in making mleap support SQLTransformer ?

@toquoccuong
Copy link

I also have the problem with SQLTransformer with groupBy operation. So It is nice if Mleap can support frame-based transformation soon.

@femibyte
Copy link

femibyte commented Aug 1, 2019

@Mageswaran1989 MLeap doesn't support the SQLTransformer for a couple of reasons:

  • mleap-runtime doesn't contain a sql engine to support arbitrary sql operations
  • MLeap only supports row-based transformations, whereas sql can have groupby, join, windowing, and many other operations

What logic do you have in your SQLTransformer? People have asked this question before and we recommend:

  • For non-row operations, move the SQL out of the ML Pipeline that you plan to serialize
  • For row-based operations, use the available ML transformers or write a custom transformer <- this is where the custom transformer documentation will help.

How does moving the SQL out of the ML Pipeline help if you're using the SQLTransformer to train a model ?

@seme0021

@drei34
Copy link

drei34 commented Mar 14, 2023

@jsleight Do you know if mleap 0.22 supports sql transformer? If you have a pipeline and a bunch of rows, what is the recommended way to filter rows using a pipeline stage?

@jsleight
Copy link
Contributor

SQLTransformer isn't supported, and tbh is hard to support. We'd essentially need to replicate Spark's sql engine without Spark.

I would advise to filter the rows before calling the pipeline.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

10 participants