-
Notifications
You must be signed in to change notification settings - Fork 543
feat(java): brings transaction api to Java module and support project #4219
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
d2d0ce0
feat(java): brings transaction api to Java module and support project
majin1102 67bc2be
optimize code
majin1102 37c5208
Merge remote-tracking branch 'majin/main' into transaction_project
majin1102 150818d
use junit5 @TempDir in TransactionTest
majin1102 3f4f74f
Merge branch 'main' into transaction_project
majin1102 d773376
fix merge error
majin1102 886e4f7
Fix merge issue
majin1102 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,105 @@ | ||
| use crate::blocking_dataset::{BlockingDataset, NATIVE_DATASET}; | ||
| use crate::error::Result; | ||
| use crate::traits::IntoJava; | ||
| use crate::utils::to_rust_map; | ||
| use arrow::datatypes::Schema; | ||
| use arrow_schema::ffi::FFI_ArrowSchema; | ||
| use jni::objects::{JMap, JObject, JString}; | ||
| use jni::JNIEnv; | ||
| use lance::dataset::transaction::{Operation, Transaction}; | ||
| use lance_core::datatypes::Schema as LanceSchema; | ||
|
|
||
| #[no_mangle] | ||
| pub extern "system" fn Java_com_lancedb_lance_Transaction_commitNative<'local>( | ||
| mut env: JNIEnv<'local>, | ||
| jtransaction: JObject, | ||
| ) -> JObject<'local> { | ||
| ok_or_throw!(env, inner_commit_transaction(&mut env, jtransaction)) | ||
| } | ||
|
|
||
| fn inner_commit_transaction<'local>( | ||
| env: &mut JNIEnv<'local>, | ||
| java_tx: JObject, | ||
| ) -> Result<JObject<'local>> { | ||
| let java_dataset: JObject = env | ||
| .call_method(&java_tx, "dataset", "()Lcom/lancedb/lance/Dataset;", &[])? | ||
| .l()?; | ||
| let write_param_jobj = env | ||
| .call_method(&java_tx, "writeParams", "()Ljava/util/Map;", &[])? | ||
| .l()?; | ||
| let write_param_jmap = JMap::from_env(env, &write_param_jobj)?; | ||
| let write_param = to_rust_map(env, &write_param_jmap)?; | ||
| let transaction = convert_to_rust_transaction(env, java_tx)?; | ||
| let new_blocking_ds = { | ||
| let mut dataset_guard = | ||
| unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?; | ||
| dataset_guard.commit_transaction(transaction, write_param)? | ||
| }; | ||
| new_blocking_ds.into_java(env) | ||
| } | ||
|
|
||
| fn convert_to_rust_transaction(env: &mut JNIEnv, java_tx: JObject) -> Result<Transaction> { | ||
| let read_ver = env.call_method(&java_tx, "readVersion", "()J", &[])?.j()?; | ||
| let uuid = env | ||
| .call_method(&java_tx, "uuid", "()Ljava/lang/String;", &[])? | ||
| .l()?; | ||
| let uuid = JString::from(uuid); | ||
| let uuid = env.get_string(&uuid)?.into(); | ||
| let op = env | ||
| .call_method( | ||
| &java_tx, | ||
| "operation", | ||
| "()Lcom/lancedb/lance/operation/Operation;", | ||
| &[], | ||
| )? | ||
| .l()?; | ||
| let op = convert_to_rust_operation(env, op)?; | ||
|
|
||
| let blobs_op = env | ||
| .call_method( | ||
| &java_tx, | ||
| "blobsOperation", | ||
| "()Lcom/lancedb/lance/operation/Operation;", | ||
| &[], | ||
| )? | ||
| .l()?; | ||
| let blobs_op = if blobs_op.is_null() { | ||
| None | ||
| } else { | ||
| Some(convert_to_rust_operation(env, blobs_op)?) | ||
| }; | ||
|
|
||
| Ok(Transaction { | ||
| read_version: read_ver as u64, | ||
| uuid, | ||
| operation: op, | ||
| blobs_op, | ||
| tag: None, | ||
| }) | ||
| } | ||
|
|
||
| fn convert_to_rust_operation(env: &mut JNIEnv, java_operation: JObject) -> Result<Operation> { | ||
| let name = env | ||
| .call_method(&java_operation, "name", "()Ljava/lang/String;", &[])? | ||
| .l()?; | ||
| let name = JString::from(name); | ||
| let name: String = env.get_string(&name)?.into(); | ||
| let op = match name.as_str() { | ||
| "Project" => { | ||
| let schema_ptr = env | ||
| .call_method(&java_operation, "exportSchema", "()J", &[])? | ||
| .j()?; | ||
| log::info!("Schema pointer: {:?}", schema_ptr); | ||
| let c_schema_ptr = schema_ptr as *mut FFI_ArrowSchema; | ||
| let c_schema = unsafe { FFI_ArrowSchema::from_raw(c_schema_ptr) }; | ||
| let schema = Schema::try_from(&c_schema)?; | ||
|
|
||
| Operation::Project { | ||
| schema: LanceSchema::try_from(&schema) | ||
| .expect("Failed to convert from arrow schema to lance schema"), | ||
| } | ||
| } | ||
| _ => unimplemented!(), | ||
| }; | ||
| Ok(op) | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
132 changes: 132 additions & 0 deletions
132
java/core/src/main/java/com/lancedb/lance/Transaction.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,132 @@ | ||
| /* | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package com.lancedb.lance; | ||
|
|
||
| import com.lancedb.lance.operation.Operation; | ||
| import com.lancedb.lance.operation.Project; | ||
|
|
||
| import org.apache.arrow.util.Preconditions; | ||
| import org.apache.arrow.vector.types.pojo.Schema; | ||
|
|
||
| import java.util.HashMap; | ||
| import java.util.Map; | ||
| import java.util.UUID; | ||
|
|
||
| /** | ||
| * Align with the Transaction struct in rust. The transaction won't commit the status to original | ||
| * dataset. It will return a new dataset after committed. | ||
| */ | ||
| public class Transaction { | ||
|
|
||
| private final long readVersion; | ||
| private final String uuid; | ||
| private final Map<String, String> writeParams; | ||
| // Mainly for JNI usage | ||
| private final Dataset dataset; | ||
| private final Operation operation; | ||
| private final Operation blobOp; | ||
|
|
||
| private Transaction( | ||
| Dataset dataset, | ||
| long readVersion, | ||
| String uuid, | ||
| Operation operation, | ||
| Operation blobOp, | ||
| Map<String, String> writeParams) { | ||
| this.dataset = dataset; | ||
| this.readVersion = readVersion; | ||
| this.uuid = uuid; | ||
| this.operation = operation; | ||
| this.blobOp = blobOp; | ||
| this.writeParams = writeParams != null ? writeParams : new HashMap<>(); | ||
| } | ||
|
|
||
| public Dataset dataset() { | ||
| return dataset; | ||
| } | ||
|
|
||
| public long readVersion() { | ||
| return readVersion; | ||
| } | ||
|
|
||
| public String uuid() { | ||
| return uuid; | ||
| } | ||
|
|
||
| public Operation operation() { | ||
| return operation; | ||
| } | ||
|
|
||
| public Operation blobsOperation() { | ||
| return blobOp; | ||
| } | ||
|
|
||
| public Map<String, String> writeParams() { | ||
| return writeParams; | ||
| } | ||
|
|
||
| public Dataset commit() { | ||
| try { | ||
| Dataset committed = commitNative(); | ||
| committed.allocator = dataset.allocator; | ||
| return committed; | ||
| } finally { | ||
| operation.release(); | ||
| if (blobOp != null) { | ||
| blobOp.release(); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private native Dataset commitNative(); | ||
|
|
||
| public static class Builder { | ||
| private final String uuid; | ||
| private final Dataset dataset; | ||
| private long readVersion; | ||
| private Operation operation; | ||
| private Operation blobOp; | ||
| private Map<String, String> writeParams; | ||
|
|
||
| public Builder(Dataset dataset) { | ||
| this.dataset = dataset; | ||
| this.uuid = UUID.randomUUID().toString(); | ||
| } | ||
|
|
||
| public Builder readVersion(long readVersion) { | ||
| this.readVersion = readVersion; | ||
| return this; | ||
| } | ||
|
|
||
| public Builder writeParams(Map<String, String> writeParams) { | ||
| this.writeParams = writeParams; | ||
| return this; | ||
| } | ||
|
|
||
| public Builder project(Schema newSchema) { | ||
| validateState(); | ||
| this.operation = new Project.Builder().schema(newSchema).allocator(dataset.allocator).build(); | ||
| return this; | ||
| } | ||
|
|
||
| private void validateState() { | ||
| Preconditions.checkState(operation == null, "Operation " + operation + " already set"); | ||
| } | ||
|
|
||
| public Transaction build() { | ||
| Preconditions.checkState(operation != null, "TransactionBuilder has no operations"); | ||
| return new Transaction(dataset, readVersion, uuid, operation, blobOp, writeParams); | ||
| } | ||
| } | ||
| } |
28 changes: 28 additions & 0 deletions
28
java/core/src/main/java/com/lancedb/lance/operation/Operation.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,28 @@ | ||
| /* | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package com.lancedb.lance.operation; | ||
|
|
||
| /** Operation interface. */ | ||
| public interface Operation { | ||
|
|
||
| /** | ||
| * We use this name to align with the Rust operation enum underlying in JNI. | ||
| * | ||
| * @return the name of the operation. | ||
| */ | ||
| String name(); | ||
|
|
||
| /** Release the underlying JNI resource including arrow c schema */ | ||
| void release(); | ||
| } |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.