Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ as well as the messages received.
| Publish telemetry (WSS - Secure MQTT over WebSocket) | | [Sample](./samples/python/publish-websockets/) |
| Raw command-response scenario | | [Sample](./samples/python/command-response/) |
| Direct database connection — query telemetry | [Sample](./samples/script/query-db/) | [Sample](./samples/python/query-db/) |
| Streaming IoT Platform data (Database queues) | | [Sample](./samples/python/queues/) |

## Documentation

Expand Down
189 changes: 189 additions & 0 deletions samples/python/queues/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
# Streaming IoT Platform data

This example demonstrates how to use
[Transactional Event Queues](https://www.oracle.com/database/advanced-queuing/)
to stream IoT Platform data.

You should already be familiar with connecting to the IoT Platform database.
If not, review the [Direct database connection example](../query-db/README.md)
in this repository.

The provided scripts allow you to subscribe and stream from the raw and normalized
message queues.

## Concepts

Digital Twin Instances data is available through data tables, but it can also be
streamed using database Transactional Event Queues.

The following queues are available:

| Queue name | Data type | Description |
| ---------------- | --------------------- | -------------------------------- |
| raw_data_in | raw_data_in_type | Incoming raw messages |
| raw_data_out | raw_data_out_type | Outgoing raw messages (commands) |
| normalized_data | JSON | Normalized data |
| rejected_data_in | rejected_data_in_type | Rejected incoming messages |

The `normalized_data` queues is a JSON queue, while the others use an Abstract Data Type
(ADT).
More information on the data model is available in the
[Transactional Event Queues](https://docs.oracle.com/en-us/iaas/Content/internet-of-things/iot-domain-database-schema.htm#queues)
section of the IoT Platform documentation.

Queue subscribers can be implemented in a durable or non-durable way:

- Durable subscribers: messages are kept in the queue until a client connects and read
available messages.
Note that the retention for the IoT Platform queues is set to 24 hours.
- Non-durable subscribers: only receive messages issued while the client is
connected.
The Python SDK does not support non-durable subscribers as such, but this can be
emulated by registering an ephemeral subscriber when a client connects.

## Sample scripts

Two sample scripts are provided:

- `sub-raw`: subscribe and stream all incoming raw data (ADT). It is implemented
as a non-durable subscriber.
- `sub-norm`: subscribe and stream the normalized data (JSON), using a durable subscriber.

Both scripts demonstrate how to use rules to filter data based on the Digital Twin
Instance Id and/or the endpoint (raw data) or content path (normalized data).

More information on using queues with the Python SDK is available on
[Using Oracle Transactional Event Queues and Advanced Queuing](https://python-oracledb.readthedocs.io/en/stable/user_guide/aq.html).

## Prerequisites

Install the Python dependencies.
(Using a [Python virtual environment](https://docs.python.org/3/library/venv.html) is recommended):

```sh
pip install -r requirements.txt
```

When using `oracledb` in _Thick_ mode, the
[Oracle Instant Client](https://www.oracle.com/europe/database/technologies/instant-client.html)
must be installed (the 23ai Release Update or newer is recommended).
The `sqlnet.ora` parameter `SSL_SERVER_DN_MATCH` must also be set to `true`.

## Configure and run the scripts

Copy `config.distr.py` to `config.py` and set the following variables:

- `db_connect_string`: The `dbConnectionString` property of your IoT Domain Group.
- `db_token_scope`: The `dbTokenScope` property of your IoT Domain Group.
- `iot_domain_short_name`: The hostname part of the `deviceHost` property of your IoT Domain.
- `oci_auth_type`: The OCI authentication type. Use "ConfigFileAuthentication"
for API key authentication, or "InstancePrincipal".
- `oci_profile`: OCI CLI profile to use for token retrieval (API key authentication only).
- `thick_mode`: Set to `True` to use the `oracledb` thick mode driver.
- `subscriber_name`: Name of the durable subscriber for the `sub-norm` sample.

### `sub-raw`

Run the script. Without parameter, it will show all messages.
You can filter by Digital Twin Instance OCID, display name, or endpoint (MQTT topic).

```text
$ ./sub-raw.py --help
usage: sub-raw.py [-h] [-v] [-d] [--id ID | --display-name DISPLAY_NAME] [--endpoint ENDPOINT]

Subscribe to the raw messages stream from IoT Platform.

options:
-h, --help show this help message and exit
-v, --verbose Enable verbose (INFO level) logging.
-d, --debug Enable debug (DEBUG level) logging.
--id ID The Digital Twin Instance OCID (mutually exclusive with --display-name).
--display-name DISPLAY_NAME
The Digital Twin Instance display name (mutually exclusive with --id).
--endpoint ENDPOINT The message endpoint (topic).
./sub-raw.py -v
2025-11-10 14:24:03,316 - INFO - sub-raw.py - Connected
2025-11-10 14:24:06,785 - INFO - sub-raw.py - Subscriber aq_sub_183a199f_8820_449e_a912_99339877c23a registered
2025-11-10 14:24:06,807 - INFO - sub-raw.py - Listening for messages
....
OCID : ocid1.iotdigitaltwininstance.oc1.<redacted>
Time received: 2025-11-10 13:24:52.109055
Endpoint : zigbee2mqtt/sonoff-temp-04
Content : {"temperature":19.8,"humidity":64.9,"battery":71,"linkquality":51}
.....
OCID : ocid1.iotdigitaltwininstance.oc1.<redacted>
Time received: 2025-11-10 13:25:48.182738
Endpoint : ttn/devices/bulles-minilora-01/up
Content : {"temperature":16.1,"humidity":11,"battery":96,"rssi":-76,"snr":12}

OCID : ocid1.iotdigitaltwininstance.oc1.<redacted>
Time received: 2025-11-10 13:25:50.742284
Endpoint : zigbee2mqtt/sonoff-temp-06
Content : {"temperature":21.2,"humidity":51.7,"battery":80,"linkquality":58}
^C
Interrupted
2025-11-10 14:25:51,831 - INFO - sub-raw.py - Subscriber aq_sub_183a199f_8820_449e_a912_99339877c23a unregistered
2025-11-10 14:25:51,855 - INFO - sub-raw.py - Disconnected
```

### `sub-norm`

The `sub-norm` script is similar and provides additional commands to manage
the durable subscription:

```text
$ ./sub-norm.py --help
Usage: sub-norm.py [OPTIONS] COMMAND [ARGS]...

Stream Digital Twin normalized data.

This example illustrate the use of "durable subscribers": once the
subscriber has been created, messages are retained and returned when the
client connects.

Options:
-v, --verbose Verbose mode
-d, --debug Debug mode
--help Show this message and exit.

Commands:
stream Stream data.
subscribe Subscribe to the normalized queue.
unsubscribe Unsubscribe to the normalized queue.
$ ./sub-norm.py subscribe --help
Usage: sub-norm.py subscribe [OPTIONS]

Subscribe to the normalized queue.

Options:
--id TEXT Digital Twin Instance ID (mutually exclusive with
--display-name)
--display-name TEXT Digital Twin Instance display name (mutually exclusive
with --id)
--content-path TEXT Path to the content
--help Show this message and exit.
$ ./sub-norm.py -v subscribe --content-path temperature
2025-11-10 14:30:19,053 - INFO - sub-norm.py - Connected
2025-11-10 14:30:22,026 - INFO - sub-norm.py - Subscriber sub_norm_subscriber registered
2025-11-10 14:30:22,048 - INFO - sub-norm.py - Disconnected
$ ./sub-norm.py -v stream
2025-11-10 14:30:32,240 - INFO - sub-norm.py - Connected
2025-11-10 14:30:32,242 - INFO - sub-norm.py - Listening for messages
.
OCID : ocid1.iotdigitaltwininstance.oc1.<redacted>
Time observed: 2025-11-10T13:30:51.351412Z
Content path : temperature
Value : 15.9
..
OCID : ocid1.iotdigitaltwininstance.oc1.<redacted>
Time observed: 2025-11-10T13:31:14.526270Z
Content path : temperature
Value : 21.0
^C
Interrupted
2025-11-10 14:31:17,731 - INFO - sub-norm.py - Disconnected
$ ./sub-norm.py -v unsubscribe
2025-11-10 14:31:26,963 - INFO - sub-norm.py - Connected
2025-11-10 14:31:27,275 - INFO - sub-norm.py - Subscriber sub_norm_subscriber unregistered
2025-11-10 14:31:27,296 - INFO - sub-norm.py - Disconnected
```
44 changes: 44 additions & 0 deletions samples/python/queues/config.distr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#!/usr/bin/env python3
"""Configuration constants for the IoT Platform queue samples.

This file defines database and authentication parameters. Copy and rename to
config.py, then edit values as needed.

See in-file comments for details on each variable.
"""
import os

# Database connect string and token scope as provided by the OCI IoT Platform.
# These are the dbConnectionString and dbTokenScope properties of your IoT Domain Group.
# They can be retrieved with:
# oci iot domain-group get --iot-domain-group-id <IoT Domain Group OCID> \
# --query 'data.["db-connection-string", "db-token-scope"]'
db_connect_string = "tcps:adb.<region>.oraclecloud.com:1521/<redacted>"
db_token_scope = "urn:oracle:db::id::<Compartment OCID>"

# Domain short name.
# This is the hostname part of the IoT Domain device host and can be retrieved using:
# oci iot domain get --iot-domain-id <IoT Domain OCID> |
# jq -r '.'data."device-host"' | split(".")[0]'
iot_domain_short_name = "<Domain SHort Name>"

# OCI Authentication type. Must be either "ConfigFileAuthentication" or "InstancePrincipal"
# oci_auth_type = "ConfigFileAuthentication"
oci_auth_type = "InstancePrincipal"

# OCI CLI profile to use for token retrieval when authentication type is "ConfigFileAuthentication"
oci_profile = os.getenv("OCI_CLI_PROFILE", "DEFAULT")

# Select Thick or Thin mode for oracledb.
# TL;DR: use Thin mode unless you specifically need the Thick driver.
# See
# https://python-oracledb.readthedocs.io/en/latest/user_guide/appendix_b.html
# for a detailed explanation.
thick_mode = False
# In Thick mode, if the Oracle Client libraries can't be found, set the location below. See
# https://python-oracledb.readthedocs.io/en/latest/user_guide/initialization.html#enabling-python-oracledb-thick-mode
# for more information on setting lib_dir for your operating system.
lib_dir = None

# For the "durable" sample (sub_norm.py), the name of the durable subscriber
subscriber_name = "sub_norm_subscriber"
3 changes: 3 additions & 0 deletions samples/python/queues/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
click~=8.2.0
oci~=2.0,>=2.161
oracledb>=3.2.0
1 change: 1 addition & 0 deletions samples/python/queues/sqlnet.ora
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
SSL_SERVER_DN_MATCH=TRUE
Loading