-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-5684][SQL]: Pass in partition name along with location information, as the location can be different (that is may not contain the partition keys) #4469
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@@ -362,7 +362,7 @@ case object BooleanType extends NativeType with PrimitiveType { | |||
* @group dataType | |||
*/ | |||
@DeveloperApi | |||
case object TimestampType extends NativeType { | |||
case object TimestampType extends NativeType with PrimitiveType { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is done, in case table is partitioned on a timestamp type column, parquet iterator returns a GenericRow due to this in ParquetTypes.scala :
def isPrimitiveType(ctype: DataType): Boolean =
classOf[PrimitiveType] isAssignableFrom ctype.getClass
and in ParquetConverter.scala we have :
protected[parquet] def createRootConverter(
parquetSchema: MessageType,
attributes: Seq[Attribute]): CatalystConverter = {
// For non-nested types we use the optimized Row converter
if (attributes.forall(a => ParquetTypesConverter.isPrimitiveType(a.dataType))) {
new CatalystPrimitiveRowConverter(attributes.toArray)
} else {
new CatalystGroupConverter(attributes.toArray)
}
}
which fails here later :
new Iterator[Row] {
def hasNext = iter.hasNext
def next() = {
val row = iter.next()._2.asInstanceOf[SpecificMutableRow]
throwing a class cast exception that GenericRow cannot be cast to SpecificMutableRow
Am I missing something here ?
@liancheng please suggest ... |
Mind tagging this with [SQL] so it can get properly sorted? |
1eab60c
to
30fdcec
Compare
ok to test |
Hey @saucam, partitioning support for the old Parquet support is quite limited (only handles 1 partition column, whose type must be INT). PR #4308 and upcoming follow-up PRs aim to provide full support for multi-level partitioning and schema merging. Also, Parquet tables converted from Hive metastore tables will retain their schema and location information inherited from metastore. We plan to deprecate the old Parquet implementation by the new Parquet data source in 1.3, and would like to remove the old one once the new implementation is proved to be stable enough. |
Test build #27224 has finished for PR 4469 at commit
|
Hi @liancheng , thanks for the comments. We are using spark-1.2.1 and the old parquet support is being used. Can this be merged so that we have proper partitioning with different locations as well. I tried partitioning on 2 columns and it worked fine (Also applied this patch for specifying a different location) |
30fdcec
to
2dd9dbb
Compare
Test build #27778 has finished for PR 4469 at commit
|
hi @liancheng , any update on this one ? i think it will be useful for people using spark 1.2.1 since old parquet path might suit their needs better in that version |
hey @saucam, I'm pretty hesitant to make big changes to branch-1.2 unless a lot of users are reporting a problem. Do the problems you describe still exist in branch-1.3? or should be close this issue? |
…s the location can be different (that is may not contain the partition keys)
Hi @marmbrus , this is a pretty common scenario in production, where the data is generated in some directory and then later partitions are added to tables using alter table add partition (=value) location <directory where data is generated (where path does not contain partition key=value)> |
While parsing the partition keys from the locations, in parquetRelations, it is assumed that location path string will always contain the partition keys, which is not true. Different location can be specified while adding partitions to the table, which results in key not found exception while reading from such partitions:
Create a partitioned parquet table :
create table test_table (dummy string) partitioned by (timestamp bigint) stored as parquet;
Add a partition to the table and specify a different location:
alter table test_table add partition (timestamp=9) location '/data/pth/different'
Run a simple select * query
we get an exception :
15/02/09 08:27:25 ERROR thriftserver.SparkSQLDriver: Failed in [select * from db4_mi2mi_binsrc1_default limit 5]
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 21.0 failed 1 times, most recent failure: Lost task 0.0 in stage 21.0 (TID 21, localhost): java
.util.NoSuchElementException: key not found: timestamp
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:58)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at scala.collection.AbstractMap.apply(Map.scala:58)
at org.apache.spark.sql.parquet.ParquetTableScan$$anonfun$execute$4$$anonfun$6.apply(ParquetTableOperations.scala:141)
at org.apache.spark.sql.parquet.ParquetTableScan$$anonfun$execute$4$$anonfun$6.apply(ParquetTableOperations.scala:141)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)