Skip to content
This repository was archived by the owner on Mar 31, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
1c121dc
fix invalid pointer bug
luyaoluo Mar 10, 2021
a271fe3
[Fix #223]remove arp test case on one machine
luyaoluo Mar 16, 2021
5c5add0
Merge branch 'master' of https://github.com/futurewei-cloud/alcor-con…
luyaoluo Aug 9, 2021
8e019a9
some minor change
luyaoluo Sep 4, 2021
b54e3ee
Merge branch 'master' of https://github.com/futurewei-cloud/alcor-con…
luyaoluo Sep 22, 2021
5de9608
Add listeners for multicast and unicast consumer
luyaoluo Oct 20, 2021
8155093
Add CLI parameter for pulsar hashed key
luyaoluo Oct 26, 2021
00fd7c7
add mq test
Nov 29, 2021
5311466
mq publish serialized goalstate.
Nov 29, 2021
869655a
add GoalStateV2 test case
Nov 29, 2021
c6ef0fb
add unicast test cases
Dec 3, 2021
103a751
Pulsar producer can publish key-shared messages.
Fangjin98 Dec 3, 2021
c61e6d0
Merge pull request #1 from Fangjin98/master
luyaoluo Dec 3, 2021
b30c925
Merge branch 'master' of https://github.com/lly00/alcor-control-agent
luyaoluo Dec 3, 2021
44d6bd4
Merge branch 'master' of https://github.com/futurewei-cloud/alcor-con…
luyaoluo Dec 3, 2021
46b2316
add test cases for mq
luyaoluo Dec 3, 2021
dde2e94
fix pulsar path
luyaoluo Dec 3, 2021
1c89be9
add deserialize of GoalStateV2
Fangjin98 Dec 7, 2021
f89bb7e
change consumer covert message into GoalStateV2
Fangjin98 Dec 7, 2021
e9ce7e0
Add publish GoalStateV2 test
Fangjin98 Dec 7, 2021
1dc9034
Merge branch 'master' into master
luyaoluo Dec 9, 2021
3e8cd96
Merge pull request #2 from Fangjin98/master
luyaoluo Dec 9, 2021
3d4f6a0
fix pulsar producer orderingKey bugs
Dec 9, 2021
4fc7762
Merge pull request #3 from Fangjin98/master
luyaoluo Dec 9, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ RUN echo "5--- installing openvswitch dependancies ---" && \
test -f /usr/bin/ovs-vsctl && rm -rf /usr/local/sbin/ov* /usr/local/bin/ov* /usr/local/bin/vtep* && \
cd ~

ENV PULSAR_RELEASE_TAG='pulsar-2.6.1'
ENV PULSAR_RELEASE_TAG='pulsar-2.8.1'
RUN echo "6--- installing pulsar dependacies ---" && \
mkdir -p /var/local/git/pulsar && \
wget https://archive.apache.org/dist/pulsar/${PULSAR_RELEASE_TAG}/DEB/apache-pulsar-client.deb -O /var/local/git/pulsar/apache-pulsar-client.deb && \
Expand Down
2 changes: 1 addition & 1 deletion build/aca-machine-init.sh
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ echo "6--- installing openvswitch dependancies ---" && \
test -f /usr/bin/ovs-vsctl && rm -rf /usr/local/sbin/ov* /usr/local/bin/ov* /usr/local/bin/vtep* && \
cd ~

PULSAR_RELEASE_TAG='pulsar-2.8.0'
PULSAR_RELEASE_TAG='pulsar-2.8.1'
echo "7--- installing pulsar dependacies ---" && \
mkdir -p /var/local/git/pulsar && \
wget https://archive.apache.org/dist/pulsar/${PULSAR_RELEASE_TAG}/DEB/apache-pulsar-client.deb -O /var/local/git/pulsar/apache-pulsar-client.deb && \
Expand Down
4 changes: 4 additions & 0 deletions include/aca_comm_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ class Aca_Comm_Manager {
int deserialize(const unsigned char *mq_buffer, size_t buffer_length,
alcor::schema::GoalState &parsed_struct);

int deserialize(const unsigned char *mq_buffer, size_t buffer_length,
alcor::schema::GoalStateV2 &parsed_struct);

int update_goal_state(alcor::schema::GoalState &goal_state_message,
alcor::schema::GoalStateOperationReply &gsOperationReply);

Expand All @@ -47,6 +50,7 @@ class Aca_Comm_Manager {
void print_goal_state(alcor::schema::GoalState parsed_struct);

void print_goal_state(alcor::schema::GoalStateV2 parsed_struct);

};
} // namespace aca_comm_manager
#endif
51 changes: 38 additions & 13 deletions include/aca_message_pulsar_consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include "pulsar/ConsumerConfiguration.h"
#include "pulsar/Message.h"
#include "pulsar/Result.h"
#include "pulsar/ConsumerType.h"
#include "pulsar/KeySharedPolicy.h"


using namespace pulsar;
Expand All @@ -29,35 +31,58 @@ namespace aca_message_pulsar
{
class ACA_Message_Pulsar_Consumer {
private:
string brokers_list; //IP addresses of pulsar brokers, format: pulsar:://<pulsar_host_ip>:<port>, example: pulsar://10.213.43.188:9092
string brokers_list; // IP addresses of pulsar brokers, format: pulsar:://<pulsar_host_ip>:<port>, example: pulsar://10.213.43.188:9092

string subscription_name; //Subscription name of the pulsar consumer
string multicast_subscription_name; // Subscription name of the multicast pulsar consumer
string unicast_subscription_name; // Subscription name of the unicast pulsar consumer

string topic_name; //A string representation of the topic to be consumed, for example: /hostid/00000000-0000-0000-0000-000000000000/netwconf/
string multicast_topic_name; //A string representation of the topic to be consumed, for example: /hostid/00000000-0000-0000-0000-000000000000/netwconf/
string unicast_topic_name;

ConsumerConfiguration consumer_config; //Configuration of the pulsar consumer
ConsumerConfiguration multicast_consumer_config; //Configuration of the mulitcast pulsar consumer
ConsumerConfiguration unicast_consumer_config; //Configuration of the unicast pulsar consumer

Client *ptr_client; //A pointer to the pulsar client
Client *ptr_multicast_client; //A pointer to the multicast pulsar client
Client *ptr_unicast_client; //A pointer to the unicast pulsar client

Consumer multicast_consumer;
Consumer unicast_consumer;

private:
void setMulticastSubscriptionName(string subscription_name);

void setUnicastSubscriptionName(string subscription_name);

void setBrokers(string brokers);

void setMulticastTopicName(string topic);

void setUnicastTopicName(string topic);



public:
ACA_Message_Pulsar_Consumer(string brokers, string subscription_name);
ACA_Message_Pulsar_Consumer(string topic, string brokers, string subscription_name);

~ACA_Message_Pulsar_Consumer();

string getBrokers() const;

string getLastTopicName() const;
string getMulticastTopicName() const;

string getSubscriptionName() const;
string getUnicastTopicName() const;

void setSubscriptionName(string subscription_name);
string getMulticastSubscriptionName() const;

bool consumeDispatched(string topic);
string getUnicastSubscriptionName() const;

private:
void setBrokers(string brokers);
bool multicastConsumerDispatched();

bool unicastConsumerDispatched(int stickyHash);

//static void listener(Consumer consumer, const Message& message);

void setLastTopicName(string topic);

};

} // namespace aca_message_pulsar
Expand Down
4 changes: 4 additions & 0 deletions include/aca_message_pulsar_producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,12 @@ class ACA_Message_Pulsar_Producer {

bool publish(string message);

bool publish(string message, string key);


private:
void setBrokers(string brokers);

};

} // aca_message_pulsar
Expand Down
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ target_link_libraries(AlcorControlAgent grpc)
target_link_libraries(AlcorControlAgent ${PROTOBUF_LIBRARY})
target_link_libraries(AlcorControlAgent ${_GRPC_GRPCPP_UNSECURE})


add_dependencies(AlcorControlAgentLib proto grpc)
add_subdirectory(proto3)
add_subdirectory(grpc)
14 changes: 10 additions & 4 deletions src/aca_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ using std::string;
// Defines
#define ACALOGNAME "AlcorControlAgent"
static char EMPTY_STRING[] = "";
static char BROKER_LIST[] = "pulsar://localhost:6502";
static char BROKER_LIST[] = "pulsar://localhost:6650";
static char PULSAR_TOPIC[] = "Host-ts-1";
static char PULSAR_SUBSCRIPTION_NAME[] = "Test-Subscription";
static char GRPC_SERVER_PORT[] = "50001";
Expand All @@ -59,6 +59,7 @@ GoalStateProvisionerClientImpl *g_grpc_client = NULL;
string g_broker_list = EMPTY_STRING;
string g_pulsar_topic = EMPTY_STRING;
string g_pulsar_subsription_name = EMPTY_STRING;
string g_pulsar_hashed_key = "0";
string g_grpc_server_port = EMPTY_STRING;
string g_ofctl_command = EMPTY_STRING;
string g_ofctl_target = EMPTY_STRING;
Expand Down Expand Up @@ -183,7 +184,7 @@ int main(int argc, char *argv[])
signal(SIGINT, aca_signal_handler);
signal(SIGTERM, aca_signal_handler);

while ((option = getopt(argc, argv, "a:p:b:h:g:s:c:t:o:md")) != -1) {
while ((option = getopt(argc, argv, "a:p:b:h:g:k:s:c:t:o:md")) != -1) {
switch (option) {
case 'a':
g_ncm_address = optarg;
Expand All @@ -200,6 +201,9 @@ int main(int argc, char *argv[])
case 'g':
g_pulsar_subsription_name = optarg;
break;
case 'k':
g_pulsar_hashed_key = optarg;
break;
case 's':
g_grpc_server_port = optarg;
break;
Expand All @@ -226,6 +230,7 @@ int main(int argc, char *argv[])
"\t\t[-b pulsar broker list]\n"
"\t\t[-h pulsar host topic to listen]\n"
"\t\t[-g pulsar subscription name]\n"
"\t\t[-k pulsar hashed key]\n"
"\t\t[-s gRPC server port\n"
"\t\t[-c ofctl command]\n"
"\t\t[-m enable demo mode]\n"
Expand Down Expand Up @@ -287,8 +292,9 @@ int main(int argc, char *argv[])
//// monitor br-tun for arp request message
//ACA_OVS_Control::get_instance().monitor("br-tun", "resume");

ACA_Message_Pulsar_Consumer network_config_consumer(g_broker_list, g_pulsar_subsription_name);
rc = network_config_consumer.consumeDispatched(g_pulsar_topic);
ACA_Message_Pulsar_Consumer network_config_consumer(g_pulsar_topic, g_broker_list, g_pulsar_subsription_name);
//network_config_consumer.multicastConsumerDispatched();
network_config_consumer.unicastConsumerDispatched(atoi(g_pulsar_hashed_key.c_str()));

pause();
aca_cleanup();
Expand Down
32 changes: 32 additions & 0 deletions src/comm/aca_comm_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,38 @@ Aca_Comm_Manager &Aca_Comm_Manager::get_instance()
return instance;
}

int Aca_Comm_Manager::deserialize(const unsigned char *mq_buffer,
size_t buffer_length, GoalStateV2 &parsed_struct)
{
int rc;

if (mq_buffer == NULL) {
rc = -EINVAL;
ACA_LOG_ERROR("Empty mq_buffer data rc: %d\n", rc);
return rc;
}

if (parsed_struct.IsInitialized() == false) {
rc = -EINVAL;
ACA_LOG_ERROR("Uninitialized parsed_struct rc: %d\n", rc);
return rc;
}

// Verify that the version of the library that we linked against is
// compatible with the version of the headers we compiled against.
GOOGLE_PROTOBUF_VERIFY_VERSION;

if (parsed_struct.ParseFromArray(mq_buffer, buffer_length)) {
ACA_LOG_INFO("%s", "Successfully converted message to protobuf struct\n");

return EXIT_SUCCESS;
} else {
rc = -EXIT_FAILURE;
ACA_LOG_ERROR("Failed to convert message to protobuf struct rc: %d\n", rc);
return rc;
}
}

int Aca_Comm_Manager::deserialize(const unsigned char *mq_buffer,
size_t buffer_length, GoalState &parsed_struct)
{
Expand Down
Loading