Skip to content

Commit 9da40d0

Browse files
committed
Delete the ObjectId.java and ObjectBuffer.java, change the input and output of plasma java client api from custem type to byte[]
1 parent 87ba3b9 commit 9da40d0

File tree

4 files changed

+43
-127
lines changed

4 files changed

+43
-127
lines changed

java/plasma/src/main/java/org/apache/arrow/plasma/ObjectBuffer.java

Lines changed: 0 additions & 55 deletions
This file was deleted.

java/plasma/src/main/java/org/apache/arrow/plasma/ObjectId.java

Lines changed: 0 additions & 24 deletions
This file was deleted.

java/plasma/src/main/java/org/apache/arrow/plasma/ObjectStoreLink.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
*/
1818
package org.apache.arrow.plasma;
1919

20-
import java.util.Collections;
2120
import java.util.List;
2221

2322
/**
@@ -32,7 +31,7 @@ public interface ObjectStoreLink {
3231
* @param value The value to put in the object store.
3332
* @param metadata encodes whatever metadata the user wishes to encode.
3433
*/
35-
void put(ObjectId objectId, byte[] value, byte[] metadata);
34+
void put(byte[] objectId, byte[] value, byte[] metadata);
3635

3736
/**
3837
* Create a buffer from the PlasmaStore based on the <tt>objectId</tt>.
@@ -43,8 +42,9 @@ public interface ObjectStoreLink {
4342
* @param isMetadata false if get data, otherwise get metadata.
4443
* @return A PlasmaBuffer wrapping the object.
4544
*/
46-
default ObjectBuffer get(ObjectId objectId, int timeoutMs, boolean isMetadata) {
47-
return get(Collections.singletonList(objectId), timeoutMs, isMetadata).get(0);
45+
default byte[] get(byte[] objectId, int timeoutMs, boolean isMetadata) {
46+
byte[][] objectIds = {objectId};
47+
return get(objectIds, timeoutMs, isMetadata).get(0);
4848
}
4949

5050
/**
@@ -56,7 +56,7 @@ default ObjectBuffer get(ObjectId objectId, int timeoutMs, boolean isMetadata) {
5656
* @param isMetadata false if get data, otherwise get metadata.
5757
* @return List of PlasmaBuffers wrapping objects.
5858
*/
59-
List<ObjectBuffer> get(List<? extends ObjectId> objectIds, int timeoutMs, boolean isMetadata);
59+
List<byte[]> get(byte[][] objectIds, int timeoutMs, boolean isMetadata);
6060

6161
/**
6262
* Wait until <tt>numReturns</tt> objects in <tt>objectIds</tt> are ready.
@@ -66,7 +66,7 @@ default ObjectBuffer get(ObjectId objectId, int timeoutMs, boolean isMetadata) {
6666
* @param numReturns We are waiting for this number of objects to be ready.
6767
* @return List of object IDs that are ready
6868
*/
69-
List<ObjectId> wait(List<? extends ObjectId> objectIds, int timeoutMs, int numReturns);
69+
List<byte[]> wait(byte[][] objectIds, int timeoutMs, int numReturns);
7070

7171
/**
7272
* Compute the hash of an object in the object store.
@@ -75,23 +75,24 @@ default ObjectBuffer get(ObjectId objectId, int timeoutMs, boolean isMetadata) {
7575
* @return A digest byte array contains object's SHA256 hash. <tt>null</tt> means that the object
7676
* isn't in the object store.
7777
*/
78-
byte[] hash(ObjectId objectId);
78+
byte[] hash(byte[] objectId);
7979

8080
/**
8181
* Fetch the object with the given ID from other plasma manager instances.
8282
*
8383
* @param objectId The object ID used to identify the object.
8484
*/
85-
default void fetch(ObjectId objectId) {
86-
fetch(Collections.singletonList(objectId));
85+
default void fetch(byte[] objectId) {
86+
byte[][] objectIds = {objectId};
87+
fetch(objectIds);
8788
}
8889

8990
/**
9091
* Fetch the objects with the given IDs from other plasma manager instances.
9192
*
9293
* @param objectIds List of object IDs used to identify the objects.
9394
*/
94-
void fetch(List<? extends ObjectId> objectIds);
95+
void fetch(byte[][] objectIds);
9596

9697
/**
9798
* Evict some objects to recover given count of bytes.
@@ -100,4 +101,11 @@ default void fetch(ObjectId objectId) {
100101
* @return The number of bytes that have been evicted.
101102
*/
102103
long evict(long numBytes);
104+
105+
/**
106+
* Release the reference of the object.
107+
*
108+
* @param objectId The object ID used to release the reference of the object.
109+
*/
110+
void release(byte[] objectId);
103111
}

java/plasma/src/main/java/org/apache/arrow/plasma/PlasmaClient.java

Lines changed: 25 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,10 @@ public PlasmaClient(String configFile, String configOverwrites, String storeSock
4848
// interface methods --------------------
4949

5050
@Override
51-
public void put(ObjectId objectId, byte[] value, byte[] metadata) {
51+
public void put(byte[] objectId, byte[] value, byte[] metadata) {
5252
ByteBuffer buf = null;
5353
try {
54-
buf = PlasmaClientJNI.create(conn, objectId.getBytes(), value.length, metadata);
54+
buf = PlasmaClientJNI.create(conn, objectId, value.length, metadata);
5555
} catch (Exception e) {
5656
System.err.println("ObjectId " + objectId + " error at PlasmaClient put");
5757
e.printStackTrace();
@@ -61,49 +61,37 @@ public void put(ObjectId objectId, byte[] value, byte[] metadata) {
6161
}
6262

6363
buf.put(value);
64-
PlasmaClientJNI.seal(conn, objectId.getBytes());
65-
PlasmaClientJNI.release(conn, objectId.getBytes());
64+
PlasmaClientJNI.seal(conn, objectId);
65+
PlasmaClientJNI.release(conn, objectId);
6666
}
6767

6868
@Override
69-
public List<ObjectBuffer> get(List<? extends ObjectId> objectIds, int timeoutMs, boolean isMetadata) {
70-
byte[][] ids = getIdBytes(objectIds);
71-
ByteBuffer[][] bufs = PlasmaClientJNI.get(conn, ids, timeoutMs);
72-
assert bufs.length == objectIds.size();
69+
public List<byte[]> get(byte[][] objectIds, int timeoutMs, boolean isMetadata) {
70+
ByteBuffer[][] bufs = PlasmaClientJNI.get(conn, objectIds, timeoutMs);
71+
assert bufs.length == objectIds.length;
7372

74-
List<ObjectBuffer> ret = new ArrayList<>();
73+
List<byte[]> ret = new ArrayList<>();
7574
for (int i = 0; i < bufs.length; i++) {
76-
ObjectId oid = objectIds.get(i);
7775
ByteBuffer buf = bufs[i][isMetadata ? 1 : 0];
7876
if (buf == null) {
79-
ret.add(new ObjectBuffer(null, null));
77+
ret.add(null);
8078
} else {
8179
byte[] bb = new byte[buf.remaining()];
8280
buf.get(bb);
83-
ret.add(new ObjectBuffer(bb, (byte[] b) -> this.release(oid)));
81+
ret.add(bb);
8482
}
8583
}
8684
return ret;
8785
}
8886

89-
private static byte[][] getIdBytes(List<? extends ObjectId> objectIds) {
90-
int size = objectIds.size();
91-
byte[][] ids = new byte[size][];
92-
for (int i = 0; i < size; i++) {
93-
ids[i] = objectIds.get(i).getBytes();
94-
}
95-
return ids;
96-
}
97-
9887
@Override
99-
public List<ObjectId> wait(List<? extends ObjectId> objectIds, int timeoutMs, int numReturns) {
100-
byte[][] ids = getIdBytes(objectIds);
101-
byte[][] readys = PlasmaClientJNI.wait(conn, ids, timeoutMs, numReturns);
88+
public List<byte[]> wait(byte[][] objectIds, int timeoutMs, int numReturns) {
89+
byte[][] readys = PlasmaClientJNI.wait(conn, objectIds, timeoutMs, numReturns);
10290

103-
List<ObjectId> ret = new ArrayList<>();
91+
List<byte[]> ret = new ArrayList<>();
10492
for (byte[] ready : readys) {
105-
for (ObjectId id : objectIds) {
106-
if (Arrays.equals(ready, id.getBytes())) {
93+
for (byte[] id : objectIds) {
94+
if (Arrays.equals(ready, id)) {
10795
ret.add(id);
10896
break;
10997
}
@@ -115,14 +103,13 @@ public List<ObjectId> wait(List<? extends ObjectId> objectIds, int timeoutMs, in
115103
}
116104

117105
@Override
118-
public byte[] hash(ObjectId objectId) {
119-
return PlasmaClientJNI.hash(conn, objectId.getBytes());
106+
public byte[] hash(byte[] objectId) {
107+
return PlasmaClientJNI.hash(conn, objectId);
120108
}
121109

122110
@Override
123-
public void fetch(List<? extends ObjectId> objectIds) {
124-
byte[][] ids = getIdBytes(objectIds);
125-
PlasmaClientJNI.fetch(conn, ids);
111+
public void fetch(byte[][] objectIds) {
112+
PlasmaClientJNI.fetch(conn, objectIds);
126113
}
127114

128115
@Override
@@ -138,25 +125,25 @@ public long evict(long numBytes) {
138125
*
139126
* @param objectId used to identify an object.
140127
*/
141-
public void seal(ObjectId objectId) {
142-
PlasmaClientJNI.seal(conn, objectId.getBytes());
128+
public void seal(byte[] objectId) {
129+
PlasmaClientJNI.seal(conn, objectId);
143130
}
144131

145132
/**
146133
* Notify Plasma that the object is no longer needed.
147134
*
148135
* @param objectId used to identify an object.
149136
*/
150-
public void release(ObjectId objectId) {
151-
PlasmaClientJNI.release(conn, objectId.getBytes());
137+
public void release(byte[] objectId) {
138+
PlasmaClientJNI.release(conn, objectId);
152139
}
153140

154141
/**
155142
* Check if the object is present and has been sealed in the PlasmaStore.
156143
*
157144
* @param objectId used to identify an object.
158145
*/
159-
public boolean contains(ObjectId objectId) {
160-
return PlasmaClientJNI.contains(conn, objectId.getBytes());
146+
public boolean contains(byte[] objectId) {
147+
return PlasmaClientJNI.contains(conn, objectId);
161148
}
162149
}

0 commit comments

Comments
 (0)