Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipe: using clusterId to judge whether the target cluster is source cluster #11994

Merged
merged 24 commits into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor
  • Loading branch information
SteveYurongSu committed Feb 2, 2024
commit 075f8bc5cd95f30c338a7907330efc9f4c35f866
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;

import org.apache.commons.lang3.SerializationUtils;
import org.apache.thrift.TException;

import java.io.DataOutputStream;
Expand All @@ -49,13 +48,22 @@ public Map<String, String> getParams() {

/////////////////////////////// Thrift ///////////////////////////////

public static PipeTransferHandshakeV2Req toTPipeTransferReq(HashMap<String, String> params)
throws TException {
public static PipeTransferHandshakeV2Req toTPipeTransferReq(Map<String, String> params)
throws TException, IOException {
final PipeTransferHandshakeV2Req handshakeReq = new PipeTransferHandshakeV2Req();

handshakeReq.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion();
handshakeReq.type = PipeRequestType.HANDSHAKE_V2.getType();
handshakeReq.body = ByteBuffer.wrap(SerializationUtils.serialize(params));
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
ReadWriteIOUtils.write(params.size(), outputStream);
for (final Map.Entry<String, String> entry : params.entrySet()) {
ReadWriteIOUtils.write(entry.getKey(), outputStream);
ReadWriteIOUtils.write(entry.getValue(), outputStream);
}
handshakeReq.body =
ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
}

handshakeReq.params = params;

Expand All @@ -65,7 +73,14 @@ public static PipeTransferHandshakeV2Req toTPipeTransferReq(HashMap<String, Stri
public static PipeTransferHandshakeV2Req fromTPipeTransferReq(TPipeTransferReq transferReq) {
final PipeTransferHandshakeV2Req handshakeReq = new PipeTransferHandshakeV2Req();

handshakeReq.params = SerializationUtils.deserialize(transferReq.body.array());
Map<String, String> params = new HashMap<>();
final int size = ReadWriteIOUtils.readInt(transferReq.body);
for (int i = 0; i < size; ++i) {
final String key = ReadWriteIOUtils.readString(transferReq.body);
final String value = ReadWriteIOUtils.readString(transferReq.body);
params.put(key, value);
}
handshakeReq.params = params;

handshakeReq.version = transferReq.version;
handshakeReq.type = transferReq.type;
Expand All @@ -81,7 +96,11 @@ public static byte[] toTransferHandshakeBytes(HashMap<String, String> params) th
final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
ReadWriteIOUtils.write(IoTDBConnectorRequestVersion.VERSION_1.getVersion(), outputStream);
ReadWriteIOUtils.write(PipeRequestType.HANDSHAKE_V2.getType(), outputStream);
ReadWriteIOUtils.write(ByteBuffer.wrap(SerializationUtils.serialize(params)), outputStream);
ReadWriteIOUtils.write(params.size(), outputStream);
for (final Map.Entry<String, String> entry : params.entrySet()) {
ReadWriteIOUtils.write(entry.getKey(), outputStream);
ReadWriteIOUtils.write(entry.getValue(), outputStream);
}
return byteArrayOutputStream.toByteArray();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.apache.iotdb.commons.concurrent.WrappedRunnable;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.connector.payload.request.PipeRequestType;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapELanguageConstant;
import org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapOneByteResponse;
Expand Down Expand Up @@ -118,18 +117,12 @@ private void receive() throws IOException {
final ByteBuffer byteBuffer = ByteBuffer.wrap(data, LONG_LEN, data.length - LONG_LEN);

// Pseudo request, to reuse logic in IoTDBThriftReceiverAgent
byte version = ReadWriteIOUtils.readByte(byteBuffer);
short type = ReadWriteIOUtils.readShort(byteBuffer);
if (type == PipeRequestType.HANDSHAKE_V2.getType()) {
ReadWriteIOUtils.readInt(byteBuffer);
}
ByteBuffer body = byteBuffer.slice();
final AirGapPseudoTPipeTransferRequest req =
(AirGapPseudoTPipeTransferRequest)
new AirGapPseudoTPipeTransferRequest()
.setVersion(version)
.setType(type)
.setBody(body);
.setVersion(ReadWriteIOUtils.readByte(byteBuffer))
.setType(ReadWriteIOUtils.readShort(byteBuffer))
.setBody(byteBuffer.slice());
final TPipeTransferResp resp = agent.receive(req, partitionFetcher, schemaFetcher);

if (resp.getStatus().code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;

import org.apache.commons.lang3.SerializationUtils;
import org.apache.thrift.TException;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -81,10 +79,11 @@ public void testPipeValidateHandshakeV1Req() throws IOException {
}

@Test
public void testPipeValidateHandshakeV2Req() throws TException {
public void testPipeValidateHandshakeV2Req() throws Exception {
HashMap<String, String> params = new HashMap<>();
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID, CLUSTER_ID);
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION, TIME_PRECISION);
params.put("Nullable", null);

PipeTransferHandshakeV2Req req = PipeTransferHandshakeV2Req.toTPipeTransferReq(params);
PipeTransferHandshakeV2Req deserializeReq =
Expand All @@ -100,6 +99,8 @@ public void testPipeValidateHandshakeV2Req() throws TException {
Assert.assertEquals(
req.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION),
deserializeReq.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION));
Assert.assertEquals(
req.getParams().get("Nullable"), deserializeReq.getParams().get("Nullable"));
}

@Test
Expand All @@ -108,22 +109,25 @@ public void testPipeValidateHandshakeV2Req4AirGap() throws IOException {
HashMap<String, String> params = new HashMap<>();
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID, CLUSTER_ID);
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION, TIME_PRECISION);
params.put("Nullable", null);
ByteBuffer byteBuffer =
ByteBuffer.wrap(PipeTransferHandshakeV2Req.toTransferHandshakeBytes(params));

// Construct request.
byte version = ReadWriteIOUtils.readByte(byteBuffer);
short type = ReadWriteIOUtils.readShort(byteBuffer);
ReadWriteIOUtils.readInt(byteBuffer);
ByteBuffer body = byteBuffer.slice();
final AirGapPseudoTPipeTransferRequest req =
(AirGapPseudoTPipeTransferRequest)
new AirGapPseudoTPipeTransferRequest().setVersion(version).setType(type).setBody(body);
final PipeTransferHandshakeV2Req deserializeReq =
PipeTransferHandshakeV2Req.fromTPipeTransferReq(req);

// Assert.
Assert.assertEquals(IoTDBConnectorRequestVersion.VERSION_1.getVersion(), req.getVersion());
Assert.assertEquals(PipeRequestType.HANDSHAKE_V2.getType(), req.getType());
Assert.assertArrayEquals(SerializationUtils.serialize(params), req.getBody());
Assert.assertEquals(
IoTDBConnectorRequestVersion.VERSION_1.getVersion(), deserializeReq.getVersion());
Assert.assertEquals(PipeRequestType.HANDSHAKE_V2.getType(), deserializeReq.getType());
Assert.assertEquals(params, deserializeReq.getParams());
}

@Test
Expand Down
Loading