Skip to content

Commit

Permalink
updated lwmqtt
Browse files Browse the repository at this point in the history
  • Loading branch information
256dpi committed Feb 5, 2023
1 parent eae214b commit b72a94d
Show file tree
Hide file tree
Showing 7 changed files with 184 additions and 184 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ fmt:

update:
rm -rf ./lwmqtt
git clone --branch v0.8.0 https://github.com/256dpi/lwmqtt.git ./lwmqtt
git clone --branch v0.9.0 https://github.com/256dpi/lwmqtt.git ./lwmqtt
mkdir -p ./src/lwmqtt
cp -r ./lwmqtt/src/*.c ./src/lwmqtt/
cp -r ./lwmqtt/src/*.h ./src/lwmqtt/
Expand Down
11 changes: 8 additions & 3 deletions src/MQTTClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ bool MQTTClient::connect(const char clientID[], const char username[], const cha
}

// prepare options
lwmqtt_options_t options = lwmqtt_default_options;
lwmqtt_connect_options_t options = lwmqtt_default_connect_options;
options.keep_alive = this->keepAlive;
options.clean_session = this->cleanSession;
options.client_id = lwmqtt_string(clientID);
Expand All @@ -359,7 +359,12 @@ bool MQTTClient::connect(const char clientID[], const char username[], const cha
}

// connect to broker
this->_lastError = lwmqtt_connect(&this->client, options, this->will, &this->_returnCode, this->timeout);
this->_lastError = lwmqtt_connect(&this->client, &options, this->will, this->timeout);

// copy return code
this->_returnCode = options.return_code;

// handle error
if (this->_lastError != LWMQTT_SUCCESS) {
// close connection
this->close();
Expand Down Expand Up @@ -387,7 +392,7 @@ bool MQTTClient::publish(const char topic[], const char payload[], int length, b
message.qos = lwmqtt_qos_t(qos);

// publish message
this->_lastError = lwmqtt_publish(&this->client, lwmqtt_string(topic), message, this->timeout, nullptr);
this->_lastError = lwmqtt_publish(&this->client, nullptr, lwmqtt_string(topic), message, this->timeout);
if (this->_lastError != LWMQTT_SUCCESS) {
// close connection
this->close();
Expand Down
227 changes: 115 additions & 112 deletions src/lwmqtt/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ static lwmqtt_err_t lwmqtt_write_to_network(lwmqtt_client_t *client, uint8_t *bu

// write
size_t partial_write = 0;
lwmqtt_err_t err = client->network_write(client->network, buf + written, len - written, &partial_write,
(uint32_t)remaining_time);
lwmqtt_err_t err =
client->network_write(client->network, buf + written, len - written, &partial_write, (uint32_t)remaining_time);
if (err != LWMQTT_SUCCESS) {
return err;
}
Expand Down Expand Up @@ -246,7 +246,7 @@ static lwmqtt_err_t lwmqtt_send_packet_in_buffer(lwmqtt_client_t *client, size_t
return LWMQTT_SUCCESS;
}

static lwmqtt_err_t lwmqtt_cycle(lwmqtt_client_t *client, size_t *read, lwmqtt_packet_type_t *packet_type) {
static lwmqtt_err_t lwmqtt_cycle_once(lwmqtt_client_t *client, size_t *read, lwmqtt_packet_type_t *packet_type) {
// read next packet from the network
lwmqtt_err_t err = lwmqtt_read_packet_in_buffer(client, read, packet_type);
if (err != LWMQTT_SUCCESS) {
Expand Down Expand Up @@ -288,7 +288,7 @@ static lwmqtt_err_t lwmqtt_cycle(lwmqtt_client_t *client, size_t *read, lwmqtt_p

// encode ack packet
size_t len;
err = lwmqtt_encode_ack(client->write_buf, client->write_buf_size, &len, ack_type, false, packet_id);
err = lwmqtt_encode_ack(client->write_buf, client->write_buf_size, &len, ack_type, packet_id);
if (err != LWMQTT_SUCCESS) {
return err;
}
Expand All @@ -305,16 +305,15 @@ static lwmqtt_err_t lwmqtt_cycle(lwmqtt_client_t *client, size_t *read, lwmqtt_p
// handle pubrec packets
case LWMQTT_PUBREC_PACKET: {
// decode pubrec packet
bool dup;
uint16_t packet_id;
err = lwmqtt_decode_ack(client->read_buf, client->read_buf_size, LWMQTT_PUBREC_PACKET, &dup, &packet_id);
err = lwmqtt_decode_ack(client->read_buf, client->read_buf_size, LWMQTT_PUBREC_PACKET, &packet_id);
if (err != LWMQTT_SUCCESS) {
return err;
}

// encode pubrel packet
size_t len;
err = lwmqtt_encode_ack(client->write_buf, client->write_buf_size, &len, LWMQTT_PUBREL_PACKET, 0, packet_id);
err = lwmqtt_encode_ack(client->write_buf, client->write_buf_size, &len, LWMQTT_PUBREL_PACKET, packet_id);
if (err != LWMQTT_SUCCESS) {
return err;
}
Expand All @@ -331,16 +330,15 @@ static lwmqtt_err_t lwmqtt_cycle(lwmqtt_client_t *client, size_t *read, lwmqtt_p
// handle pubrel packets
case LWMQTT_PUBREL_PACKET: {
// decode pubrec packet
bool dup;
uint16_t packet_id;
err = lwmqtt_decode_ack(client->read_buf, client->read_buf_size, LWMQTT_PUBREL_PACKET, &dup, &packet_id);
err = lwmqtt_decode_ack(client->read_buf, client->read_buf_size, LWMQTT_PUBREL_PACKET, &packet_id);
if (err != LWMQTT_SUCCESS) {
return err;
}

// encode pubcomp packet
size_t len;
err = lwmqtt_encode_ack(client->write_buf, client->write_buf_size, &len, LWMQTT_PUBCOMP_PACKET, 0, packet_id);
err = lwmqtt_encode_ack(client->write_buf, client->write_buf_size, &len, LWMQTT_PUBCOMP_PACKET, packet_id);
if (err != LWMQTT_SUCCESS) {
return err;
}
Expand Down Expand Up @@ -379,7 +377,7 @@ static lwmqtt_err_t lwmqtt_cycle_until(lwmqtt_client_t *client, lwmqtt_packet_ty
// loop until timeout has been reached
do {
// do one cycle
lwmqtt_err_t err = lwmqtt_cycle(client, &read, packet_type);
lwmqtt_err_t err = lwmqtt_cycle_once(client, &read, packet_type);
if (err != LWMQTT_SUCCESS) {
return err;
}
Expand All @@ -398,36 +396,29 @@ static lwmqtt_err_t lwmqtt_cycle_until(lwmqtt_client_t *client, lwmqtt_packet_ty
return LWMQTT_SUCCESS;
}

lwmqtt_err_t lwmqtt_yield(lwmqtt_client_t *client, size_t available, uint32_t timeout) {
// set command timer
client->timer_set(client->command_timer, timeout);

// cycle until timeout has been reached
lwmqtt_packet_type_t packet_type = LWMQTT_NO_PACKET;
lwmqtt_err_t err = lwmqtt_cycle_until(client, &packet_type, available, LWMQTT_NO_PACKET);
if (err != LWMQTT_SUCCESS) {
return err;
lwmqtt_err_t lwmqtt_connect(lwmqtt_client_t *client, lwmqtt_connect_options_t *options, lwmqtt_will_t *will,
uint32_t timeout) {
// ensure default options
static lwmqtt_connect_options_t def_options = lwmqtt_default_connect_options;
if (options == NULL) {
options = &def_options;
}

return LWMQTT_SUCCESS;
}

lwmqtt_err_t lwmqtt_connect(lwmqtt_client_t *client, lwmqtt_options_t options, lwmqtt_will_t *will,
lwmqtt_return_code_t *return_code, uint32_t timeout) {
// set command timer
client->timer_set(client->command_timer, timeout);

// save keep alive interval
client->keep_alive_interval = (uint32_t)(options.keep_alive) * 1000;
client->keep_alive_interval = (uint32_t)(options->keep_alive) * 1000;

// set keep alive timer
client->timer_set(client->keep_alive_timer, client->keep_alive_interval);

// reset pong pending flag
client->pong_pending = false;

// initialize return code
*return_code = LWMQTT_UNKNOWN_RETURN_CODE;
// reset return code and session present
options->return_code = LWMQTT_UNKNOWN_RETURN_CODE;
options->session_present = false;

// encode connect packet
size_t len;
Expand All @@ -452,20 +443,103 @@ lwmqtt_err_t lwmqtt_connect(lwmqtt_client_t *client, lwmqtt_options_t options, l
}

// decode connack packet
bool session_present;
err = lwmqtt_decode_connack(client->read_buf, client->read_buf_size, &session_present, return_code);
err =
lwmqtt_decode_connack(client->read_buf, client->read_buf_size, &options->session_present, &options->return_code);
if (err != LWMQTT_SUCCESS) {
return err;
}

// return error if connection was not accepted
if (*return_code != LWMQTT_CONNECTION_ACCEPTED) {
if (options->return_code != LWMQTT_CONNECTION_ACCEPTED) {
return LWMQTT_CONNECTION_DENIED;
}

return LWMQTT_SUCCESS;
}

lwmqtt_err_t lwmqtt_publish(lwmqtt_client_t *client, lwmqtt_publish_options_t *options, lwmqtt_string_t topic,
lwmqtt_message_t msg, uint32_t timeout) {
// ensure default options
static lwmqtt_publish_options_t def_options = lwmqtt_default_publish_options;
if (options == NULL) {
options = &def_options;
}

// set command timer
client->timer_set(client->command_timer, timeout);

// add packet id if at least qos 1
bool dup = false;
uint16_t packet_id = 0;
if (msg.qos == LWMQTT_QOS1 || msg.qos == LWMQTT_QOS2) {
if (options->dup_id != NULL && *options->dup_id > 0) {
dup = true;
packet_id = *options->dup_id;
} else {
packet_id = lwmqtt_get_next_packet_id(client);
if (options->dup_id != NULL) {
*options->dup_id = packet_id;
}
}
}

// encode publish packet
size_t len = 0;
lwmqtt_err_t err = lwmqtt_encode_publish(client->write_buf, client->write_buf_size, &len, dup, packet_id, topic, msg);
if (err != LWMQTT_SUCCESS) {
return err;
}

// send packet (without payload)
err = lwmqtt_send_packet_in_buffer(client, len);
if (err != LWMQTT_SUCCESS) {
return err;
}

// send payload if available
if (msg.payload_len > 0) {
err = lwmqtt_write_to_network(client, msg.payload, msg.payload_len);
if (err != LWMQTT_SUCCESS) {
return err;
}
}

// immediately return on qos zero
if (msg.qos == LWMQTT_QOS0) {
return LWMQTT_SUCCESS;
}

// skip if requested
if (options->skip_ack) {
return LWMQTT_SUCCESS;
}

// define ack packet
lwmqtt_packet_type_t ack_type = LWMQTT_NO_PACKET;
if (msg.qos == LWMQTT_QOS1) {
ack_type = LWMQTT_PUBACK_PACKET;
} else if (msg.qos == LWMQTT_QOS2) {
ack_type = LWMQTT_PUBCOMP_PACKET;
}

// wait for ack packet
lwmqtt_packet_type_t packet_type = LWMQTT_NO_PACKET;
err = lwmqtt_cycle_until(client, &packet_type, 0, ack_type);
if (err != LWMQTT_SUCCESS) {
return err;
} else if (packet_type != ack_type) {
return LWMQTT_MISSING_OR_WRONG_PACKET;
}

// decode ack packet
err = lwmqtt_decode_ack(client->read_buf, client->read_buf_size, ack_type, &packet_id);
if (err != LWMQTT_SUCCESS) {
return err;
}

return LWMQTT_SUCCESS;
}

lwmqtt_err_t lwmqtt_subscribe(lwmqtt_client_t *client, int count, lwmqtt_string_t *topic_filter, lwmqtt_qos_t *qos,
uint32_t timeout) {
// set command timer
Expand Down Expand Up @@ -546,9 +620,8 @@ lwmqtt_err_t lwmqtt_unsubscribe(lwmqtt_client_t *client, int count, lwmqtt_strin
}

// decode unsuback packet
bool dup;
uint16_t packet_id;
err = lwmqtt_decode_ack(client->read_buf, client->read_buf_size, LWMQTT_UNSUBACK_PACKET, &dup, &packet_id);
err = lwmqtt_decode_ack(client->read_buf, client->read_buf_size, LWMQTT_UNSUBACK_PACKET, &packet_id);
if (err != LWMQTT_SUCCESS) {
return err;
}
Expand All @@ -560,103 +633,33 @@ lwmqtt_err_t lwmqtt_unsubscribe_one(lwmqtt_client_t *client, lwmqtt_string_t top
return lwmqtt_unsubscribe(client, 1, &topic_filter, timeout);
}

lwmqtt_err_t lwmqtt_publish(lwmqtt_client_t *client, lwmqtt_string_t topic, lwmqtt_message_t message, uint32_t timeout,
lwmqtt_publish_options_t *options) {
// ensure default options
static lwmqtt_publish_options_t def_options = lwmqtt_default_publish_options;
if (options == NULL) {
options = &def_options;
}

lwmqtt_err_t lwmqtt_disconnect(lwmqtt_client_t *client, uint32_t timeout) {
// set command timer
client->timer_set(client->command_timer, timeout);

// add packet id if at least qos 1
bool dup = false;
uint16_t packet_id = 0;
if (message.qos == LWMQTT_QOS1 || message.qos == LWMQTT_QOS2) {
if (options->dup_id != NULL && *options->dup_id > 0) {
dup = true;
packet_id = *options->dup_id;
} else {
packet_id = lwmqtt_get_next_packet_id(client);
if (options->dup_id != NULL) {
*options->dup_id = packet_id;
}
}
}

// encode publish packet
size_t len = 0;
lwmqtt_err_t err =
lwmqtt_encode_publish(client->write_buf, client->write_buf_size, &len, dup, packet_id, topic, message);
// encode disconnect packet
size_t len;
lwmqtt_err_t err = lwmqtt_encode_zero(client->write_buf, client->write_buf_size, &len, LWMQTT_DISCONNECT_PACKET);
if (err != LWMQTT_SUCCESS) {
return err;
}

// send packet (without payload)
// send disconnected packet
err = lwmqtt_send_packet_in_buffer(client, len);
if (err != LWMQTT_SUCCESS) {
return err;
}

// send payload if available
if (message.payload_len > 0) {
err = lwmqtt_write_to_network(client, message.payload, message.payload_len);
if (err != LWMQTT_SUCCESS) {
return err;
}
}

// immediately return on qos zero
if (message.qos == LWMQTT_QOS0) {
return LWMQTT_SUCCESS;
}

// skip if requested
if (options->skip_ack) {
return LWMQTT_SUCCESS;
}

// define ack packet
lwmqtt_packet_type_t ack_type = LWMQTT_NO_PACKET;
if (message.qos == LWMQTT_QOS1) {
ack_type = LWMQTT_PUBACK_PACKET;
} else if (message.qos == LWMQTT_QOS2) {
ack_type = LWMQTT_PUBCOMP_PACKET;
}

// wait for ack packet
lwmqtt_packet_type_t packet_type = LWMQTT_NO_PACKET;
err = lwmqtt_cycle_until(client, &packet_type, 0, ack_type);
if (err != LWMQTT_SUCCESS) {
return err;
} else if (packet_type != ack_type) {
return LWMQTT_MISSING_OR_WRONG_PACKET;
}

// decode ack packet
err = lwmqtt_decode_ack(client->read_buf, client->read_buf_size, ack_type, &dup, &packet_id);
if (err != LWMQTT_SUCCESS) {
return err;
}

return LWMQTT_SUCCESS;
}

lwmqtt_err_t lwmqtt_disconnect(lwmqtt_client_t *client, uint32_t timeout) {
lwmqtt_err_t lwmqtt_yield(lwmqtt_client_t *client, size_t available, uint32_t timeout) {
// set command timer
client->timer_set(client->command_timer, timeout);

// encode disconnect packet
size_t len;
lwmqtt_err_t err = lwmqtt_encode_zero(client->write_buf, client->write_buf_size, &len, LWMQTT_DISCONNECT_PACKET);
if (err != LWMQTT_SUCCESS) {
return err;
}

// send disconnected packet
err = lwmqtt_send_packet_in_buffer(client, len);
// cycle until timeout has been reached
lwmqtt_packet_type_t packet_type = LWMQTT_NO_PACKET;
lwmqtt_err_t err = lwmqtt_cycle_until(client, &packet_type, available, LWMQTT_NO_PACKET);
if (err != LWMQTT_SUCCESS) {
return err;
}
Expand Down
Loading

0 comments on commit b72a94d

Please sign in to comment.