Skip to content

Add functions to opt-out "smart heartbeat" #218

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 105 additions & 28 deletions core/pbauto_heartbeat.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,43 @@

static struct HeartbeatWatcherData m_watcher;

static void heartbeat_thump(pubnub_t* pb, pubnub_t* heartbeat_pb);

static void start_heartbeat_timer(unsigned thumper_index)
{
size_t period_sec;
pubnub_mutex_lock(m_watcher.mutw);
struct pubnub_heartbeat_data* thumper =
&m_watcher.heartbeat_data[thumper_index];
pubnub_mutex_lock(m_watcher.timerlock);
bool const is_started = m_watcher.heartbeat_timers[thumper_index] > 0;
pubnub_mutex_unlock(m_watcher.timerlock);
pubnub_t* pb = thumper->pb;
pubnub_t* heartbeat_pb = thumper->heartbeat_pb;
bool announce_presence = false;
if (NULL != pb) {
pubnub_mutex_lock(pb->thumper_monitor);
announce_presence = pb->should_announce_presence;
pb->should_announce_presence = false;
if (announce_presence) pb->waiting_announce_presence = true;
pubnub_mutex_unlock(pb->thumper_monitor);
}
pubnub_mutex_unlock(m_watcher.mutw);
// Making initial heartbeat only if user disabled "smart heartbeat" for
// backward compatibility.
if (announce_presence && NULL != pb && !pb->use_smart_heartbeat)
heartbeat_thump(pb, heartbeat_pb);

if (is_started && NULL != pb && !pb->use_smart_heartbeat) {
PUBNUB_LOG_TRACE("--->start_heartbeat_timer(%u). Already started, Skip.\n",
thumper_index);
return;
}

PUBNUB_ASSERT_OPT(thumper_index < PUBNUB_MAX_HEARTBEAT_THUMPERS);

PUBNUB_LOG_TRACE("--->start_heartbeat_timer(%u).\n", thumper_index);
PUBNUB_LOG_TRACE("--->start_heartbeat_timer(%u). Start.\n", thumper_index);
pubnub_mutex_lock(m_watcher.mutw);
period_sec = m_watcher.heartbeat_data[thumper_index].period_sec;
const size_t period_sec = thumper->period_sec;
pubnub_mutex_unlock(m_watcher.mutw);

pubnub_mutex_lock(m_watcher.timerlock);
Expand Down Expand Up @@ -102,7 +129,6 @@ static int copy_context_settings(pubnub_t* pb_clone, pubnub_t const* pb)
return 0;
}


static bool pubsub_keys_changed(pubnub_t const* pb_clone, pubnub_t const* pb)
{
return (pb_clone->core.publish_key != pb->core.publish_key)
Expand All @@ -129,7 +155,7 @@ static void heartbeat_thump(pubnub_t* pb, pubnub_t* heartbeat_pb)
PUBNUB_ASSERT_OPT(pb_valid_ctx_ptr(pb));
PUBNUB_ASSERT_OPT(pb_valid_ctx_ptr(heartbeat_pb));

pubnub_mutex_lock(pb->monitor);
pubnub_mutex_lock(pb->thumper_monitor);
pubnub_mutex_lock(heartbeat_pb->monitor);
keys_changed = pubsub_keys_changed(heartbeat_pb, pb);
pubnub_mutex_unlock(heartbeat_pb->monitor);
Expand All @@ -143,7 +169,7 @@ static void heartbeat_thump(pubnub_t* pb, pubnub_t* heartbeat_pb)
#else
add_heartbeat_in_progress(pb->thumperIndex);
#endif
pubnub_mutex_unlock(pb->monitor);
pubnub_mutex_unlock(pb->thumper_monitor);
pubnub_cancel(heartbeat_pb);
return;
}
Expand Down Expand Up @@ -175,7 +201,7 @@ static void heartbeat_thump(pubnub_t* pb, pubnub_t* heartbeat_pb)
add_heartbeat_in_progress(pb->thumperIndex);
#endif
}
pubnub_mutex_unlock(pb->monitor);
pubnub_mutex_unlock(pb->thumper_monitor);
}


Expand All @@ -191,16 +217,27 @@ static void auto_heartbeat_callback(pubnub_t* heartbeat_pb,
/* Maybe something should be done with this */
pubnub_get(heartbeat_pb);

pubnub_mutex_lock(m_watcher.mutw);
struct pubnub_heartbeat_data thumper = m_watcher.heartbeat_data[thumper_index];
pubnub_t *pb = thumper.pb;
// Check whether this is awaited announce on channels / groups change or not.
if (NULL != pb && !pb->use_smart_heartbeat) {
pubnub_mutex_lock(pb->thumper_monitor);
if (pb->waiting_announce_presence) {
pb->waiting_announce_presence = false;
pubnub_mutex_unlock(pb->thumper_monitor);
pubnub_mutex_unlock(m_watcher.mutw);
return;
}
pubnub_mutex_unlock(pb->thumper_monitor);
}
pubnub_mutex_unlock(m_watcher.mutw);

if (PNR_OK == result) {
/* Start heartbeat timer */
start_heartbeat_timer(thumper_index);
}
else {
pubnub_t* pb;
pubnub_mutex_lock(m_watcher.mutw);
pb = m_watcher.heartbeat_data[thumper_index].pb;
pubnub_mutex_unlock(m_watcher.mutw);

if (result != PNR_CANCELLED) {
PUBNUB_LOG_WARNING("punbub_heartbeat(heartbeat_pb=%p) failed with "
"code: %d('%s') - "
Expand All @@ -212,13 +249,12 @@ static void auto_heartbeat_callback(pubnub_t* heartbeat_pb,
heartbeat_thump(pb, heartbeat_pb);
}
else if (pb != NULL) {
pubnub_mutex_lock(pb->monitor);
if (pubsub_keys_changed(heartbeat_pb, pb)) {
pubnub_init(
heartbeat_pb, pb->core.publish_key, pb->core.subscribe_key);

heartbeat_thump(pb, heartbeat_pb);
}
pubnub_mutex_unlock(pb->monitor);
}
}
}
Expand Down Expand Up @@ -297,6 +333,7 @@ static void handle_heartbeat_timers(int elapsed_ms)
unsigned thumper_index = indexes[i];
int remains_ms = m_watcher.heartbeat_timers[thumper_index] - elapsed_ms;
if (remains_ms <= 0) {
m_watcher.heartbeat_timers[thumper_index] = 0;
struct pubnub_heartbeat_data* thumper =
&m_watcher.heartbeat_data[thumper_index];
pubnub_t* pb;
Expand Down Expand Up @@ -439,17 +476,17 @@ int pubnub_set_heartbeat_period(pubnub_t* pb, size_t period_sec)
PUBNUB_ASSERT(pb_valid_ctx_ptr(pb));
PUBNUB_ASSERT(period_sec > 0);

pubnub_mutex_lock(pb->monitor);
pubnub_mutex_lock(pb->thumper_monitor);
if (UNASSIGNED == pb->thumperIndex) {
pubnub_mutex_unlock(pb->monitor);
pubnub_mutex_unlock(pb->thumper_monitor);
PUBNUB_LOG_ERROR("Error: pubnub_set_heartbeat_period(pb=%p) - "
"Auto heartbeat is desabled.\n",
pb);
return -1;
}
pubnub_mutex_lock(m_watcher.mutw);
m_watcher.heartbeat_data[pb->thumperIndex].period_sec = period_sec;
pubnub_mutex_unlock(pb->monitor);
pubnub_mutex_unlock(pb->thumper_monitor);
pubnub_mutex_unlock(m_watcher.mutw);

return 0;
Expand All @@ -460,15 +497,15 @@ int pubnub_enable_auto_heartbeat(pubnub_t* pb, size_t period_sec)
{
PUBNUB_ASSERT(pb_valid_ctx_ptr(pb));

pubnub_mutex_lock(pb->monitor);
pubnub_mutex_lock(pb->thumper_monitor);
if (UNASSIGNED == pb->thumperIndex) {
if (form_heartbeat_thumper(pb) != 0) {
pubnub_mutex_unlock(pb->monitor);
pubnub_mutex_unlock(pb->thumper_monitor);

return -1;
}
}
pubnub_mutex_unlock(pb->monitor);
pubnub_mutex_unlock(pb->thumper_monitor);

return pubnub_set_heartbeat_period(pb, period_sec);
}
Expand Down Expand Up @@ -550,14 +587,14 @@ void pubnub_disable_auto_heartbeat(pubnub_t* pb)
{
PUBNUB_ASSERT(pb_valid_ctx_ptr(pb));

pubnub_mutex_lock(pb->monitor);
pubnub_mutex_lock(pb->thumper_monitor);
if (is_exempted(pb, pb->thumperIndex)) {
pubnub_mutex_unlock(pb->monitor);
pubnub_mutex_unlock(pb->thumper_monitor);
return;
}
release_thumper(pb->thumperIndex);
pb->thumperIndex = UNASSIGNED;
pubnub_mutex_unlock(pb->monitor);
pubnub_mutex_unlock(pb->thumper_monitor);
}


Expand All @@ -567,18 +604,37 @@ bool pubnub_is_auto_heartbeat_enabled(pubnub_t* pb)

PUBNUB_ASSERT(pb_valid_ctx_ptr(pb));

pubnub_mutex_lock(pb->monitor);
pubnub_mutex_lock(pb->thumper_monitor);
rslt = (pb->thumperIndex != UNASSIGNED);
pubnub_mutex_unlock(pb->monitor);
pubnub_mutex_unlock(pb->thumper_monitor);

return rslt;
}

void pubnub_enable_smart_heartbeat(pubnub_t* pb)
{
PUBNUB_ASSERT_OPT(pb_valid_ctx_ptr(pb));

pubnub_mutex_lock(pb->thumper_monitor);
pb->use_smart_heartbeat = true;
pubnub_mutex_unlock(pb->thumper_monitor);
}

void pubnub_disable_smart_heartbeat(pubnub_t* pb)
{
PUBNUB_ASSERT_OPT(pb_valid_ctx_ptr(pb));

pubnub_mutex_lock(pb->thumper_monitor);
pb->use_smart_heartbeat = false;
pubnub_mutex_unlock(pb->thumper_monitor);
}


void pbauto_heartbeat_free_channelInfo(pubnub_t* pb)
{
PUBNUB_ASSERT_OPT(pb_valid_ctx_ptr(pb));

pubnub_mutex_lock(pb->thumper_monitor);
if (pb->channelInfo.channel != NULL) {
free(pb->channelInfo.channel);
pb->channelInfo.channel = NULL;
Expand All @@ -587,18 +643,21 @@ void pbauto_heartbeat_free_channelInfo(pubnub_t* pb)
free(pb->channelInfo.channel_group);
pb->channelInfo.channel_group = NULL;
}
pubnub_mutex_unlock(pb->thumper_monitor);
}


void pbauto_heartbeat_read_channelInfo(pubnub_t const* pb,
void pbauto_heartbeat_read_channelInfo(pubnub_t* pb,
char const** channel,
char const** channel_group)
{
PUBNUB_ASSERT_OPT(channel != NULL);
PUBNUB_ASSERT_OPT(channel_group != NULL);

pubnub_mutex_lock(pb->thumper_monitor);
*channel = pb->channelInfo.channel;
*channel_group = pb->channelInfo.channel_group;
pubnub_mutex_unlock(pb->thumper_monitor);
}

#if defined _WIN32
Expand All @@ -625,6 +684,18 @@ static enum pubnub_res write_auto_heartbeat_channelInfo(pubnub_t* pb,
{
PUBNUB_ASSERT_OPT((channel != NULL) || (channel_group != NULL));

pubnub_mutex_lock(pb->thumper_monitor);
if (channel != NULL) {
if (NULL == pb->channelInfo.channel
|| 0 != strcmp(pb->channelInfo.channel, channel))
pb->should_announce_presence = true;
}
if (channel_group != NULL) {
if (NULL == pb->channelInfo.channel_group
|| 0 != strcmp(pb->channelInfo.channel_group, channel_group))
pb->should_announce_presence = true;
}

pbauto_heartbeat_free_channelInfo(pb);
if (channel != NULL) {
pb->channelInfo.channel = strndup(channel, PUBNUB_MAX_OBJECT_LENGTH - 1);
Expand All @@ -635,6 +706,7 @@ static enum pubnub_res write_auto_heartbeat_channelInfo(pubnub_t* pb,
"channel = '%s'\n",
pb,
channel);
pubnub_mutex_unlock(pb->thumper_monitor);
return PNR_OUT_OF_MEMORY;
}
}
Expand All @@ -652,10 +724,11 @@ static enum pubnub_res write_auto_heartbeat_channelInfo(pubnub_t* pb,
free(pb->channelInfo.channel);
pb->channelInfo.channel = NULL;
}
pubnub_mutex_unlock(pb->thumper_monitor);
return PNR_OUT_OF_MEMORY;
}
}

pubnub_mutex_unlock(pb->thumper_monitor);
return PNR_OK;
}

Expand Down Expand Up @@ -740,7 +813,8 @@ static void stop_auto_heartbeat(unsigned thumper_index)
pubnub_t* heartbeat_pb;

pubnub_mutex_lock(m_watcher.mutw);
heartbeat_pb = m_watcher.heartbeat_data[thumper_index].heartbeat_pb;
struct pubnub_heartbeat_data thump = m_watcher.heartbeat_data[thumper_index];
heartbeat_pb = thump.heartbeat_pb;
pubnub_mutex_unlock(m_watcher.mutw);

stop_heartbeat(heartbeat_pb, thumper_index);
Expand All @@ -760,6 +834,9 @@ void pbauto_heartbeat_transaction_ongoing(pubnub_t const* pb)
/*FALLTHRU*/
case PBTT_SUBSCRIBE:
case PBTT_SUBSCRIBE_V2:
// Ignore implicit heartbeat if "smart heartbeat" is disabled.
if (!pb->use_smart_heartbeat) return;

stop_auto_heartbeat(pb->thumperIndex);
break;
default:
Expand Down
17 changes: 15 additions & 2 deletions core/pbauto_heartbeat.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,24 @@ struct HeartbeatWatcherData {

/** Pubnub context fields for heartbeat info used by the module for keeping presence.
*/
#define M_heartbeatInfo() unsigned thumperIndex;
#if PUBNUB_THREADSAFE
#define M_heartbeatInfo() \
pubnub_mutex_t thumper_monitor; \
unsigned thumperIndex; \
bool use_smart_heartbeat; \
bool should_announce_presence; \
bool waiting_announce_presence;
#else
#define M_heartbeatInfo() \
unsigned thumperIndex; \
bool use_smart_heartbeat; \
bool should_announce_presence; \
bool waiting_announce_presence;
#endif

/** Reads channel and channel groups saved(subscribed on)
*/
void pbauto_heartbeat_read_channelInfo(pubnub_t const* pb,
void pbauto_heartbeat_read_channelInfo(pubnub_t* pb,
char const** channel,
char const** channel_group);

Expand Down
3 changes: 3 additions & 0 deletions core/pubnub_alloc_static.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ void pballoc_free_at_last(pubnub_t *pb)
pbpal_free(pb);
pubnub_mutex_unlock(pb->monitor);
pubnub_mutex_destroy(pb->monitor);
#if PUBNUB_USE_AUTO_HEARTBEAT
pubnub_mutex_destroy(pb->thumper_monitor);
#endif
}


Expand Down
3 changes: 3 additions & 0 deletions core/pubnub_alloc_std.c
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ void pballoc_free_at_last(pubnub_t* pb)
remove_allocated(pb);
pubnub_mutex_unlock(pb->monitor);
pubnub_mutex_destroy(pb->monitor);
#if PUBNUB_USE_AUTO_HEARTBEAT
pubnub_mutex_destroy(pb->thumper_monitor);
#endif
pubnub_mutex_unlock(m_lock);
free(pb);
}
Expand Down
Loading
Loading