Skip to content

Commit

Permalink
* NEW [proto/quic] Merge mqttv311 quic protocol layer and mqttv5 quic…
Browse files Browse the repository at this point in the history
… protocol layer.

Signed-off-by: wanghaemq <wangwei@emqx.io>
  • Loading branch information
wanghaEMQ committed Jul 3, 2024
1 parent 8d1d0dd commit 023ebfa
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 6 deletions.
92 changes: 87 additions & 5 deletions src/mqtt/protocol/mqtt/mqtt_quic_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ typedef nni_mqtt_packet_type packet_type_t;
static int prior_flags = QUIC_HIGH_PRIOR_MSG;

static void mqtt_quic_sock_init(void *arg, nni_sock *sock);
static void mqttv5_quic_sock_init(void *arg, nni_sock *sock);
static void mqtt_quic_sock_fini(void *arg);
static void mqtt_quic_sock_open(void *arg);
static void mqtt_quic_sock_send(void *arg, nni_aio *aio);
Expand Down Expand Up @@ -80,6 +81,7 @@ struct mqtt_quic_ctx {

// A mqtt_sock_s is our per-socket protocol private structure.
struct mqtt_sock_s {
uint8_t mqtt_ver; // mqtt version.
bool multi_stream;
bool qos_first;
nni_mtx mtx; // more fine grained mutual exclusion
Expand Down Expand Up @@ -192,6 +194,7 @@ mqtt_send_msg(nni_aio *aio, nni_msg *msg, mqtt_sock_t *s)
case NNG_MQTT_PUBREC:
case NNG_MQTT_PUBREL:
case NNG_MQTT_PUBCOMP:
// TODO MQTT V5
case NNG_MQTT_PINGREQ:
break;

Expand Down Expand Up @@ -233,7 +236,6 @@ mqtt_send_msg(nni_aio *aio, nni_msg *msg, mqtt_sock_t *s)
if (s->qos_first)
if (ptype == NNG_MQTT_SUBSCRIBE || ptype == NNG_MQTT_UNSUBSCRIBE ||
(qos > 0 && ptype == NNG_MQTT_PUBLISH)) {
// nni_mqtt_msg_encode(msg); // no need to encode here
nni_aio_set_msg(aio, msg);
nni_aio_set_prov_data(aio, &prior_flags);
nni_pipe_send(p->qpipe, aio);
Expand Down Expand Up @@ -579,7 +581,10 @@ mqtt_quic_data_strm_recv_cb(void *arg)
}

nni_mqtt_msg_proto_data_alloc(msg);
nni_mqtt_msg_decode(msg);
if (s->mqtt_ver == MQTT_PROTOCOL_VERSION_v311)
nni_mqtt_msg_decode(msg);
else
nni_mqttv5_msg_decode(msg);

packet_type_t packet_type = nni_mqtt_msg_get_packet_type(msg);

Expand Down Expand Up @@ -637,6 +642,10 @@ mqtt_quic_data_strm_recv_cb(void *arg)
nni_id_remove(&p->sent_unack, packet_id);
user_aio = nni_mqtt_msg_get_aio(cached_msg);
nni_mqtt_msg_set_aio(cached_msg, NULL);
// should we support sub/unsub cb here?
// if (packet_type == NNG_MQTT_SUBACK ||
// packet_type == NNG_MQTT_UNSUBACK) {
// got a matched callback
if (user_aio != NULL) {
nni_msg_clone(msg);
nni_aio_set_msg(user_aio, msg);
Expand Down Expand Up @@ -782,14 +791,23 @@ mqtt_quic_recv_cb(void *arg)

// nni_msg_set_pipe(msg, nni_pipe_id(p->pipe));
nni_mqtt_msg_proto_data_alloc(msg);
if ((rv = nni_mqtt_msg_decode(msg)) != MQTT_SUCCESS) {
int (*decode_func)(nni_msg *);
int (*encode_func)(nni_msg *);
if (s->mqtt_ver == MQTT_PROTOCOL_VERSION_v5) {
decode_func = nni_mqttv5_msg_decode;
encode_func = nni_mqttv5_msg_encode;
} else {
decode_func = nni_mqtt_msg_decode;
encode_func = nni_mqtt_msg_encode;
}
if ((rv = decode_func(msg)) != MQTT_SUCCESS) {
// Msg should be clear if decode failed. We reuse it to send disconnect.
// Or it would encode a malformed packet.
nni_mqtt_msg_set_packet_type(msg, NNG_MQTT_DISCONNECT);
nni_mqtt_msg_set_disconnect_reason_code(msg, rv);
nni_mqtt_msg_set_disconnect_property(msg, NULL);
// Composed a disconnect msg
if ((rv = nni_mqtt_msg_encode(msg)) != MQTT_SUCCESS) {
if ((rv = encode_func(msg)) != MQTT_SUCCESS) {
log_error("Error in encoding disconnect.\n");
nni_msg_free(msg);
nni_mtx_unlock(&p->lk);
Expand Down Expand Up @@ -1134,6 +1152,13 @@ mqtt_timer_cb(void *arg)
* Socket Implementation *
******************************************************************************/

static void mqttv5_quic_sock_init(void *arg, nni_sock *sock)
{
mqtt_sock_t *s = arg;
mqtt_quic_sock_init(arg, sock);
s->mqtt_ver = MQTT_PROTOCOL_VERSION_v5;
}

static void mqtt_quic_sock_init(void *arg, nni_sock *sock)
{
mqtt_sock_t *s = arg;
Expand Down Expand Up @@ -1166,6 +1191,7 @@ static void mqtt_quic_sock_init(void *arg, nni_sock *sock)
nni_aio_init(&s->time_aio, mqtt_timer_cb, s);

s->pipe = NULL;
s->mqtt_ver = MQTT_PROTOCOL_VERSION_v311;

s->cb.connect_cb = NULL;
s->cb.disconnect_cb = NULL;
Expand Down Expand Up @@ -1924,7 +1950,12 @@ mqtt_quic_ctx_send(void *arg, nni_aio *aio)
default:
break;
}
if (nni_mqtt_msg_encode(msg) != MQTT_SUCCESS) {
if (s->mqtt_ver == MQTT_PROTOCOL_VERSION_v5)
rv = nni_mqttv5_msg_encode(msg);
else
rv = nni_mqtt_msg_encode(msg);

if (rv != MQTT_SUCCESS) {
nni_mtx_unlock(&s->mtx);
log_error("MQTT client encoding msg failed!");
nni_msg_free(msg);
Expand Down Expand Up @@ -2177,6 +2208,17 @@ static nni_proto_sock_ops mqtt_quic_sock_ops = {
.sock_recv = mqtt_quic_sock_recv,
};

static nni_proto_sock_ops mqttv5_quic_sock_ops = {
.sock_size = sizeof(mqtt_sock_t),
.sock_init = mqttv5_quic_sock_init,
.sock_fini = mqtt_quic_sock_fini,
.sock_open = mqtt_quic_sock_open,
.sock_close = mqtt_quic_sock_close,
.sock_options = mqtt_quic_sock_options,
.sock_send = mqtt_quic_sock_send,
.sock_recv = mqtt_quic_sock_recv,
};

static nni_proto mqtt_quic_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNG_MQTT_SELF, NNG_MQTT_SELF_NAME },
Expand All @@ -2187,12 +2229,28 @@ static nni_proto mqtt_quic_proto = {
.proto_ctx_ops = &mqtt_quic_ctx_ops,
};

static nni_proto mqttv5_quic_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNG_MQTT_SELF, NNG_MQTT_SELF_NAME },
.proto_peer = { NNG_MQTT_PEER, NNG_MQTT_PEER_NAME },
.proto_flags = NNI_PROTO_FLAG_SNDRCV,
.proto_sock_ops = &mqttv5_quic_sock_ops,
.proto_pipe_ops = &mqtt_quic_pipe_ops,
.proto_ctx_ops = &mqtt_quic_ctx_ops,
};

int
nng_mqtt_quic_client_open(nng_socket *sock)
{
return (nni_proto_open(sock, &mqtt_quic_proto));
}

int
nng_mqttv5_quic_client_open(nng_socket *sock)
{
return (nni_proto_open(sock, &mqttv5_quic_proto));
}

int
nng_mqtt_quic_set_connect_cb(nng_socket *sock, int (*cb)(void *, void *), void *arg)
{
Expand Down Expand Up @@ -2261,6 +2319,30 @@ nng_mqtt_quic_set_disconnect_cb(nng_socket *sock, int (*cb)(void *, void *), voi
return 0;
}

int
nng_mqttv5_quic_set_connect_cb(nng_socket *sock, int (*cb)(void *, void *), void *arg)
{
return nng_mqtt_quic_set_connect_cb(sock, cb, arg);
}

int
nng_mqttv5_quic_set_disconnect_cb(nng_socket *sock, int (*cb)(void *, void *), void *arg)
{
return nng_mqtt_quic_set_disconnect_cb(sock, cb, arg);
}

int
nng_mqttv5_quic_set_msg_recv_cb(nng_socket *sock, int (*cb)(void *, void *), void *arg)
{
return nng_mqtt_quic_set_msg_recv_cb(sock, cb, arg);
}

int
nng_mqttv5_quic_set_msg_send_cb(nng_socket *sock, int (*cb)(void *, void *), void *arg)
{
return nng_mqtt_quic_set_msg_send_cb(sock, cb, arg);
}

/*
// As taking msquic as tranport, we exclude the dialer for now.
int
Expand Down
2 changes: 1 addition & 1 deletion src/mqtt/protocol/mqtt/mqttv5_quic_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ mqtt_pipe_send_msg(nni_aio *aio, nni_msg *msg, mqtt_pipe_t *p, uint16_t packet_i
return NNG_EPROTO;
}
if (!p->busy) {
// TODO: qos_first
// TODO: qos_first in data pipe
nni_aio_set_msg(&p->send_aio, msg);
p->busy = true;
nni_pipe_send(p->qpipe, &p->send_aio);
Expand Down

0 comments on commit 023ebfa

Please sign in to comment.