Skip to content
This repository was archived by the owner on Jul 8, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions cpp_test_suite/new_tests/cxx_dserver_misc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,26 @@
#undef SUITE_NAME
#define SUITE_NAME DServerMiscTestSuite

struct EventCallback : public Tango::CallBack
{
EventCallback()
: num_of_all_events(0)
, num_of_error_events(0)
{}

void push_event(Tango::EventData* event)
{
num_of_all_events++;
if (event->err)
{
num_of_error_events++;
}
}

int num_of_all_events;
int num_of_error_events;
};

class DServerMiscTestSuite: public CxxTest::TestSuite
{
protected:
Expand Down Expand Up @@ -203,6 +223,38 @@ cout << "str = " << str << endl;
TS_ASSERT(dserver->info().server_id == full_ds_name);
TS_ASSERT(dserver->info().server_version == server_version);
}

/* Tests that subscriber can receive events immediately after
* a device restart without a need to wait for re-subscription.
*/
void test_event_subscription_recovery_after_device_restart()
{
EventCallback callback{};

std::string attribute_name = "event_change_tst";

TS_ASSERT_THROWS_NOTHING(device1->subscribe_event(
attribute_name,
Tango::USER_EVENT,
&callback));

TS_ASSERT_THROWS_NOTHING(device1->command_inout("IOPushEvent"));
Tango_sleep(2);
TS_ASSERT_EQUALS(2, callback.num_of_all_events);
TS_ASSERT_EQUALS(0, callback.num_of_error_events);

{
Tango::DeviceData input{};
input << device1_name;
TS_ASSERT_THROWS_NOTHING(dserver->command_inout("DevRestart", input));
}

TS_ASSERT_THROWS_NOTHING(device1->command_inout("IOPushEvent"));
Tango_sleep(2);
TS_ASSERT_EQUALS(3, callback.num_of_all_events);
TS_ASSERT_EQUALS(0, callback.num_of_error_events);
}

};
#undef cout
#endif // DServerMiscTestSuite_h
3 changes: 2 additions & 1 deletion cppapi/server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ set(HEADERS attrdesc.h
w_pipe.tpp
subdev_diag.h
encoded_attribute.h
encoded_format.h)
encoded_format.h
event_subscription_state.h)

add_subdirectory(idl)
add_subdirectory(jpeg)
Expand Down
21 changes: 9 additions & 12 deletions cppapi/server/attribute.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5895,21 +5895,18 @@ bool Attribute::data_ready_event_subscribed()
//
//--------------------------------------------------------------------------------------------------------------------

void Attribute::set_client_lib(int _l,std::string &ev_name)
void Attribute::set_client_lib(int client_lib_version, EventType event_type)
{
cout4 << "Attribute::set_client_lib(" << _l << "," << ev_name << ")" << std::endl;
int i;
for (i = 0; i < numEventType; i++)
{
if (ev_name == EventName[i])
{
break;
}
}
cout4 << "Attribute::set_client_lib("
<< client_lib_version << ","
<< EventName[event_type] << ")" << std::endl;

if (count(client_lib[i].begin(), client_lib[i].end(), _l) == 0)
if (0 == count(
client_lib[event_type].begin(),
client_lib[event_type].end(),
client_lib_version))
{
client_lib[i].push_back(_l);
client_lib[event_type].push_back(client_lib_version);
}
}

Expand Down
4 changes: 2 additions & 2 deletions cppapi/server/attribute.h
Original file line number Diff line number Diff line change
Expand Up @@ -2302,9 +2302,9 @@ class Attribute
bool is_mem_exception() {return att_mem_exception;}
virtual bool is_fwd_att() {return false;}

void set_client_lib(int,std::string &);
void set_client_lib(int, EventType);
std::vector<int> &get_client_lib(EventType _et) {return client_lib[_et];}
void remove_client_lib(int,const std::string &);
void remove_client_lib(int, const std::string &);

void add_config_5_specific(AttributeConfig_5 &);
void add_startup_exception(std::string,const DevFailed &);
Expand Down
10 changes: 5 additions & 5 deletions cppapi/server/device.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6106,17 +6106,17 @@ void DeviceImpl::remove_local_command(const std::string &cmd_name)
//
//------------------------------------------------------------------------------------------------------------------

void DeviceImpl::get_event_param(std::vector<EventPar> &eve)
void DeviceImpl::get_event_param(EventSubscriptionStates& eve)
{
ZmqEventSupplier *event_supplier_zmq = Util::instance()->get_zmq_event_supplier();

if (event_supplier_zmq->any_dev_intr_client(this) == true)
{
EventPar ep;
EventSubscriptionState ep;

ep.notifd = false;
ep.zmq = true;
ep.attr_id = -1;
ep.attribute_name = "";
ep.quality = false;
ep.data_ready = false;
ep.dev_intr_change = true;
Expand All @@ -6139,11 +6139,11 @@ void DeviceImpl::get_event_param(std::vector<EventPar> &eve)
//
//------------------------------------------------------------------------------------------------------------------

void DeviceImpl::set_event_param(std::vector<EventPar> &eve)
void DeviceImpl::set_event_param(const EventSubscriptionStates& eve)
{
for (size_t loop = 0; loop < eve.size(); loop++)
{
if (eve[loop].attr_id == -1)
if (eve[loop].attribute_name.empty())
{
if (eve[loop].dev_intr_change == true)
{
Expand Down
5 changes: 3 additions & 2 deletions cppapi/server/device.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include <deviceclass.h>
#include <devintr.h>
#include <dintrthread.h>
#include "event_subscription_state.h"

namespace Tango
{
Expand Down Expand Up @@ -3423,8 +3424,8 @@ class DeviceImpl : public virtual POA_Tango::Device
void disable_intr_change_ev() {intr_change_ev = false;}
bool is_intr_change_ev_enable() {return intr_change_ev;}

void get_event_param(std::vector<EventPar> &);
void set_event_param(std::vector<EventPar> &);
void get_event_param(EventSubscriptionStates&);
void set_event_param(const EventSubscriptionStates&);

void set_client_lib(int _l) {if (count(client_lib.begin(),client_lib.end(),_l)==0)client_lib.push_back(_l);}

Expand Down
14 changes: 7 additions & 7 deletions cppapi/server/dserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -882,7 +882,7 @@ void DServer::restart(std::string &d_name)

std::vector<PollObj *> &p_obj = dev_to_del->get_poll_obj_list();
std::vector<Pol> dev_pol;
std::vector<EventPar> eve;
EventSubscriptionStates eve;

for (i = 0;i < p_obj.size();i++)
{
Expand Down Expand Up @@ -1193,7 +1193,7 @@ void ServRestartThread::run(void *ptr)
// Memorize event parameters and devices interface
//

std::map<std::string,std::vector<EventPar> > map_events;
ServerEventSubscriptionState map_events;
std::map<std::string,DevIntr> map_dev_inter;

dev->mem_event_par(map_events);
Expand Down Expand Up @@ -1977,14 +1977,14 @@ void DServer::mcast_event_for_att(std::string &dev_name,std::string &att_name,st
//
//------------------------------------------------------------------------------------------------------------------

void DServer::mem_event_par(std::map<std::string,std::vector<EventPar> > &_map)
void DServer::mem_event_par(ServerEventSubscriptionState& _map)
{
for (size_t i = 0;i < class_list.size();i++)
{
std::vector<DeviceImpl *> &dev_list = class_list[i]->get_device_list();
for (size_t j = 0;j < dev_list.size();j++)
{
std::vector<EventPar> eve;
EventSubscriptionStates eve;
dev_list[j]->get_device_attr()->get_event_param(eve);
dev_list[j]->get_event_param(eve);

Expand All @@ -2010,16 +2010,16 @@ void DServer::mem_event_par(std::map<std::string,std::vector<EventPar> > &_map)
//
//------------------------------------------------------------------------------------------------------------------

void DServer::apply_event_par(std::map<std::string,std::vector<EventPar> > &_map)
void DServer::apply_event_par(const ServerEventSubscriptionState& _map)
{
for (size_t i = 0;i < class_list.size();i++)
{
std::vector<DeviceImpl *> &dev_list = class_list[i]->get_device_list();
for (size_t j = 0;j < dev_list.size();j++)
{
std::string &dev_name = dev_list[j]->get_name();
std::map<std::string,std::vector<EventPar> >::iterator ite;
ite = _map.find(dev_name);

const auto ite = _map.find(dev_name);

if (ite != _map.end())
{
Expand Down
5 changes: 3 additions & 2 deletions cppapi/server/dserver.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#define _DSERVER_H

#include <tango.h>
#include "event_subscription_state.h"

namespace Tango
{
Expand Down Expand Up @@ -128,8 +129,8 @@ public :
void _create_cpp_class(const char *c1,const char *c2) {this->create_cpp_class(c1,c2);}

void mcast_event_for_att(std::string &,std::string &,std::vector<std::string> &);
void mem_event_par(std::map<std::string, std::vector<EventPar> > &);
void apply_event_par(std::map<std::string,std::vector<EventPar> > &);
void mem_event_par(ServerEventSubscriptionState&);
void apply_event_par(const ServerEventSubscriptionState&);

void mem_devices_interface(std::map<std::string,DevIntr> &);
void changed_devices_interface(std::map<std::string,DevIntr> &);
Expand Down
37 changes: 37 additions & 0 deletions cppapi/server/event_subscription_state.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#ifndef _EVENT_SUBSCRIPTION_STATE_H
#define _EVENT_SUBSCRIPTION_STATE_H

#include <map>
#include <vector>
#include <string>

namespace Tango
{

using EventClientLibVersion = int;
using EventClientLibVersions = std::vector<EventClientLibVersion>;

struct EventSubscriptionState
{
std::string attribute_name;

EventClientLibVersions change;
EventClientLibVersions archive;
EventClientLibVersions periodic;
EventClientLibVersions user;
EventClientLibVersions att_conf;

bool quality;
bool data_ready;
bool dev_intr_change;

bool notifd;
bool zmq;
};

using EventSubscriptionStates = std::vector<EventSubscriptionState>;
using ServerEventSubscriptionState = std::map<std::string, EventSubscriptionStates>;

}

#endif
11 changes: 7 additions & 4 deletions cppapi/server/eventcmds.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -581,10 +581,13 @@ void DServer::event_subscription(std::string &dev_name,std::string &obj_name,std
//

if (client_lib != 0)
{
omni_mutex_lock oml(EventSupplier::get_event_mutex());
attribute.set_client_lib(client_lib,event);
}
{
EventType event_type = CHANGE_EVENT;
tg->event_name_2_event_type(event, event_type);

omni_mutex_lock oml(EventSupplier::get_event_mutex());
attribute.set_client_lib(client_lib, event_type);
}
}
else if (event == EventName[PIPE_EVENT])
{
Expand Down
29 changes: 22 additions & 7 deletions cppapi/server/multiattribute.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1449,7 +1449,7 @@ void MultiAttribute::read_alarm(std::string &status)
//
//------------------------------------------------------------------------------------------------------------------

void MultiAttribute::get_event_param(std::vector<EventPar> &eve)
void MultiAttribute::get_event_param(EventSubscriptionStates& eve)
{
unsigned int i;

Expand Down Expand Up @@ -1508,7 +1508,7 @@ void MultiAttribute::get_event_param(std::vector<EventPar> &eve)

if (once_more == true)
{
EventPar ep;
EventSubscriptionState ep;

if (attr_list[i]->use_notifd_event() == true)
ep.notifd = true;
Expand All @@ -1520,7 +1520,7 @@ void MultiAttribute::get_event_param(std::vector<EventPar> &eve)
else
ep.zmq = false;

ep.attr_id = i;
ep.attribute_name = attr_list[i]->get_name();
ep.change = ch;
ep.quality = qu;
ep.archive = ar;
Expand Down Expand Up @@ -1548,46 +1548,61 @@ void MultiAttribute::get_event_param(std::vector<EventPar> &eve)
//
//------------------------------------------------------------------------------------------------------------------

void MultiAttribute::set_event_param(std::vector<EventPar> &eve)
void MultiAttribute::set_event_param(const EventSubscriptionStates& eve)
{
for (size_t i = 0;i < eve.size();i++)
{
if (eve[i].attr_id != -1)
if (! eve[i].attribute_name.empty())
{
Tango::Attribute &att = get_attr_by_ind(eve[i].attr_id);
Tango::Attribute &att = get_attr_by_name(eve[i].attribute_name.c_str());

{
omni_mutex_lock oml(EventSupplier::get_event_mutex());
std::vector<int>::iterator ite;
std::vector<int>::const_iterator ite;

if (eve[i].change.empty() == false)
{
for (ite = eve[i].change.begin();ite != eve[i].change.end();++ite)
{
att.set_change_event_sub(*ite);
att.set_client_lib(*ite, CHANGE_EVENT);
}
}

if (eve[i].periodic.empty() == false)
{
for (ite = eve[i].periodic.begin();ite != eve[i].periodic.end();++ite)
{
att.set_periodic_event_sub(*ite);
att.set_client_lib(*ite, PERIODIC_EVENT);
}
}

if (eve[i].archive.empty() == false)
{
for (ite = eve[i].archive.begin();ite != eve[i].archive.end();++ite)
{
att.set_archive_event_sub(*ite);
att.set_client_lib(*ite, ARCHIVE_EVENT);
}
}

if (eve[i].att_conf.empty() == false)
{
for (ite = eve[i].att_conf.begin();ite != eve[i].att_conf.end();++ite)
{
att.set_att_conf_event_sub(*ite);
att.set_client_lib(*ite, ATTR_CONF_EVENT);
}
}

if (eve[i].user.empty() == false)
{
for (ite = eve[i].user.begin();ite != eve[i].user.end();++ite)
{
att.set_user_event_sub(*ite);
att.set_client_lib(*ite, USER_EVENT);
}
}

if (eve[i].quality == true)
Expand Down
Loading