From f37f850a698a976bfc89595b79bb5792949b62af Mon Sep 17 00:00:00 2001 From: Shawn Rutledge Date: Tue, 15 Dec 2020 15:49:57 +0100 Subject: [PATCH] WIP try to force it to store byte arrays The trouble is github.com/ferristseng/rust-ipfs-api does not offer dag_put with cbor: https://github.com/ferristseng/rust-ipfs-api/issues/63 only JSON; and serde_json won't let us sneak a byte array into a string value either, because Rust expects strings to always be valid UTF-8. It would probably be a dead end anyway because of https://github.com/ipfs/go-ipfs/issues/4313 : there's no way to read back CBOR data via the http API, so I couldn't write the select function. --- Cargo.toml | 1 + qt-schema-to-cbor/main.cpp | 77 ++++++++++++++++ qt-schema-to-cbor/schema-to-cbor.pro | 16 ++++ src/main.rs | 132 ++++++++++++++++----------- uradmonitor-schema.json | 38 ++++++++ 5 files changed, 212 insertions(+), 52 deletions(-) create mode 100644 qt-schema-to-cbor/main.cpp create mode 100644 qt-schema-to-cbor/schema-to-cbor.pro create mode 100644 uradmonitor-schema.json diff --git a/Cargo.toml b/Cargo.toml index b7477df..88c0e45 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,3 +13,4 @@ futures = "0.3" serde = "1.0" serde_json = "1.0" clap = "~2.27.0" +bytes = "0.4.12" diff --git a/qt-schema-to-cbor/main.cpp b/qt-schema-to-cbor/main.cpp new file mode 100644 index 0000000..ccb4d9f --- /dev/null +++ b/qt-schema-to-cbor/main.cpp @@ -0,0 +1,77 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +static const QLatin1String TypeKey("type"); +static const QLatin1String ValuesKey("values"); + +/*! + A utility to create a "head record" of the form + { + "_timestamp": { + "type": "u64", + "values": b"" + }, + "numeric_field": { + "type": "f32" + "values": b"" + } + } + + where b"" represents a byte array, NOT a string. + (This can be represented in CBOR but not in JSON.) + It can be inserted as a DAG node like this: + $ ipfs dag put --input-enc cbor headRecord.cbor + However, updating it is difficult, because of + https://github.com/ipfs/go-ipfs/issues/4313 : + the IPFS HTTP API doesn't provide a way to read it back + in CBOR format, only as JSON, but it can't be correctly + represented in JSON. +*/ +int main(int argc, char *argv[]) +{ + if (argc < 2) + qFatal("required argument: file.json"); + + QCoreApplication a(argc, argv); + + QFile j(a.arguments().last()); + if (!j.open(QIODevice::ReadOnly)) { + qFatal("couldn't open input file"); + } + + QJsonDocument jd = QJsonDocument::fromJson(j.readAll()); + QCborMap headRecord; + + Q_ASSERT(jd.isObject()); + auto jdo = jd.object(); + auto it = jdo.constBegin(); + while (it != jdo.constEnd()) { + Q_ASSERT(it.value().isObject()); + auto kv = it.value().toObject(); + qDebug() << it.key() << kv; + Q_ASSERT(kv.contains(TypeKey)); + QCborMap field; + field.insert(TypeKey, kv.value(TypeKey).toString()); + field.insert(ValuesKey, QByteArray()); +// field.insert(ValuesKey, QByteArray::fromHex("9f018202039f0405ffff")); + headRecord.insert(it.key(), field); + ++it; + } + + qDebug() << headRecord; + + QFile f("headRecord.cbor"); + if (!f.open(QIODevice::WriteOnly)) { + qWarning("Couldn't write file."); + return -1; + } + + f.write(headRecord.toCborValue().toCbor()); + f.close(); +} diff --git a/qt-schema-to-cbor/schema-to-cbor.pro b/qt-schema-to-cbor/schema-to-cbor.pro new file mode 100644 index 0000000..197e5fc --- /dev/null +++ b/qt-schema-to-cbor/schema-to-cbor.pro @@ -0,0 +1,16 @@ +QT -= gui + +CONFIG += c++11 console +CONFIG -= app_bundle + +# You can make your code fail to compile if it uses deprecated APIs. +# In order to do so, uncomment the following line. +#DEFINES += QT_DISABLE_DEPRECATED_BEFORE=0x060000 # disables all the APIs deprecated before Qt 6.0.0 + +SOURCES += \ + main.cpp + +# Default rules for deployment. +qnx: target.path = /tmp/$${TARGET}/bin +else: unix:!android: target.path = /opt/$${TARGET}/bin +!isEmpty(target.path): INSTALLS += target diff --git a/src/main.rs b/src/main.rs index 08997f8..327e9fa 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,4 @@ +use bytes::BufMut; use clap::{App, Arg}; use futures::TryStreamExt; use ipfs_api::IpfsClient; @@ -12,15 +13,32 @@ type JsonMap = HashMap; static mut VERBOSITY: u64 = 0; #[tokio::main] -async fn insert_from_json(ipnskey: &str, rdr: R) -> String { +async fn create(ipnskey: &str, schema_rdr: R) -> String { let begintime = SystemTime::now(); let verbosity = unsafe { VERBOSITY }; - let unixtime = serde_json::json!( - match SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) { - Ok(n) => n.as_secs(), - Err(_) => 0, // before 1970?!? + + let schema: JsonMap = serde_json::from_reader(schema_rdr).unwrap(); + if verbosity > 1 { + println!("given schema {:?}", schema); + } + + for (key, value) in schema.iter() { + if verbosity > 1 { + println!("{:?} {:?}", key, value); } - ); + } + + return "".to_string(); +} + +#[tokio::main] +async fn insert_from_json(ipnskey: &str, rdr: R) -> String { + let begintime = SystemTime::now(); + let verbosity = unsafe { VERBOSITY }; + let unixtime: u64 = match SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) { + Ok(n) => n.as_secs(), + Err(_) => 0, // before 1970?!? + }; let json_data: JsonMap = serde_json::from_reader(rdr).unwrap(); if verbosity > 1 { @@ -62,16 +80,24 @@ async fn insert_from_json(ipnskey: &str, rdr: R) -> String { } let mut existing: JsonMap = serde_json::from_slice(&bytes).unwrap(); for (key, value) in existing.iter_mut() { - let new_value: &serde_json::Value = match key.as_str() { - "_timestamp" => &unixtime, - _ => json_data.get(key).unwrap(), + let values_entry = value.get("values").unwrap(); + let mut buf = vec![]; + buf.put(values_entry.as_str().unwrap()); + match key.as_str() { + "_timestamp" => buf.put_u64_le(unixtime), + _ => { + //~ let v: f32 = json_data.get(key).unwrap().as_f64().unwrap() as f32; + let v = json_data.get(key).unwrap().as_f64().unwrap(); + buf.put_f64_le(v); + } }; - if verbosity > 2 { - println!("{:?} {:?} <- {:?}", key, value, new_value); - } - let vec = value.as_array_mut().unwrap(); - vec.push(new_value.clone()); + let s = unsafe { String::from_utf8_unchecked(buf) }; + value + .as_object_mut() + .unwrap() + .insert("values".to_string(), serde_json::json!(s)); } + let cursor = io::Cursor::new(serde_json::json!(existing).to_string()); let response = client.dag_put(cursor).await.expect("dag_put error"); let cid = response.cid.cid_string; @@ -85,42 +111,43 @@ async fn insert_from_json(ipnskey: &str, rdr: R) -> String { begintime.elapsed().unwrap().as_millis() ); } - client.pin_add(&cid, false).await.expect("pin error"); - if verbosity > 1 { - println!( - "pinned @ {} ms", - begintime.elapsed().unwrap().as_millis() - ); - } - let _ = client.pin_rm(&resolved.path, false).await; - if verbosity > 1 { - println!( - "unpinned old @ {} ms", - begintime.elapsed().unwrap().as_millis() - ); - } - client - .block_rm(&resolved.path) - .await - .expect("error removing last"); - if verbosity > 1 { - println!( - "block_rm(old) done @ {} ms", - begintime.elapsed().unwrap().as_millis() - ); - } - client - .name_publish(&cid, false, Some("12h"), None, Some(ipnskey)) - .await - .expect("error publishing name"); - if verbosity > 1 { - println!( - "published {} @ {} ms", - &cid, - begintime.elapsed().unwrap().as_millis() - ); - } - return cid; + return "".to_string(); + //~ client.pin_add(&cid, false).await.expect("pin error"); + //~ if verbosity > 1 { + //~ println!( + //~ "pinned @ {} ms", + //~ begintime.elapsed().unwrap().as_millis() + //~ ); + //~ } + //~ let _ = client.pin_rm(&resolved.path, false).await; + //~ if verbosity > 1 { + //~ println!( + //~ "unpinned old @ {} ms", + //~ begintime.elapsed().unwrap().as_millis() + //~ ); + //~ } + //~ client + //~ .block_rm(&resolved.path) + //~ .await + //~ .expect("error removing last"); + //~ if verbosity > 1 { + //~ println!( + //~ "block_rm(old) done @ {} ms", + //~ begintime.elapsed().unwrap().as_millis() + //~ ); + //~ } + //~ client + //~ .name_publish(&cid, false, Some("12h"), None, Some(ipnskey)) + //~ .await + //~ .expect("error publishing name"); + //~ if verbosity > 1 { + //~ println!( + //~ "published {} @ {} ms", + //~ &cid, + //~ begintime.elapsed().unwrap().as_millis() + //~ ); + //~ } + //~ return cid; } Err(e) => { eprintln!("error reading dag node: {}", e); @@ -166,7 +193,7 @@ fn main() { ) .arg( Arg::with_name("query") - .help("insert or select") + .help("create, insert or select") .required(true) .index(2), ) @@ -193,7 +220,8 @@ fn main() { let _result = match matches.value_of("query").unwrap() { "insert" => insert_from_json(key, io::stdin()), - _ => "only insert is supported so far".to_string(), + "create" => create(key, io::stdin()), + _ => "only create and insert are supported so far".to_string(), }; //~ println!("{}", result); } diff --git a/uradmonitor-schema.json b/uradmonitor-schema.json new file mode 100644 index 0000000..d00f451 --- /dev/null +++ b/uradmonitor-schema.json @@ -0,0 +1,38 @@ +{ + "_timestamp": { + "type": "u64" + }, + "ch2o": { + "type": "f32" + }, + "co2": { + "type": "u16" + }, + "cpm":{ + "type": "u16" + }, + "humidity": { + "type": "f32" + }, + "noise":{ + "type": "f32" + }, + "pm25":{ + "type": "u16" + }, + "pressure": { + "type": "u32" + }, + "temperature": { + "type": "f32" + }, + "uptime": { + "type": "u32" + }, + "voc": { + "type": "u32" + }, + "voltage": { + "type": "u16" + } +}