Skip to content

Commit 6ecb48d

Browse files
committed
Make the whole thing event driven
1 parent 1d6db52 commit 6ecb48d

File tree

3 files changed

+158
-64
lines changed

3 files changed

+158
-64
lines changed

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ mqtt.connect
4949
Publish message to topic.
5050

5151
```ruby
52+
while !mqtt.connected do
53+
ESP32::System.delay(100)
54+
end
5255
mqtt.publish("topic", 'message')
5356
```
5457

mrblib/mrb_esp32_mqtt.rb

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,57 @@
11
module ESP32
22
module MQTT
33
class Client
4-
attr_accessor :ca, :cert, :key
4+
attr_accessor :ca, :cert, :key, :connected
55

66
def initialize(host, port)
7-
@callbacks = {}
7+
self._initialize(host, port)
88

9-
# C calls the block given here with every received message.
10-
self._initialize(host, port) do |topic, message|
11-
@callbacks[topic].call(message) if @callbacks[topic]
9+
@connected = false
10+
@connect_callbacks = []
11+
@message_callbacks = {}
12+
13+
self.set_connected_handler do
14+
@connected = true
15+
@connect_callbacks.each { |cb| cb.call }
16+
@connect_callbacks = []
17+
end
18+
19+
self.set_disconnected_handler do
20+
@connected = false
21+
end
22+
23+
self.set_unsubscribed_handler do |topic|
24+
@message_callbacks[topic] = nil
25+
end
26+
27+
# C calls this block with every received message.
28+
self.set_data_handler do |topic, message|
29+
@message_callbacks[topic].call(message) if @message_callbacks[topic]
1230
end
1331
end
1432

1533
def subscribe(topic, &block)
16-
@callbacks[topic] = block if block
17-
self._subscribe(topic)
34+
@message_callbacks[topic] = block if block
35+
36+
# Take semaphore
37+
38+
if @connected
39+
self._subscribe(topic)
40+
else
41+
self.on_connect do
42+
self._subscribe(topic)
43+
end
44+
end
45+
46+
# Release semaphore
1847
end
1948

49+
def on_connect(&block)
50+
@connect_callbacks << block
51+
end
52+
2053
def on_message_from(topic, &block)
21-
@callbacks[topic] = block if block
54+
@message_callbacks[topic] = block
2255
end
2356
end
2457
end

src/mrb_esp32_mqtt.c

Lines changed: 114 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,7 @@
1313
#include "esp_log.h"
1414
#include "mqtt_client.h"
1515

16-
1716
#define TAG ("mruby-esp32-mqtt")
18-
#define WAIT_EVENT_TIMEOUT_SEC (20)
19-
#define WAIT_EVENT_QUEUE_LEN (10)
2017

2118
static void mrb_mqtt_client_free(mrb_state *mrb, void *p);
2219

@@ -30,31 +27,62 @@ typedef struct mqtt_client_t {
3027
mrb_int port;
3128
mrb_bool ssl;
3229
esp_mqtt_client_handle_t client;
33-
QueueHandle_t queue;
34-
TaskHandle_t main_task_handle;
35-
mrb_value message_proc;
30+
31+
TaskHandle_t mruby_task_handle;
32+
mrb_value connected_proc;
33+
mrb_value disconnected_proc;
34+
mrb_value unsubscribed_proc;
35+
mrb_value data_proc;
3636
} mqtt_client_t;
3737

38+
39+
static void
40+
mqtt_connected_handler(mqtt_client_t *client, esp_mqtt_event_handle_t event) {
41+
// Get semaphore.
42+
43+
// Call @connected_proc.
44+
mrb_assert(mrb_type(client->connected_proc) == MRB_TT_PROC);
45+
mrb_yield_argv(client->mrb, client->connected_proc, 0, NULL);
46+
47+
// Release semaphore.
48+
}
49+
50+
static void
51+
mqtt_disconnected_handler(mqtt_client_t *client, esp_mqtt_event_handle_t event) {
52+
// Get semaphore.
53+
54+
// Call @disconnected_proc.
55+
mrb_assert(mrb_type(client->disconnected_proc) == MRB_TT_PROC);
56+
mrb_yield_argv(client->mrb, client->disconnected_proc, 0, NULL);
57+
58+
// Release semaphore.
59+
}
60+
3861
static void
39-
mqtt_message_handler(mqtt_client_t *client, esp_mqtt_event_handle_t event) {
40-
// Suspend main task.
41-
vTaskSuspend(client->main_task_handle);
42-
int arena_index = mrb_gc_arena_save(client->mrb);
62+
mqtt_unsubscribed_handler(mqtt_client_t *client, esp_mqtt_event_handle_t event) {
63+
// Get semaphore.
4364

44-
// Check message_proc is a a proc?
45-
mrb_assert(mrb_type(client->message_proc) == MRB_TT_PROC);
65+
// Call @unsubscribed_proc.
66+
mrb_assert(mrb_type(client->unsubscribed_proc) == MRB_TT_PROC);
67+
mrb_yield_argv(client->mrb, client->unsubscribed_proc, 0, NULL);
68+
69+
// Release semaphore.
70+
}
71+
72+
static void
73+
mqtt_data_handler(mqtt_client_t *client, esp_mqtt_event_handle_t event) {
74+
// Get semaphore.
4675

4776
// Prep arguments to pass.
4877
mrb_value args[2];
4978
args[0] = mrb_str_new_static(client->mrb, event->topic, event->topic_len);
5079
args[1] = mrb_str_new_static(client->mrb, event->data, event->data_len);
5180

52-
// Call message_proc.
53-
mrb_yield_argv(client->mrb, client->message_proc, 2, &args[0]);
81+
// Call @data_proc
82+
mrb_assert(mrb_type(client->data_proc) == MRB_TT_PROC);
83+
mrb_yield_argv(client->mrb, client->data_proc, 2, &args[0]);
5484

55-
// Resume main task.
56-
mrb_gc_arena_restore(client->mrb, arena_index);
57-
vTaskResume(client->main_task_handle);
85+
// Release semaphore.
5886
}
5987

6088
static void mqtt_event_handler(void *arg, esp_event_base_t base, int32_t event_id, void *event_data)
@@ -64,54 +92,40 @@ static void mqtt_event_handler(void *arg, esp_event_base_t base, int32_t event_i
6492
esp_mqtt_event_handle_t event = event_data;
6593

6694
switch ((esp_mqtt_event_id_t)event_id) {
95+
case MQTT_EVENT_ERROR:
96+
ESP_LOGI(TAG, "MQTT_EVENT_ERROR");
97+
break;
6798
case MQTT_EVENT_CONNECTED:
68-
ESP_LOGD(TAG, "MQTT_EVENT_CONNECTED");
69-
xQueueSend(client->queue, event_data, (TickType_t)0);
99+
ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED");
100+
mqtt_connected_handler(client, event);
70101
break;
71102
case MQTT_EVENT_DISCONNECTED:
72103
ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED");
73-
xQueueSend(client->queue, event_data, (TickType_t)0);
104+
mqtt_disconnected_handler(client, event);
74105
break;
75106
case MQTT_EVENT_SUBSCRIBED:
76107
ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id);
77-
xQueueSend(client->queue, event_data, (TickType_t)0);
78108
break;
79109
case MQTT_EVENT_UNSUBSCRIBED:
80110
ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id);
81-
xQueueSend(client->queue, event_data, (TickType_t)0);
111+
mqtt_unsubscribed_handler(client, event);
82112
break;
83113
case MQTT_EVENT_PUBLISHED:
84114
ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id);
85115
break;
86116
case MQTT_EVENT_DATA:
87117
ESP_LOGI(TAG, "MQTT_EVENT_DATA");
88-
mqtt_message_handler(client, event);
118+
mqtt_data_handler(client, event);
89119
break;
90-
case MQTT_EVENT_ERROR:
91-
ESP_LOGI(TAG, "MQTT_EVENT_ERROR");
120+
case MQTT_EVENT_BEFORE_CONNECT:
121+
ESP_LOGI(TAG, "MQTT_EVENT_BEFORE_CONNECT");
92122
break;
93123
default:
94124
ESP_LOGI(TAG, "Other event id:%d", event->event_id);
95125
break;
96126
}
97127
}
98128

99-
static void
100-
mqtt_wait_for_event(mrb_state *mrb, mrb_value self, int32_t event_id) {
101-
mqtt_client_t *client = (mqtt_client_t *) DATA_PTR(self);
102-
esp_mqtt_event_t event;
103-
int wait_count;
104-
struct RClass* error_class;
105-
106-
for(wait_count = 0 ; wait_count < WAIT_EVENT_TIMEOUT_SEC ; wait_count++) {
107-
if(xQueueReceive(client->queue, (void*)&event, (TickType_t)(1000 / portTICK_PERIOD_MS))) {
108-
if(event.event_id == event_id) return;
109-
}
110-
}
111-
error_class = mrb_exc_get_id(mrb, MRB_ERROR_SYM(ESP32::MQTT::TimeoutError));
112-
mrb_raise(mrb, error_class, "Timeout wait for mqtt event.");
113-
}
114-
115129
static void
116130
mrb_mqtt_client_free(mrb_state *mrb, void *p) {
117131
mqtt_client_t *client = (mqtt_client_t *)p;
@@ -128,21 +142,14 @@ mrb_mqtt_client_initialize(mrb_state *mrb, mrb_value self) {
128142

129143
mrb_value host;
130144
mrb_int port;
131-
mrb_value block;
132-
133-
mrb_get_args(mrb, "Si&", &host, &port, &block);
145+
mrb_get_args(mrb, "Si", &host, &port);
134146

147+
client->mruby_task_handle = xTaskGetCurrentTaskHandle();
135148
client->mrb = mrb;
136149
client->host = mrb_malloc(mrb, strlen(mrb_str_to_cstr(mrb, host)));
137150
strcpy(client->host, mrb_str_to_cstr(mrb, host));
138151
client->port = port;
139152
client->ssl = FALSE;
140-
client->queue = xQueueCreate(WAIT_EVENT_QUEUE_LEN, sizeof(esp_mqtt_event_t));
141-
142-
// Save block given and main task for handling incoming messages.
143-
mrb_iv_set(mrb, self, mrb_intern_lit(mrb, "@message_proc"), block);
144-
client->message_proc = block;
145-
client->main_task_handle = xTaskGetCurrentTaskHandle();
146153

147154
mrb_data_init(self, client, &mrb_mqtt_client);
148155
ESP_LOGI(TAG, "initialize(%s, %d)", client->host, client->port);
@@ -199,7 +206,6 @@ mrb_mqtt_client_connect(mrb_state *mrb, mrb_value self) {
199206

200207
client->client = mqtt_client;
201208

202-
mqtt_wait_for_event(mrb, self, MQTT_EVENT_CONNECTED);
203209
ESP_LOGI(
204210
TAG,
205211
"connect(%s://%s:%d)",
@@ -245,7 +251,6 @@ mrb_mqtt_client_subscribe(mrb_state *mrb, mrb_value self) {
245251
struct RClass* error_class;
246252

247253
mrb_value topic;
248-
249254
mrb_get_args(mrb, "S", &topic);
250255

251256
ret = esp_mqtt_client_subscribe(
@@ -258,7 +263,6 @@ mrb_mqtt_client_subscribe(mrb_state *mrb, mrb_value self) {
258263
mrb_raise(mrb, error_class, "Failed to subscribe.");
259264
return self;
260265
}
261-
mqtt_wait_for_event(mrb, self, MQTT_EVENT_SUBSCRIBED);
262266
ESP_LOGI(TAG, "subscribe(%s)", mrb_str_to_cstr(mrb, topic));
263267

264268
return self;
@@ -283,7 +287,6 @@ mrb_mqtt_client_unsubscribe(mrb_state *mrb, mrb_value self) {
283287
mrb_raise(mrb, error_class, "Failed to unsubscribe.");
284288
return self;
285289
}
286-
mqtt_wait_for_event(mrb, self, MQTT_EVENT_UNSUBSCRIBED);
287290
ESP_LOGI(TAG, "unsubscribe(%s)", mrb_str_to_cstr(mrb, topic));
288291

289292
return self;
@@ -301,12 +304,63 @@ mrb_mqtt_client_disconnect(mrb_state *mrb, mrb_value self) {
301304
mrb_raise(mrb, error_class, "Failed to disconnect.");
302305
return self;
303306
}
304-
mqtt_wait_for_event(mrb, self, MQTT_EVENT_DISCONNECTED);
305307
ESP_LOGI(TAG, "disconnect");
306308

307309
return self;
308310
}
309311

312+
static mrb_value
313+
mrb_mqtt_client_set_connected_handler(mrb_state *mrb, mrb_value self) {
314+
mqtt_client_t *client = (mqtt_client_t *) DATA_PTR(self);
315+
316+
mrb_value block;
317+
mrb_get_args(mrb, "&", &block);
318+
319+
mrb_iv_set(mrb, self, mrb_intern_lit(mrb, "@connected_proc"), block);
320+
client->connected_proc = block;
321+
322+
return self;
323+
}
324+
325+
static mrb_value
326+
mrb_mqtt_client_set_disconnected_handler(mrb_state *mrb, mrb_value self) {
327+
mqtt_client_t *client = (mqtt_client_t *) DATA_PTR(self);
328+
329+
mrb_value block;
330+
mrb_get_args(mrb, "&", &block);
331+
332+
mrb_iv_set(mrb, self, mrb_intern_lit(mrb, "@disconnected_proc"), block);
333+
client->disconnected_proc = block;
334+
335+
return self;
336+
}
337+
338+
static mrb_value
339+
mrb_mqtt_client_set_unsubscribed_handler(mrb_state *mrb, mrb_value self) {
340+
mqtt_client_t *client = (mqtt_client_t *) DATA_PTR(self);
341+
342+
mrb_value block;
343+
mrb_get_args(mrb, "&", &block);
344+
345+
mrb_iv_set(mrb, self, mrb_intern_lit(mrb, "@unsubscribed_proc"), block);
346+
client->unsubscribed_proc = block;
347+
348+
return self;
349+
}
350+
351+
static mrb_value
352+
mrb_mqtt_client_set_data_handler(mrb_state *mrb, mrb_value self) {
353+
mqtt_client_t *client = (mqtt_client_t *) DATA_PTR(self);
354+
355+
mrb_value block;
356+
mrb_get_args(mrb, "&", &block);
357+
358+
mrb_iv_set(mrb, self, mrb_intern_lit(mrb, "@data_proc"), block);
359+
client->data_proc = block;
360+
361+
return self;
362+
}
363+
310364
void
311365
mrb_mruby_esp32_mqtt_gem_init(mrb_state* mrb) {
312366
struct RClass *esp32_module = mrb_define_module(mrb, "ESP32");
@@ -320,8 +374,12 @@ mrb_mruby_esp32_mqtt_gem_init(mrb_state* mrb) {
320374
mrb_define_method(mrb, client_class, "_subscribe", mrb_mqtt_client_subscribe, MRB_ARGS_REQ(1));
321375
mrb_define_method(mrb, client_class, "unsubscribe", mrb_mqtt_client_unsubscribe, MRB_ARGS_REQ(1));
322376
mrb_define_method(mrb, client_class, "disconnect", mrb_mqtt_client_disconnect, MRB_ARGS_NONE());
377+
378+
mrb_define_method(mrb, client_class, "set_connected_handler", mrb_mqtt_client_set_connected_handler, MRB_ARGS_BLOCK());
379+
mrb_define_method(mrb, client_class, "set_disconnected_handler", mrb_mqtt_client_set_disconnected_handler, MRB_ARGS_BLOCK());
380+
mrb_define_method(mrb, client_class, "set_unsubscribed_handler", mrb_mqtt_client_set_unsubscribed_handler, MRB_ARGS_BLOCK());
381+
mrb_define_method(mrb, client_class, "set_data_handler", mrb_mqtt_client_set_data_handler, MRB_ARGS_BLOCK());
323382

324-
mrb_define_class_under(mrb, mqtt_module, "TimeoutError", mrb->eStandardError_class);
325383
mrb_define_class_under(mrb, mqtt_module, "ConnectError", mrb->eStandardError_class);
326384
mrb_define_class_under(mrb, mqtt_module, "PublishError", mrb->eStandardError_class);
327385
mrb_define_class_under(mrb, mqtt_module, "SubscribeError", mrb->eStandardError_class);

0 commit comments

Comments
 (0)