Skip to content

Commit 42b00d6

Browse files
authored
[IOTDB-4135] Merge thrift-sync into ClientRPC (apache#7004)
1 parent 0292905 commit 42b00d6

File tree

46 files changed

+310
-1279
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+310
-1279
lines changed

antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,7 @@ utilityStatement
6565
| loadConfiguration | loadTimeseries | loadFile | removeFile | unloadFile;
6666

6767
syncStatement
68-
: startPipeServer | stopPipeServer | showPipeServer
69-
| createPipeSink | showPipeSinkType | showPipeSink | dropPipeSink
68+
: createPipeSink | showPipeSinkType | showPipeSink | dropPipeSink
7069
| createPipe | showPipe | stopPipe | startPipe | dropPipe;
7170

7271
/**
@@ -764,18 +763,6 @@ syncAttributeClauses
764763
: attributePair (COMMA attributePair)*
765764
;
766765

767-
// sync receiver
768-
startPipeServer
769-
: START PIPESERVER
770-
;
771-
772-
stopPipeServer
773-
: STOP PIPESERVER
774-
;
775-
776-
showPipeServer
777-
: SHOW PIPESERVER
778-
;
779766

780767
/**
781768
* 7. Common Clauses

antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -379,10 +379,6 @@ PIPES
379379
: P I P E S
380380
;
381381

382-
PIPESERVER
383-
: P I P E S E R V E R
384-
;
385-
386382
PIPESINK
387383
: P I P E S I N K
388384
;

integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java

Lines changed: 3 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -21,19 +21,17 @@
2121
import org.apache.iotdb.commons.path.PartialPath;
2222
import org.apache.iotdb.db.conf.IoTDBDescriptor;
2323
import org.apache.iotdb.db.engine.modification.Deletion;
24-
import org.apache.iotdb.db.exception.sync.PipeServerException;
2524
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
2625
import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
2726
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
2827
import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
29-
import org.apache.iotdb.db.sync.SyncService;
3028
import org.apache.iotdb.db.sync.pipedata.DeletionPipeData;
3129
import org.apache.iotdb.db.sync.pipedata.PipeData;
3230
import org.apache.iotdb.db.sync.pipedata.SchemaPipeData;
3331
import org.apache.iotdb.db.sync.pipedata.TsFilePipeData;
3432
import org.apache.iotdb.db.sync.sender.pipe.Pipe;
3533
import org.apache.iotdb.db.sync.sender.pipe.TsFilePipe;
36-
import org.apache.iotdb.db.sync.transport.client.IoTDBSInkTransportClient;
34+
import org.apache.iotdb.db.sync.transport.client.IoTDBSinkTransportClient;
3735
import org.apache.iotdb.db.utils.EnvironmentUtils;
3836
import org.apache.iotdb.itbase.category.LocalStandaloneTest;
3937
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -51,7 +49,6 @@
5149
import org.slf4j.LoggerFactory;
5250

5351
import java.io.File;
54-
import java.net.Socket;
5552
import java.util.ArrayList;
5653
import java.util.Arrays;
5754
import java.util.List;
@@ -71,7 +68,7 @@ public class IoTDBSyncReceiverIT {
7168
String remoteIp1;
7269
long createdTime1 = System.currentTimeMillis();
7370
String showPipeSql = "SHOW PIPE";
74-
IoTDBSInkTransportClient client;
71+
IoTDBSinkTransportClient client;
7572

7673
@Before
7774
public void setUp() throws Exception {
@@ -94,15 +91,9 @@ public void setUp() throws Exception {
9491
FileUtils.moveDirectory(srcDir, tmpDir);
9592
EnvironmentUtils.cleanEnv();
9693
EnvironmentUtils.envSetUp();
97-
try {
98-
SyncService.getInstance().startPipeServer(true);
99-
new Socket("localhost", 6670).close();
100-
} catch (Exception e) {
101-
Assert.fail("Failed to start pipe server because " + e.getMessage());
102-
}
10394
Pipe pipe = new TsFilePipe(createdTime1, pipeName1, null, 0, false);
10495
remoteIp1 = "127.0.0.1";
105-
client = new IoTDBSInkTransportClient(pipe, remoteIp1, 6670, "127.0.0.1");
96+
client = new IoTDBSinkTransportClient(pipe, remoteIp1, 6667, "127.0.0.1");
10697
client.handshake();
10798
}
10899

@@ -120,16 +111,6 @@ public void tearDown() throws Exception {
120111
EnvironmentUtils.cleanEnv();
121112
}
122113

123-
@Test
124-
public void testStopPipeServer() {
125-
logger.info("testStopPipeServerCheck");
126-
try {
127-
SyncService.getInstance().stopPipeServer();
128-
} catch (PipeServerException e) {
129-
Assert.fail("Can not stop pipe server");
130-
}
131-
}
132-
133114
@Test
134115
public void testReceiveDataAndLoad() {
135116
logger.info("testReceiveDataAndLoad");

node-commons/pom.xml

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -77,11 +77,6 @@
7777
<groupId>com.google.guava</groupId>
7878
<artifactId>guava</artifactId>
7979
</dependency>
80-
<dependency>
81-
<groupId>org.apache.iotdb</groupId>
82-
<artifactId>iotdb-thrift</artifactId>
83-
<version>${project.version}</version>
84-
</dependency>
8580
<dependency>
8681
<groupId>org.apache.iotdb</groupId>
8782
<artifactId>iotdb-thrift-confignode</artifactId>

node-commons/src/main/java/org/apache/iotdb/commons/sync/SyncConstant.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,6 @@ public class SyncConstant {
7272
public static final int DATA_CHUNK_SIZE =
7373
Math.min(16 * 1024 * 1024, RpcUtils.THRIFT_FRAME_MAX_SIZE);
7474

75-
public static final int SUCCESS_CODE = 1;
76-
public static final int ERROR_CODE = -1;
77-
public static final int REBASE_CODE = -2;
78-
public static final int RETRY_CODE = -3;
79-
public static final int CONFLICT_CODE = -4;
80-
8175
/** receiver */
8276
public static final String RECEIVER_DIR_NAME = "receiver";
8377

node-commons/src/main/java/org/apache/iotdb/commons/sync/SyncPathUtil.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
package org.apache.iotdb.commons.sync;
2121

2222
import org.apache.iotdb.commons.conf.CommonDescriptor;
23-
import org.apache.iotdb.service.transport.thrift.IdentityInfo;
23+
import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
2424

2525
import java.io.File;
2626
import java.io.IOException;
@@ -105,12 +105,12 @@ public static String getReceiverFileDataDir(String pipeName, String remoteIp, lo
105105
+ SyncConstant.FILE_DATA_DIR_NAME;
106106
}
107107

108-
public static String getFileDataDirPath(IdentityInfo identityInfo) {
108+
public static String getFileDataDirPath(TSyncIdentityInfo identityInfo) {
109109
return SyncPathUtil.getReceiverFileDataDir(
110110
identityInfo.getPipeName(), identityInfo.getAddress(), identityInfo.getCreateTime());
111111
}
112112

113-
public static String getPipeLogDirPath(IdentityInfo identityInfo) {
113+
public static String getPipeLogDirPath(TSyncIdentityInfo identityInfo) {
114114
return SyncPathUtil.getReceiverPipeLogDir(
115115
identityInfo.getPipeName(), identityInfo.getAddress(), identityInfo.getCreateTime());
116116
}

pom.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,6 @@
8585
<module>thrift-commons</module>
8686
<module>thrift-confignode</module>
8787
<module>thrift-multi-leader-consensus</module>
88-
<module>thrift-sync</module>
8988
<module>thrift-influxdb</module>
9089
<module>service-rpc</module>
9190
<module>jdbc</module>

server/src/assembly/resources/conf/iotdb-datanode.properties

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -675,10 +675,6 @@ timestamp_precision=ms
675675
####################
676676
### PIPE Server Configuration
677677
####################
678-
# PIPE server port to listen
679-
# Datatype: int
680-
# pipe_server_port=6670
681-
682678
# White IP list of Sync client.
683679
# Please use the form of network segment to present the range of IP, for example: 192.168.0.0/16
684680
# If there are more than one IP segment, please separate them by commas

server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -501,9 +501,6 @@ public class IoTDBConfig {
501501
*/
502502
private int externalSortThreshold = 1000;
503503

504-
/** If this IoTDB instance is a receiver of sync, set the server port. */
505-
private int pipeServerPort = 6670;
506-
507504
/** White list for sync */
508505
private String ipWhiteList = "0.0.0.0/0";
509506

@@ -1441,14 +1438,6 @@ public void setmRemoteSchemaCacheSize(int mRemoteSchemaCacheSize) {
14411438
this.mRemoteSchemaCacheSize = mRemoteSchemaCacheSize;
14421439
}
14431440

1444-
public int getPipeServerPort() {
1445-
return pipeServerPort;
1446-
}
1447-
1448-
public void setPipeServerPort(int pipeServerPort) {
1449-
this.pipeServerPort = pipeServerPort;
1450-
}
1451-
14521441
public int getMaxNumberOfSyncFileRetry() {
14531442
return maxNumberOfSyncFileRetry;
14541443
}

server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -417,11 +417,6 @@ private void loadProps() {
417417
properties.getProperty(
418418
"session_timeout_threshold",
419419
Integer.toString(conf.getSessionTimeoutThreshold()))));
420-
conf.setPipeServerPort(
421-
Integer.parseInt(
422-
properties
423-
.getProperty("pipe_server_port", Integer.toString(conf.getPipeServerPort()))
424-
.trim()));
425420
conf.setMaxNumberOfSyncFileRetry(
426421
Integer.parseInt(
427422
properties
@@ -1417,10 +1412,6 @@ public void loadHotModifiedProps(Properties properties) throws QueryProcessExcep
14171412
String.valueOf(conf.getSelectIntoInsertTabletPlanRowLimit()))));
14181413

14191414
// update sync config
1420-
conf.setPipeServerPort(
1421-
Integer.parseInt(
1422-
properties.getProperty(
1423-
"pipe_server_port", String.valueOf(conf.getPipeServerPort()))));
14241415
conf.setMaxNumberOfSyncFileRetry(
14251416
Integer.parseInt(
14261417
properties

0 commit comments

Comments
 (0)