Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions native-engine/blaze-jni-bridge/src/conf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ define_conf!(IntConf, SUGGESTED_BATCH_MEM_SIZE_KWAY_MERGE);
define_conf!(BooleanConf, ORC_FORCE_POSITIONAL_EVOLUTION);
define_conf!(IntConf, UDAF_FALLBACK_NUM_UDAFS_TRIGGER_SORT_AGG);
define_conf!(BooleanConf, PARSE_JSON_ERROR_FALLBACK);
define_conf!(StringConf, NATIVE_LOG_LEVEL);

pub trait BooleanConf {
fn key(&self) -> &'static str;
Expand Down
7 changes: 5 additions & 2 deletions native-engine/blaze/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use datafusion::{
use datafusion_ext_plans::memmgr::MemManager;
use jni::{
JNIEnv,
objects::{JClass, JObject},
objects::{JClass, JObject, JString},
};
use once_cell::sync::OnceCell;

Expand All @@ -43,6 +43,7 @@ pub extern "system" fn Java_org_apache_spark_sql_blaze_JniBridge_callNative(
env: JNIEnv,
_: JClass,
executor_memory_overhead: i64,
log_level: JString,
native_wrapper: JObject,
) -> i64 {
handle_unwinded_scope(|| -> Result<i64> {
Expand All @@ -61,7 +62,9 @@ pub extern "system" fn Java_org_apache_spark_sql_blaze_JniBridge_callNative(
INIT.get_or_try_init(|| {
// logging is not initialized at this moment
eprintln!("------ initializing blaze native environment ------");
init_logging();
let log_level = env.get_string(log_level).map(|s| String::from(s)).unwrap();
eprintln!("initializing logging with level: {}", log_level);
init_logging(log_level.as_str());

// init jni java classes
log::info!("initializing JNI bridge");
Expand Down
11 changes: 7 additions & 4 deletions native-engine/blaze/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{cell::Cell, time::Instant};
use std::{cell::Cell, str::FromStr, time::Instant};

use log::{Level, LevelFilter, Log, Metadata, Record};
use once_cell::sync::OnceCell;
Expand All @@ -23,12 +23,14 @@ thread_local! {
pub static THREAD_PARTITION_ID: Cell<usize> = Cell::new(0);
}

const MAX_LEVEL: Level = Level::Info;
const DEFAULT_MAX_LEVEL: Level = Level::Info;

pub fn init_logging() {
pub fn init_logging(level: &str) {
let log_level = Level::from_str(level).unwrap_or_else(|_| DEFAULT_MAX_LEVEL);
static LOGGER: OnceCell<SimpleLogger> = OnceCell::new();
let logger = LOGGER.get_or_init(|| SimpleLogger {
start_instant: Instant::now(),
log_level,
});

log::set_logger(logger).expect("error setting logger");
Expand All @@ -38,11 +40,12 @@ pub fn init_logging() {
#[derive(Clone, Copy)]
struct SimpleLogger {
start_instant: Instant,
log_level: Level,
}

impl Log for SimpleLogger {
fn enabled(&self, metadata: &Metadata) -> bool {
metadata.level() <= MAX_LEVEL
metadata.level() <= self.log_level
}

fn log(&self, record: &Record) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@ public enum BlazeConf {
// batches in memory at the same time
SUGGESTED_BATCH_MEM_SIZE_KWAY_MERGE("spark.blaze.suggested.batch.memSize.multiwayMerging", 1048576),

ORC_FORCE_POSITIONAL_EVOLUTION("spark.blaze.orc.force.positional.evolution", false);
ORC_FORCE_POSITIONAL_EVOLUTION("spark.blaze.orc.force.positional.evolution", false),

NATIVE_LOG_LEVEL("spark.blaze.native.log.level", "info");

public final String key;
private final Object defaultValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
public class JniBridge {
public static final ConcurrentHashMap<String, Object> resourcesMap = new ConcurrentHashMap<>();

public static native long callNative(long initNativeMemory, BlazeCallNativeWrapper wrapper);
public static native long callNative(long initNativeMemory, String logLevel, BlazeCallNativeWrapper wrapper);

public static native boolean nextBatch(long ptr);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ case class BlazeCallNativeWrapper(
private var batchCurRowIdx = 0

logWarning(s"Start executing native plan")
private var nativeRuntimePtr = JniBridge.callNative(NativeHelper.nativeMemory, this)
private var nativeRuntimePtr =
JniBridge.callNative(NativeHelper.nativeMemory, BlazeConf.NATIVE_LOG_LEVEL.stringConf(), this)

private lazy val rowIterator = new Iterator[InternalRow] {
override def hasNext: Boolean = {
Expand Down
Loading