Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Rust): Implement TypeMeta part of the compatibility features #1789

Merged
merged 12 commits into from
Aug 5, 2024
Prev Previous commit
Next Next commit
merge main
  • Loading branch information
theweipeng committed Aug 2, 2024
commit fc0dfc9a6b61db15fb12d0dff926b4c4ddf32c10
32 changes: 28 additions & 4 deletions docs/guide/java_serialization_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,10 @@ import org.apache.fury.config.*;
public class Example {
// reuse fury.
private static final ThreadSafeFury fury = new ThreadLocalFury(classLoader -> {
Fury f = Fury.builder().withLanguage(Language.JAVA)
.withClassLoader(classLoader).build();
f.register(SomeClass.class);
return f;
Fury f = Fury.builder().withLanguage(Language.JAVA)
.withClassLoader(classLoader).build();
f.register(SomeClass.class);
return f;
});

public static void main(String[] args) {
Expand Down Expand Up @@ -115,6 +115,7 @@ public class Example {
| `codeGenEnabled` | Disabling may result in faster initial serialization but slower subsequent serializations. | `true` |
| `asyncCompilationEnabled` | If enabled, serialization uses interpreter mode first and switches to JIT serialization after async serializer JIT for a class is finished. | `false` |
| `scalaOptimizationEnabled` | Enables or disables Scala-specific serialization optimization. | `false` |
| `copyRef` | When disabled, the copy performance will be better. But fury deep copy will ignore circular and shared reference. Same reference of an object graph will be copied into different objects in one `Fury#copy`. | `true` |

## Advanced Usage

Expand Down Expand Up @@ -192,6 +193,29 @@ not worthy compared to performance cost. Maybe you should try to disable long co
much
space savings.

### Object deep copy

Deep copy example:

```java
Fury fury=Fury.builder()
...
.withRefCopy(true).build();
SomeClass a=xxx;
SomeClass copied=fury.copy(a)
```

Make fury deep copy ignore circular and shared reference, this deep copy mode will ignore circular and shared reference.
Same reference of an object graph will be copied into different objects in one `Fury#copy`.

```java
Fury fury=Fury.builder()
...
.withRefCopy(false).build();
SomeClass a=xxx;
SomeClass copied=fury.copy(a)
```

### Implement a customized serializer

In some cases, you may want to implement a serializer for your type, especially some class customize serialization by
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,50 @@ public Object kryo_copy(KryoState.KryoUserTypeState state) {
return state.kryo.copy(state.object);
}

@Benchmark
public Object fury_copy_string_map(FuryState.DataState state) {
return state.fury.copy(state.data.stringMap);
}

@Benchmark
public Object fury_copy_int_map(FuryState.DataState state) {
return state.fury.copy(state.data.intMap);
}

@Benchmark
public Object kryo_copy_string_map(KryoState.DataState state) {
return state.kryo.copy(state.data.stringMap);
}

@Benchmark
public Object kryo_copy_int_map(KryoState.DataState state) {
return state.kryo.copy(state.data.intMap);
}

@Benchmark
public Object fury_copy_list(FuryState.DataState state) {
return state.fury.copy(state.data.intList);
}

@Benchmark
public Object kryo_copy_list(KryoState.DataState state) {
return state.kryo.copy(state.data.intList);
}

@Benchmark
public Object fury_copy_object_array(FuryState.DataState state) {
return state.fury.copy(state.data.objectArray);
}

@Benchmark
public Object kryo_copy_object_array(KryoState.DataState state) {
return state.kryo.copy(state.data.objectArray);
}

public static void main(String[] args) throws IOException {
if (args.length == 0) {
String commandLine =
"org.apache.fury.*CopyBenchmark.* -f 1 -wi 3 -i 3 -t 1 -w 2s -r 2s -rf csv "
"org.apache.fury.*CopyBenchmark.*list -f 1 -wi 3 -i 3 -t 1 -w 2s -r 2s -rf csv "
+ "-p bufferType=array -p references=false";
System.out.println(commandLine);
args = commandLine.split(" ");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@

package org.apache.fury.benchmark.data;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.fury.util.StringUtils;

public class Data {
Expand Down Expand Up @@ -93,4 +97,20 @@ public static String newLongStr() {
}
return StringUtils.random(strLength);
}

public List<String> stringList = new ArrayList<>();
public List<Integer> intList = new ArrayList<>();
public Object[] objectArray = new Integer[20];
public Map<String, String> stringMap = new HashMap<>();
public Map<Integer, Integer> intMap = new HashMap<>();

{
for (int i = 0; i < 20; i++) {
stringList.add("hello, " + i);
objectArray[i] = i;
intList.add(i);
stringMap.put("key" + i, "value" + i);
intMap.put(i, i * 2);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.esotericsoftware.kryo.unsafe.UnsafeByteBufferInput;
import com.esotericsoftware.kryo.unsafe.UnsafeByteBufferOutput;
import java.util.ArrayList;
import java.util.HashMap;
import org.apache.fury.benchmark.IntsSerializationSuite;
import org.apache.fury.benchmark.LongStringSerializationSuite;
import org.apache.fury.benchmark.LongsSerializationSuite;
Expand Down Expand Up @@ -85,6 +86,11 @@ public void setup() {
kryo.setRegistrationRequired(registerClass);
kryo.register(int[].class);
kryo.register(long[].class);
kryo.register(Object[].class);
kryo.register(Integer[].class);
kryo.register(String[].class);
kryo.register(ArrayList.class);
kryo.register(HashMap.class);
}
}

Expand Down
29 changes: 22 additions & 7 deletions java/fury-core/src/main/java/org/apache/fury/Fury.java
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ private StackOverflowError processCopyStackOverflowError(StackOverflowError e) {
if (!copyRefTracking) {
String msg =
"Object may contain circular references, please enable ref tracking "
+ "by `FuryBuilder#withCopyRefTracking(true)`";
+ "by `FuryBuilder#withRefCopy(true)`";
StackOverflowError t1 = ExceptionUtils.trySetStackOverflowErrorMessage(e, msg);
if (t1 != null) {
return t1;
Expand Down Expand Up @@ -1337,9 +1337,7 @@ public <T> T copyObject(T obj) {
break;
// todo: add fastpath for other types.
default:
copyDepth++;
copy = classInfo.getSerializer().copy(obj);
copyDepth--;
copy = copyObject(obj, classInfo.getSerializer());
}
return (T) copy;
}
Expand Down Expand Up @@ -1369,10 +1367,26 @@ public <T> T copyObject(T obj, int classId) {
case ClassResolver.STRING_CLASS_ID:
return obj;
default:
return (T) classResolver.getOrUpdateClassInfo(obj.getClass()).getSerializer().copy(obj);
return copyObject(obj, classResolver.getOrUpdateClassInfo(obj.getClass()).getSerializer());
}
}

public <T> T copyObject(T obj, Serializer<T> serializer) {
copyDepth++;
T copyObject;
if (serializer.needToCopyRef()) {
copyObject = getCopyObject(obj);
if (copyObject == null) {
copyObject = serializer.copy(obj);
originToCopyMap.put(obj, copyObject);
}
} else {
copyObject = serializer.copy(obj);
}
copyDepth--;
return copyObject;
}

/**
* Track ref for copy.
*
Expand All @@ -1388,8 +1402,9 @@ public <T> void reference(T o1, T o2) {
}
}

public Object getCopyObject(Object originObj) {
return originToCopyMap.get(originObj);
@SuppressWarnings("unchecked")
public <T> T getCopyObject(T originObj) {
return (T) originToCopyMap.get(originObj);
}

private void serializeToStream(OutputStream outputStream, Consumer<MemoryBuffer> function) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,31 +80,29 @@ public T copy(T originObj) {
return originObj;
}
if (isRecord) {
Object[] fieldValues = copyFields(originObj);
try {
T t = (T) constructor.invokeWithArguments(fieldValues);
Arrays.fill(copyRecordInfo.getRecordComponents(), null);
return t;
} catch (Throwable e) {
Platform.throwException(e);
}
return originObj;
return copyRecord(originObj);
}
T newObj;
T newObj = newBean();
if (needToCopyRef) {
T copyObject = (T) fury.getCopyObject(originObj);
if (copyObject != null) {
return copyObject;
}
newObj = newBean();
fury.reference(originObj, newObj);
} else {
newObj = newBean();
}
copyFields(originObj, newObj);
return newObj;
}

private T copyRecord(T originObj) {
Object[] fieldValues = copyFields(originObj);
try {
T t = (T) constructor.invokeWithArguments(fieldValues);
Arrays.fill(copyRecordInfo.getRecordComponents(), null);
fury.reference(originObj, t);
return t;
} catch (Throwable e) {
Platform.throwException(e);
}
return originObj;
}

private Object[] copyFields(T originObj) {
InternalFieldInfo[] fieldInfos = this.fieldInfos;
if (fieldInfos == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,22 @@ public void write(MemoryBuffer buffer, T[] arr) {
public T[] copy(T[] originArray) {
int length = originArray.length;
Object[] newArray = newArray(length);
for (int i = 0; i < length; i++) {
newArray[i] = fury.copyObject(originArray[i]);
if (needToCopyRef) {
fury.reference(originArray, newArray);
}
Serializer componentSerializer = this.componentTypeSerializer;
if (componentSerializer != null) {
if (componentSerializer.isImmutable()) {
System.arraycopy(originArray, 0, newArray, 0, length);
} else {
for (int i = 0; i < length; i++) {
newArray[i] = componentSerializer.copy(originArray[i]);
}
}
} else {
for (int i = 0; i < length; i++) {
newArray[i] = fury.copyObject(originArray[i]);
}
}
return (T[]) newArray;
}
Expand All @@ -131,7 +145,6 @@ public void xwrite(MemoryBuffer buffer, T[] arr) {

@Override
public T[] read(MemoryBuffer buffer) {
// Some jdk8 will crash if use varint, why?
int numElements = buffer.readVarUint32Small7();
boolean isFinal = (numElements & 0b1) != 0;
numElements >>>= 1;
Expand All @@ -140,9 +153,6 @@ public T[] read(MemoryBuffer buffer) {
refResolver.reference(value);
if (isFinal) {
final Serializer componentTypeSerializer = this.componentTypeSerializer;
if (componentTypeSerializer == null) {
System.out.println("=======");
}
for (int i = 0; i < numElements; i++) {
Object elem;
int nextReadRefId = refResolver.tryPreserveRefId(buffer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public T xread(MemoryBuffer buffer) {
Object fieldValue = fury.xreadRefByNullableSerializer(buffer, serializer);
fieldAccessor.set(obj, fieldValue);
if (hasGenerics) {
generics.pushGenericType(fieldGeneric);
generics.popGenericType();
}
}
return obj;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.fury.memory.MemoryBuffer;
import org.apache.fury.reflect.ReflectionUtils;
import org.apache.fury.reflect.TypeRef;
import org.apache.fury.resolver.ClassInfo;
import org.apache.fury.resolver.ClassInfoHolder;
import org.apache.fury.resolver.ClassResolver;
import org.apache.fury.resolver.RefResolver;
Expand Down Expand Up @@ -417,8 +418,24 @@ public T xread(MemoryBuffer buffer) {
}

protected <K, V> void copyEntry(Map<K, V> originMap, Map<K, V> newMap) {
ClassResolver classResolver = fury.getClassResolver();
for (Map.Entry<K, V> entry : originMap.entrySet()) {
newMap.put(fury.copyObject(entry.getKey()), fury.copyObject(entry.getValue()));
K key = entry.getKey();
if (key != null) {
ClassInfo classInfo = classResolver.getClassInfo(key.getClass(), keyClassInfoWriteCache);
if (!classInfo.getSerializer().isImmutable()) {
key = fury.copyObject(key, classInfo.getClassId());
}
}
V value = entry.getValue();
if (value != null) {
ClassInfo classInfo =
classResolver.getClassInfo(value.getClass(), valueClassInfoWriteCache);
if (!classInfo.getSerializer().isImmutable()) {
value = fury.copyObject(value, classInfo.getClassId());
}
}
newMap.put(key, value);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.Collection;
import org.apache.fury.Fury;
import org.apache.fury.memory.MemoryBuffer;
import org.apache.fury.resolver.ClassInfo;
import org.apache.fury.resolver.ClassResolver;

/** Base serializer for all java collections. */
@SuppressWarnings({"unchecked", "rawtypes"})
Expand Down Expand Up @@ -57,19 +59,23 @@ public T copy(T originCollection) {
}
Collection newCollection = newCollection(originCollection);
if (needToCopyRef) {
Collection copyObject = (Collection) fury.getCopyObject(originCollection);
if (copyObject != null) {
return (T) copyObject;
}
fury.reference(originCollection, newCollection);
}
copyElements(originCollection, newCollection);
return (T) newCollection;
}

public void copyElements(T originCollection, Collection newCollection) {
ClassResolver classResolver = fury.getClassResolver();
for (Object element : originCollection) {
newCollection.add(fury.copyObject(element));
if (element != null) {
ClassInfo classInfo =
classResolver.getClassInfo(element.getClass(), elementClassInfoHolder);
if (!classInfo.getSerializer().isImmutable()) {
element = fury.copyObject(element, classInfo.getClassId());
}
}
newCollection.add(element);
}
}

Expand Down
Loading
Loading
You are viewing a condensed version of this merge commit. You can view the full changes here.