Skip to content

Commit f9c64fd

Browse files
committed
Make message receiving non-blocking. Happens inside event handler
1 parent fe46222 commit f9c64fd

File tree

3 files changed

+71
-38
lines changed

3 files changed

+71
-38
lines changed

README.md

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,22 @@ Publish message to topic.
5252
mqtt.publish("topic", 'message')
5353
```
5454

55-
Subscribe to topic and get message.
55+
Subscribe to one or more topics to receive messages. Received messages are event driven. For each message, the main mruby task pauses temporarily, allowing the block given for the receiving topic to run.
5656

5757
```ruby
58-
mqtt.subscribe("topic")
59-
topic, message = mqtt.get
58+
mqtt.subscribe("topic1") do |message|
59+
puts "Received from topic1: #{message}"
60+
end
61+
62+
mqtt.on_message_from("topic2") do |message|
63+
puts "New message from topic2: #{message}"
64+
end
65+
mqtt.subscribe("topic2")
66+
67+
loop do
68+
# Do whatever in your main loop.
69+
ESP32::System.delay(1000)
70+
end
6071
```
6172

6273
Disconnect.

mrblib/mrb_esp32_mqtt.rb

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,28 @@ module ESP32
22
module MQTT
33
class Client
44
attr_accessor :ca, :cert, :key
5+
6+
def initialize(host, port)
7+
@callbacks = {}
8+
9+
# C calls the block given here with every received message.
10+
self._initialize(host, port) do |topic, message|
11+
@callbacks[topic].call(message) if @callbacks[topic]
12+
end
13+
end
14+
15+
def subscribe(topic, &block)
16+
@callbacks[topic] = block if block
17+
self._subscribe(topic)
18+
end
19+
20+
def on_message_from(topic, &block)
21+
@callbacks[topic] = block if block
22+
end
23+
24+
def update(topic, message)
25+
@callbacks[topic].call(message) if @callbacks[topic]
26+
end
527
end
628
end
729
end

src/mrb_esp32_mqtt.c

Lines changed: 35 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,32 @@ typedef struct mqtt_client_t {
3131
mrb_bool ssl;
3232
esp_mqtt_client_handle_t client;
3333
QueueHandle_t queue;
34+
TaskHandle_t main_task_handle;
35+
mrb_value message_proc;
3436
} mqtt_client_t;
3537

38+
static void
39+
mqtt_message_handler(mqtt_client_t *client, esp_mqtt_event_handle_t event) {
40+
// Suspend main task.
41+
vTaskSuspend(client->main_task_handle);
42+
int arena_index = mrb_gc_arena_save(client->mrb);
43+
44+
// Check message_proc is a a proc?
45+
mrb_assert(mrb_type(client->message_proc) == MRB_TT_PROC);
46+
47+
// Prep arguments to pass.
48+
mrb_value args[2];
49+
args[0] = mrb_str_new_static(client->mrb, event->topic, event->topic_len);
50+
args[1] = mrb_str_new_static(client->mrb, event->data, event->data_len);
51+
52+
// Call message_proc.
53+
mrb_yield_argv(client->mrb, client->message_proc, 2, &args[0]);
54+
55+
// Resume main task.
56+
mrb_gc_arena_restore(client->mrb, arena_index);
57+
vTaskResume(client->main_task_handle);
58+
}
59+
3660
static void mqtt_event_handler(void *arg, esp_event_base_t base, int32_t event_id, void *event_data)
3761
{
3862
ESP_LOGD(TAG, "Event dispatched from event loop base=%s, event_id=%d", base, event_id);
@@ -61,7 +85,7 @@ static void mqtt_event_handler(void *arg, esp_event_base_t base, int32_t event_i
6185
break;
6286
case MQTT_EVENT_DATA:
6387
ESP_LOGI(TAG, "MQTT_EVENT_DATA");
64-
xQueueSend(client->queue, event_data, (TickType_t)0);
88+
mqtt_message_handler(client, event);
6589
break;
6690
case MQTT_EVENT_ERROR:
6791
ESP_LOGI(TAG, "MQTT_EVENT_ERROR");
@@ -88,17 +112,6 @@ mqtt_wait_for_event(mrb_state *mrb, mrb_value self, int32_t event_id) {
88112
mrb_raise(mrb, error_class, "Timeout wait for mqtt event.");
89113
}
90114

91-
static void
92-
mqtt_wait_for_data(mrb_state *mrb, mrb_value self, esp_mqtt_event_t *event) {
93-
mqtt_client_t *client = (mqtt_client_t *) DATA_PTR(self);
94-
95-
while(true) {
96-
if(xQueueReceive(client->queue, (void*)event, (TickType_t)(1000 / portTICK_PERIOD_MS))) {
97-
if(event->event_id == MQTT_EVENT_DATA) break;
98-
}
99-
}
100-
}
101-
102115
static void
103116
mrb_mqtt_client_free(mrb_state *mrb, void *p) {
104117
mqtt_client_t *client = (mqtt_client_t *)p;
@@ -110,13 +123,14 @@ mrb_mqtt_client_free(mrb_state *mrb, void *p) {
110123
}
111124

112125
static mrb_value
113-
mrb_mqtt_client_init(mrb_state *mrb, mrb_value self) {
126+
mrb_mqtt_client_initialize(mrb_state *mrb, mrb_value self) {
114127
mqtt_client_t *client = mrb_malloc(mrb, sizeof(mqtt_client_t));
115128

116129
mrb_value host;
117130
mrb_int port;
131+
mrb_value block;
118132

119-
mrb_get_args(mrb, "Si", &host, &port);
133+
mrb_get_args(mrb, "Si&", &host, &port, &block);
120134

121135
client->mrb = mrb;
122136
client->host = mrb_malloc(mrb, strlen(mrb_str_to_cstr(mrb, host)));
@@ -125,6 +139,11 @@ mrb_mqtt_client_init(mrb_state *mrb, mrb_value self) {
125139
client->ssl = FALSE;
126140
client->queue = xQueueCreate(WAIT_EVENT_QUEUE_LEN, sizeof(esp_mqtt_event_t));
127141

142+
// Save block given and main task for handling incoming messages.
143+
mrb_iv_set(mrb, self, mrb_intern_lit(mrb, "@message_proc"), block);
144+
client->message_proc = block;
145+
client->main_task_handle = xTaskGetCurrentTaskHandle();
146+
128147
mrb_data_init(self, client, &mrb_mqtt_client);
129148
ESP_LOGI(TAG, "initialize(%s, %d)", client->host, client->port);
130149

@@ -270,24 +289,6 @@ mrb_mqtt_client_unsubscribe(mrb_state *mrb, mrb_value self) {
270289
return self;
271290
}
272291

273-
static mrb_value
274-
mrb_mqtt_client_get(mrb_state *mrb, mrb_value self) {
275-
esp_mqtt_event_t event;
276-
277-
mqtt_wait_for_data(mrb, self, &event);
278-
279-
mrb_value topic = mrb_str_new_static(mrb, event.topic, event.topic_len);
280-
mrb_value message = mrb_str_new_static(mrb, event.data, event.data_len);
281-
282-
mrb_value ary = mrb_ary_new(mrb);
283-
mrb_ary_push(mrb, ary, topic);
284-
mrb_ary_push(mrb, ary, message);
285-
286-
ESP_LOGI(TAG, "get()");
287-
288-
return ary;
289-
}
290-
291292
static mrb_value
292293
mrb_mqtt_client_disconnect(mrb_state *mrb, mrb_value self) {
293294
mqtt_client_t *client = (mqtt_client_t *) DATA_PTR(self);
@@ -312,13 +313,12 @@ mrb_mruby_esp32_mqtt_gem_init(mrb_state* mrb) {
312313
struct RClass *mqtt_module = mrb_define_module_under(mrb, esp32_module, "MQTT");
313314
struct RClass *client_class = mrb_define_class_under(mrb, mqtt_module, "Client", mrb->object_class);
314315

315-
mrb_define_method(mrb, client_class, "initialize", mrb_mqtt_client_init, MRB_ARGS_REQ(2));
316+
mrb_define_method(mrb, client_class, "_initialize", mrb_mqtt_client_initialize, MRB_ARGS_REQ(2)|MRB_ARGS_BLOCK());
316317
mrb_define_method(mrb, client_class, "ssl=", mrb_mqtt_client_set_ssl, MRB_ARGS_REQ(1));
317318
mrb_define_method(mrb, client_class, "connect", mrb_mqtt_client_connect, MRB_ARGS_NONE());
318319
mrb_define_method(mrb, client_class, "publish", mrb_mqtt_client_publish, MRB_ARGS_REQ(2));
319-
mrb_define_method(mrb, client_class, "subscribe", mrb_mqtt_client_subscribe, MRB_ARGS_REQ(1));
320+
mrb_define_method(mrb, client_class, "_subscribe", mrb_mqtt_client_subscribe, MRB_ARGS_REQ(1));
320321
mrb_define_method(mrb, client_class, "unsubscribe", mrb_mqtt_client_unsubscribe, MRB_ARGS_REQ(1));
321-
mrb_define_method(mrb, client_class, "get", mrb_mqtt_client_get, MRB_ARGS_NONE());
322322
mrb_define_method(mrb, client_class, "disconnect", mrb_mqtt_client_disconnect, MRB_ARGS_NONE());
323323

324324
mrb_define_class_under(mrb, mqtt_module, "TimeoutError", mrb->eStandardError_class);

0 commit comments

Comments
 (0)