Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix unsigned bigint in group by columns #780

Merged
merged 5 commits into from
May 31, 2019
Merged

Conversation

birdstorm
Copy link
Contributor

Close #779

@birdstorm
Copy link
Contributor Author

/run-all-tests

Copy link
Contributor

@zhexuany zhexuany left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@birdstorm
Copy link
Contributor Author

/run-all-tests

@birdstorm
Copy link
Contributor Author

/run-all-tests

@soufunlab
Copy link

我把代码加到master在本地编译,运行还是一样的错误

@birdstorm
Copy link
Contributor Author

我把代码加到master在本地编译,运行还是一样的错误

@soufunlab 请问能提供错误的堆栈吗,顺便看一下 spark.sql("SELECT ti_version()").show(false)的结果

@soufunlab
Copy link

Caused by: scala.MatchError: 128929508491259904 (of class java.lang.Long)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$DecimalConverter.toCatalystImpl(CatalystTypeConverters.scala:307)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$DecimalConverter.toCatalystImpl(CatalystTypeConverters.scala:304)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:379)
at org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1$$anonfun$apply$3.apply(ExistingRDD.scala:62)
at org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1$$anonfun$apply$3.apply(ExistingRDD.scala:59)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

@soufunlab
Copy link

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|UDF:ti_version() |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Release Version: 2.1-SNAPSHOT
Supported Spark Version: spark-2.3
Git Commit Hash: 0e8efdf
Git Branch: master
UTC Build Time: 2019-05-31 11:08:16|
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

@birdstorm
Copy link
Contributor Author

@soufunlab 你在执行什么语句的查询呢?能否发出来我看看是不是可以复现

@soufunlab
Copy link

问题出在这
spark-core /org/apache/spark/sql/catalyst/CatalystTypeConverters.scala

private class DecimalConverter(dataType: DecimalType)
extends CatalystTypeConverter[Any, JavaBigDecimal, Decimal] {
override def toCatalystImpl(scalaValue: Any): Decimal = {
val decimal = scalaValue match {
case d: BigDecimal => Decimal(d)
case d: JavaBigDecimal => Decimal(d)
case d: JavaBigInteger => Decimal(d)
case d: Decimal => d
}
decimal.toPrecision(dataType.precision, dataType.scale)
}
override def toScala(catalystValue: Decimal): JavaBigDecimal = {
if (catalystValue == null) null
else catalystValue.toJavaBigDecimal
}
override def toScalaImpl(row: InternalRow, column: Int): JavaBigDecimal =
row.getDecimal(column, dataType.precision, dataType.scale).toJavaBigDecimal
}
scalaValue传进来的类型是Long,还没走到你修复的代码那就报错了

@soufunlab
Copy link

soufunlab commented May 31, 2019

spark.sql(
  s"""
     |	select
     |	sku_id
     |	from table_a
     |	where biz_type=6 and out_int_type='出库' and act_stock_type=1
     |	group by sku_id
   """.stripMargin)
  .show()

@birdstorm
Copy link
Contributor Author

birdstorm commented May 31, 2019

@soufunlab 不能复现,你可以确定一下自己的 jar 包已经替换成功了吗?$SPARK_HOME/jars里没有多余的 tispark 包。
另外,可以去掉 where 条件看看是否还能复现。方便的话贴一下 explain 的结果好了。

@soufunlab
Copy link

不加where条件,是能查出来的。
执行计划如下:
== Physical Plan ==
*(2) HashAggregate(keys=[sku_id#33], functions=[])
+- Exchange hashpartitioning(sku_id#33, 200)
+- *(1) HashAggregate(keys=[sku_id#33], functions=[])
+- TiSpark CoprocessorRDD{[table: vv_stock_flow_log] CoveringIndexScan[Index: idx_sku_id] , Columns: [sku_id], KeyRange: ([t\200\000\000\000\000\000\006\217_i\200\000\000\000\000\000\000\001\000], [t\200\000\000\000\000\000\006\217_i\200\000\000\000\000\000\000\001\372]), Aggregates: First([sku_id]), Group By: [[sku_id] ASC], startTs: 408762435619258378}

加where条件查询报错,执行计划如下:
== Physical Plan ==
*(2) HashAggregate(keys=[sku_id#90], functions=[])
+- Exchange hashpartitioning(sku_id#90, 200)
+- *(1) HashAggregate(keys=[sku_id#90], functions=[])
+- TiSpark RegionTaskExec{downgradeThreshold=1000000000,downgradeFilter=[[[biz_type] EQUAL 6], [[act_stock_type] EQUAL 1], Not(IsNull([out_int_type])), [[out_int_type] EQUAL "出库"]]
+- TiDB HandleRDD{[table: vv_stock_flow_log] IndexScan[Index: idx_sku_id] , Columns: [sku_id], [out_int_type], [act_stock_type], [biz_type], Residual Filter: [[biz_type] EQUAL 6], [[act_stock_type] EQUAL 1], Not(IsNull([out_int_type])), [[out_int_type] EQUAL "出库"], KeyRange: ([t\200\000\000\000\000\000\006\217_i\200\000\000\000\000\000\000\001\000], [t\200\000\000\000\000\000\006\217_i\200\000\000\000\000\000\000\001\372]), Group By: [[sku_id] ASC], startTs: 408762436746477588}

@soufunlab
Copy link

我这是在本地执行的

@birdstorm
Copy link
Contributor Author

/run-all-tests

@birdstorm birdstorm merged commit 0dff4f3 into master May 31, 2019
@birdstorm birdstorm deleted the fix-bigint-groupby branch May 31, 2019 13:00
@marsishandsome
Copy link
Collaborator

@birdstorm why tidb return long value?

zhexuany pushed a commit to zhexuany/tispark that referenced this pull request Jun 4, 2019
wfxxh pushed a commit to wanfangdata/tispark that referenced this pull request Jun 30, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

MatchError when using unsigned bigint as group by column
4 participants