|
| 1 | +--- |
| 2 | +id: sink-to-mqtt |
| 3 | +title: Sink data from RisingWave to MQTT |
| 4 | +description: Sink data from RisingWave to MQTT. |
| 5 | +slug: /sink-to-mqtt |
| 6 | +--- |
| 7 | +This guide describes how to sink data from RisingWave to the MQTT topic using the MQTT sink connector in RisingWave. |
| 8 | + |
| 9 | +The [Message Queuing Telemetry Transport](https://mqtt.org/) (MQTT) protocol is ideal for connecting remote devices with a small code footprint and minimal network bandwidth. MQTT today is used in a wide variety of industries, such as automotive, manufacturing, telecommunications, oil and gas, etc. |
| 10 | + |
| 11 | +## Prerequisites |
| 12 | + |
| 13 | +Before sinking data from RisingWave to an MQTT topic, please ensure the following: |
| 14 | + |
| 15 | +- The RisingWave cluster is running. |
| 16 | +- An MQTT broker is running and accessible from your RisingWave cluster. |
| 17 | +- Create an MQTT topic that you want to sink data to. |
| 18 | +- You have permission to publish data to the MQTT topic. |
| 19 | + |
| 20 | +For example, we have an `iot_sensor_data` table in RisingWave that stores data from various IoT devices at a given timestamp, including temperature and humidity readings, along with a status field indicating whether the device is in a normal or abnormal state. For more information to learn about MQTT and get started with it, refer to the [MQTT guide](https://mqtt.org/getting-started/). |
| 21 | +### Syntax |
| 22 | +To sink data from RisingWave to an MQTT topic, create a sink using the syntax below: |
| 23 | + |
| 24 | +```sql |
| 25 | +CREATE SINK [ IF NOT EXISTS ] sink_name |
| 26 | +[FROM sink_from | AS select_query] |
| 27 | +WITH ( |
| 28 | + connector='mqtt', |
| 29 | + url = '<your MQTT server>:<port>', |
| 30 | + topic = '<topic>', |
| 31 | + qos = '<qos_level>', |
| 32 | + type = '<append-only>' |
| 33 | + username = '<your user name>', |
| 34 | + password = '<your password>') |
| 35 | +FORMAT PLAIN ENCODE data_encode -- Format options: plain (encode BYTES and JSON) ( |
| 36 | + force_append_only='true', |
| 37 | +); |
| 38 | +``` |
| 39 | +This query sets up an MQTT sink `mqtt_sink` to forward data from `iot_sensor_data` to an MQTT server. It configures the MQTT connector, server URL, target topic, data type, message retention, quality of service, and JSON encoding. |
| 40 | + |
| 41 | +```sql |
| 42 | +CREATE SINK mqtt_sink |
| 43 | +FROM iot_sensor_data |
| 44 | +WITH |
| 45 | +( |
| 46 | + connector='mqtt', |
| 47 | + url='tcp://mqtt-server', |
| 48 | + topic= 'sink_iot_data', |
| 49 | + type = 'append-only', |
| 50 | + retain = 'true', |
| 51 | + qos = 'at_least_once', |
| 52 | +) FORMAT PLAIN ENCODE JSON ( |
| 53 | + force_append_only='true', |
| 54 | +); |
| 55 | +``` |
| 56 | +After the sink is created, you will continuously consume the data in the MQTT topic from RisingWave in append-only mode. |
| 57 | + |
| 58 | +### Parameters |
| 59 | + |
| 60 | +| Field | Notes | |
| 61 | +|--------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| |
| 62 | +| `url` | Required. The URL of the broker to connect to, e.g., `tcp://localhost`. Must be prefixed with `tcp://`, `mqtt://`, `ssl://`, or `mqtts://` to denote the protocol. `mqtts://` and `ssl://` use native certificates if no CA is specified. | |
| 63 | +| `qos` | Optional. The quality of service for publishing messages. Defaults to `at_most_once`. Options include `at_most_once`, `at_least_once`, or `exactly_once`. | |
| 64 | +| `username` | Optional. Username for the MQTT broker. | |
| 65 | +| `password` | Optional. Password for the MQTT broker. | |
| 66 | +| `client_prefix` | Optional. Prefix for the MQTT client ID. Defaults to "risingwave". | |
| 67 | +| `clean_start` | Optional. Determines if all states from queues are removed when the client disconnects. If true, the broker clears all client states upon disconnect; if false, the broker retains the client state and resumes pending operations upon reconnection. | |
| 68 | +| `inflight_messages`| Optional. Maximum number of inflight messages. Defaults to 100. | |
| 69 | +| `tls.client_cert` | Optional. Path to the client's certificate file (PEM) or a string with the certificate content. Required for client authentication. Can use `fs://` prefix for file paths. | |
| 70 | +| `tls.client_key` | Optional. Path to the client's private key file (PEM) or a string with the private key content. Required for client authentication. Can use `fs://` prefix for file paths. | |
| 71 | +| `topic` | Required. The topic name to subscribe or publish to. Can include wildcard topics, e.g., `/topic/#`. | |
| 72 | +| `retain` | Optional. Whether the message should be retained by the broker. | |
| 73 | +| `r#type` | Required. Type identifier. | |
| 74 | + |
0 commit comments