Skip to content

Commit

Permalink
KYLIN-3744 Add configuration and fix bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
hit-lacus authored and shaofengshi committed Mar 24, 2019
1 parent ad9b49b commit 9a363c1
Show file tree
Hide file tree
Showing 9 changed files with 71 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2059,6 +2059,18 @@ public String getStreamingSegmentRetentionPolicy() {
return getOptional("kylin.stream.segment.retention.policy", "fullBuild");
}

public String getStreamingAssigner() {
return getOptional("kylin.stream.assigner", "DefaultAssigner");
}

public int getCoordinatorHttpClientTimeout() {
return Integer.parseInt(getOptional("kylin.stream.coordinator.client.timeout.millsecond", "5000"));
}

public int getReceiverHttpClientTimeout() {
return Integer.parseInt(getOptional("kylin.stream.receiver.client.timeout.millsecond", "5000"));
}

public int getStreamingReceiverHttpMaxThreads() {
return Integer.parseInt(getOptional("kylin.stream.receiver.http.max.threads", "200"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.IOException;
import java.net.InetAddress;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -479,6 +480,21 @@ private boolean assignmentEqual(Map<String, List<Partition>> receiverAssignment,
if (emptyMap(receiverAssignment) && emptyMap(rsAssignment)) {
return true;
}

if (receiverAssignment != null) {
for (Map.Entry<String, List<Partition>> entry : receiverAssignment.entrySet()) {
Collections.sort(entry.getValue());
entry.setValue(entry.getValue());
}
}

if (rsAssignment != null) {
for (Map.Entry<String, List<Partition>> entry : rsAssignment.entrySet()) {
Collections.sort(entry.getValue());
entry.setValue(entry.getValue());
}
}

if (receiverAssignment != null && receiverAssignment.equals(rsAssignment)) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.kylin.stream.coordinator.assign.Assigner;
import org.apache.kylin.stream.coordinator.assign.AssignmentUtil;
import org.apache.kylin.stream.coordinator.assign.AssignmentsCache;
import org.apache.kylin.stream.coordinator.assign.CubePartitionRoundRobinAssigner;
import org.apache.kylin.stream.coordinator.exception.ClusterStateException;
import org.apache.kylin.stream.coordinator.exception.StoreException;
import org.apache.kylin.stream.coordinator.exception.ClusterStateException.TransactionStep;
Expand Down Expand Up @@ -139,7 +140,7 @@ public class Coordinator implements CoordinatorClient {
private Coordinator() {
this.streamMetadataStore = StreamMetadataStoreFactory.getStreamMetaDataStore();
this.receiverAdminClient = new HttpReceiverAdminClient();
this.assigner = new DefaultAssigner();
this.assigner = getAssigner();
this.zkClient = ZKUtils.getZookeeperClient();
this.selector = new CoordinatorLeaderSelector();
this.jobStatusChecker = new StreamingBuildJobStatusChecker();
Expand All @@ -154,7 +155,7 @@ private Coordinator() {
public Coordinator(StreamMetadataStore metadataStore, ReceiverAdminClient receiverClient) {
this.streamMetadataStore = metadataStore;
this.receiverAdminClient = receiverClient;
this.assigner = new DefaultAssigner();
this.assigner = getAssigner();
this.zkClient = ZKUtils.getZookeeperClient();
this.selector = new CoordinatorLeaderSelector();
this.jobStatusChecker = new StreamingBuildJobStatusChecker();
Expand Down Expand Up @@ -479,8 +480,8 @@ void doReassign(CubeInstance cubeInstance, CubeAssignment preAssignments, CubeAs
}
}
if (needRollback.isEmpty()) {
throw new ClusterStateException(cubeName, ClusterState.ROLLBACK_SUCCESS,
TransactionStep.STOP_AND_SNYC, "", e);
throw new ClusterStateException(cubeName, ClusterState.ROLLBACK_SUCCESS, TransactionStep.STOP_AND_SNYC,
"", e);
} else {
StringBuilder str = new StringBuilder();
try {
Expand Down Expand Up @@ -604,8 +605,8 @@ void doReassign(CubeInstance cubeInstance, CubeAssignment preAssignments, CubeAs
throw new ClusterStateException(cubeName, ClusterState.ROLLBACK_FAILED, TransactionStep.ASSIGN_NEW,
failedInfo, e);
} else if (!failedReceiver.isEmpty()) {
throw new ClusterStateException(cubeName, ClusterState.ROLLBACK_FAILED,
TransactionStep.MAKE_IMMUTABLE, failedInfo, e);
throw new ClusterStateException(cubeName, ClusterState.ROLLBACK_FAILED, TransactionStep.MAKE_IMMUTABLE,
failedInfo, e);
} else {
throw new ClusterStateException(cubeName, ClusterState.ROLLBACK_SUCCESS, TransactionStep.ASSIGN_NEW,
failedInfo, e);
Expand Down Expand Up @@ -1251,6 +1252,23 @@ private boolean checkSegmentIsReadyToBuild(List<SegmentBuildState> allSegmentSta
return false;
}

private Assigner getAssigner() {
String assignerName = getConfig().getStreamingAssigner();
Assigner oneAssigner;
logger.debug("Using assigner {}", assignerName);
switch (assignerName) {
case "DefaultAssigner":
oneAssigner = new DefaultAssigner();
break;
case "CubePartitionRoundRobinAssigner":
oneAssigner = new CubePartitionRoundRobinAssigner();
break;
default:
oneAssigner = new DefaultAssigner();
}
return oneAssigner;
}

private class CoordinatorLeaderSelector extends LeaderSelectorListenerAdapter implements Closeable {
private LeaderSelector leaderSelector;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import java.util.Map;

import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.stream.coordinator.StreamMetadataStore;
Expand All @@ -41,6 +42,7 @@
public class HttpCoordinatorClient implements CoordinatorClient {
private static final Logger logger = LoggerFactory.getLogger(HttpCoordinatorClient.class);

private static final String CUBES = "/cubes/";
private StreamMetadataStore streamMetadataStore;
private RestService restService;
private Node coordinatorNode;
Expand All @@ -52,8 +54,7 @@ public HttpCoordinatorClient(StreamMetadataStore metadataStore) {
int maxRetry = 10;

this.retryCaller = new RetryCaller(maxRetry, 1000);
int connectionTimeout = 5000; // default connection timeout is 5s, todo
// move to configuration
int connectionTimeout = KylinConfig.getInstanceFromEnv().getCoordinatorHttpClientTimeout();
int readTimeout = 10000;
this.restService = new RestService(connectionTimeout, readTimeout);
}
Expand Down Expand Up @@ -115,7 +116,7 @@ public void reBalance(Map<Integer, Map<String, List<Partition>>> reBalancePlan)
public void assignCube(String cubeName) {
logger.info("send assign request to coordinator");
try {
putRequest("/cubes/" + cubeName + "/assign");
putRequest(CUBES + cubeName + "/assign");
} catch (IOException e) {
throw new StreamingException(e);
}
Expand All @@ -125,7 +126,7 @@ public void assignCube(String cubeName) {
public void unAssignCube(String cubeName) {
logger.info("send unAssign request to coordinator");
try {
putRequest("/cubes/" + cubeName + "/unAssign");
putRequest(CUBES + cubeName + "/unAssign");
} catch (IOException e) {
throw new StreamingException(e);
}
Expand All @@ -135,7 +136,7 @@ public void unAssignCube(String cubeName) {
public void reAssignCube(String cubeName, CubeAssignment newAssignments) {
logger.info("send reassign request to coordinator");
try {
String path = "/cubes/" + cubeName + "/reAssign";
String path = CUBES + cubeName + "/reAssign";
String content = JsonUtil.writeValueAsIndentString(newAssignments);
postRequest(path, content);
} catch (IOException e) {
Expand Down Expand Up @@ -192,7 +193,7 @@ public void removeNodeFromReplicaSet(Integer replicaSetID, String nodeID) {
public void pauseConsumers(String cubeName) {
logger.info("send cube pause request to coordinator: {}", cubeName);
try {
String path = "/cubes/" + cubeName + "/pauseConsume";
String path = CUBES + cubeName + "/pauseConsume";
putRequest(path);
} catch (IOException e) {
throw new StreamingException(e);
Expand All @@ -203,7 +204,7 @@ public void pauseConsumers(String cubeName) {
public void resumeConsumers(String cubeName) {
logger.info("send cube resume request to coordinator: {}", cubeName);
try {
String path = "/cubes/" + cubeName + "/resumeConsume";
String path = CUBES + cubeName + "/resumeConsume";
putRequest(path);
} catch (IOException e) {
throw new StreamingException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ public void setUp() throws Exception {
@After
public void tearDown() throws Exception {
coordinator = null;
System.clearProperty("kylin.stream.zookeeper");
testingServer.stop();// clear metadata
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.IOException;

import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.stream.core.model.AssignRequest;
import org.apache.kylin.stream.core.model.ConsumerStatsResponse;
Expand All @@ -46,7 +47,7 @@ public class HttpReceiverAdminClient implements ReceiverAdminClient {
private RetryCaller retryCaller;

public HttpReceiverAdminClient() {
int connectionTimeout = 5000; // default connection timeout is 5s, todo move to configuration
int connectionTimeout = KylinConfig.getInstanceFromEnv().getReceiverHttpClientTimeout();
int readTimeout = 30000;
this.maxRetry = 3;
this.retryPauseTime = 1000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,14 @@

public class LZ4CompressorTest {
public static void main(String[] args) throws Exception {

if(args.length == 0){
System.out.println("args[0] must be data file path");
return;
}
LZ4Factory factory = LZ4Factory.fastestInstance();

byte[] data = Files.toByteArray(new File("/Users/ganma/dev/githome/kylin/stream-core/stream_index/test_streaming_v2_cube/20180730070000_20180730080000/1/1.data"));
byte[] data = Files.toByteArray(new File(args[0]));
final int decompressedLength = data.length;

// compress data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public String execRequest(HttpRequestBase request, int connectionTimeout, int re
displayMessage = msg;
}
logger.trace("Send request: {}. And receive response[{}] which lenght is {}, and content is {}.", code,
request.getRequestLine().toString(), msg.length(), displayMessage);
request.getRequestLine(), msg.length(), displayMessage);
}
if (code != 200)
throw new IOException("Invalid http response " + code + " when send request: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,6 @@ private void startSegmentStateChecker() {
@Override
public void run() {
Collection<StreamingSegmentManager> segmentManagers = getAllCubeSegmentManagers();
ServerContext serverContext = new ServerContext();
serverContext.setReplicaSetID(replicaSetID);
serverContext.setReceiver(currentNode);
for (StreamingSegmentManager segmentManager : segmentManagers) {
CubeInstance cubeInstance = segmentManager.getCubeInstance();
String cubeName = cubeInstance.getName();
Expand Down Expand Up @@ -193,8 +190,7 @@ public void run() {
/**
* <pre>
* When segment status was changed to immutable, the leader of replica will
* try to upload local segment cache to remote, while the follower will remove
* local segment cache.
* try to upload local segment cache to remote.
* </pre>
*/
private void handleImmutableCubeSegments(String cubeName, StreamingSegmentManager segmentManager,
Expand Down

0 comments on commit 9a363c1

Please sign in to comment.