diff --git a/common/src/main/java/org/apache/comet/NativeBase.java b/common/src/main/java/org/apache/comet/NativeBase.java index 42357b9bf..6133a2d8b 100644 --- a/common/src/main/java/org/apache/comet/NativeBase.java +++ b/common/src/main/java/org/apache/comet/NativeBase.java @@ -81,7 +81,7 @@ static synchronized void load() { // Try to load Comet library from the java.library.path. try { - System.loadLibrary(libraryToLoad); + System.loadLibrary(NATIVE_LIB_NAME); loaded = true; } catch (UnsatisfiedLinkError ex) { // Doesn't exist, so proceed to loading bundled library. diff --git a/native/core/src/jvm_bridge/mod.rs b/native/core/src/jvm_bridge/mod.rs index 4a5f72f48..4936b1c5b 100644 --- a/native/core/src/jvm_bridge/mod.rs +++ b/native/core/src/jvm_bridge/mod.rs @@ -339,7 +339,7 @@ fn get_throwable_message( throwable: &JThrowable, ) -> CometResult { unsafe { - let message = env + let message: JString = env .call_method_unchecked( throwable, jvm_classes.throwable_get_message_method, @@ -348,7 +348,11 @@ fn get_throwable_message( )? .l()? .into(); - let message_str = env.get_string(&message)?.into(); + let message_str = if !message.is_null() { + env.get_string(&message)?.into() + } else { + String::from("null") + }; let cause: JThrowable = env .call_method_unchecked( diff --git a/spark/src/test/scala/org/apache/comet/CometNativeSuite.scala b/spark/src/test/scala/org/apache/comet/CometNativeSuite.scala new file mode 100644 index 000000000..ef0485dfe --- /dev/null +++ b/spark/src/test/scala/org/apache/comet/CometNativeSuite.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet + +import org.apache.spark.SparkException +import org.apache.spark.sql.CometTestBase +import org.apache.spark.sql.catalyst.expressions.PrettyAttribute +import org.apache.spark.sql.comet.{CometExec, CometExecUtils} +import org.apache.spark.sql.types.LongType +import org.apache.spark.sql.vectorized.ColumnarBatch + +class CometNativeSuite extends CometTestBase { + test("test handling NPE thrown by JVM") { + val rdd = spark.range(0, 1).rdd.map { value => + val limitOp = + CometExecUtils.getLimitNativePlan(Seq(PrettyAttribute("test", LongType)), 100).get + val cometIter = CometExec.getCometIterator( + Seq(new Iterator[ColumnarBatch] { + override def hasNext: Boolean = true + override def next(): ColumnarBatch = throw new NullPointerException() + }), + 1, + limitOp) + cometIter.next() + cometIter.close() + value + } + + val exception = intercept[SparkException] { + rdd.collect() + } + assert(exception.getMessage contains "java.lang.NullPointerException") + } +}