Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions plugins/engine-datafusion/jni/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ crate-type = ["cdylib"]

[dependencies]
# DataFusion dependencies
datafusion = "49.0.0"
datafusion-substrait = "49.0.0"
arrow = "55.2.0"
datafusion = "50.0.0"
datafusion-substrait = "50.0.0"
arrow = "56.2.0"
arrow-array = "55.2.0"
arrow-schema = "55.2.0"
arrow-buffer = "55.2.0"
Expand Down
189 changes: 179 additions & 10 deletions plugins/engine-datafusion/jni/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* compatible open source license.
*/

use jni::objects::{JByteArray, JClass};
use jni::objects::{JByteArray, JClass, JObject};
use jni::sys::{jbyteArray, jlong, jstring};
use jni::JNIEnv;
use std::sync::Arc;
Expand All @@ -17,12 +17,12 @@ use datafusion::execution::context::SessionContext;

use datafusion::DATAFUSION_VERSION;
use datafusion::datasource::file_format::csv::CsvFormat;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::execution::cache::cache_manager::{CacheManager, CacheManagerConfig, FileStatisticsCache};
use datafusion::execution::cache::cache_manager::{CacheManager, CacheManagerConfig, FileMetadataCache, FileStatisticsCache};
use datafusion::execution::cache::cache_unit::DefaultFilesMetadataCache;
use datafusion::execution::disk_manager::DiskManagerConfig;
use datafusion::execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder};
use datafusion::prelude::SessionConfig;
use crate::util::{create_object_meta_from_filenames, parse_string_arr};
use crate::util::{create_object_meta_from_filenames, create_object_meta_from_file, parse_string_arr, construct_file_metadata};
use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl};
use datafusion::execution::cache::cache_unit::DefaultListFilesCache;
use datafusion::execution::cache::CacheAccessor;
Expand All @@ -33,6 +33,11 @@ use jni::objects::{JObjectArray, JString};
use prost::Message;
use tokio::runtime::Runtime;
use object_store::ObjectMeta;
use chrono::Utc;
use std::collections::HashMap;
use std::sync::Mutex;
use std::ops::Deref;


/// Create a new DataFusion session context
#[no_mangle]
Expand Down Expand Up @@ -103,7 +108,7 @@ pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_createG
}

#[no_mangle]
pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_createSessionContext(
pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_createSessionContextv1(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can still keep this createSessionContext ?

_env: JNIEnv,
_class: JClass,
runtime_id: jlong,
Expand Down Expand Up @@ -261,30 +266,28 @@ pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_nativeE
},
Err(e) => {
println!("SUBSTRAIT Rust: Failed to convert Substrait plan: {}", e);
return;
return 0;
}
};

let dataframe = ctx.execute_logical_plan(logical_plan)
.await.expect("Failed to run Logical Plan");

// TODO : check if this works
return match dataframe.execute_stream() {
match dataframe.execute_stream().await {
Ok(stream) => {
let boxed_stream = Box::new(stream);
let stream_ptr = Box::into_raw(boxed_stream);
stream_ptr as jlong
},
Err(e) => {
println!("SUBSTRAIT Rust: Failed to execute stream: {}", e);
0
}
}
})


// Create DataFrame from the converted logical plan


}

// If we need to create session context separately
Expand Down Expand Up @@ -353,4 +356,170 @@ pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_nativeC



#[no_mangle]
pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_initCacheManagerConfig(
_env: JNIEnv,
_class: JClass,
) -> jlong {
let config = CacheManagerConfig::default();
Box::into_raw(Box::new(config)) as jlong
}

/// Create RuntimeEnv using the configured CacheManagerConfig
#[no_mangle]
pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_createGlobalRuntimeWithConfig(
_env: JNIEnv,
_class: JClass,
config_ptr: jlong,
) -> jlong {
// Take ownership of the CacheManagerConfig
let cache_manager_config = unsafe { Box::from_raw(config_ptr as *mut CacheManagerConfig) };

// Create RuntimeEnv with the configured cache manager
let runtime_env = RuntimeEnvBuilder::default()
.with_cache_manager(*cache_manager_config)
.build()
.unwrap();

Box::into_raw(Box::new(runtime_env)) as jlong
}

/// Create a metadata cache and add it to the CacheManagerConfig
/// The config_ptr remains the same, only the contents are updated
#[no_mangle]
pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_createMetadataCache(
_env: JNIEnv,
_class: JClass,
config_ptr: jlong,
size_limit: jlong,
) -> jlong {
// Create the cache
let cache = Arc::new(DefaultFilesMetadataCache::new(size_limit.try_into().unwrap()));

// Update the CacheManagerConfig at the same memory location
if config_ptr != 0 {
let cache_manager_config = unsafe { &mut *(config_ptr as *mut CacheManagerConfig) };
// This replaces the contents at the same pointer location
*cache_manager_config = cache_manager_config.clone()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to clone?

.with_file_metadata_cache(Some(cache.clone() as Arc<dyn FileMetadataCache>));
}

// Return the cache pointer
Box::into_raw(Box::new(cache)) as jlong
}

#[no_mangle]
pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_metadataCachePut(
mut env: JNIEnv,
_class: JClass,
cache_ptr: jlong,
file_path: JString,
) -> i32 {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to think of return type here. Are there any return codes we are following?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking of having (1,0,-1) OR maybe Result() if we want to propagate error messages back to Java. Ideally we should have boolean to check if it was code other than 1, a pre-configured retry mechanism should be triggered

let file_path: String = match env.get_string(&file_path) {
Ok(s) => s.into(),
Err(_) => return -1,
};
let cache = unsafe { &mut *(cache_ptr as *mut Arc<DefaultFilesMetadataCache>) };
let data_format = if file_path.to_lowercase().ends_with(".parquet") {
"parquet"
} else {
return 0; // Skip unsupported formats
};

let object_meta = create_object_meta_from_file(&file_path);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can fail as well, ideally the function should have Result() return to capture the error.

let store = Arc::new(object_store::local::LocalFileSystem::new());

// Use Runtime to block on the async operation
let metadata = Runtime::new()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's use the global runtime instead of creating our own here.

.expect("Failed to create Tokio Runtime")
.block_on(async {
construct_file_metadata(store.as_ref(), &object_meta, data_format)
.await
.expect("Failed to construct file metadata")
});

cache.put(&object_meta, metadata);

println!("Cached metadata for: {}", file_path);
1
}

#[no_mangle]
pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_metadataCacheRemove(
mut env: JNIEnv,
_class: JClass,
cache_ptr: jlong,
file_path: JString,
) -> bool {
let file_path: String = match env.get_string(&file_path) {
Ok(s) => s.into(),
Err(_) => return false,
};
let cache = unsafe { &mut *(cache_ptr as *mut Arc<DefaultFilesMetadataCache>) };
let object_meta = create_object_meta_from_file(&file_path);

// Try to get mutable access if there's only one reference
if let Some(cache_mut) = Arc::get_mut(cache) {
cache_mut.remove(&object_meta);
println!("Cache removed for: {}", file_path);
true
} else {
// If there are multiple references, we can't remove the item

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm assuming the references will be created whenever a search query is running so in that case we won't be able to free up the cache? Is there a way to mark the entries dirty? Thinking of cases when cache want to evict something and has to take the decision, probably a dirty bit can help in making the decision easier? We can keep this as an follow up as well

// This is a limitation of the current cache design
println!("Cannot remove from cache (multiple references exist): {}", file_path);
false
}
}

#[no_mangle]
pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_metadataCacheGet(
mut env: JNIEnv,
_class: JClass,
cache_ptr: jlong,
file_path: JString,
) -> jlong {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are using bool, i32, jlong in different functions.. probably we need to come to a common way.

let file_path: String = match env.get_string(&file_path) {
Ok(s) => s.into(),
Err(_) => return 0,
};

let cache = unsafe { &*(cache_ptr as *const Arc<DefaultFilesMetadataCache>) };
let object_meta = create_object_meta_from_file(&file_path);

match cache.get(&object_meta) {
Some(metadata) => {
println!("Retrieved metadata for: {} - size: {:?}", file_path, metadata.memory_size());
Box::into_raw(Box::new(metadata)) as jlong
},
None => {
println!("No metadata found for: {}", file_path);
0
},
}
}

#[no_mangle]
pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_metadataCacheContainsFile(
mut env: JNIEnv,
_class: JClass,
cache_ptr: jlong,
file_path: JString
) -> bool {
let file_path: String = match env.get_string(&file_path) {
Ok(s) => s.into(),
Err(_) => return false
};
let cache = unsafe { &*(cache_ptr as *const Arc<DefaultFilesMetadataCache>) };
let object_meta = create_object_meta_from_file(&file_path);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we creating object meta here?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are checking if a given key exists in the cache. As metadata_cache has object_meta as key, we are constructing the key here.

But i think we should rather pick this up from a static entity perhaps listing_table_cache which already has ObjectMeta stored corresponding to each filePath

cache.contains_key(&object_meta)
}

#[no_mangle]
pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_metadataCacheGetSize(
mut env: JNIEnv,
_class: JClass,
cache_ptr: jlong
) -> usize {
let cache = unsafe { &*(cache_ptr as *const Arc<DefaultFilesMetadataCache>) };
cache.memory_used()
}
62 changes: 45 additions & 17 deletions plugins/engine-datafusion/jni/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,18 @@
use anyhow::Result;
use chrono::{DateTime, Utc};
use datafusion::arrow::array::RecordBatch;
use datafusion::datasource::physical_plan::parquet::metadata::DFParquetMetadata;
use datafusion::datasource::physical_plan::parquet::CachedParquetMetaData;
use datafusion::execution::cache::cache_manager::FileMetadata;
use jni::objects::{JObject, JObjectArray, JString};
use jni::sys::jlong;
use jni::JNIEnv;
use object_store::{path::Path as ObjectPath, ObjectMeta};
use object_store::{path::Path as ObjectPath, ObjectMeta, ObjectStore};
use std::collections::HashMap;
use std::error::Error;
use std::fs;
use std::sync::Arc;
use object_store::{local::LocalFileSystem};

/// Set error message from a result using a Consumer<String> Java callback
pub fn set_error_message_batch<Err: Error>(env: &mut JNIEnv, callback: JObject, result: Result<Vec<RecordBatch>, Err>) {
Expand Down Expand Up @@ -90,7 +95,6 @@ pub fn set_object_result_error<T: Error>(env: &mut JNIEnv, callback: JObject, er
.expect("Failed to call object result callback with error");
}


/// Parse a string map from JNI arrays
pub fn parse_string_map(
env: &mut JNIEnv,
Expand Down Expand Up @@ -159,20 +163,44 @@ pub fn throw_exception(env: &mut JNIEnv, message: &str) {

pub fn create_object_meta_from_filenames(base_path: &str, filenames: Vec<String>) -> Vec<ObjectMeta> {
filenames.into_iter().map(|filename| {
let filename = filename.as_str();
let full_path = format!("{}/{}", base_path.trim_end_matches('/'), filename);
let file_size = fs::metadata(&full_path).map(|m| m.len()).unwrap_or(0);
let modified = fs::metadata(&full_path)
.and_then(|m| m.modified())
.map(|t| DateTime::<Utc>::from(t))
.unwrap_or_else(|_| Utc::now());

ObjectMeta {
location: ObjectPath::from(filename),
last_modified: modified,
size: file_size,
e_tag: None,
version: None,
}
create_object_meta_from_file(&full_path)
}).collect()
}
}

pub fn create_object_meta_from_file(file_path: &str) -> ObjectMeta {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's start making function in Result() return type

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted.

let file_size = fs::metadata(&file_path).map(|m| m.len()).unwrap_or(0);
let modified = fs::metadata(&file_path)
.and_then(|m| m.modified())
.map(|t| DateTime::<Utc>::from(t))
.unwrap_or_else(|_| Utc::now());

ObjectMeta {
location: ObjectPath::from(file_path),
last_modified: modified,
size: file_size,
e_tag: None,
version: None,
}
}

// Utility method to construct file metadata using DataFusion's DFParquetMetadata
pub async fn construct_file_metadata(
store: &dyn ObjectStore,
object_meta: &ObjectMeta,
data_format: &str,
) -> Result<Arc<dyn FileMetadata>, Box<dyn std::error::Error>> {
match data_format.to_lowercase().as_str() {
"parquet" => {
let df_metadata = DFParquetMetadata::new(
store,
object_meta
);

let parquet_metadata = df_metadata.fetch_metadata().await?;
let par = CachedParquetMetaData::new(parquet_metadata);
Ok(Arc::new(par))
},
_ => Err(format!("Unsupported data format: {}", data_format).into())
}
}
Loading
Loading