-
Couldn't load subscription status.
- Fork 0
DataFusion Cache Support #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: base-search-indexing
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,7 +6,7 @@ | |
| * compatible open source license. | ||
| */ | ||
|
|
||
| use jni::objects::{JByteArray, JClass}; | ||
| use jni::objects::{JByteArray, JClass, JObject}; | ||
| use jni::sys::{jbyteArray, jlong, jstring}; | ||
| use jni::JNIEnv; | ||
| use std::sync::Arc; | ||
|
|
@@ -17,12 +17,12 @@ use datafusion::execution::context::SessionContext; | |
|
|
||
| use datafusion::DATAFUSION_VERSION; | ||
| use datafusion::datasource::file_format::csv::CsvFormat; | ||
| use datafusion::datasource::file_format::parquet::ParquetFormat; | ||
| use datafusion::execution::cache::cache_manager::{CacheManager, CacheManagerConfig, FileStatisticsCache}; | ||
| use datafusion::execution::cache::cache_manager::{CacheManager, CacheManagerConfig, FileMetadataCache, FileStatisticsCache}; | ||
| use datafusion::execution::cache::cache_unit::DefaultFilesMetadataCache; | ||
| use datafusion::execution::disk_manager::DiskManagerConfig; | ||
| use datafusion::execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder}; | ||
| use datafusion::prelude::SessionConfig; | ||
| use crate::util::{create_object_meta_from_filenames, parse_string_arr}; | ||
| use crate::util::{create_object_meta_from_filenames, create_object_meta_from_file, parse_string_arr, construct_file_metadata}; | ||
| use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl}; | ||
| use datafusion::execution::cache::cache_unit::DefaultListFilesCache; | ||
| use datafusion::execution::cache::CacheAccessor; | ||
|
|
@@ -33,6 +33,11 @@ use jni::objects::{JObjectArray, JString}; | |
| use prost::Message; | ||
| use tokio::runtime::Runtime; | ||
| use object_store::ObjectMeta; | ||
| use chrono::Utc; | ||
| use std::collections::HashMap; | ||
| use std::sync::Mutex; | ||
| use std::ops::Deref; | ||
|
|
||
|
|
||
| /// Create a new DataFusion session context | ||
| #[no_mangle] | ||
|
|
@@ -103,7 +108,7 @@ pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_createG | |
| } | ||
|
|
||
| #[no_mangle] | ||
| pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_createSessionContext( | ||
| pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_createSessionContextv1( | ||
| _env: JNIEnv, | ||
| _class: JClass, | ||
| runtime_id: jlong, | ||
|
|
@@ -261,30 +266,28 @@ pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_nativeE | |
| }, | ||
| Err(e) => { | ||
| println!("SUBSTRAIT Rust: Failed to convert Substrait plan: {}", e); | ||
| return; | ||
| return 0; | ||
| } | ||
| }; | ||
|
|
||
| let dataframe = ctx.execute_logical_plan(logical_plan) | ||
| .await.expect("Failed to run Logical Plan"); | ||
|
|
||
| // TODO : check if this works | ||
| return match dataframe.execute_stream() { | ||
| match dataframe.execute_stream().await { | ||
| Ok(stream) => { | ||
| let boxed_stream = Box::new(stream); | ||
| let stream_ptr = Box::into_raw(boxed_stream); | ||
| stream_ptr as jlong | ||
| }, | ||
| Err(e) => { | ||
| println!("SUBSTRAIT Rust: Failed to execute stream: {}", e); | ||
| 0 | ||
| } | ||
| } | ||
| }) | ||
|
|
||
|
|
||
| // Create DataFrame from the converted logical plan | ||
|
|
||
|
|
||
| } | ||
|
|
||
| // If we need to create session context separately | ||
|
|
@@ -353,4 +356,170 @@ pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_nativeC | |
|
|
||
|
|
||
|
|
||
| #[no_mangle] | ||
| pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_initCacheManagerConfig( | ||
| _env: JNIEnv, | ||
| _class: JClass, | ||
| ) -> jlong { | ||
| let config = CacheManagerConfig::default(); | ||
| Box::into_raw(Box::new(config)) as jlong | ||
| } | ||
|
|
||
| /// Create RuntimeEnv using the configured CacheManagerConfig | ||
| #[no_mangle] | ||
| pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_createGlobalRuntimeWithConfig( | ||
| _env: JNIEnv, | ||
| _class: JClass, | ||
| config_ptr: jlong, | ||
| ) -> jlong { | ||
| // Take ownership of the CacheManagerConfig | ||
| let cache_manager_config = unsafe { Box::from_raw(config_ptr as *mut CacheManagerConfig) }; | ||
|
|
||
| // Create RuntimeEnv with the configured cache manager | ||
| let runtime_env = RuntimeEnvBuilder::default() | ||
| .with_cache_manager(*cache_manager_config) | ||
| .build() | ||
| .unwrap(); | ||
|
|
||
| Box::into_raw(Box::new(runtime_env)) as jlong | ||
| } | ||
|
|
||
| /// Create a metadata cache and add it to the CacheManagerConfig | ||
| /// The config_ptr remains the same, only the contents are updated | ||
| #[no_mangle] | ||
| pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_createMetadataCache( | ||
| _env: JNIEnv, | ||
| _class: JClass, | ||
| config_ptr: jlong, | ||
| size_limit: jlong, | ||
| ) -> jlong { | ||
| // Create the cache | ||
| let cache = Arc::new(DefaultFilesMetadataCache::new(size_limit.try_into().unwrap())); | ||
|
|
||
| // Update the CacheManagerConfig at the same memory location | ||
| if config_ptr != 0 { | ||
| let cache_manager_config = unsafe { &mut *(config_ptr as *mut CacheManagerConfig) }; | ||
| // This replaces the contents at the same pointer location | ||
| *cache_manager_config = cache_manager_config.clone() | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need to clone? |
||
| .with_file_metadata_cache(Some(cache.clone() as Arc<dyn FileMetadataCache>)); | ||
| } | ||
|
|
||
| // Return the cache pointer | ||
| Box::into_raw(Box::new(cache)) as jlong | ||
| } | ||
|
|
||
| #[no_mangle] | ||
| pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_metadataCachePut( | ||
| mut env: JNIEnv, | ||
| _class: JClass, | ||
| cache_ptr: jlong, | ||
| file_path: JString, | ||
| ) -> i32 { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to think of return type here. Are there any return codes we are following? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am thinking of having (1,0,-1) OR maybe Result() if we want to propagate error messages back to Java. Ideally we should have boolean to check if it was code other than 1, a pre-configured retry mechanism should be triggered |
||
| let file_path: String = match env.get_string(&file_path) { | ||
| Ok(s) => s.into(), | ||
| Err(_) => return -1, | ||
| }; | ||
| let cache = unsafe { &mut *(cache_ptr as *mut Arc<DefaultFilesMetadataCache>) }; | ||
| let data_format = if file_path.to_lowercase().ends_with(".parquet") { | ||
| "parquet" | ||
| } else { | ||
| return 0; // Skip unsupported formats | ||
| }; | ||
|
|
||
| let object_meta = create_object_meta_from_file(&file_path); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can fail as well, ideally the function should have Result() return to capture the error. |
||
| let store = Arc::new(object_store::local::LocalFileSystem::new()); | ||
|
|
||
| // Use Runtime to block on the async operation | ||
| let metadata = Runtime::new() | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's use the global runtime instead of creating our own here. |
||
| .expect("Failed to create Tokio Runtime") | ||
| .block_on(async { | ||
| construct_file_metadata(store.as_ref(), &object_meta, data_format) | ||
| .await | ||
| .expect("Failed to construct file metadata") | ||
| }); | ||
|
|
||
| cache.put(&object_meta, metadata); | ||
|
|
||
| println!("Cached metadata for: {}", file_path); | ||
| 1 | ||
| } | ||
|
|
||
| #[no_mangle] | ||
| pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_metadataCacheRemove( | ||
| mut env: JNIEnv, | ||
| _class: JClass, | ||
| cache_ptr: jlong, | ||
| file_path: JString, | ||
| ) -> bool { | ||
| let file_path: String = match env.get_string(&file_path) { | ||
| Ok(s) => s.into(), | ||
| Err(_) => return false, | ||
| }; | ||
| let cache = unsafe { &mut *(cache_ptr as *mut Arc<DefaultFilesMetadataCache>) }; | ||
| let object_meta = create_object_meta_from_file(&file_path); | ||
|
|
||
| // Try to get mutable access if there's only one reference | ||
| if let Some(cache_mut) = Arc::get_mut(cache) { | ||
| cache_mut.remove(&object_meta); | ||
| println!("Cache removed for: {}", file_path); | ||
| true | ||
| } else { | ||
| // If there are multiple references, we can't remove the item | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm assuming the references will be created whenever a search query is running so in that case we won't be able to free up the cache? Is there a way to mark the entries dirty? Thinking of cases when cache want to evict something and has to take the decision, probably a dirty bit can help in making the decision easier? We can keep this as an follow up as well |
||
| // This is a limitation of the current cache design | ||
| println!("Cannot remove from cache (multiple references exist): {}", file_path); | ||
| false | ||
| } | ||
| } | ||
|
|
||
| #[no_mangle] | ||
| pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_metadataCacheGet( | ||
| mut env: JNIEnv, | ||
| _class: JClass, | ||
| cache_ptr: jlong, | ||
| file_path: JString, | ||
| ) -> jlong { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We are using bool, i32, jlong in different functions.. probably we need to come to a common way. |
||
| let file_path: String = match env.get_string(&file_path) { | ||
| Ok(s) => s.into(), | ||
| Err(_) => return 0, | ||
| }; | ||
|
|
||
| let cache = unsafe { &*(cache_ptr as *const Arc<DefaultFilesMetadataCache>) }; | ||
| let object_meta = create_object_meta_from_file(&file_path); | ||
|
|
||
| match cache.get(&object_meta) { | ||
| Some(metadata) => { | ||
| println!("Retrieved metadata for: {} - size: {:?}", file_path, metadata.memory_size()); | ||
| Box::into_raw(Box::new(metadata)) as jlong | ||
| }, | ||
| None => { | ||
| println!("No metadata found for: {}", file_path); | ||
| 0 | ||
| }, | ||
| } | ||
| } | ||
|
|
||
| #[no_mangle] | ||
| pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_metadataCacheContainsFile( | ||
| mut env: JNIEnv, | ||
| _class: JClass, | ||
| cache_ptr: jlong, | ||
| file_path: JString | ||
| ) -> bool { | ||
| let file_path: String = match env.get_string(&file_path) { | ||
| Ok(s) => s.into(), | ||
| Err(_) => return false | ||
| }; | ||
| let cache = unsafe { &*(cache_ptr as *const Arc<DefaultFilesMetadataCache>) }; | ||
| let object_meta = create_object_meta_from_file(&file_path); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why are we creating object meta here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We are checking if a given key exists in the cache. As metadata_cache has object_meta as key, we are constructing the key here. But i think we should rather pick this up from a static entity perhaps |
||
| cache.contains_key(&object_meta) | ||
| } | ||
|
|
||
| #[no_mangle] | ||
| pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_metadataCacheGetSize( | ||
| mut env: JNIEnv, | ||
| _class: JClass, | ||
| cache_ptr: jlong | ||
| ) -> usize { | ||
| let cache = unsafe { &*(cache_ptr as *const Arc<DefaultFilesMetadataCache>) }; | ||
| cache.memory_used() | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,13 +5,18 @@ | |
| use anyhow::Result; | ||
| use chrono::{DateTime, Utc}; | ||
| use datafusion::arrow::array::RecordBatch; | ||
| use datafusion::datasource::physical_plan::parquet::metadata::DFParquetMetadata; | ||
| use datafusion::datasource::physical_plan::parquet::CachedParquetMetaData; | ||
| use datafusion::execution::cache::cache_manager::FileMetadata; | ||
| use jni::objects::{JObject, JObjectArray, JString}; | ||
| use jni::sys::jlong; | ||
| use jni::JNIEnv; | ||
| use object_store::{path::Path as ObjectPath, ObjectMeta}; | ||
| use object_store::{path::Path as ObjectPath, ObjectMeta, ObjectStore}; | ||
| use std::collections::HashMap; | ||
| use std::error::Error; | ||
| use std::fs; | ||
| use std::sync::Arc; | ||
| use object_store::{local::LocalFileSystem}; | ||
|
|
||
| /// Set error message from a result using a Consumer<String> Java callback | ||
| pub fn set_error_message_batch<Err: Error>(env: &mut JNIEnv, callback: JObject, result: Result<Vec<RecordBatch>, Err>) { | ||
|
|
@@ -90,7 +95,6 @@ pub fn set_object_result_error<T: Error>(env: &mut JNIEnv, callback: JObject, er | |
| .expect("Failed to call object result callback with error"); | ||
| } | ||
|
|
||
|
|
||
| /// Parse a string map from JNI arrays | ||
| pub fn parse_string_map( | ||
| env: &mut JNIEnv, | ||
|
|
@@ -159,20 +163,44 @@ pub fn throw_exception(env: &mut JNIEnv, message: &str) { | |
|
|
||
| pub fn create_object_meta_from_filenames(base_path: &str, filenames: Vec<String>) -> Vec<ObjectMeta> { | ||
| filenames.into_iter().map(|filename| { | ||
| let filename = filename.as_str(); | ||
| let full_path = format!("{}/{}", base_path.trim_end_matches('/'), filename); | ||
| let file_size = fs::metadata(&full_path).map(|m| m.len()).unwrap_or(0); | ||
| let modified = fs::metadata(&full_path) | ||
| .and_then(|m| m.modified()) | ||
| .map(|t| DateTime::<Utc>::from(t)) | ||
| .unwrap_or_else(|_| Utc::now()); | ||
|
|
||
| ObjectMeta { | ||
| location: ObjectPath::from(filename), | ||
| last_modified: modified, | ||
| size: file_size, | ||
| e_tag: None, | ||
| version: None, | ||
| } | ||
| create_object_meta_from_file(&full_path) | ||
| }).collect() | ||
| } | ||
| } | ||
|
|
||
| pub fn create_object_meta_from_file(file_path: &str) -> ObjectMeta { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's start making function in Result() return type There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Noted. |
||
| let file_size = fs::metadata(&file_path).map(|m| m.len()).unwrap_or(0); | ||
| let modified = fs::metadata(&file_path) | ||
| .and_then(|m| m.modified()) | ||
| .map(|t| DateTime::<Utc>::from(t)) | ||
| .unwrap_or_else(|_| Utc::now()); | ||
|
|
||
| ObjectMeta { | ||
| location: ObjectPath::from(file_path), | ||
| last_modified: modified, | ||
| size: file_size, | ||
| e_tag: None, | ||
| version: None, | ||
| } | ||
| } | ||
|
|
||
| // Utility method to construct file metadata using DataFusion's DFParquetMetadata | ||
| pub async fn construct_file_metadata( | ||
| store: &dyn ObjectStore, | ||
| object_meta: &ObjectMeta, | ||
| data_format: &str, | ||
| ) -> Result<Arc<dyn FileMetadata>, Box<dyn std::error::Error>> { | ||
| match data_format.to_lowercase().as_str() { | ||
| "parquet" => { | ||
| let df_metadata = DFParquetMetadata::new( | ||
| store, | ||
| object_meta | ||
| ); | ||
|
|
||
| let parquet_metadata = df_metadata.fetch_metadata().await?; | ||
| let par = CachedParquetMetaData::new(parquet_metadata); | ||
| Ok(Arc::new(par)) | ||
| }, | ||
| _ => Err(format!("Unsupported data format: {}", data_format).into()) | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can still keep this
createSessionContext?