Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Submit bulk measurements with timestamps using specialized "bulk-json-compact" format #39

Open
valentinbarral opened this issue Jan 19, 2021 · 8 comments

Comments

@valentinbarral
Copy link

Hello,

I've searched the documentation but I didn't find nothing about this. In my case, I've a telemetry source that sends local data stored for a period of time, so each message includes several individual measurements (each one with a timestamp).
As an example, one of my measurements would look like this:

{"1611082554":{"temperature":45.2}, "1611082568":{"temperature":45.3}}

Is it possible to handle such data format from Kotori or is the timestamp always set when each individual measurement is received on the server?

Greetings.

@amotl
Copy link
Member

amotl commented Jan 20, 2021

Dear Valentin,

thanks for writing in and for sharing the payload format you are trying to ingest. I will try to answer as good as possible.

Measurements with timestamps

Is the timestamp always set when each individual measurement is received on the server?

Indeed, Kotori can accept timestamped measurements, see [1]. I have been able to find this back by searching for "timestamp" within the documentation, like [2]. Please let us know how we might be able to improve the discoverability of that feature within the documentation.

However, Kotori currently needs those timestamps to be inlined into the distinct measurement payloads, like

{
  "time": 1611082554,
  "temperature": 45.2
}

Kotori also accepts timestamps within other fields, like datetime, Time, dateTime and timestamp - see [3].

Bulk readings (vanilla)

Is it possible to handle such data format from Kotori?

Unfortunately, Kotori is currently not able to accept bulk readings yet. We already planned to implement that but haven't been able to catch up with it.

Regarding our plans, we would have implemented it to make Kotori accept payloads like that:

[
  {
    "time": 1611082554,
    "temperature": 45.2
  },
  {
    "time": 1611082568,
    "temperature": 45.3
  }
]

Bulk readings (specialized decoder)

Is it possible to handle such data format from Kotori?

Now that you showed us the payload format you are trying to ingest, I would like to comment on that.

It feels a bit weird to transfer the Unix Timestamp as a string and at the same time use it as a key. But okay, that's how reality looks like, right? Kotori might implement that as a specialized decoder like the Tasmota decoder [4] we added the other day and I am open to that in general. However, I am curious where this payload is coming from and want to humbly ask if you can share some more information about this?

{
  "1611082554": {
    "temperature": 45.2
  },
  "1611082568": {
    "temperature": 45.3
  }
}

Thanks again for sharing your use case. Depending on your time frame and your response, we might be able to do something about it within the next development iteration.

With kind regards,
Andreas.

[1] https://getkotori.org/docs/handbook/acquisition/timestamp.html
[2] https://getkotori.org/docs/search.html?q=timestamp
[3] https://getkotori.org/docs/faq/payload-format.html
[4] https://getkotori.org/docs/handbook/decoders/tasmota.html

@valentinbarral
Copy link
Author

Hello,

thank you for your extensive response. I hadn't seen the option of using my own timestamp in every measure, maybe with that I could do something (extracting each measure from the original message and sending it individually to Kotori).

The format of the measures we use is simply intended to reduce the size of the messages. We are working with a device that must read a large amount of data from a vehicle and transmit it using a limited number of bytes per message. With this format we avoid sending a "time" key or similar in each value, which allows us to save a lot of space even keeping the values readable by humans.

After reading your comment about a future implementation of bulk readings I think there would be another problem in our case. Although in the example I gave the two measurements were the same (temperature), actually each of the individual measurements may include different values (e.g. one is a temperature value but the other is a humidity value).

I understand that this is a quite particular case, so I don't think that introducing it as a functionality in Kotori could be something that the community could take advantage of. It would be, in my opinion, the fact of having an API or mechanism to create a custom decoder. This way we could process our "special" message and generate an array of values in the format expected by Kotori.

For now I think I will prepare a mqtt subscriber to extract the measurements of each message and publish them again in other different topics for Kotori to process. As in our case it is not necessary to work in real time, the small delay that could be introduced with this pre-processing will not affect us.

Thanks again for your answers and for keeping the project active.
Greetings.

@amotl amotl changed the title Custom timestamp? Submitting bulk measurements with timestamps Jan 20, 2021
@amotl
Copy link
Member

amotl commented Jan 20, 2021

Hi Valentin,

The format of the measures we use is simply intended to reduce the size of the messages. With this format we avoid sending a "time" key or similar in each value, which allows us to save a lot of space even keeping the values readable by humans.

I see. Thanks for your insights. Maybe you shouldn't be using JSON at all if you have hard requirements on message sizes? However, I well recognize the feature of human readability of telemetry packets on the wire. We tried to balance similar aspects when conceiving BERadio the other day and, well, while it's still ASCII, it's not that much human readable in comparison to JSON. We even wrote up a summary about serialization efficiency wrt. payload size of different marshalling formats.

So, I hear you.

I understand that this is a quite particular case, so I don't think that introducing it as a functionality in Kotori could be something that the community could take advantage of. [...] For now I think I will prepare a MQTT subscriber to extract the measurements of each message and [re]publish them [to another topic to make Kotori pick it up].

That would be an option, but you would have to run another software component side by side. I am absolutely not against supporting specific use cases for users of Kotori (you! ;]) - so thanks for outlining your scenario.

It would be, in my opinion, the fact of having an API or mechanism to create a custom decoder.

Exactly. However, currently even the Tasmota decoder mentioned above is built-in and the payload format will be detected by applying some heuristics. As long as the number of different payload formats will not be overwhelmingly high, we might well keep it like that. As soon as there is a need for it, we will think about implementing a real plugin/extension subsystem for being able to hook custom external decoders into the stream.

After reading your comment about a future implementation of bulk readings I think there would be another problem in our case. Although in the example I gave the two measurements were the same (temperature), actually each of the individual measurements may include different values (e.g. one is a temperature value but the other is a humidity value).

I am not sure I am following here. Having readings of multiple sensors within individual measurement containers is well supported and this is what we actually already recommend when ingesting single (non-bulk) measurements. The rationale behind this is that all those single readings will have the same timestamp when written to the database.

So, unless I am getting something wrong on this detail, everything would be still perfectly aligned on this matter. Please clarify otherwise.

With kind regards,
Andreas.

@amotl
Copy link
Member

amotl commented Jan 21, 2021

Hi Valentin,

#41 integrates a decoder like you are looking at. It is no hassle for Kotori to carry and serve this and will save you from having to perform any republishing operations.

I don't know how you are currently operating Kotori. Since porting it to Python 3 just recently, we have not been able to publish any recent distribution packages for Debian or images for Docker. If you are lucky, just whip it up within a development sandbox as outlined on [1] by switching to the bulk-json-compact branch like:

git clone --branch bulk-json-compact https://github.com/daq-tools/kotori

Remember to publish your data payloads to the /tc.json suffix as outlined at #41. Please let me know if that works for you.

With kind regards,
Andreas.

[1] https://getkotori.org/docs/setup/sandbox.html


Edit: There's also another way to install Kotori on behalf of this branch without having to clone from Git. Using a recent version of pip, this should also work well:

python3 -m venv .venv
source .venv/bin/activate
pip install --editable="git+https://github.com/daq-tools/kotori.git@bulk-json-compact#egg=kotori[daq,export]"

@valentinbarral
Copy link
Author

Hi,

first of all, thank you very much for implementing the 'bulk-json-compact' format, it's perfect for my case. However I'm having trouble getting it to work (probably I have some kind of error in the sandbox setup, but I think I followed the instructions correctly...)

First I cloned the branch git clone --branch bulk-json-compact https://github.com/daq-tools/kotori as you indicated and then followed the instructions in https://getkotori.org/docs/setup/sandbox.html. When launching the command make setup-virtualenv I replaced it with make virtualenv-dev, as I saw that it launched the other instruction and installed the extra modules.

Finally I launch Kotori with kotori --config etc/development.ini --debug. I don't see any errors in the console, which stays at

2021-01-22T18:55:07+0100 [kotori.daq.services.mig            ] INFO    : [mqttkit-1   ] transactions: 0.00 tps
2021-01-22T18:55:07+0100 [kotori.daq.services.mig            ] INFO    : [basic       ] transactions: 0.00 tps
2021-01-22T18:55:07+0100 [kotori.daq.services.mig            ] INFO    : [weewx       ] transactions: 0.00 tps
2021-01-22T18:55:07+0100 [kotori.daq.services.mig            ] INFO    : [hiveeyes    ] transactions: 0.00 tps

On the side I launch my measures generator to publish in the topic 359779080127729/test/test2/1/tc.json. The measures seem to be arriving fine, since if I subscribe to that topic I get:

➜  kotori git:(bulk-json-compact) mosquitto_sub -h 127.0.0.1 -p 1883 -t '359779080127729/test/test2/1/tc.json'
{"1611337127":{"Vehicle_Speed":0,"BMS_VDC":376.47058823040004,"BMS_Power":1.6383978618250694e-8,"BMSCurrent":0.0003583999999818843,"BMS_SOC":94.921875},"1611337128":{"Vehicle_Speed":0,"BMS_VDC":376.47058823040004,"BMS_Power":1.6383978618250694e-8,"BMSCurrent":0.0003583999999818843,"BMS_SOC":94.921875}}

However in Kotori's log I don't see anything, and neither in Grafana (which is working because when I make any query I see it in the corresponding log). I've checked and the access data to Grafana are correct in etc/development.ini.

As I say, I'm probably missing something in the Python environment part, but I've already gone around a few times and I can't find the problem.

Any ideas?

Regards.

@amotl
Copy link
Member

amotl commented Jan 22, 2021

Hi Valentin,

thanks for your efforts trying out the new feature. I hope that we will get this going on your end and will take the chance to explain some details how Kotori and its configuration works. While some aspects might already be obvious to you, there might still be things to learn. I hope you appreciate it.

General information

Within the etc/development.ini shipped with the sandbox setup, you can see that four realms are defined: mqttkit-1, basic, weewx and hiveeyes. The latter two are vendor-specific configurations and the former two are generic ones.

  • mqttkit-1 defines a WAN-like addressing scheme comprised of a quadruple of realm / network / gateway / node. Because this style was actually the first one implemented, you will find respective examples throughout the documentation.
  • basic defines are more compact/narrow LAN-like addressing scheme comprised of just realm / node. We added this later because for some use cases, the WAN style might be overkill. However, this style of addressing is not well reflected within the documentation yet. Nevertheless, I wanted to tell you about it because it might also fit your needs better as you can save some more bytes by reducing the width of the channel address.

Those schemes implement the channel address and will get applied to both the MQTT topic and the HTTP URI. While the first addressing component realm is always fixed through the respective configuration snippet, all other address components can be freely defined to the convenience of the user. In this manner, Kotori makes it easy to submit data on different channels without having to provision them beforehand in any way.

Your scenario

So, you might be able to see Kotori pick up those measurement submissions successfully by publishing to MQTT topics like:

  • basic/node-42/tc.json
  • mqttkit-1/network-xxx/gateway-yyy/node-zzz/tc.json

With the current configuration you are running, Kotori probably completely ignores all submissions to the 359779080127729 realm (I've picked this up from your example), as it doesn't know anything about it.

Working example

Assuming you are using a POSIX shell, a full publishing example based on your scenario would be

# Define channel and data.
CHANNEL=basic/node-42
DATA='{"1611337127":{"Vehicle_Speed":0,"BMS_VDC":376.47058823040004,"BMS_Power":1.6383978618250694e-8,"BMSCurrent":0.0003583999999818843,"BMS_SOC":94.921875},"1611337128":{"Vehicle_Speed":0,"BMS_VDC":376.47058823040004,"BMS_Power":1.6383978618250694e-8,"BMSCurrent":0.0003583999999818843,"BMS_SOC":94.921875}}'

# Submit measurement.
echo "$DATA" | mosquitto_pub -h localhost -t "$CHANNEL/tc.json" -l

If that is successful, the kotori.log should contain something along the lines of

kotori.log: tc.json example
2021-01-22T19:55:06+0100 [kotori.daq.services.mig            ] DEBUG   : Processing message on topic 'basic/node-42/tc.json' with payload '{"1611337127":{"Vehicle_Speed":0,"BMS_VDC":376.47058823040004,"BMS_Power":1.6383978618250694e-8,"BMSCurrent":0.0003583999999818843,"BMS_SOC":94.921875},"1611337128":{"Vehicle_Speed":0,"BMS_VDC":376.47058823040004,"BMS_Power":1.6383978618250694e-8,"BMSCurrent":0.0003583999999818843,"BMS_SOC":94.921875}}'
2021-01-22T19:55:06+0100 [kotori.daq.services.mig            ] DEBUG   : Topology address: {'realm': 'basic', 'node': 'node-42', 'slot': 'tc.json'}
2021-01-22T19:55:06+0100 [kotori.daq.services.mig            ] DEBUG   : Storage location: {'realm': 'basic', 'node': 'node-42', 'slot': 'tc.json', 'label': 'node_42', 'database': 'basic_node_42', 'measurement': 'sensors', 'measurement_events': 'events'}
2021-01-22T19:55:06+0100 [kotori.daq.storage.influx          ] INFO    : Creating database "basic_node_42"
2021-01-22T19:55:07+0100 [kotori.daq.storage.influx          ] DEBUG   : Storage success: {'measurement': 'sensors', 'tags': {}, 'time_precision': 's', 'time': 1611337127, 'fields': {'Vehicle_Speed': 0.0, 'BMS_VDC': 376.47058823040004, 'BMS_Power': 1.6383978618250694e-08, 'BMSCurrent': 0.0003583999999818843, 'BMS_SOC': 94.921875}}
2021-01-22T19:55:07+0100 [kotori.daq.storage.influx          ] DEBUG   : Storage success: {'measurement': 'sensors', 'tags': {}, 'time_precision': 's', 'time': 1611337128, 'fields': {'Vehicle_Speed': 0.0, 'BMS_VDC': 376.47058823040004, 'BMS_Power': 1.6383978618250694e-08, 'BMSCurrent': 0.0003583999999818843, 'BMS_SOC': 94.921875}}
2021-01-22T19:55:07+0100 [kotori.daq.services.mig            ] DEBUG   : Provisioning Grafana with GrafanaManager
2021-01-22T19:55:07+0100 [kotori.daq.graphing.grafana.manager] INFO    : Provisioning Grafana dashboard "basic-node-42" for database "basic_node_42" and measurement "sensors"
2021-01-22T19:55:07+0100 [kotori.daq.graphing.grafana.api    ] INFO    : Checking/Creating datasource "basic_node_42"
2021-01-22T19:55:07+0100 [kotori.daq.graphing.grafana.api    ] INFO    : response: {'datasource': {'id': 182, 'uid': 'eXPho8BMz', 'orgId': 1, 'name': 'basic_node_42', 'type': 'influxdb', 'typeLogoUrl': '', 'access': 'proxy', 'url': 'http://localhost:8086/', 'password': 'root', 'user': 'root', 'database': 'basic_node_42', 'basicAuth': False, 'basicAuthUser': '', 'basicAuthPassword': '', 'withCredentials': False, 'isDefault': False, 'jsonData': {}, 'secureJsonFields': {}, 'version': 1, 'readOnly': False}, 'id': 182, 'message': 'Datasource added', 'name': 'basic_node_42'}
2021-01-22T19:55:07+0100 [kotori.daq.graphing.grafana.api    ] INFO    : Get folder with uid="instagraf"
2021-01-22T19:55:07+0100 [kotori.daq.graphing.grafana.api    ] INFO    : Getting dashboard "basic-node-42"
2021-01-22T19:55:07+0100 [kotori.daq.graphing.grafana.api    ] INFO    : Client Error 404: {"message":"Dashboard not found"}
2021-01-22T19:55:07+0100 [kotori.daq.graphing.grafana.api    ] INFO    : Creating/updating dashboard "basic-node-42"
2021-01-22T19:55:07+0100 [kotori.daq.graphing.grafana.api    ] INFO    : Grafana response: {'id': 222, 'slug': 'basic-node-42', 'status': 'success', 'uid': 'hzyhT8fMk', 'url': '/d/hzyhT8fMk/basic-node-42', 'version': 1}
2021-01-22T19:55:07+0100 [kotori.daq.graphing.grafana.api    ] INFO    : Checking dashboard "basic-node-42"

Can you give this a spin and then report back to me about the outcome?

With kind regards,
Andreas.


P.S.: If you are successful with this, while still running in development/sandbox mode (but without --debug, in order to reduce logging overhead), you might want to reconfigure metrics_logger_interval = 2 in order to get better reporting about the activity.

# How often to log metrics
metrics_logger_interval = 60

kotori.log: transactions per second
2021-01-22T20:00:52+0100 [kotori.daq.services.mig            ] INFO    : [basic       ] transactions: 2.49 tps
2021-01-22T20:00:54+0100 [kotori.daq.services.mig            ] INFO    : [basic       ] transactions: 2.00 tps
2021-01-22T20:01:02+0100 [kotori.daq.services.mig            ] INFO    : [basic       ] transactions: 1.00 tps
2021-01-22T20:01:04+0100 [kotori.daq.services.mig            ] INFO    : [basic       ] transactions: 2.50 tps

@valentinbarral
Copy link
Author

Hi,

perfect! the problem was indeed in the MQTT topic. Now I understand the nomenclature you use and how the path should be to work correctly. I've made a quick test with one of the topics you put in your last answer and now I can see the measures in Grafana, using the "bulk-json-compact" format:

basic-node-42

This will allow us to make great progress in our project, since until now we needed to export the data to a specific tool in order to analyze it. With your solution we can now see the values in real time without any effort.

Again, I can't thank you enough for your work with the project and with this issue in particular.

Best regards.
(P.S. You can close the issue if you consider so).

@amotl
Copy link
Member

amotl commented Jan 23, 2021

Hi Valentin,

Until now we needed to export the data to a specific tool in order to analyze it. With your solution we can now see the values in real time without any effort.

That is really important for effortless data visualization in scenarios like yours. We are glad that we have been able to help.

Let's keep this issue open until the wip branches have been integrated into mainline. While I believe the functionality is fine already, it still lacks documentation. Until this is added, we might still use this issue for discussing some details around it.

I wish you much success with your project.

With kind regards,
Andreas.

@amotl amotl changed the title Submitting bulk measurements with timestamps Submitting bulk measurements with timestamps in special "bulk-json-compact" format Jan 23, 2021
@amotl amotl changed the title Submitting bulk measurements with timestamps in special "bulk-json-compact" format Submit bulk measurements with timestamps using specialized "bulk-json-compact" format Jan 23, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants