Skip to content

[SPARK-5684][SQL]: Pass in partition name along with location information, as the location can be different (that is may not contain the partition keys) #4469

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from

Conversation

saucam
Copy link

@saucam saucam commented Feb 9, 2015

While parsing the partition keys from the locations, in parquetRelations, it is assumed that location path string will always contain the partition keys, which is not true. Different location can be specified while adding partitions to the table, which results in key not found exception while reading from such partitions:

Create a partitioned parquet table :
create table test_table (dummy string) partitioned by (timestamp bigint) stored as parquet;
Add a partition to the table and specify a different location:
alter table test_table add partition (timestamp=9) location '/data/pth/different'
Run a simple select * query
we get an exception :
15/02/09 08:27:25 ERROR thriftserver.SparkSQLDriver: Failed in [select * from db4_mi2mi_binsrc1_default limit 5]
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 21.0 failed 1 times, most recent failure: Lost task 0.0 in stage 21.0 (TID 21, localhost): java
.util.NoSuchElementException: key not found: timestamp
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:58)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at scala.collection.AbstractMap.apply(Map.scala:58)
at org.apache.spark.sql.parquet.ParquetTableScan$$anonfun$execute$4$$anonfun$6.apply(ParquetTableOperations.scala:141)
at org.apache.spark.sql.parquet.ParquetTableScan$$anonfun$execute$4$$anonfun$6.apply(ParquetTableOperations.scala:141)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

@@ -362,7 +362,7 @@ case object BooleanType extends NativeType with PrimitiveType {
* @group dataType
*/
@DeveloperApi
case object TimestampType extends NativeType {
case object TimestampType extends NativeType with PrimitiveType {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is done, in case table is partitioned on a timestamp type column, parquet iterator returns a GenericRow due to this in ParquetTypes.scala :

def isPrimitiveType(ctype: DataType): Boolean =
classOf[PrimitiveType] isAssignableFrom ctype.getClass

and in ParquetConverter.scala we have :

protected[parquet] def createRootConverter(
parquetSchema: MessageType,
attributes: Seq[Attribute]): CatalystConverter = {
// For non-nested types we use the optimized Row converter
if (attributes.forall(a => ParquetTypesConverter.isPrimitiveType(a.dataType))) {
new CatalystPrimitiveRowConverter(attributes.toArray)
} else {
new CatalystGroupConverter(attributes.toArray)
}
}

which fails here later :

new Iterator[Row] {
def hasNext = iter.hasNext
def next() = {
val row = iter.next()._2.asInstanceOf[SpecificMutableRow]

throwing a class cast exception that GenericRow cannot be cast to SpecificMutableRow

Am I missing something here ?

@saucam
Copy link
Author

saucam commented Feb 9, 2015

@liancheng please suggest ...

@sryza
Copy link
Contributor

sryza commented Feb 9, 2015

Mind tagging this with [SQL] so it can get properly sorted?

@saucam saucam changed the title SPARK-5684: Pass in partition name along with location information, as the location can be different (that is may not contain the partition keys) [SPARK-5684][SQL]: Pass in partition name along with location information, as the location can be different (that is may not contain the partition keys) Feb 10, 2015
@liancheng
Copy link
Contributor

ok to test

@liancheng
Copy link
Contributor

Hey @saucam, partitioning support for the old Parquet support is quite limited (only handles 1 partition column, whose type must be INT). PR #4308 and upcoming follow-up PRs aim to provide full support for multi-level partitioning and schema merging. Also, Parquet tables converted from Hive metastore tables will retain their schema and location information inherited from metastore. We plan to deprecate the old Parquet implementation by the new Parquet data source in 1.3, and would like to remove the old one once the new implementation is proved to be stable enough.

@SparkQA
Copy link

SparkQA commented Feb 10, 2015

Test build #27224 has finished for PR 4469 at commit 30fdcec.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@saucam
Copy link
Author

saucam commented Feb 11, 2015

Hi @liancheng , thanks for the comments. We are using spark-1.2.1 and the old parquet support is being used. Can this be merged so that we have proper partitioning with different locations as well. I tried partitioning on 2 columns and it worked fine (Also applied this patch for specifying a different location)

@SparkQA
Copy link

SparkQA commented Feb 20, 2015

Test build #27778 has finished for PR 4469 at commit 2dd9dbb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@saucam
Copy link
Author

saucam commented Mar 9, 2015

hi @liancheng , any update on this one ? i think it will be useful for people using spark 1.2.1 since old parquet path might suit their needs better in that version

@marmbrus
Copy link
Contributor

marmbrus commented Apr 3, 2015

hey @saucam, I'm pretty hesitant to make big changes to branch-1.2 unless a lot of users are reporting a problem. Do the problems you describe still exist in branch-1.3? or should be close this issue?

Yash Datta added 2 commits April 4, 2015 21:51
@saucam
Copy link
Author

saucam commented Apr 4, 2015

Hi @marmbrus , this is a pretty common scenario in production, where the data is generated in some directory and then later partitions are added to tables using alter table add partition (=value) location <directory where data is generated (where path does not contain partition key=value)>
In the old parquet path in v1.2.1, this is not possible.
This is doable in the new parquet path in spark 1.3 though.

@asfgit asfgit closed this in 0cc8fcb Apr 12, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants