Skip to content
Merged
140 changes: 108 additions & 32 deletions java/core/lance-jni/src/transaction.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,38 @@
use crate::blocking_dataset::{BlockingDataset, NATIVE_DATASET};
use crate::error::Result;
use crate::traits::IntoJava;
use crate::traits::{import_vec, FromJObjectWithEnv, IntoJava};
use crate::utils::to_rust_map;
use arrow::datatypes::Schema;
use arrow_schema::ffi::FFI_ArrowSchema;
use jni::objects::{JMap, JObject, JString};
use jni::objects::{JMap, JObject, JString, JValue};
use jni::JNIEnv;
use lance::dataset::transaction::{Operation, Transaction, TransactionBuilder};
use lance_core::datatypes::Schema as LanceSchema;
use std::sync::Arc;

#[no_mangle]
pub extern "system" fn Java_com_lancedb_lance_Transaction_commitNative<'local>(
pub extern "system" fn Java_com_lancedb_lance_Dataset_nativeCommitTransaction<'local>(
mut env: JNIEnv<'local>,
jtransaction: JObject,
java_dataset: JObject,
java_transaction: JObject,
) -> JObject<'local> {
ok_or_throw!(env, inner_commit_transaction(&mut env, jtransaction))
ok_or_throw!(
env,
inner_commit_transaction(&mut env, java_dataset, java_transaction)
)
}

fn inner_commit_transaction<'local>(
env: &mut JNIEnv<'local>,
java_tx: JObject,
java_dataset: JObject,
java_transaction: JObject,
) -> Result<JObject<'local>> {
let java_dataset: JObject = env
.call_method(&java_tx, "dataset", "()Lcom/lancedb/lance/Dataset;", &[])?
.l()?;
let write_param_jobj = env
.call_method(&java_tx, "writeParams", "()Ljava/util/Map;", &[])?
.call_method(&java_transaction, "writeParams", "()Ljava/util/Map;", &[])?
.l()?;
let write_param_jmap = JMap::from_env(env, &write_param_jobj)?;
let write_param = to_rust_map(env, &write_param_jmap)?;
let transaction = convert_to_rust_transaction(env, java_tx)?;
let transaction = convert_to_rust_transaction(env, java_transaction, Some(&java_dataset))?;
let new_blocking_ds = {
let mut dataset_guard =
unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?;
Expand All @@ -39,26 +41,32 @@ fn inner_commit_transaction<'local>(
new_blocking_ds.into_java(env)
}

fn convert_to_rust_transaction(env: &mut JNIEnv, java_tx: JObject) -> Result<Transaction> {
let read_ver = env.call_method(&java_tx, "readVersion", "()J", &[])?.j()?;
fn convert_to_rust_transaction(
env: &mut JNIEnv,
java_transaction: JObject,
java_dataset: Option<&JObject>,
) -> Result<Transaction> {
let read_ver = env
.call_method(&java_transaction, "readVersion", "()J", &[])?
.j()?;
let uuid = env
.call_method(&java_tx, "uuid", "()Ljava/lang/String;", &[])?
.call_method(&java_transaction, "uuid", "()Ljava/lang/String;", &[])?
.l()?;
let uuid = JString::from(uuid);
let uuid = env.get_string(&uuid)?.into();
let op = env
.call_method(
&java_tx,
&java_transaction,
"operation",
"()Lcom/lancedb/lance/operation/Operation;",
&[],
)?
.l()?;
let op = convert_to_rust_operation(env, op)?;
let op = convert_to_rust_operation(env, op, java_dataset)?;

let blobs_op = env
.call_method(
&java_tx,
&java_transaction,
"blobsOperation",
"()Lcom/lancedb/lance/operation/Operation;",
&[],
Expand All @@ -67,11 +75,16 @@ fn convert_to_rust_transaction(env: &mut JNIEnv, java_tx: JObject) -> Result<Tra
let blobs_op = if blobs_op.is_null() {
None
} else {
Some(convert_to_rust_operation(env, blobs_op)?)
Some(convert_to_rust_operation(env, blobs_op, java_dataset)?)
};

let transaction_properties = env
.call_method(&java_tx, "transactionProperties", "()Ljava/util/Map;", &[])?
.call_method(
&java_transaction,
"transactionProperties",
"()Ljava/util/Map;",
&[],
)?
.l()?;
let transaction_properties = JMap::from_env(env, &transaction_properties)?;
let transaction_properties = to_rust_map(env, &transaction_properties)?;
Expand All @@ -82,28 +95,91 @@ fn convert_to_rust_transaction(env: &mut JNIEnv, java_tx: JObject) -> Result<Tra
.build())
}

fn convert_to_rust_operation(env: &mut JNIEnv, java_operation: JObject) -> Result<Operation> {
fn convert_to_rust_operation(
env: &mut JNIEnv,
java_operation: JObject,
java_dataset: Option<&JObject>,
) -> Result<Operation> {
let name = env
.call_method(&java_operation, "name", "()Ljava/lang/String;", &[])?
.l()?;
let name = JString::from(name);
let name: String = env.get_string(&name)?.into();
let op = match name.as_str() {
"Project" => {
let schema_ptr = env
.call_method(&java_operation, "exportSchema", "()J", &[])?
.j()?;
log::info!("Schema pointer: {:?}", schema_ptr);
let c_schema_ptr = schema_ptr as *mut FFI_ArrowSchema;
let c_schema = unsafe { FFI_ArrowSchema::from_raw(c_schema_ptr) };
let schema = Schema::try_from(&c_schema)?;

Operation::Project {
schema: LanceSchema::try_from(&schema)
.expect("Failed to convert from arrow schema to lance schema"),
"Project" => Operation::Project {
schema: convert_schema_from_operation(env, &java_operation, java_dataset.unwrap())?,
},
"Append" => {
let fragment_objs = env
.call_method(&java_operation, "fragments", "()Ljava/util/List;", &[])?
.l()?;
let fragment_objs = import_vec(env, &fragment_objs)?;
let mut fragments = Vec::with_capacity(fragment_objs.len());
for f in fragment_objs {
fragments.push(f.extract_object(env)?);
}
Operation::Append { fragments }
}
"Overwrite" => {
let fragment_objs = env
.call_method(&java_operation, "fragments", "()Ljava/util/List;", &[])?
.l()?;
let fragment_objs = import_vec(env, &fragment_objs)?;
let mut fragments = Vec::with_capacity(fragment_objs.len());
for f in fragment_objs {
fragments.push(f.extract_object(env)?);
}
let config_upsert_values = env
.call_method(
&java_operation,
"configUpsertValues",
"()Ljava/util/Map;",
&[],
)?
.l()?;
let config_upsert_values = if config_upsert_values.is_null() {
None
} else {
let config_upsert_values = JMap::from_env(env, &config_upsert_values)?;
Some(to_rust_map(env, &config_upsert_values)?)
};
Operation::Overwrite {
fragments,
schema: convert_schema_from_operation(env, &java_operation, java_dataset.unwrap())?,
config_upsert_values,
}
}
_ => unimplemented!(),
};
Ok(op)
}

fn convert_schema_from_operation(
env: &mut JNIEnv,
java_operation: &JObject,
java_dataset: &JObject,
) -> Result<LanceSchema> {
let java_buffer_allocator = env
.call_method(
java_dataset,
"allocator",
"()Lorg/apache/arrow/memory/BufferAllocator;",
&[],
)?
.l()?;
let schema_ptr = env
.call_method(
java_operation,
"exportSchema",
"(Lorg/apache/arrow/memory/BufferAllocator;)J",
&[JValue::Object(&java_buffer_allocator)],
)?
.j()?;
let c_schema_ptr = schema_ptr as *mut FFI_ArrowSchema;
let c_schema = unsafe { FFI_ArrowSchema::from_raw(c_schema_ptr) };
let schema = Schema::try_from(&c_schema)?;
Ok(
LanceSchema::try_from(&schema)
.expect("Failed to convert from arrow schema to lance schema"),
)
}
40 changes: 37 additions & 3 deletions java/core/src/main/java/com/lancedb/lance/Dataset.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ public class Dataset implements Closeable {

private long nativeDatasetHandle;

BufferAllocator allocator;
boolean selfManagedAllocator = false;
private BufferAllocator allocator;
private boolean selfManagedAllocator = false;

private final LockManager lockManager = new LockManager();

Expand Down Expand Up @@ -221,7 +221,7 @@ private static native Dataset openNative(
Map<String, String> storageOptions);

/**
* Create a new version of dataset.
* Create a new version of dataset. Use {@link Transaction} instead
*
* @param allocator the buffer allocator
* @param path The file path of the dataset to open.
Expand All @@ -230,6 +230,7 @@ private static native Dataset openNative(
* is not needed for overwrite or restore operations.
* @return A new instance of {@link Dataset} linked to the opened dataset.
*/
@Deprecated
public static Dataset commit(
BufferAllocator allocator,
String path,
Expand All @@ -238,6 +239,7 @@ public static Dataset commit(
return commit(allocator, path, operation, readVersion, new HashMap<>());
}

@Deprecated
public static Dataset commit(
BufferAllocator allocator,
String path,
Expand All @@ -253,19 +255,27 @@ public static Dataset commit(
return dataset;
}

/** Use {@link Transaction} instead */
@Deprecated
public static native Dataset commitAppend(
String path,
Optional<Long> readVersion,
List<FragmentMetadata> fragmentsMetadata,
Map<String, String> storageOptions);

/** Use {@link Transaction} instead */
@Deprecated
public static native Dataset commitOverwrite(
String path,
long arrowSchemaMemoryAddress,
Optional<Long> readVersion,
List<FragmentMetadata> fragmentsMetadata,
Map<String, String> storageOptions);

public BufferAllocator allocator() {
return allocator;
}

/**
* Create a new transaction builder at current version for the dataset. The dataset itself will
* not refresh after the transaction committed.
Expand All @@ -276,6 +286,30 @@ public Transaction.Builder newTransactionBuilder() {
return new Transaction.Builder(this).readVersion(version());
}

/**
* Commit a single transaction and return a new Dataset with the new version. Original dataset
* version will not be refreshed.
*
* @param transaction The transaction to commit
* @return A new instance of {@link Dataset} linked to committed version.
*/
public Dataset commitTransaction(Transaction transaction) {
Preconditions.checkNotNull(transaction);
try {
Dataset dataset = nativeCommitTransaction(transaction);
if (selfManagedAllocator) {
dataset.allocator = new RootAllocator(Long.MAX_VALUE);
} else {
dataset.allocator = allocator;
}
return dataset;
} finally {
transaction.release();
}
}

private native Dataset nativeCommitTransaction(Transaction transaction);

/**
* Drop a Dataset.
*
Expand Down
6 changes: 3 additions & 3 deletions java/core/src/main/java/com/lancedb/lance/Fragment.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public LanceScanner newScan() {
return LanceScanner.create(
dataset,
new ScanOptions.Builder().fragmentIds(Arrays.asList(fragment.getId())).build(),
dataset.allocator);
dataset.allocator());
}

/**
Expand All @@ -78,7 +78,7 @@ public LanceScanner newScan(long batchSize) {
.fragmentIds(Arrays.asList(fragment.getId()))
.batchSize(batchSize)
.build(),
dataset.allocator);
dataset.allocator());
}

/**
Expand All @@ -92,7 +92,7 @@ public LanceScanner newScan(ScanOptions options) {
return LanceScanner.create(
dataset,
new ScanOptions.Builder(options).fragmentIds(Arrays.asList(fragment.getId())).build(),
dataset.allocator);
dataset.allocator());
}

private native int countRowsNative(Dataset dataset, long fragmentId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
import java.util.Map;
import java.util.Optional;

/** Fragment related operations. */
@Deprecated
/** Fragment related operations. Use {@link Transaction} instead */
public abstract class FragmentOperation {
protected static void validateFragments(List<FragmentMetadata> fragments) {
if (fragments == null || fragments.isEmpty()) {
Expand Down
4 changes: 2 additions & 2 deletions java/core/src/main/java/com/lancedb/lance/SqlQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ public SqlQuery withRowAddr(boolean withAddr) {
}

public ArrowReader intoBatchRecords() throws IOException {
try (ArrowArrayStream s = ArrowArrayStream.allocateNew(dataset.allocator)) {
try (ArrowArrayStream s = ArrowArrayStream.allocateNew(dataset.allocator())) {
intoBatchRecords(
dataset, sql, Optional.ofNullable(table), withRowId, withRowAddr, s.memoryAddress());
return Data.importArrayStream(dataset.allocator, s);
return Data.importArrayStream(dataset.allocator(), s);
}
}

Expand Down
Loading