Skip to content

Commit

Permalink
Merge pull request #202 from tobiasschuerg/feat-precision-only
Browse files Browse the repository at this point in the history
Feat: allow sending precision only
  • Loading branch information
vlastahajek authored Oct 14, 2022
2 parents 64e5e3e + b475961 commit d430851
Show file tree
Hide file tree
Showing 11 changed files with 147 additions and 28 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog
## unreleased
### Features
- [202](https://github.com/tobiasschuerg/InfluxDB-Client-for-Arduino/pull/202) - Added option to specify timestamp precision and do not send timestamp. Set using `WriteOption::useServerTimestamptrue)`.

### Fixes
- [200](https://github.com/tobiasschuerg/InfluxDB-Client-for-Arduino/pull/200) - Backward compatible compilation. Solves _marked 'override', but does not override_ errors.

Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ The client has to be configured with a time precision. The default settings is t
client.setWriteOptions(WriteOptions().writePrecision(WritePrecision::MS));
```
When a write precision is configured, the client will automatically assign the current time to the timestamp of each written point which doesn't have a timestamp assigned.
Automated assigning of timestamp can be turned off by using `WriteOption::useServerTimestamp(true)`. Client will still specify a timestamp precision for a server.

If you want to manage timestamp on your own, there are several ways to set the timestamp explicitly.
- `setTime(WritePrecision writePrecision)` - Sets the timestamp to the actual time in the desired precision. The same precision must set in WriteOptions.
Expand Down
25 changes: 16 additions & 9 deletions platformio.ini
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
; CanAirIO Sensorlib
; InfluxDB Arduino Client
;
; Full guide and details: https://github.com/kike-canaries/canairio_sensorlib


[platformio]
src_dir = ./test/
Expand All @@ -11,19 +9,24 @@ framework = arduino
upload_speed = 1500000
monitor_speed = 115200
monitor_filters = time
lib_ignore = WiFiNINA
build_flags =
-D CORE_DEBUG_LEVEL=0
lib_deps =
https://github.com/tobiasschuerg/InfluxDB-Client-for-Arduino.git
lib_extra_dirs =
../

[esp32_common]
platform = espressif32
board = esp32dev
framework = ${env.framework}
upload_speed = ${env.upload_speed}
monitor_speed = ${env.monitor_speed}
lib_deps = ${env.lib_deps}
build_flags =
lib_ignore = ${env.lib_ignore}
lib_extra_dirs = ${env.lib_extra_dirs}
lib_deps =
WiFi
HTTPClient
build_flags =
${env.build_flags}

[esp8266_common]
Expand All @@ -33,8 +36,12 @@ board = esp12e
monitor_speed = ${env.monitor_speed}
build_flags =
${env.build_flags}
lib_deps =
${env.lib_deps}
lib_deps =
ESP8266WiFi
ESP8266HTTPClient
lib_ignore = ${env.lib_ignore}
lib_extra_dirs =
${env.lib_extra_dirs}

[env:esp8266BasicTest]
extends = esp8266_common
Expand Down
3 changes: 2 additions & 1 deletion src/InfluxDbClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ bool InfluxDBClient::setWriteOptions(const WriteOptions & writeOptions) {
_writeOptions._maxRetryInterval = writeOptions._maxRetryInterval;
_writeOptions._maxRetryAttempts = writeOptions._maxRetryAttempts;
_writeOptions._defaultTags = writeOptions._defaultTags;
_writeOptions._useServerTimestamp = writeOptions._useServerTimestamp;
return true;
}

Expand Down Expand Up @@ -531,7 +532,7 @@ void InfluxDBClient::dropCurrentBatch() {
}

String InfluxDBClient::pointToLineProtocol(const Point& point) {
return point.createLineProtocol(_writeOptions._defaultTags);
return point.createLineProtocol(_writeOptions._defaultTags, _writeOptions._useServerTimestamp);
}

bool InfluxDBClient::validateConnection() {
Expand Down
2 changes: 1 addition & 1 deletion src/InfluxDbClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class InfluxDBClient {
// Returns true if setting was successful. Otherwise check getLastErrorMessage() for an error.
// Example:
// client.setHTTPOptions(HTTPOptions().httpReadTimeout(20000)).
bool setHTTPOptions(const HTTPOptions &httpOptions);
bool setHTTPOptions(const HTTPOptions &httpOptions);
// Sets connection parameters for InfluxDB 2
// Must be called before calling any method initiating a connection to server.
// serverUrl - url of the InfluxDB 2 server (e.g. https//localhost:8086)
Expand Down
24 changes: 22 additions & 2 deletions src/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,10 @@ class WriteOptions {
// Maximum count of retry attempts of failed writes, default 3
uint16_t _maxRetryAttempts;
// Default tags. Default tags are added to every written point.
// There cannot be duplicate tags in default tags and tags included in a point.
// There cannot be the same tags in the default tags and among the tags included with a point.
String _defaultTags;
// Let server assign timestamp in given precision. Do not sent timestamp.
bool _useServerTimestamp;
public:
WriteOptions():
_writePrecision(WritePrecision::NoTime),
Expand All @@ -72,17 +74,33 @@ class WriteOptions {
_flushInterval(60),
_retryInterval(5),
_maxRetryInterval(300),
_maxRetryAttempts(3) {
_maxRetryAttempts(3),
_useServerTimestamp(false) {
}
// Sets timestamp precision. If timestamp precision is set, but a point does not have a timestamp, timestamp is automatically assigned from the device clock.
// If useServerTimestamp is set to true, timestamp is not sent, only precision is specified for the server.
WriteOptions& writePrecision(WritePrecision precision) { _writePrecision = precision; return *this; }
// Sets number of points that will be written to the databases at once. Points are added one by one and when number reaches batch size there are sent to server.
WriteOptions& batchSize(uint16_t batchSize) { _batchSize = batchSize; return *this; }
// Sets size of the write buffer to control maximum number of record to keep in case of write failures.
// When max size is reached, oldest records are overwritten.
WriteOptions& bufferSize(uint16_t bufferSize) { _bufferSize = bufferSize; return *this; }
// Sets interval in seconds after whitch points will be written to the db. If
WriteOptions& flushInterval(uint16_t flushIntervalSec) { _flushInterval = flushIntervalSec; return *this; }
// Sets default retry interval in sec. This is used in case of network failure or if server is bussy and doesn't specify retry interval.
// Setting to zero disables retrying.
WriteOptions& retryInterval(uint16_t retryIntervalSec) { _retryInterval = retryIntervalSec; return *this; }
// Sets maximum retry interval in sec.
WriteOptions& maxRetryInterval(uint16_t maxRetryIntervalSec) { _maxRetryInterval = maxRetryIntervalSec; return *this; }
// Sets maximum number of retry attempts of failed writes.
WriteOptions& maxRetryAttempts(uint16_t maxRetryAttempts) { _maxRetryAttempts = maxRetryAttempts; return *this; }
// Adds new default tag. Default tags are added to every written point.
// There cannot be the same tag in the default tags and in the tags included with a point.
WriteOptions& addDefaultTag(const String &name, const String &value);
// Clears default tag list
WriteOptions& clearDefaultTags() { _defaultTags = (char *)nullptr; return *this; }
// If timestamp precision is set and useServerTimestamp is true, timestamp from point is not sent, or assigned.
WriteOptions& useServerTimestamp(bool useServerTimestamp) { _useServerTimestamp = useServerTimestamp; return *this; }
};

/**
Expand All @@ -105,7 +123,9 @@ class HTTPOptions {
_connectionReuse(false),
_httpReadTimeout(5000) {
}
// Set true if HTTP connection should be kept open. Usable for frequent writes.
HTTPOptions& connectionReuse(bool connectionReuse) { _connectionReuse = connectionReuse; return *this; }
// Sets timeout after which HTTP stops reading
HTTPOptions& httpReadTimeout(int httpReadTimeoutMs) { _httpReadTimeout = httpReadTimeoutMs; return *this; }
};

Expand Down
6 changes: 3 additions & 3 deletions src/Point.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ String Point::toLineProtocol(const String &includeTags) const {
return createLineProtocol(includeTags);
}

String Point::createLineProtocol(const String &incTags) const {
String Point::createLineProtocol(const String &incTags, bool excludeTimestamp) const {
String line;
line.reserve(strLen(_data->measurement) + 1 + incTags.length() + 1 + _data->tags.length() + 1 + _data->fields.length() + 1 + strLen(_data->timestamp));
line += _data->measurement;
Expand All @@ -162,14 +162,14 @@ String Point::createLineProtocol(const String &incTags) const {
line += " ";
line += _data->fields;
}
if(hasTime()) {
if(hasTime() && !excludeTimestamp) {
line += " ";
line += _data->timestamp;
}
return line;
}

void Point::setTime(WritePrecision precision) {
void Point::setTime(WritePrecision precision) {
struct timeval tv;
gettimeofday(&tv, NULL);

Expand Down
2 changes: 1 addition & 1 deletion src/Point.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,6 @@ friend class InfluxDBClient;
// set timestamp
void setTime(char *timestamp);
// Creates line protocol string
String createLineProtocol(const String &incTags) const;
String createLineProtocol(const String &incTags, bool excludeTimestamp = false) const;
};
#endif //_POINT_H_
102 changes: 91 additions & 11 deletions test/Test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ void Test::run() {
testFluxTypes();
testFluxTypesSerialization();
testTimestampAdjustment();
testUseServerTimestamp();
testFluxParserEmpty();
testFluxParserSingleTable();
testFluxParserNilValue();
Expand Down Expand Up @@ -109,16 +110,10 @@ void Test::testOptions() {
TEST_ASSERT(defWO._maxRetryInterval == 300);
TEST_ASSERT(defWO._maxRetryAttempts == 3);
TEST_ASSERT(defWO._defaultTags.length() == 0);
TEST_ASSERT(!defWO._useServerTimestamp);

//Test max batch size
// defWO = WriteOptions().batchSize(1<<14);
// #if defined(ESP8266)
// TEST_ASSERT(defWO._batchSize == 255);
// #elif defined(ESP32)
// TEST_ASSERT(defWO._batchSize == 2047);
// #endif

defWO = WriteOptions().writePrecision(WritePrecision::NS).batchSize(32000).bufferSize(20).flushInterval(120).retryInterval(1).maxRetryInterval(20).maxRetryAttempts(5).addDefaultTag("tag1","val1").addDefaultTag("tag2","val2");
defWO = WriteOptions().writePrecision(WritePrecision::NS).batchSize(32000).bufferSize(20).flushInterval(120).retryInterval(1).maxRetryInterval(20).maxRetryAttempts(5).addDefaultTag("tag1","val1").addDefaultTag("tag2","val2").useServerTimestamp(true);
TEST_ASSERT(defWO._writePrecision == WritePrecision::NS);
TEST_ASSERT(defWO._batchSize == 32000);
TEST_ASSERT(defWO._bufferSize == 20);
Expand All @@ -127,6 +122,7 @@ void Test::testOptions() {
TEST_ASSERT(defWO._maxRetryInterval == 20);
TEST_ASSERT(defWO._maxRetryAttempts == 5);
TEST_ASSERT(defWO._defaultTags == "tag1=val1,tag2=val2");
TEST_ASSERT(defWO._useServerTimestamp);

HTTPOptions defHO;
TEST_ASSERT(!defHO._connectionReuse);
Expand Down Expand Up @@ -469,7 +465,9 @@ void Test::testLineProtocol() {
String testLine = "test,tag1=tagvalue fieldInt=-23i,fieldBool=true,fieldFloat1=1.12,fieldFloat2=1.12345,fieldDouble1=1.12,fieldDouble2=1.12345,fieldChar=\"A\",fieldUChar=1i,fieldUInt=23i,fieldLong=123456i,fieldULong=123456i,fieldLongLong=9123456789i,fieldULongLong=9123456789i,fieldString=\"text test\"";
TEST_ASSERTM(line == testLine, line);

client.setWriteOptions(WriteOptions().addDefaultTag("dtag","val"));
auto opts = WriteOptions().addDefaultTag("dtag","val");

client.setWriteOptions(opts);

line = client.pointToLineProtocol(p);
testLine = "test,dtag=val,tag1=tagvalue fieldInt=-23i,fieldBool=true,fieldFloat1=1.12,fieldFloat2=1.12345,fieldDouble1=1.12,fieldDouble2=1.12345,fieldChar=\"A\",fieldUChar=1i,fieldUInt=23i,fieldLong=123456i,fieldULong=123456i,fieldLongLong=9123456789i,fieldULongLong=9123456789i,fieldString=\"text test\"";
Expand Down Expand Up @@ -542,6 +540,12 @@ void Test::testLineProtocol() {
TEST_ASSERT(parts[2].length() == snow.length() + 9);
delete[] parts;

client.setWriteOptions(opts.useServerTimestamp(true));
line = client.pointToLineProtocol(p);
parts = getParts(line, ' ', partsCount);
TEST_ASSERT(partsCount == 2);
delete [] parts;

TEST_END();
}

Expand Down Expand Up @@ -580,14 +584,90 @@ void Test::testBasicFunction() {
TEST_ASSERTM( count == 5, String(count) + " vs 5"); //5 points

// test precision
for (int i = (int)WritePrecision::NoTime; i <= (int)WritePrecision::NS; i++) {
for (uint8_t i = (int)WritePrecision::NoTime; i <= (int)WritePrecision::NS; i++) {
client.setWriteOptions((WritePrecision)i, 1);
Point *p = createPoint("test1");
p->addField("index", i);
TEST_ASSERTM(client.writePoint(*p), String("i=") + i);
delete p;
}


TEST_END();
deleteAll(Test::apiUrl);
}

bool checkLinesParts(InfluxDBClient &client, size_t lineCount, int partCount) {
bool res = false;
do {
String query = "select";
FluxQueryResult q = client.query(query);
TEST_ASSERTM(!q.getError().length(), q.getError());
std::vector<String> lines = getLines(q);
auto count = lines.size();
TEST_ASSERTM( count == lineCount, String(count) + " vs " + String(lineCount));
for(size_t i=0;i<count;i++) {
int partsCount;
String *parts = getParts(lines[i], ',', partsCount);
TEST_ASSERTM(partsCount == partCount, String(i) + ":" + lines[i]);
delete[] parts;
}
res = true;
} while(0);
end:
deleteAll(Test::apiUrl);
return res;
}


void Test::testUseServerTimestamp() {
TEST_INIT("testUseServerTimestamp");

InfluxDBClient client(Test::apiUrl, Test::orgName, Test::bucketName, Test::token);

TEST_ASSERT(waitServer(Test::managementUrl, true));

// test no precision, no timestamp
Point *p = createPoint("test1");
TEST_ASSERT(client.writePoint(*p));

TEST_ASSERT(checkLinesParts(client, 1, 9));

// Test no precision, custom timestamp
auto opts = WriteOptions().batchSize(2);
client.setWriteOptions(opts);

Point *dir = new Point("dir");
dir->addTag("direction", "check-precision");
dir->addTag("precision", "no");
dir->addField("a","a");
p->setTime("1234567890");
TEST_ASSERT(client.writePoint(*dir));
delete dir;
TEST_ASSERT(client.writePoint(*p));

TEST_ASSERT(checkLinesParts(client, 1, 10));

// Test writerecitions + ts
client.setWriteOptions(opts.writePrecision(WritePrecision::S));

dir = new Point("dir");
dir->addTag("direction", "check-precision");
dir->addTag("precision", "s");
dir->addField("a","a");
TEST_ASSERT(client.writePoint(*dir));
TEST_ASSERT(client.writePoint(*p));

TEST_ASSERT(checkLinesParts(client, 1, 10));
//test sending only precision
client.setWriteOptions(opts.useServerTimestamp(true));

TEST_ASSERT(client.writePoint(*dir));
TEST_ASSERT(client.writePoint(*p));
delete dir;
delete p;
TEST_ASSERT(checkLinesParts(client, 1, 9));

TEST_END();
deleteAll(Test::apiUrl);
}
Expand Down Expand Up @@ -2467,7 +2547,7 @@ void Test::testLargeBatch() {
const char *line = "test1,SSID=Bonitoo-ng,deviceId=4288982576 temperature=17,humidity=28i";
uint32_t free = ESP.getFreeHeap();
#if defined(ESP8266)
int batchSize = 330;
int batchSize = 320;
#elif defined(ESP32)
// 2.0.4. introduces a memory hog which causes original 2048 lines cannot be sent
int batchSize = 1950;
Expand Down
1 change: 1 addition & 0 deletions test/Test.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class Test : public TestBase {
static void testOldAPI();
static void testBatch();
static void testLineProtocol();
static void testUseServerTimestamp();
static void testFluxTypes();
static void testFluxTypesSerialization();
static void testFluxParserEmpty();
Expand Down
6 changes: 6 additions & 0 deletions test/server/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,12 @@ app.post(prefix + '/api/v2/write', (req,res) => {
console.log("Set permanentError: " + permanentError);
res.status(permanentError).send("bad request");
break;
case 'check-precision':
const precision = req.query['precision'];
if(precision !== point.tags['precision'] && !(!precision && point.tags['precision']=='no')) {
res.status(400).send("bad precision " + precision);
}
break;
}

}
Expand Down

0 comments on commit d430851

Please sign in to comment.