Skip to content

Commit

Permalink
enh(gorgone): use zmq ffi binding (#1057)
Browse files Browse the repository at this point in the history
Co-authored-by: Kevin Duret <kduret@centreon.com>
Co-authored-by: Laurent Pinsivy <lpinsivy@centreon.com>
  • Loading branch information
3 people authored and Evan-Adam committed Jul 16, 2024
1 parent 71570a9 commit 277877b
Show file tree
Hide file tree
Showing 100 changed files with 2,152 additions and 2,261 deletions.
134 changes: 91 additions & 43 deletions gorgone/gorgone/class/clientzmq.pm
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ use strict;
use warnings;
use gorgone::standard::library;
use gorgone::standard::misc;
use ZMQ::LibZMQ4;
use ZMQ::Constants qw(:all);
use Crypt::Mode::CBC;
use MIME::Base64;
use Scalar::Util;
use ZMQ::FFI qw(ZMQ_DONTWAIT);
use EV;

my $connectors = {};
my $callbacks = {};
Expand All @@ -37,9 +37,11 @@ my $sockets = {};
sub new {
my ($class, %options) = @_;
my $connector = {};
$connector->{context} = $options{context};
$connector->{logger} = $options{logger};
$connector->{identity} = $options{identity};
$connector->{extra_identity} = gorgone::standard::library::generate_token(length => 12);
$connector->{core_loop} = $options{core_loop};

$connector->{verbose_last_message} = '';
$connector->{config_core} = $options{config_core};
Expand Down Expand Up @@ -92,23 +94,26 @@ sub new {

sub init {
my ($self, %options) = @_;

$self->{handshake} = 0;
$sockets->{$self->{identity}} = gorgone::standard::library::connect_com(
delete $self->{server_pubkey};
$sockets->{ $self->{identity} } = gorgone::standard::library::connect_com(
context => $self->{context},
zmq_type => 'ZMQ_DEALER',
name => $self->{identity} . '-' . $self->{extra_identity},
logger => $self->{logger},
type => $self->{target_type},
path => $self->{target_path},
zmq_ipv6 => $self->{config_core}->{ipv6}
);
$callbacks->{$self->{identity}} = $options{callback} if (defined($options{callback}));
$callbacks->{ $self->{identity} } = $options{callback} if (defined($options{callback}));
}

sub close {
my ($self, %options) = @_;

zmq_close($sockets->{$self->{identity}});
delete $self->{core_watcher};
$sockets->{ $self->{identity} }->close();
}

sub get_connect_identity {
Expand All @@ -120,14 +125,23 @@ sub get_connect_identity {
sub get_server_pubkey {
my ($self, %options) = @_;

zmq_sendmsg($sockets->{$self->{identity}}, '[GETPUBKEY]', ZMQ_DONTWAIT);
zmq_poll([$self->get_poll()], 10000);
$sockets->{ $self->{identity} }->send('[GETPUBKEY]', ZMQ_DONTWAIT);
$self->event(identity => $self->{identity});

my $w1 = $self->{connect_loop}->timer(
10,
0,
sub {
$self->{connect_loop}->break();
}
);
$self->{connect_loop}->run();
}

sub read_key_protocol {
my ($self, %options) = @_;

$self->{logger}->writeLogDebug('[clientzmq] read key protocol: ' . $options{text});
$self->{logger}->writeLogDebug('[clientzmq] ' . $self->{identity} . ' - read key protocol: ' . $options{text});

return (-1, 'Wrong protocol') if ($options{text} !~ /^\[KEY\]\s+(.*)$/);

Expand Down Expand Up @@ -167,7 +181,7 @@ sub decrypt_message {
);
};
if ($@) {
$self->{logger}->writeLogError("[clientzmq] decrypt message issue: " . $@);
$self->{logger}->writeLogError("[clientzmq] $self->{identity} - decrypt message issue: " . $@);
return (-1, $@);
}
return (0, $plaintext);
Expand All @@ -176,6 +190,11 @@ sub decrypt_message {
sub client_get_secret {
my ($self, %options) = @_;

# there is an issue
if ($options{message} =~ /^\[ACK\]/) {
return (-1, "issue: $options{message}");
}

my $plaintext;
eval {
my $cryptedtext = MIME::Base64::decode_base64($options{message});
Expand All @@ -191,8 +210,10 @@ sub client_get_secret {
sub check_server_pubkey {
my ($self, %options) = @_;

$self->{logger}->writeLogDebug("[clientzmq] $self->{identity} - get_server_pubkey check [1]");

if ($options{message} !~ /^\s*\[PUBKEY\]\s+\[(.*?)\]/) {
$self->{logger}->writeLogError('[clientzmq] Cannot read pubbkey response from server: ' . $options{message}) if (defined($self->{logger}));
$self->{logger}->writeLogError('[clientzmq] ' . $self->{identity} . ' - cannot read pubbkey response from server: ' . $options{message}) if (defined($self->{logger}));
$self->{verbose_last_message} = 'cannot read pubkey response from server';
return 0;
}
Expand All @@ -206,7 +227,7 @@ sub check_server_pubkey {
);

if ($code == 0) {
$self->{logger}->writeLogError('[clientzmq] Cannot load pubbkey') if (defined($self->{logger}));
$self->{logger}->writeLogError('[clientzmq] ' . $self->{identity} . ' cannot load pubbkey') if (defined($self->{logger}));
$self->{verbose_last_message} = 'cannot load pubkey';
return 0;
}
Expand All @@ -225,6 +246,8 @@ sub check_server_pubkey {
}
}

$self->{logger}->writeLogDebug("[clientzmq] $self->{identity} - get_server_pubkey ok [1]");

return 1;
}

Expand All @@ -250,73 +273,74 @@ sub ping {
$self->send_message(action => $action, data => $options{data}, json_encode => $options{json_encode});
$status = 1;
}

if ($self->{ping_progress} == 1 &&
time() - $self->{ping_timeout_time} > $self->{ping_timeout}) {
$self->{logger}->writeLogError("[clientzmq] No ping response") if (defined($self->{logger}));
$self->{ping_progress} = 0;
# we delete the old one
for (my $i = 0; $i < scalar(@{$options{poll}}); $i++) {
if (Scalar::Util::refaddr($sockets->{$self->{identity}}) eq Scalar::Util::refaddr($options{poll}->[$i]->{socket})) {
splice @{$options{poll}}, $i, 1;
last;
}
}
zmq_close($sockets->{$self->{identity}});
$sockets->{ $self->{identity} }->close();
delete $self->{core_watcher};

$self->init();
push @{$options{poll}}, $self->get_poll();
$status = 1;
}

return $status;
}

sub get_poll {
sub add_watcher {
my ($self, %options) = @_;

return {
socket => $sockets->{$self->{identity}},
events => ZMQ_POLLIN,
callback => sub {
event(identity => $self->{identity});
$self->{core_watcher} = $self->{core_loop}->io(
$sockets->{ $self->{identity} }->get_fd(),
EV::READ,
sub {
$self->event(identity => $self->{identity});
}
};
);
}

sub event {
my (%options) = @_;

# We have a response. So it's ok :)
if ($connectors->{ $options{identity} }->{ping_progress} == 1) {
$connectors->{ $options{identity} }->{ping_progress} = 0;
}
my ($self, %options) = @_;

$connectors->{ $options{identity} }->{ping_time} = time();
while (1) {
my $message = gorgone::standard::library::zmq_dealer_read_message(socket => $sockets->{$options{identity}});
last if (!defined($message));
while ($sockets->{ $options{identity} }->has_pollin()) {
# We have a response. So it's ok :)
if ($connectors->{ $options{identity} }->{ping_progress} == 1) {
$connectors->{ $options{identity} }->{ping_progress} = 0;
}

my ($rv, $message) = gorgone::standard::library::zmq_dealer_read_message(socket => $sockets->{ $options{identity} });
last if ($rv);

# in progress
if ($connectors->{ $options{identity} }->{handshake} == 0) {
$self->{connect_loop}->break();
$connectors->{ $options{identity} }->{handshake} = 1;
if ($connectors->{ $options{identity} }->check_server_pubkey(message => $message) == 0) {
$connectors->{ $options{identity} }->{handshake} = -1;

}
} elsif ($connectors->{ $options{identity} }->{handshake} == 1) {
$self->{connect_loop}->break();

$self->{logger}->writeLogDebug("[clientzmq] $self->{identity} - client_get_secret recv [3]");
my ($status, $verbose, $symkey, $hostname) = $connectors->{ $options{identity} }->client_get_secret(
message => $message
);
if ($status == -1) {
$self->{logger}->writeLogDebug("[clientzmq] $self->{identity} - client_get_secret $verbose [3]");
$connectors->{ $options{identity} }->{handshake} = -1;
$connectors->{ $options{identity} }->{verbose_last_message} = $verbose;
return ;
}
$connectors->{ $options{identity} }->{handshake} = 2;
if (defined($connectors->{ $options{identity} }->{logger})) {
$connectors->{ $options{identity} }->{logger}->writeLogInfo(
"[clientzmq] Client connected successfully to '" . $connectors->{ $options{identity} }->{target_type} .
"[clientzmq] $self->{identity} - Client connected successfully to '" . $connectors->{ $options{identity} }->{target_type} .
"://" . $connectors->{ $options{identity} }->{target_path} . "'"
);
$self->add_watcher();
}
} else {
my ($rv, $data) = $connectors->{ $options{identity} }->decrypt_message(message => $message);
Expand All @@ -332,7 +356,7 @@ sub event {
} elsif (defined($callbacks->{$options{identity}})) {
$callbacks->{$options{identity}}->(identity => $options{identity}, data => $data);
}
}
}
}
}

Expand All @@ -357,14 +381,25 @@ sub zmq_send_message {
return undef;
}

zmq_sendmsg($options{socket}, $message, ZMQ_DONTWAIT);
$options{socket}->send($message, ZMQ_DONTWAIT);
$self->event(identity => $self->{identity});
}

sub send_message {
my ($self, %options) = @_;

if ($self->{handshake} == 0) {
$self->{connect_loop} = new EV::Loop();
$self->{connect_watcher} = $self->{connect_loop}->io(
$sockets->{ $self->{identity} }->get_fd(),
EV::READ,
sub {
$self->event(identity => $self->{identity});
}
);

if (!defined($self->{server_pubkey})) {
$self->{logger}->writeLogDebug("[clientzmq] $self->{identity} - get_server_pubkey sent [1]");
$self->get_server_pubkey();
} else {
$self->{handshake} = 1;
Expand All @@ -378,15 +413,27 @@ sub send_message {
client_pubkey => $self->{client_pubkey},
);
if ($status == -1) {
$self->{logger}->writeLogDebug("[clientzmq] $self->{identity} - client_helo crypt handshake issue [2]");
$self->{verbose_last_message} = 'crypt handshake issue';
return (-1, $self->{verbose_last_message});
}

$self->{logger}->writeLogDebug("[clientzmq] $self->{identity} - client_helo sent [2]");

$self->{verbose_last_message} = 'Handshake timeout';
zmq_sendmsg($sockets->{$self->{identity}}, $ciphertext, ZMQ_DONTWAIT);
zmq_poll([$self->get_poll()], 10000);
$sockets->{ $self->{identity} }->send($ciphertext, ZMQ_DONTWAIT);
$self->event(identity => $self->{identity});

my $w1 = $self->{connect_loop}->timer(
10,
0,
sub { $self->{connect_loop}->break(); }
);
$self->{connect_loop}->run();
}

undef $self->{connect_loop} if (defined($self->{connect_loop}));

if ($self->{handshake} < 2) {
$self->{handshake} = 0;
return (-1, $self->{verbose_last_message});
Expand All @@ -396,6 +443,7 @@ sub send_message {
socket => $sockets->{ $self->{identity} },
%options
);

return 0;
}

Expand Down
Loading

0 comments on commit 277877b

Please sign in to comment.