Skip to content

Opt for lsop request serialize #296

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.alipay.oceanbase.rpc.protocol.payload;

import com.alipay.oceanbase.rpc.util.ObByteBuf;
import com.alipay.oceanbase.rpc.util.Serialization;
import io.netty.buffer.ByteBuf;

Expand Down Expand Up @@ -44,6 +45,7 @@ public abstract class AbstractPayload implements ObPayload {
private long version = 1;
protected long timeout = RPC_OPERATION_TIMEOUT.getDefaultLong();
protected int groupId = 0;
protected long payLoadContentSize = -1;

/*
* Get pcode.
Expand Down Expand Up @@ -176,11 +178,15 @@ public void setSequence(long sequence) {
* encode unis header
*/
protected int encodeHeader(byte[] bytes, int idx) {
int headerLen = (int) getObUniVersionHeaderLength(getVersion(), getPayloadContentSize());
System.arraycopy(encodeObUniVersionHeader(getVersion(), getPayloadContentSize()), 0, bytes,
idx, headerLen);
idx += headerLen;
byte[] versionHeaderBytes = encodeObUniVersionHeader(getVersion(), getPayloadContentSize());
System.arraycopy(versionHeaderBytes, 0, bytes,
idx, versionHeaderBytes.length);
idx += versionHeaderBytes.length;
return idx;
}

protected void encodeHeader(ObByteBuf buf) {
encodeObUniVersionHeader(buf, getVersion(), getPayloadContentSize());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,30 @@
import com.alipay.oceanbase.rpc.util.ObVString;
import io.netty.buffer.ByteBuf;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;

public class ObObj implements ObSimplePayload {

private static ObObj MAX_OBJECT;
private static ObObj MIN_OBJECT;
private static ObObj NULL_OBJECT;
private static long MAX_OBJECT_VALUE = -2L;
private static long MIN_OBJECT_VALUE = -3L;

static {
MAX_OBJECT = new ObObj(ObObjType.ObExtendType.getDefaultObjMeta(), MAX_OBJECT_VALUE);
MIN_OBJECT = new ObObj(ObObjType.ObExtendType.getDefaultObjMeta(), MIN_OBJECT_VALUE);
NULL_OBJECT = new ObObj(ObObjType.ObNullType.getDefaultObjMeta(), null);
}
private static final int OBJ_POOL_SIZE = 100000;
private static final AtomicInteger CURRENT_INDEX = new AtomicInteger(0);
private static final ConcurrentLinkedQueue<ObObj> OBJ_POOL = new ConcurrentLinkedQueue<>();
static {
// 初始化对象池
for (int i = 0; i < OBJ_POOL_SIZE; i++) {
OBJ_POOL.offer(new ObObj());
}
}

/*
Expand Down Expand Up @@ -158,14 +172,33 @@ public void setValue(Object value) {
* Get instance.
*/
public static ObObj getInstance(Object value) {
ObObjMeta meta = ObObjType.defaultObjMeta(value);
ObObjType type = ObObjType.valueOfType(value);
ObObjMeta meta = null;
if (type == ObObjType.ObVarcharType) {
meta = ObObjMetaPool.varchrObjMeta;
} else {
meta = ObObjType.defaultObjMeta(value);
}
ObObj obj = OBJ_POOL.poll();
if (obj == null) {
// 如果池为空,创建新对象
obj = new ObObj();
}
// 初始化对象
obj.setMeta(meta);
if (value instanceof ObObj) {
return new ObObj(meta, ((ObObj) value).getValue());
obj.setValue(((ObObj) value).getValue());
} else {
return new ObObj(meta, value);
obj.setValue(value);
}
return obj;
}

public static ObObj getNullObject() {
return NULL_OBJECT;
}


/*
* Get max.
*/
Expand Down
Loading