Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions java/core/lance-jni/src/blocking_dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ use jni::{objects::JObject, JNIEnv};
use lance::dataset::builder::DatasetBuilder;
use lance::dataset::refs::TagContents;
use lance::dataset::statistics::{DataStatistics, DatasetStatisticsExt};
use lance::dataset::transaction::Operation;
use lance::dataset::transaction::{Operation, Transaction};
use lance::dataset::{
ColumnAlteration, Dataset, NewColumnTransform, ProjectionRequest, ReadParams, Version,
WriteParams,
ColumnAlteration, CommitBuilder, Dataset, NewColumnTransform, ProjectionRequest, ReadParams,
Version, WriteParams,
};
use lance::io::{ObjectStore, ObjectStoreParams};
use lance::table::format::Fragment;
Expand Down Expand Up @@ -239,6 +239,22 @@ impl BlockingDataset {
Ok(())
}

pub fn commit_transaction(
&mut self,
transaction: Transaction,
write_params: HashMap<String, String>,
) -> Result<Self> {
let new_dataset = RT.block_on(
CommitBuilder::new(Arc::new(self.clone().inner))
.with_store_params(ObjectStoreParams {
storage_options: Some(write_params),
..Default::default()
})
.execute(transaction),
)?;
Ok(BlockingDataset { inner: new_dataset })
}

pub fn replace_schema_metadata(&mut self, metadata: HashMap<String, String>) -> Result<()> {
RT.block_on(self.inner.replace_schema_metadata(metadata))?;
Ok(())
Expand Down
1 change: 1 addition & 0 deletions java/core/lance-jni/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ mod file_writer;
mod fragment;
mod schema;
pub mod traits;
mod transaction;
pub mod utils;

pub use error::Error;
Expand Down
105 changes: 105 additions & 0 deletions java/core/lance-jni/src/transaction.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
use crate::blocking_dataset::{BlockingDataset, NATIVE_DATASET};
use crate::error::Result;
use crate::traits::IntoJava;
use crate::utils::to_rust_map;
use arrow::datatypes::Schema;
use arrow_schema::ffi::FFI_ArrowSchema;
use jni::objects::{JMap, JObject, JString};
use jni::JNIEnv;
use lance::dataset::transaction::{Operation, Transaction};
use lance_core::datatypes::Schema as LanceSchema;

#[no_mangle]
pub extern "system" fn Java_com_lancedb_lance_Transaction_commitNative<'local>(
mut env: JNIEnv<'local>,
jtransaction: JObject,
) -> JObject<'local> {
ok_or_throw!(env, inner_commit_transaction(&mut env, jtransaction))
}

fn inner_commit_transaction<'local>(
env: &mut JNIEnv<'local>,
java_tx: JObject,
) -> Result<JObject<'local>> {
let java_dataset: JObject = env
.call_method(&java_tx, "dataset", "()Lcom/lancedb/lance/Dataset;", &[])?
.l()?;
let write_param_jobj = env
.call_method(&java_tx, "writeParams", "()Ljava/util/Map;", &[])?
.l()?;
let write_param_jmap = JMap::from_env(env, &write_param_jobj)?;
let write_param = to_rust_map(env, &write_param_jmap)?;
let transaction = convert_to_rust_transaction(env, java_tx)?;
let new_blocking_ds = {
let mut dataset_guard =
unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?;
dataset_guard.commit_transaction(transaction, write_param)?
};
new_blocking_ds.into_java(env)
}

fn convert_to_rust_transaction(env: &mut JNIEnv, java_tx: JObject) -> Result<Transaction> {
let read_ver = env.call_method(&java_tx, "readVersion", "()J", &[])?.j()?;
let uuid = env
.call_method(&java_tx, "uuid", "()Ljava/lang/String;", &[])?
.l()?;
let uuid = JString::from(uuid);
let uuid = env.get_string(&uuid)?.into();
let op = env
.call_method(
&java_tx,
"operation",
"()Lcom/lancedb/lance/operation/Operation;",
&[],
)?
.l()?;
let op = convert_to_rust_operation(env, op)?;

let blobs_op = env
.call_method(
&java_tx,
"blobsOperation",
"()Lcom/lancedb/lance/operation/Operation;",
&[],
)?
.l()?;
let blobs_op = if blobs_op.is_null() {
None
} else {
Some(convert_to_rust_operation(env, blobs_op)?)
};

Ok(Transaction {
read_version: read_ver as u64,
uuid,
operation: op,
blobs_op,
tag: None,
})
}

fn convert_to_rust_operation(env: &mut JNIEnv, java_operation: JObject) -> Result<Operation> {
let name = env
.call_method(&java_operation, "name", "()Ljava/lang/String;", &[])?
.l()?;
let name = JString::from(name);
let name: String = env.get_string(&name)?.into();
let op = match name.as_str() {
"Project" => {
let schema_ptr = env
.call_method(&java_operation, "exportSchema", "()J", &[])?
.j()?;
log::info!("Schema pointer: {:?}", schema_ptr);
let c_schema_ptr = schema_ptr as *mut FFI_ArrowSchema;
let c_schema = unsafe { FFI_ArrowSchema::from_raw(c_schema_ptr) };
let schema = Schema::try_from(&c_schema)?;

Operation::Project {
schema: LanceSchema::try_from(&schema)
.expect("Failed to convert from arrow schema to lance schema"),
}
}
_ => unimplemented!(),
};
Ok(op)
}
10 changes: 10 additions & 0 deletions java/core/src/main/java/com/lancedb/lance/Dataset.java
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,16 @@ public static native Dataset commitOverwrite(
List<FragmentMetadata> fragmentsMetadata,
Map<String, String> storageOptions);

/**
* Create a new transaction builder at current version for the dataset. The dataset itself will
* not refresh after the transaction committed.
*
* @return A new instance of {@link Transaction.Builder} linked to the opened dataset.
*/
public Transaction.Builder newTransactionBuilder() {
return new Transaction.Builder(this).readVersion(version());
}

/**
* Drop a Dataset.
*
Expand Down
132 changes: 132 additions & 0 deletions java/core/src/main/java/com/lancedb/lance/Transaction.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.lancedb.lance;

import com.lancedb.lance.operation.Operation;
import com.lancedb.lance.operation.Project;

import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.types.pojo.Schema;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

/**
* Align with the Transaction struct in rust. The transaction won't commit the status to original
* dataset. It will return a new dataset after committed.
*/
public class Transaction {

private final long readVersion;
private final String uuid;
private final Map<String, String> writeParams;
// Mainly for JNI usage
private final Dataset dataset;
private final Operation operation;
private final Operation blobOp;

private Transaction(
Dataset dataset,
long readVersion,
String uuid,
Operation operation,
Operation blobOp,
Map<String, String> writeParams) {
this.dataset = dataset;
this.readVersion = readVersion;
this.uuid = uuid;
this.operation = operation;
this.blobOp = blobOp;
this.writeParams = writeParams != null ? writeParams : new HashMap<>();
}

public Dataset dataset() {
return dataset;
}

public long readVersion() {
return readVersion;
}

public String uuid() {
return uuid;
}

public Operation operation() {
return operation;
}

public Operation blobsOperation() {
return blobOp;
}

public Map<String, String> writeParams() {
return writeParams;
}

public Dataset commit() {
try {
Dataset committed = commitNative();
committed.allocator = dataset.allocator;
return committed;
} finally {
operation.release();
if (blobOp != null) {
blobOp.release();
}
}
}

private native Dataset commitNative();

public static class Builder {
private final String uuid;
private final Dataset dataset;
private long readVersion;
private Operation operation;
private Operation blobOp;
private Map<String, String> writeParams;

public Builder(Dataset dataset) {
this.dataset = dataset;
this.uuid = UUID.randomUUID().toString();
}

public Builder readVersion(long readVersion) {
this.readVersion = readVersion;
return this;
}

public Builder writeParams(Map<String, String> writeParams) {
this.writeParams = writeParams;
return this;
}

public Builder project(Schema newSchema) {
validateState();
this.operation = new Project.Builder().schema(newSchema).allocator(dataset.allocator).build();
return this;
}

private void validateState() {
Preconditions.checkState(operation == null, "Operation " + operation + " already set");
}

public Transaction build() {
Preconditions.checkState(operation != null, "TransactionBuilder has no operations");
return new Transaction(dataset, readVersion, uuid, operation, blobOp, writeParams);
}
}
}
28 changes: 28 additions & 0 deletions java/core/src/main/java/com/lancedb/lance/operation/Operation.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.lancedb.lance.operation;

/** Operation interface. */
public interface Operation {

/**
* We use this name to align with the Rust operation enum underlying in JNI.
*
* @return the name of the operation.
*/
String name();

/** Release the underlying JNI resource including arrow c schema */
void release();
}
Loading
Loading