Skip to content

Commit 87ba3b9

Browse files
committed
Java client support for plasma
-------------------------- This commit includes the support of java client for plasma, which is part of the java worker support of Ray. In addition to some minor changes in build system, it consists of the following modules: - java/plasma: java client support for plasma - cpp/src/plasma/lib/java: JNI support for plasma client
1 parent 3d41978 commit 87ba3b9

File tree

10 files changed

+876
-0
lines changed

10 files changed

+876
-0
lines changed

cpp/src/plasma/CMakeLists.txt

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,30 @@ install(
160160
FILES "${CMAKE_CURRENT_BINARY_DIR}/plasma.pc"
161161
DESTINATION "${CMAKE_INSTALL_LIBDIR}/pkgconfig/")
162162

163+
# Plasma java client support
164+
add_compile_options("-I$ENV{JAVA_HOME}/include/")
165+
if(WIN32)
166+
add_compile_options("-I$ENV{JAVA_HOME}/include/win32")
167+
elseif(APPLE)
168+
add_compile_options("-I$ENV{JAVA_HOME}/include/darwin")
169+
else() # linux
170+
add_compile_options("-I$ENV{JAVA_HOME}/include/linux")
171+
endif()
172+
173+
include_directories("${CMAKE_CURRENT_LIST_DIR}/lib/java")
174+
175+
file(GLOB PLASMA_LIBRARY_EXT_java_SRC
176+
lib/java/*.cc lib/*.cc)
177+
add_library(plasma_java SHARED
178+
${PLASMA_LIBRARY_EXT_java_SRC}
179+
)
180+
181+
if(APPLE)
182+
target_link_libraries(plasma_java plasma_static ${PLASMA_LINK_LIBS} "-undefined dynamic_lookup" -Wl,-force_load,${FLATBUFFERS_STATIC_LIB} ${FLATBUFFERS_STATIC_LIB} ${PTHREAD_LIBRARY})
183+
else(APPLE)
184+
target_link_libraries(plasma_java plasma_static ${PLASMA_LINK_LIBS} -Wl,--whole-archive ${FLATBUFFERS_STATIC_LIB} -Wl,--no-whole-archive ${FLATBUFFERS_STATIC_LIB} ${PTHREAD_LIBRARY})
185+
endif(APPLE)
186+
163187
#######################################
164188
# Unit tests
165189
#######################################
Lines changed: 322 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,322 @@
1+
#include <string>
2+
#include <cstring>
3+
#include <iostream>
4+
#include <stdlib.h>
5+
#include <pthread.h>
6+
#include <unistd.h>
7+
#include "plasma/client.h"
8+
#include "org_apache_arrow_plasma_PlasmaClientJNI.h"
9+
10+
using namespace std;
11+
using namespace plasma;
12+
using namespace arrow;
13+
14+
const jsize LEN_OF_OBJECTID = sizeof(ObjectID) / sizeof(jbyte);
15+
16+
inline void jbyteArray_to_object_id(JNIEnv *env, jbyteArray a, ObjectID *oid)
17+
{
18+
env->GetByteArrayRegion(a, 0, LEN_OF_OBJECTID, (jbyte *) oid);
19+
}
20+
21+
inline void object_id_to_jbyteArray(JNIEnv *env, jbyteArray a, ObjectID *oid)
22+
{
23+
env->SetByteArrayRegion(a, 0, LEN_OF_OBJECTID, (jbyte *) oid);
24+
}
25+
26+
class JByteArrayGetter {
27+
private:
28+
JNIEnv *_env;
29+
jbyteArray _a;
30+
jbyte *bp;
31+
32+
public:
33+
34+
JByteArrayGetter(JNIEnv *env, jbyteArray a, jbyte **out) {
35+
_env = env;
36+
_a = a;
37+
38+
bp = _env->GetByteArrayElements(_a, NULL);
39+
*out = bp;
40+
}
41+
42+
~JByteArrayGetter() {
43+
_env->ReleaseByteArrayElements(_a, bp, 0);
44+
}
45+
};
46+
47+
JNIEXPORT jlong JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_connect
48+
(JNIEnv *env, jclass cls, jstring config, jstring overwrites, jstring store_socket_name, jstring manager_socket_name, jint release_delay)
49+
{
50+
const char *s_name = env->GetStringUTFChars(store_socket_name, NULL);
51+
const char *m_name = env->GetStringUTFChars(manager_socket_name, NULL);
52+
53+
PlasmaClient *client = new PlasmaClient();
54+
ARROW_CHECK_OK(
55+
client->Connect(s_name, m_name, release_delay));
56+
57+
//fprintf (stdout, "JNI plasma client init, fd = %d\n", client->get_store_fd());
58+
//fflush (stdout);
59+
60+
env->ReleaseStringUTFChars(store_socket_name, s_name);
61+
env->ReleaseStringUTFChars(manager_socket_name, m_name);
62+
return (long) client;
63+
}
64+
65+
JNIEXPORT void JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_disconnect
66+
(JNIEnv *env, jclass cls, jlong conn)
67+
{
68+
PlasmaClient *client = (PlasmaClient *) conn;
69+
70+
//fprintf (stdout, "JNI plasma client disconnect, fd = %d\n", client->get_store_fd());
71+
//fflush (stdout);
72+
73+
ARROW_CHECK_OK(client->Disconnect());
74+
delete client;
75+
return;
76+
}
77+
78+
JNIEXPORT jobject JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_create
79+
(JNIEnv *env, jclass cls, jlong conn, jbyteArray object_id, jint size, jbyteArray metadata)
80+
{
81+
PlasmaClient *client = (PlasmaClient *) conn;
82+
ObjectID oid;
83+
jbyteArray_to_object_id(env, object_id, &oid);
84+
85+
// prepare metadata buffer
86+
uint8_t *md = NULL;
87+
jsize md_size = 0;
88+
std::unique_ptr<JByteArrayGetter> md_getter;
89+
if (metadata != NULL) {
90+
md_size = env->GetArrayLength(metadata);
91+
}
92+
if (md_size > 0) {
93+
md_getter.reset(new JByteArrayGetter(env, metadata, (jbyte **)&md));
94+
}
95+
96+
std::shared_ptr<Buffer> data;
97+
Status s = client->Create(oid, size, md, md_size, &data);
98+
if (s.IsPlasmaObjectExists()) {
99+
jclass Exception = env->FindClass("org/ray/spi/impl/PlasmaObjectExistsException");
100+
env->ThrowNew(Exception, "An object with this ID already exists in the plasma store.");
101+
return NULL;
102+
}
103+
if (s.IsPlasmaStoreFull()) {
104+
jclass Exception = env->FindClass("org/ray/spi/impl/PlasmaOutOfMemoryException");
105+
env->ThrowNew(Exception, "The plasma store ran out of memory and could not create this object.");
106+
return NULL;
107+
}
108+
ARROW_CHECK(s.ok());
109+
110+
return env->NewDirectByteBuffer(data->mutable_data(), size);
111+
}
112+
113+
JNIEXPORT jbyteArray JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_hash
114+
(JNIEnv *env, jclass cls, jlong conn, jbyteArray object_id)
115+
{
116+
PlasmaClient *client = (PlasmaClient *) conn;
117+
ObjectID oid;
118+
jbyteArray_to_object_id(env, object_id, &oid);
119+
120+
unsigned char digest[kDigestSize];
121+
bool success = client->Hash(oid, digest).ok();
122+
123+
if (success) {
124+
jbyteArray ret = env->NewByteArray(kDigestSize);
125+
env->SetByteArrayRegion(ret, 0, kDigestSize, (jbyte *) digest);
126+
return ret;
127+
}
128+
else {
129+
return NULL;
130+
}
131+
}
132+
133+
JNIEXPORT void JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_seal
134+
(JNIEnv *env, jclass cls, jlong conn, jbyteArray object_id)
135+
{
136+
PlasmaClient *client = (PlasmaClient *) conn;
137+
ObjectID oid;
138+
jbyteArray_to_object_id(env, object_id, &oid);
139+
140+
ARROW_CHECK_OK(client->Seal(oid));
141+
}
142+
143+
JNIEXPORT void JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_release
144+
(JNIEnv *env, jclass cls, jlong conn, jbyteArray object_id)
145+
{
146+
PlasmaClient *client = (PlasmaClient *) conn;
147+
ObjectID oid;
148+
jbyteArray_to_object_id(env, object_id, &oid);
149+
150+
//fprintf (stdout, "JNI plasma client release, fd = %d\n", client->get_store_fd());
151+
//fflush (stdout);
152+
153+
ARROW_CHECK_OK(client->Release(oid));
154+
}
155+
156+
JNIEXPORT jobjectArray JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_get
157+
(JNIEnv *env, jclass cls, jlong conn, jobjectArray object_ids, jint timeout_ms)
158+
{
159+
PlasmaClient *client = (PlasmaClient *) conn;
160+
//fprintf (stdout, "JNI plasma client get, fd = %d\n", client->get_store_fd());
161+
//fflush (stdout);
162+
163+
jsize num_oids = env->GetArrayLength(object_ids);
164+
std::vector<ObjectID> oids(num_oids);
165+
std::vector<ObjectBuffer> obufs(num_oids);
166+
for (int i = 0; i < num_oids; ++i) {
167+
jbyteArray_to_object_id(env,
168+
(jbyteArray) env->GetObjectArrayElement(object_ids, i), &oids[i]);
169+
}
170+
// TODO: may be blocked. consider to add the thread support
171+
ARROW_CHECK_OK(
172+
client->Get(oids.data(), num_oids, timeout_ms, obufs.data()));
173+
174+
jclass clsByteBuffer = env->FindClass("java/nio/ByteBuffer");
175+
jclass clsByteBufferArray = env->FindClass("[Ljava/nio/ByteBuffer;");
176+
177+
jobjectArray ret = env->NewObjectArray(num_oids, clsByteBufferArray, NULL);
178+
jobjectArray o = NULL;
179+
jobject dataBuf, metadataBuf;
180+
for (int i = 0; i < num_oids; ++i) {
181+
o = env->NewObjectArray(2, clsByteBuffer, NULL);
182+
if (obufs[i].data && obufs[i].data->size() != -1 ) {
183+
dataBuf = env->NewDirectByteBuffer(const_cast<uint8_t *>(obufs[i].data->data()), obufs[i].data->size());
184+
if (obufs[i].metadata && obufs[i].metadata->size() > 0) {
185+
metadataBuf = env->NewDirectByteBuffer(const_cast<uint8_t *>(obufs[i].metadata->data()), obufs[i].metadata->size());
186+
}
187+
else {
188+
metadataBuf = NULL;
189+
}
190+
}
191+
else {
192+
dataBuf = NULL;
193+
metadataBuf = NULL;
194+
}
195+
196+
env->SetObjectArrayElement(o, 0, dataBuf);
197+
env->SetObjectArrayElement(o, 1, metadataBuf);
198+
env->SetObjectArrayElement(ret, i, o);
199+
}
200+
return ret;
201+
}
202+
203+
JNIEXPORT jboolean JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_contains
204+
(JNIEnv *env, jclass cls, jlong conn, jbyteArray object_id)
205+
{
206+
PlasmaClient *client = (PlasmaClient *) conn;
207+
ObjectID oid;
208+
jbyteArray_to_object_id(env, object_id, &oid);
209+
210+
bool has_object;
211+
ARROW_CHECK_OK(client->Contains(oid, &has_object));
212+
213+
if (has_object) {
214+
return true;
215+
}
216+
else {
217+
return false;
218+
}
219+
}
220+
221+
JNIEXPORT void JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_fetch
222+
(JNIEnv *env, jclass cls, jlong conn, jobjectArray object_ids)
223+
{
224+
PlasmaClient *client = (PlasmaClient *) conn;
225+
jsize num_oids = env->GetArrayLength(object_ids);
226+
227+
/*
228+
if (!plasma_manager_is_connected(client)) {
229+
jclass Exception = env->FindClass("java/lang/RuntimeException");
230+
env->ThrowNew(Exception, "Not connected to the plasma manager.");
231+
return;
232+
}
233+
*/
234+
235+
std::vector<ObjectID> oids(num_oids);
236+
for (int i = 0; i < num_oids; ++i) {
237+
jbyteArray_to_object_id(env,
238+
(jbyteArray) env->GetObjectArrayElement(object_ids, i), &oids[i]);
239+
}
240+
241+
ARROW_CHECK_OK(client->Fetch((int) num_oids, oids.data()));
242+
243+
return;
244+
}
245+
246+
JNIEXPORT jobjectArray JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_wait
247+
(JNIEnv *env, jclass cls, jlong conn, jobjectArray object_ids, jint timeout_ms, jint num_returns)
248+
{
249+
PlasmaClient *client = (PlasmaClient *) conn;
250+
jsize num_oids = env->GetArrayLength(object_ids);
251+
252+
/*
253+
if (!plasma_manager_is_connected(client)) {
254+
jclass Exception = env->FindClass("java/lang/RuntimeException");
255+
env->ThrowNew(Exception, "Not connected to the plasma manager.");
256+
return NULL;
257+
}
258+
*/
259+
260+
if (num_returns < 0) {
261+
jclass Exception = env->FindClass("java/lang/RuntimeException");
262+
env->ThrowNew(Exception, "The argument num_returns cannot be less than zero.");
263+
return NULL;
264+
}
265+
if (num_returns > num_oids) {
266+
jclass Exception = env->FindClass("java/lang/RuntimeException");
267+
env->ThrowNew(Exception,
268+
"The argument num_returns cannot be greater than len(object_ids).");
269+
return NULL;
270+
}
271+
272+
std::vector<ObjectRequest> oreqs(num_oids);
273+
274+
for (int i = 0; i < num_oids; ++i) {
275+
jbyteArray_to_object_id(env,
276+
(jbyteArray) env->GetObjectArrayElement(object_ids, i), &oreqs[i].object_id);
277+
oreqs[i].type = PLASMA_QUERY_ANYWHERE;
278+
}
279+
280+
int num_return_objects;
281+
// TODO: may be blocked. consider to add the thread support
282+
ARROW_CHECK_OK(client->Wait((int) num_oids, oreqs.data(), num_returns,
283+
(uint64_t) timeout_ms, &num_return_objects));
284+
285+
int num_to_return = min(num_return_objects, num_returns);
286+
jclass clsByteArray = env->FindClass("[B");
287+
jobjectArray ret = env->NewObjectArray(num_to_return, clsByteArray, NULL);
288+
289+
int num_returned = 0;
290+
jbyteArray oid = NULL;
291+
for (int i = 0; i < num_oids; ++i) {
292+
if (num_returned >= num_to_return) {
293+
break;
294+
}
295+
296+
if (oreqs[i].status == ObjectStatusLocal
297+
|| oreqs[i].status == ObjectStatusRemote) {
298+
oid = env->NewByteArray(LEN_OF_OBJECTID);
299+
object_id_to_jbyteArray(env, oid, &oreqs[i].object_id);
300+
env->SetObjectArrayElement(ret, num_returned, oid);
301+
num_returned++;
302+
}
303+
else {
304+
//ARROW_CHECK(oreqs[i].status == ObjectStatus_Nonexistent);
305+
}
306+
}
307+
ARROW_CHECK(num_returned == num_to_return);
308+
309+
return ret;
310+
}
311+
312+
JNIEXPORT jlong JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_evict
313+
(JNIEnv *env, jclass cls, jlong conn, jlong num_bytes)
314+
{
315+
PlasmaClient *client = (PlasmaClient *) conn;
316+
317+
int64_t evicted_bytes;
318+
ARROW_CHECK_OK(client->Evict((int64_t) num_bytes, evicted_bytes));
319+
320+
return (jlong) evicted_bytes;
321+
}
322+

0 commit comments

Comments
 (0)