Skip to content
This repository was archived by the owner on Mar 31, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
021d12b
Add UnicastGS and MultiGS based on GoalStateV2
VanderChen Jun 4, 2021
18aaa21
Add VpcUnicastGoalStateByte and VpcMulticastGoalStateByte
VanderChen Jun 5, 2021
2970780
Implment DpmService V2
VanderChen Jun 13, 2021
b6ab7c4
Implement PortService.buildPortStateV2
VanderChen Jun 15, 2021
6b21941
Implement VpcService.buildVpcStates V2
VanderChen Jun 16, 2021
09f118c
Implement VpcService.buildVpcStates V2
VanderChen Jun 16, 2021
e15f232
Build UnicastGoalStateV2 and MulticastGoalStateV2
VanderChen Jun 16, 2021
2f64247
Build UnicastGoalStateV2 and MulticastGoalStateV2
VanderChen Jun 16, 2021
5d1e2e5
Merge branch 'master' of https://github.com/futurewei-cloud/alcor int…
VanderChen Jul 8, 2021
73f44cc
Merge branch 'master' of https://github.com/futurewei-cloud/alcor int…
VanderChen Aug 2, 2021
0fa81ed
Fix patchGoalstateForNeighbor
VanderChen Aug 4, 2021
b4ccd2f
Fix patchGoalstateForNeighbor
VanderChen Aug 5, 2021
4ff2ff8
Fix DpmService
VanderChen Aug 11, 2021
a8e0eea
Solve conflicts
VanderChen Aug 15, 2021
59913ae
Merge branch 'master' of https://github.com/futurewei-cloud/alcor int…
VanderChen Sep 8, 2021
39673c3
Support GSv2 in DataPlaneClint
VanderChen Sep 11, 2021
3ca09c0
Merge branch 'master' of https://github.com/futurewei-cloud/alcor int…
VanderChen Sep 11, 2021
e5e6083
Merge branch 'mq-dpmv2' of https://github.com/VanderChen/alcor into m…
VanderChen Sep 14, 2021
cceb87c
Implement DataPlaneClientV2 for gRPC
VanderChen Sep 15, 2021
988d65a
Merge branch 'mq-dpmv2' of https://github.com/VanderChen/alcor into m…
VanderChen Sep 15, 2021
6bfe9e2
Implement DataPlaneClientV2 for gRPC
VanderChen Sep 22, 2021
9b6bc7f
Implement DataPlaneClientV2 for gRPC
VanderChen Sep 22, 2021
a575a98
Merge branch 'mq-dpmv2' of https://github.com/VanderChen/alcor into m…
VanderChen Sep 29, 2021
14a4738
Fix service bean bug
VanderChen Sep 29, 2021
d157af9
Merge branch 'master' of https://github.com/futurewei-cloud/alcor int…
VanderChen Oct 16, 2021
aa76721
Fix bug
VanderChen Oct 16, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion services/data_plane_manager/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ Copyright(c) 2020 Futurewei Cloud
<java.version>11</java.version>
<asciidoctor-plugin.version>1.5.6</asciidoctor-plugin.version>
<snippets>${project.basedir}/target/generated-snippets/</snippets>
<pulsar.version>2.6.1</pulsar.version>
<pulsar.version>2.8.0</pulsar.version>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ free of charge, to any person obtaining a copy of this software and associated d
package com.futurewei.alcor.dataplane.client;

import com.futurewei.alcor.dataplane.entity.MulticastGoalState;
import com.futurewei.alcor.dataplane.entity.MulticastGoalStateV2;
import com.futurewei.alcor.dataplane.entity.UnicastGoalState;
import com.futurewei.alcor.dataplane.entity.UnicastGoalStateV2;
import com.futurewei.alcor.schema.Goalstate.GoalState;
import com.futurewei.alcor.schema.Goalstateprovisioner.GoalStateOperationReply.GoalStateOperationStatus;
import org.springframework.stereotype.Component;
Expand All @@ -25,8 +27,8 @@ free of charge, to any person obtaining a copy of this software and associated d
import java.util.Map;

@Component
public interface DataPlaneClient {
List<String> sendGoalStates(List<UnicastGoalState> unicastGoalStates) throws Exception;
List<String> sendGoalStates(List<UnicastGoalState> unicastGoalStates,
MulticastGoalState multicastGoalState) throws Exception;
public interface DataPlaneClient <U, T> {
List<String> sendGoalStates(List<U> unicastGoalStates) throws Exception;
List<String> sendGoalStates(List<U> unicastGoalStates,
T multicastGoalState) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,16 @@ free of charge, to any person obtaining a copy of this software and associated d
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;

import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;

//@Component
@Service("grpcDataPlaneClient")
public class DataPlaneClientImpl implements DataPlaneClient {
@ConditionalOnProperty(prefix = "protobuf.goal-state-message", name = "version", havingValue = "101")
public class DataPlaneClientImpl implements DataPlaneClient<UnicastGoalState, MulticastGoalState> {
private static final Logger LOG = LoggerFactory.getLogger(DataPlaneClientImpl.class);

private int grpcPort;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,293 @@
/*
*
* MIT License
* Copyright(c) 2020 Futurewei Cloud
*
* Permission is hereby granted,
* free of charge, to any person obtaining a copy of this software and associated documentation files(the "Software"), to deal in the Software without restriction,
* including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and / or sell copies of the Software, and to permit persons
* to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
* /
*/

package com.futurewei.alcor.dataplane.client.grpc;

import com.futurewei.alcor.dataplane.client.DataPlaneClient;
import com.futurewei.alcor.dataplane.config.Config;
import com.futurewei.alcor.dataplane.entity.MulticastGoalStateV2;
import com.futurewei.alcor.dataplane.entity.UnicastGoalStateV2;
import com.futurewei.alcor.schema.GoalStateProvisionerGrpc;
import com.futurewei.alcor.schema.Goalstate;
import com.futurewei.alcor.schema.Goalstateprovisioner;
import io.grpc.ConnectivityState;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import io.netty.util.concurrent.DefaultThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;

import java.util.*;
import java.util.concurrent.*;

@Service("grpcDataPlaneClient")
@ConditionalOnProperty(prefix = "protobuf.goal-state-message", name = "version", havingValue = "102")
public class DataPlaneClientImplV2 implements DataPlaneClient<UnicastGoalStateV2, MulticastGoalStateV2> {

private static DataPlaneClientImplV2 instance = null;

private static final Logger LOG = LoggerFactory.getLogger(DataPlaneClientImplV2.class);

private int hostAgentPort;

private final ExecutorService executor;

// each host_ip should have this amount of gRPC channels
private int numberOfGrpcChannelPerHost;

// when a channel is set up, send this amount of default GoalStates for warmup.
private int numberOfWarmupsPerChannel;

// prints out UUID and time, when sending a GoalState to any of the monitorHosts
private ArrayList<String> monitorHosts;

private ConcurrentHashMap<String, ArrayList<GrpcChannelStub>> hostIpGrpcChannelStubMap;

@Override
public List<String> sendGoalStates(List<UnicastGoalStateV2> unicastGoalStates) throws Exception {
List<String> results = new ArrayList<>();
for (UnicastGoalStateV2 unicastGoalState : unicastGoalStates) {
results.add(
doSendGoalState(unicastGoalState)
);
}

return results;
}

public static DataPlaneClientImplV2 getInstance(Config globalConfig, ArrayList<String> monitorHosts) {
if (instance == null) {
instance = new DataPlaneClientImplV2(globalConfig, monitorHosts);
}
return instance;
}

public DataPlaneClientImplV2(Config globalConfig, ArrayList<String> monitorHosts) {
// each host should have at least 1 gRPC channel
if(numberOfGrpcChannelPerHost < 1) {
numberOfGrpcChannelPerHost = 1;
}

// allow users to not send warmups, if they wish to.
if(numberOfWarmupsPerChannel < 0){
numberOfWarmupsPerChannel = 0;
}

this.monitorHosts = monitorHosts;
LOG.info("Printing out all monitorHosts");
for(String host : this.monitorHosts){
LOG.info("Monitoring this host: " + host);
}
LOG.info("Done printing out all monitorHosts");
this.numberOfGrpcChannelPerHost = globalConfig.numberOfGrpcChannelPerHost;
this.numberOfWarmupsPerChannel = globalConfig.numberOfWarmupsPerChannel;
this.hostAgentPort = 50001;

this.executor = new ThreadPoolExecutor(100,
200,
50,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(),
new DefaultThreadFactory("grpc-thread-pool"));
//TODO: Setup a connection pool. one ACA, one client.
this.hostIpGrpcChannelStubMap = new ConcurrentHashMap();
LOG.info("This instance has "+ numberOfGrpcChannelPerHost+" channels, and "+ numberOfWarmupsPerChannel+" warmups");
}

@Override
public List<String> sendGoalStates(List<UnicastGoalStateV2> unicastGoalStates, MulticastGoalStateV2 multicastGoalState) throws Exception {
if (unicastGoalStates == null) {
unicastGoalStates = new ArrayList<>();
}

if (multicastGoalState != null &&
multicastGoalState.getHostIps() != null &&
multicastGoalState.getGoalState() != null) {
for (String hostIp: multicastGoalState.getHostIps()) {
UnicastGoalStateV2 unicastGoalState = new UnicastGoalStateV2();
unicastGoalState.setHostIp(hostIp);
unicastGoalState.setGoalState(multicastGoalState.getGoalState());

unicastGoalStates.add(unicastGoalState);
}
}

if (unicastGoalStates.size() > 0) {
return sendGoalStates(unicastGoalStates);
}

return null;
}

private GrpcChannelStub createGrpcChannelStub(String hostIp) {
ManagedChannel channel = ManagedChannelBuilder.forAddress(hostIp, this.hostAgentPort)
.usePlaintext()
.keepAliveWithoutCalls(true)
.keepAliveTime(Long.MAX_VALUE, TimeUnit.SECONDS)
.build();
GoalStateProvisionerGrpc.GoalStateProvisionerStub asyncStub = GoalStateProvisionerGrpc.newStub(channel);

return new GrpcChannelStub(channel, asyncStub);
}

private GrpcChannelStub getOrCreateGrpcChannel(String hostIp) {
if (!this.hostIpGrpcChannelStubMap.containsKey(hostIp)) {
this.hostIpGrpcChannelStubMap.put(hostIp, createGrpcChannelStubArrayList(hostIp));
LOG.info("[getOrCreateGrpcChannel] Created a channel and stub to host IP: " + hostIp);
}
int usingChannelWithThisIndex = ThreadLocalRandom.current().nextInt(0, numberOfGrpcChannelPerHost);
ManagedChannel chan = this.hostIpGrpcChannelStubMap.get(hostIp).get(usingChannelWithThisIndex).channel;
//checks the channel status, reconnects if the channel is IDLE

ConnectivityState channelState = chan.getState(true);
if (channelState != ConnectivityState.READY && channelState != ConnectivityState.CONNECTING && channelState != ConnectivityState.IDLE) {
GrpcChannelStub newChannelStub = createGrpcChannelStub(hostIp);
this.hostIpGrpcChannelStubMap.get(hostIp).set(usingChannelWithThisIndex, newChannelStub);
LOG.info("[getOrCreateGrpcChannel] Replaced a channel and stub to host IP: " + hostIp);
}
LOG.info("[getOrCreateGrpcChannel] Using channel and stub index " + usingChannelWithThisIndex + " to host IP: " + hostIp);
return this.hostIpGrpcChannelStubMap.get(hostIp).get(usingChannelWithThisIndex);
}

private ArrayList<GrpcChannelStub> createGrpcChannelStubArrayList(String hostIp) {
long start = System.currentTimeMillis();
ArrayList<GrpcChannelStub> arr = new ArrayList<>();
for (int i = 0; i < numberOfGrpcChannelPerHost; i++) {
GrpcChannelStub channelStub = createGrpcChannelStub(hostIp);
warmUpChannelStub(channelStub, hostIp);
arr.add(channelStub);
}
long end = System.currentTimeMillis();
LOG.info("[createGrpcChannelStubArrayList] Created " + numberOfGrpcChannelPerHost + " gRPC channel stubs for host " + hostIp + ", elapsed Time in milli seconds: " + (end - start));
return arr;
}

// try to warmup a gRPC channel and its stub, by sending an empty GoalState`.
void warmUpChannelStub(GrpcChannelStub channelStub, String hostIp) {
GoalStateProvisionerGrpc.GoalStateProvisionerStub asyncStub = channelStub.stub;

StreamObserver<Goalstateprovisioner.GoalStateOperationReply> responseObserver = new StreamObserver<>() {
@Override
public void onNext(Goalstateprovisioner.GoalStateOperationReply reply) {
LOG.info("Receive warmup response from ACA@" + hostIp + " | " + reply.toString());
}

@Override
public void onError(Throwable t) {
LOG.warn("Receive warmup error from ACA@" + hostIp + " | " + t.getMessage());
}

@Override
public void onCompleted() {
LOG.info("Complete receiving warmup message from ACA@" + hostIp);
}
};

StreamObserver<Goalstate.GoalStateV2> requestObserver = asyncStub.pushGoalStatesStream(responseObserver);
try {
Goalstate.GoalStateV2 goalState = Goalstate.GoalStateV2.getDefaultInstance();
LOG.info("Sending GS to Host " + hostIp + " as follows | " + goalState.toString());
for (int i = 0; i < numberOfWarmupsPerChannel; i++) {
requestObserver.onNext(goalState);
}
} catch (RuntimeException e) {
// Cancel RPC
LOG.warn("[doSendGoalState] Sending GS, but error happened | " + e.getMessage());
requestObserver.onError(e);
throw e;
}
// Mark the end of requests
LOG.info("Sending warmup GS to Host " + hostIp + " is completed");
return;
}

private String doSendGoalState(UnicastGoalStateV2 unicastGoalState) {
String hostIp = unicastGoalState.getHostIp();
LOG.info("Setting up a channel to ACA on: " + hostIp);
long start = System.currentTimeMillis();

GrpcChannelStub channelStub = getOrCreateGrpcChannel(hostIp);
long chan_established = System.currentTimeMillis();
LOG.info("[doSendGoalState] Established channel, elapsed Time in milli seconds: " + (chan_established - start));
GoalStateProvisionerGrpc.GoalStateProvisionerStub asyncStub = channelStub.stub;

long stub_established = System.currentTimeMillis();
LOG.info("[doSendGoalState] Established stub, elapsed Time after channel established in milli seconds: " + (stub_established - chan_established));

Map<String, List<Goalstateprovisioner.GoalStateOperationReply.GoalStateOperationStatus>> result = new HashMap<>();
StreamObserver<Goalstateprovisioner.GoalStateOperationReply> responseObserver = new StreamObserver<>() {
@Override
public void onNext(Goalstateprovisioner.GoalStateOperationReply reply) {
LOG.info("Receive response from ACA@" + hostIp + " | " + reply.toString());
result.put(hostIp, reply.getOperationStatusesList());
}

@Override
public void onError(Throwable t) {
LOG.warn("Receive error from ACA@" + hostIp + " | " + t.getMessage());
}

@Override
public void onCompleted() {
LOG.info("Complete receiving message from ACA@" + hostIp);
}
};

StreamObserver<Goalstate.GoalStateV2> requestObserver = asyncStub.pushGoalStatesStream(responseObserver);
try {
Goalstate.GoalStateV2 goalState = unicastGoalState.getGoalState();
LOG.info("Sending GS to Host " + hostIp + " as follows | " + goalState.toString());
requestObserver.onNext(goalState);
if (unicastGoalState.getGoalState().getNeighborStatesCount() == 1 && monitorHosts.contains(hostIp)) {
long sent_gs_time = System.currentTimeMillis();
// If there's only one neighbor state and it is trying to send it to aca_node_one, the IP of which is now
// hardcoded) this send goalstate action is probably caused by on-demand workflow, need to record when it
// sends this goalState so what we can look into this and the ACA log to see how much time was spent.
String neighbor_id = unicastGoalState.getGoalState().getNeighborStatesMap().keySet().iterator().next();
LOG.info("Sending neighbor ID: " + neighbor_id + " at: " + sent_gs_time);
}
} catch (RuntimeException e) {
// Cancel RPC
LOG.warn("[doSendGoalState] Sending GS, but error happened | " + e.getMessage());
requestObserver.onError(e);
throw e;
}
// Mark the end of requests
LOG.info("Sending GS to Host " + hostIp + " is completed");

// comment out onCompleted so that the same channel/stub and keep sending next time.
// requestObserver.onCompleted();

// shutdown(channel);

return null;
}

private class GrpcChannelStub {
public ManagedChannel channel;
public GoalStateProvisionerGrpc.GoalStateProvisionerStub stub;

public GrpcChannelStub(ManagedChannel channel, GoalStateProvisionerGrpc.GoalStateProvisionerStub stub) {
this.channel = channel;
this.stub = stub;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@ free of charge, to any person obtaining a copy of this software and associated d

import com.futurewei.alcor.dataplane.client.DataPlaneClient;
import com.futurewei.alcor.dataplane.entity.MulticastGoalState;
import com.futurewei.alcor.dataplane.entity.MulticastGoalStateV2;
import com.futurewei.alcor.dataplane.entity.UnicastGoalState;
import com.futurewei.alcor.dataplane.entity.UnicastGoalStateV2;
import com.futurewei.alcor.schema.Goalstate;
import com.futurewei.alcor.schema.Goalstateprovisioner;

import java.util.List;
import java.util.Map;

public class DataPlaneClientImpl implements DataPlaneClient {
public class DataPlaneClientImpl implements DataPlaneClient<UnicastGoalState, MulticastGoalState> {

@Override
public List<String> sendGoalStates(List<UnicastGoalState> unicastGoalStates) throws Exception {
Expand Down
Loading