Skip to content

Commit

Permalink
WIP try to force it to store byte arrays
Browse files Browse the repository at this point in the history
The trouble is github.com/ferristseng/rust-ipfs-api does not offer
dag_put with cbor: ferristseng/rust-ipfs-api#63
only JSON; and serde_json won't let us sneak a byte array into a string
value either, because Rust expects strings to always be valid UTF-8.

It would probably be a dead end anyway because of
ipfs/kubo#4313 : there's no way to
read back CBOR data via the http API, so I couldn't write the select
function.
  • Loading branch information
ec1oud committed Dec 15, 2020
1 parent 10dd356 commit f37f850
Show file tree
Hide file tree
Showing 5 changed files with 212 additions and 52 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ futures = "0.3"
serde = "1.0"
serde_json = "1.0"
clap = "~2.27.0"
bytes = "0.4.12"
77 changes: 77 additions & 0 deletions qt-schema-to-cbor/main.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#include <QCoreApplication>
#include <QCborArray>
#include <QCborMap>
#include <QCborValue>
#include <QDebug>
#include <QFile>
#include <QJsonDocument>
#include <QJsonObject>

static const QLatin1String TypeKey("type");
static const QLatin1String ValuesKey("values");

/*!
A utility to create a "head record" of the form
{
"_timestamp": {
"type": "u64",
"values": b""
},
"numeric_field": {
"type": "f32"
"values": b""
}
}
where b"" represents a byte array, NOT a string.
(This can be represented in CBOR but not in JSON.)
It can be inserted as a DAG node like this:
$ ipfs dag put --input-enc cbor headRecord.cbor
However, updating it is difficult, because of
https://github.com/ipfs/go-ipfs/issues/4313 :
the IPFS HTTP API doesn't provide a way to read it back
in CBOR format, only as JSON, but it can't be correctly
represented in JSON.
*/
int main(int argc, char *argv[])
{
if (argc < 2)
qFatal("required argument: file.json");

QCoreApplication a(argc, argv);

QFile j(a.arguments().last());
if (!j.open(QIODevice::ReadOnly)) {
qFatal("couldn't open input file");
}

QJsonDocument jd = QJsonDocument::fromJson(j.readAll());
QCborMap headRecord;

Q_ASSERT(jd.isObject());
auto jdo = jd.object();
auto it = jdo.constBegin();
while (it != jdo.constEnd()) {
Q_ASSERT(it.value().isObject());
auto kv = it.value().toObject();
qDebug() << it.key() << kv;
Q_ASSERT(kv.contains(TypeKey));
QCborMap field;
field.insert(TypeKey, kv.value(TypeKey).toString());
field.insert(ValuesKey, QByteArray());
// field.insert(ValuesKey, QByteArray::fromHex("9f018202039f0405ffff"));
headRecord.insert(it.key(), field);
++it;
}

qDebug() << headRecord;

QFile f("headRecord.cbor");
if (!f.open(QIODevice::WriteOnly)) {
qWarning("Couldn't write file.");
return -1;
}

f.write(headRecord.toCborValue().toCbor());
f.close();
}
16 changes: 16 additions & 0 deletions qt-schema-to-cbor/schema-to-cbor.pro
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
QT -= gui

CONFIG += c++11 console
CONFIG -= app_bundle

# You can make your code fail to compile if it uses deprecated APIs.
# In order to do so, uncomment the following line.
#DEFINES += QT_DISABLE_DEPRECATED_BEFORE=0x060000 # disables all the APIs deprecated before Qt 6.0.0

SOURCES += \
main.cpp

# Default rules for deployment.
qnx: target.path = /tmp/$${TARGET}/bin
else: unix:!android: target.path = /opt/$${TARGET}/bin
!isEmpty(target.path): INSTALLS += target
132 changes: 80 additions & 52 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use bytes::BufMut;
use clap::{App, Arg};
use futures::TryStreamExt;
use ipfs_api::IpfsClient;
Expand All @@ -12,15 +13,32 @@ type JsonMap = HashMap<String, serde_json::Value>;
static mut VERBOSITY: u64 = 0;

#[tokio::main]
async fn insert_from_json<R: io::Read>(ipnskey: &str, rdr: R) -> String {
async fn create<R: io::Read>(ipnskey: &str, schema_rdr: R) -> String {
let begintime = SystemTime::now();
let verbosity = unsafe { VERBOSITY };
let unixtime = serde_json::json!(
match SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) {
Ok(n) => n.as_secs(),
Err(_) => 0, // before 1970?!?

let schema: JsonMap = serde_json::from_reader(schema_rdr).unwrap();
if verbosity > 1 {
println!("given schema {:?}", schema);
}

for (key, value) in schema.iter() {
if verbosity > 1 {
println!("{:?} {:?}", key, value);
}
);
}

return "".to_string();
}

#[tokio::main]
async fn insert_from_json<R: io::Read>(ipnskey: &str, rdr: R) -> String {
let begintime = SystemTime::now();
let verbosity = unsafe { VERBOSITY };
let unixtime: u64 = match SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) {
Ok(n) => n.as_secs(),
Err(_) => 0, // before 1970?!?
};

let json_data: JsonMap = serde_json::from_reader(rdr).unwrap();
if verbosity > 1 {
Expand Down Expand Up @@ -62,16 +80,24 @@ async fn insert_from_json<R: io::Read>(ipnskey: &str, rdr: R) -> String {
}
let mut existing: JsonMap = serde_json::from_slice(&bytes).unwrap();
for (key, value) in existing.iter_mut() {
let new_value: &serde_json::Value = match key.as_str() {
"_timestamp" => &unixtime,
_ => json_data.get(key).unwrap(),
let values_entry = value.get("values").unwrap();
let mut buf = vec![];
buf.put(values_entry.as_str().unwrap());
match key.as_str() {
"_timestamp" => buf.put_u64_le(unixtime),
_ => {
//~ let v: f32 = json_data.get(key).unwrap().as_f64().unwrap() as f32;
let v = json_data.get(key).unwrap().as_f64().unwrap();
buf.put_f64_le(v);
}
};
if verbosity > 2 {
println!("{:?} {:?} <- {:?}", key, value, new_value);
}
let vec = value.as_array_mut().unwrap();
vec.push(new_value.clone());
let s = unsafe { String::from_utf8_unchecked(buf) };
value
.as_object_mut()
.unwrap()
.insert("values".to_string(), serde_json::json!(s));
}

let cursor = io::Cursor::new(serde_json::json!(existing).to_string());
let response = client.dag_put(cursor).await.expect("dag_put error");
let cid = response.cid.cid_string;
Expand All @@ -85,42 +111,43 @@ async fn insert_from_json<R: io::Read>(ipnskey: &str, rdr: R) -> String {
begintime.elapsed().unwrap().as_millis()
);
}
client.pin_add(&cid, false).await.expect("pin error");
if verbosity > 1 {
println!(
"pinned @ {} ms",
begintime.elapsed().unwrap().as_millis()
);
}
let _ = client.pin_rm(&resolved.path, false).await;
if verbosity > 1 {
println!(
"unpinned old @ {} ms",
begintime.elapsed().unwrap().as_millis()
);
}
client
.block_rm(&resolved.path)
.await
.expect("error removing last");
if verbosity > 1 {
println!(
"block_rm(old) done @ {} ms",
begintime.elapsed().unwrap().as_millis()
);
}
client
.name_publish(&cid, false, Some("12h"), None, Some(ipnskey))
.await
.expect("error publishing name");
if verbosity > 1 {
println!(
"published {} @ {} ms",
&cid,
begintime.elapsed().unwrap().as_millis()
);
}
return cid;
return "".to_string();
//~ client.pin_add(&cid, false).await.expect("pin error");
//~ if verbosity > 1 {
//~ println!(
//~ "pinned @ {} ms",
//~ begintime.elapsed().unwrap().as_millis()
//~ );
//~ }
//~ let _ = client.pin_rm(&resolved.path, false).await;
//~ if verbosity > 1 {
//~ println!(
//~ "unpinned old @ {} ms",
//~ begintime.elapsed().unwrap().as_millis()
//~ );
//~ }
//~ client
//~ .block_rm(&resolved.path)
//~ .await
//~ .expect("error removing last");
//~ if verbosity > 1 {
//~ println!(
//~ "block_rm(old) done @ {} ms",
//~ begintime.elapsed().unwrap().as_millis()
//~ );
//~ }
//~ client
//~ .name_publish(&cid, false, Some("12h"), None, Some(ipnskey))
//~ .await
//~ .expect("error publishing name");
//~ if verbosity > 1 {
//~ println!(
//~ "published {} @ {} ms",
//~ &cid,
//~ begintime.elapsed().unwrap().as_millis()
//~ );
//~ }
//~ return cid;
}
Err(e) => {
eprintln!("error reading dag node: {}", e);
Expand Down Expand Up @@ -166,7 +193,7 @@ fn main() {
)
.arg(
Arg::with_name("query")
.help("insert or select")
.help("create, insert or select")
.required(true)
.index(2),
)
Expand All @@ -193,7 +220,8 @@ fn main() {

let _result = match matches.value_of("query").unwrap() {
"insert" => insert_from_json(key, io::stdin()),
_ => "only insert is supported so far".to_string(),
"create" => create(key, io::stdin()),
_ => "only create and insert are supported so far".to_string(),
};
//~ println!("{}", result);
}
38 changes: 38 additions & 0 deletions uradmonitor-schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
{
"_timestamp": {
"type": "u64"
},
"ch2o": {
"type": "f32"
},
"co2": {
"type": "u16"
},
"cpm":{
"type": "u16"
},
"humidity": {
"type": "f32"
},
"noise":{
"type": "f32"
},
"pm25":{
"type": "u16"
},
"pressure": {
"type": "u32"
},
"temperature": {
"type": "f32"
},
"uptime": {
"type": "u32"
},
"voc": {
"type": "u32"
},
"voltage": {
"type": "u16"
}
}

0 comments on commit f37f850

Please sign in to comment.