-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ARROW-1163: [Java] Java client support for plasma #2065
Conversation
-------------------------- This commit includes the support of java client for plasma, which is part of the java worker support of Ray. In addition to some minor changes in build system, it consists of the following modules: - java/plasma: java client support for plasma - cpp/src/plasma/lib/java: JNI support for plasma client
Related to ray-project/ray#2095. |
I think this should integrate better with other Arrow Java APIs. Right now, the apis here don't interact with ValueVector nor ArrowBuf, the key components of the Java API. We should make sure to integrate at that level so that our pieces work well together. |
Also, would be good to have a jira ticket tracking this discussion/approach. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the contribution! This looks really great :)
Before it can be merged, we should have a few things:
- Fix all the linting/test errors on Travis (e.g. org_apache_arrow_plasma_PlasmaClientJNI.cc needs a license header, <jni.h> is not found in the C++ build, so we need to install it, there is probably some linting error to be fixed once the license is there
- Add some tests to make sure the Java PlasmaClient is working
- Provide instructions on how to set it up and compile it for developers
I think the things @jacques-n brings up can be addressed in a follow-up PR, let's make sure the low-level APIs are working well in this PR.
Thanks for your attention @jacques-n. I think the comment from @jacques-n is great, and the java client of plasma better not depend on the DomainDO itself such as ObjectBuffer and ObjectId. And the scenes here is simple, so i just delete the data structure of ObjectBuffer and ObjectId and use the byte[] for the input and output of the api. The code is ready and will commit with other changes comment by @pcmoritz later. |
Thank you for your time @pcmoritz. The comment from you really helpful.
|
Thanks, that sounds good! For 2, we can do the setup similar to how we do it in C++ (see https://github.com/apache/arrow/blob/master/cpp/src/plasma/test/client_tests.cc#L61) and start an instance of the store from the Java tests (and shut it down after the tests are done). |
The public interface (which is fine in a separate patch), should be ValueVector & ArrowBuf based, not byte[] based. Before releasing this functionality we should be making sure that it works in the context of the rest of the Arrow Java code. |
In Python, we expose two different object store client APIs.
In Java, I think it makes sense to have analogs of these. The more important one for now is the first one, which could be implemented using Arrow buffers (but the stored data would be blobs and not necessarily need to be data serialized using the Arrow format). The second would use the Arrow format under the hood. |
Yes, i think it is make sense to hava analogs of these the same as python, and we could try to use the Arrow buffers to instead the byte[] in the follow-up PR, which could make sure that it works in the context of the rest of the Arrow Java code. Thank you for your time again, @pcmoritz @jacques-n @robertnishihara |
…output of plasma java client api from custem type to byte[]
-------------------------- It consists of the following changes: add java/plasma/README.md for help document add java/plasma/test.sh for plasma java client test case fix some ci error delete the useless parameters of connect api from jni
Codecov Report
@@ Coverage Diff @@
## master #2065 +/- ##
==========================================
- Coverage 86.33% 86.33% -0.01%
==========================================
Files 242 242
Lines 41103 41103
==========================================
- Hits 35487 35486 -1
- Misses 5616 5617 +1
Continue to review full report at Codecov.
|
As comment, we just fix the problem you have suggest, could you review them again? Thank you very much, @pcmoritz |
java/plasma/test.sh
Outdated
@@ -35,7 +36,7 @@ pushd ../../cpp | |||
-DCMAKE_CXX_FLAGS="-g -O3" \ | |||
-DARROW_BUILD_TESTS=off \ | |||
-DARROW_HDFS=on \ | |||
-DARROW_BOOST_USE_SHARED=off \ | |||
-DARROW_BOOST_USE_SHARED=on \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to do this to make it compile with Ubuntu 16.04, since the boost static libraries that ship with it are compiled without -fPIC.
${PLASMA_LIBRARY_EXT_java_SRC}) | ||
|
||
if(APPLE) | ||
target_link_libraries(plasma_java plasma_static ${PLASMA_LINK_LIBS} "-undefined dynamic_lookup" -Wl,-force_load,${FLATBUFFERS_STATIC_LIB} ${FLATBUFFERS_STATIC_LIB} ${PTHREAD_LIBRARY}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need the -Wl,-force_load,
part here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-Wl,-force_load for add runtime library path here
@@ -0,0 +1,311 @@ | |||
// Licensed to the Apache Software Foundation (ASF) under one |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add this file to the C++ linting/clang-tidy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this file has already in the C++ linting process now, because i just fix many linting errors which is feedback by the ci.
|
||
#include "plasma/client.h" | ||
|
||
const jsize LEN_OF_OBJECTID = sizeof(plasma::ObjectID) / sizeof(jbyte); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be a constexpr?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, it will not be changed, and worked as constant
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Also, naming it OBJECT_ID_SIZE would be more consistent with the rest of the code (but up to you, since it's private in this file)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, i could rename it by the way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
constexpr
is preferable (since the compiler can optimize more)
|
||
JNIEXPORT void JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_disconnect( | ||
JNIEnv* env, jclass cls, jlong conn) { | ||
plasma::PlasmaClient* client = (plasma::PlasmaClient*)conn; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's do a reinterpret_cast here, we shouldn't have old style casts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
JNIEXPORT jobject JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_create( | ||
JNIEnv* env, jclass cls, jlong conn, jbyteArray object_id, jint size, | ||
jbyteArray metadata) { | ||
plasma::PlasmaClient* client = (plasma::PlasmaClient*)conn; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
std::shared_ptr<Buffer> data; | ||
Status s = client->Create(oid, size, md, md_size, &data); | ||
if (s.IsPlasmaObjectExists()) { | ||
jclass Exception = env->FindClass("org/ray/spi/impl/PlasmaObjectExistsException"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class needs to be integrated into this PR, right? Can we also add a test that this works?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know what you mean now, i will use the normal Exception(java.lang.Exception) instead, thanks.
|
||
JNIEXPORT void JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_release( | ||
JNIEnv* env, jclass cls, jlong conn, jbyteArray object_id) { | ||
plasma::PlasmaClient* client = (plasma::PlasmaClient*)conn; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
c-style cast
|
||
JNIEXPORT jobjectArray JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_get( | ||
JNIEnv* env, jclass cls, jlong conn, jobjectArray object_ids, jint timeout_ms) { | ||
plasma::PlasmaClient* client = (plasma::PlasmaClient*)conn; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
c-style cast
std::vector<plasma::ObjectID> oids(num_oids); | ||
std::vector<plasma::ObjectBuffer> obufs(num_oids); | ||
for (int i = 0; i < num_oids; ++i) { | ||
jbyteArray_to_object_id(env, (jbyteArray)env->GetObjectArrayElement(object_ids, i), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
c-style cast
jclass clsByteBufferArray = env->FindClass("[Ljava/nio/ByteBuffer;"); | ||
|
||
jobjectArray ret = env->NewObjectArray(num_oids, clsByteBufferArray, NULL); | ||
jobjectArray o = NULL; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should these NULL
s be nullptr
? I'm not sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is point here, maybe nullptr better, i will fix them.
|
||
JNIEXPORT jboolean JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_contains( | ||
JNIEnv* env, jclass cls, jlong conn, jbyteArray object_id) { | ||
plasma::PlasmaClient* client = (plasma::PlasmaClient*)conn; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
c-style cast
bool has_object; | ||
ARROW_CHECK_OK(client->Contains(oid, &has_object)); | ||
|
||
if (has_object) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe just return has_object
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know it's equivalent, I mean I think the code would be clearer if you replace
if (has_object) {
return true;
} else {
return false;
}
with
return has_object;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you are right, i am changing it to return has_object.
plasma::PlasmaClient* client = (plasma::PlasmaClient*)conn; | ||
jsize num_oids = env->GetArrayLength(object_ids); | ||
|
||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove unused code
If this is something that needs to be handled in the future, then add a comment with a TODO
|
||
std::vector<plasma::ObjectID> oids(num_oids); | ||
for (int i = 0; i < num_oids; ++i) { | ||
jbyteArray_to_object_id(env, (jbyteArray)env->GetObjectArrayElement(object_ids, i), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
casts
JNIEXPORT jobjectArray JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_wait( | ||
JNIEnv* env, jclass cls, jlong conn, jobjectArray object_ids, jint timeout_ms, | ||
jint num_returns) { | ||
plasma::PlasmaClient* client = (plasma::PlasmaClient*)conn; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
casts
int num_return_objects; | ||
// TODO: may be blocked. consider to add the thread support | ||
ARROW_CHECK_OK(client->Wait(static_cast<int>(num_oids), oreqs.data(), num_returns, | ||
(uint64_t)timeout_ms, &num_return_objects)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
casts
env->SetObjectArrayElement(ret, num_returned, oid); | ||
num_returned++; | ||
} else { | ||
// ARROW_CHECK(oreqs[i].status == ObjectStatus_Nonexistent); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this commented out?
|
||
JNIEXPORT jlong JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_evict( | ||
JNIEnv* env, jclass cls, jlong conn, jlong num_bytes) { | ||
plasma::PlasmaClient* client = (plasma::PlasmaClient*)conn; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
casts
int64_t evicted_bytes; | ||
ARROW_CHECK_OK(client->Evict((int64_t)num_bytes, evicted_bytes)); | ||
|
||
return (jlong)evicted_bytes; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
casts
plasma::PlasmaClient* client = (plasma::PlasmaClient*)conn; | ||
|
||
int64_t evicted_bytes; | ||
ARROW_CHECK_OK(client->Evict((int64_t)num_bytes, evicted_bytes)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
casts
* Class: org_apache_arrow_plasma_PlasmaClientJNI | ||
* Method: connect | ||
* Signature: (Ljava/lang/String;Ljava/lang/String;I)J | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pcmoritz What should the comment style be in this file? For C++ files we've been doing //
everywhere else, and using Doxygen style ///
for documentation.
However, should JNI files be handled differently?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, it is generated by javah automatically
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I see.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this PR include instructions for how to generate the files?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is only the org_apache_arrow_plasma_PlasmaClientJNI.h will by generated by javah offline, and usually we do not need to add the instructions for how to generate, because we have to write the code of org_apache_arrow_plasma_PlasmaClientJNI.cc, and just check in the org_apache_arrow_plasma_PlasmaClientJNI.h as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 This looks good to me now. I'll merge unless there are more objections.
@salah-man Are you or somebody from your team willing to take a stab at https://issues.apache.org/jira/browse/ARROW-2892? It is important and would be high impact I think (and you can probably do it much faster than me because I have limited experience with Java). |
@salah-man can you let us know your JIRA id so we can assign this issue to you? |
hi, @wesm, this is Hao Chen from Ant Financial. Sorry for the delay, we have limited bandwidth currently. |
That's great, thanks for your help! At a high level, it should be possible to store arrow tables and tensors in python (which is implemented here: https://arrow.apache.org/docs/python/plasma.html#using-arrow-and-pandas-with-plasma) and then read them out using Java with a similar API, using zero-copy if possible. I'm not very familiar with the arrow Java API, but it will probably need integration with https://github.com/apache/arrow/blob/543808d5b2b64df008b57efec2ab9057a9bdc723/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java. We can discuss more about it in https://issues.apache.org/jira/browse/ARROW-2892. The overall goal for us should be to make it possible to exchange data between Java and Python Ray tasks via Plasma I think. If we can achieve that, this will not only help Ray a lot but also provide the right capabilities for people who only use plasma and not Ray. |
@pcmoritz thanks for explaining. Regarding integrating Java API with Also, do we have any doc for Arrow Java API ( Thanks. |
@salah-man, let's take this conversation to https://issues.apache.org/jira/browse/ARROW-2892 |
This commit includes the support of java client for plasma, which is part of the java worker support of Ray.
In addition to some minor changes in build system, it consists of the following modules: