-
Notifications
You must be signed in to change notification settings - Fork 189
/
Copy pathQuick.Data.InfluxDB.pas
262 lines (224 loc) · 7.96 KB
/
Quick.Data.InfluxDB.pas
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
{ ***************************************************************************
Copyright (c) 2016-2020 Kike Pérez
Unit : Quick.Data.InfluxDB
Description : InfluxDB data provider
Author : Kike Pérez
Version : 1.0
Created : 05/04/2019
Modified : 21/04/2020
This file is part of QuickLogger: https://github.com/exilon/QuickLogger
***************************************************************************
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*************************************************************************** }
unit Quick.Data.InfluxDB;
{$i QuickLib.inc}
interface
uses
Classes,
SysUtils,
DateUtils,
Quick.Collections,
Quick.HttpClient,
Quick.Commons,
Quick.Value,
Quick.Arrays,
Quick.Data.Custom;
type
TInfluxDBData = class(TDataProvider)
private
fHTTPClient : TJsonHTTPClient;
fURL : string;
fFullURL : string;
fDataBase : string;
fUserName : string;
fPassword : string;
fUserAgent : string;
fTags : TPairArray;
fCreateDataBaseIfNotExists : Boolean;
procedure CreateDataBase;
function GenerateWriteQuery(const aMeasurement : string; aTagPairs : IList<TPair>; aFieldPairs : IList<TFlexPair>; aTime : TDateTime): string;
procedure EscapeData(var aTags : string);
procedure SetWriteURL;
procedure SetPassword(const Value: string);
procedure SetUserName(const Value: string);
procedure Write(const aLine: string); overload;
public
constructor Create; override;
destructor Destroy; override;
property URL : string read fURL write fURL;
property DataBase : string read fDataBase write fDataBase;
property UserName : string read fUserName write SetUserName;
property Password : string read fPassword write SetPassword;
property CreateDataBaseIfNotExists : Boolean read fCreateDataBaseIfNotExists write fCreateDataBaseIfNotExists;
property UserAgent : string read fUserAgent write fUserAgent;
property Tags : TPairArray read fTags write fTags;
procedure Init; override;
procedure Restart; override;
procedure Stop; override;
procedure Write(const aMeasurement : string; aFieldPairs : IList<TFlexPair>; aTime : TDateTime = 0); overload;
procedure Write(const aMeasurement: string; aTagPairs : IList<TPair>; aFieldPairs: IList<TFlexPair>; aTime: TDateTime); overload;
procedure Write(const aMeasurement: string; const aFieldKey : string; aFieldValue : TFlexValue; aTime: TDateTime); overload;
end;
EInfluxDBData = class(Exception);
implementation
constructor TInfluxDBData.Create;
begin
inherited;
fURL := 'http://localhost:8086';
fDataBase := 'db';
fUserName := '';
fPassword := '';
fCreateDataBaseIfNotExists := True;
OutputOptions.UseUTCTime := True;
fUserAgent := DEF_USER_AGENT;
end;
destructor TInfluxDBData.Destroy;
begin
if Assigned(fHTTPClient) then fHTTPClient.Free;
inherited;
end;
procedure TInfluxDBData.Init;
begin
if fInitiated then Stop;
SetWriteURL;
fHTTPClient := TJsonHTTPClient.Create;
fHTTPClient.ContentType := 'application/json';
fHTTPClient.UserAgent := fUserAgent;
fHTTPClient.HandleRedirects := True;
if fCreateDataBaseIfNotExists then CreateDataBase;
inherited;
end;
procedure TInfluxDBData.Restart;
begin
Stop;
if Assigned(fHTTPClient) then FreeAndNil(fHTTPClient);
Init;
end;
procedure TInfluxDBData.SetPassword(const Value: string);
begin
if fPassword <> Value then
begin
fPassword := Value;
SetWriteURL;
end;
end;
procedure TInfluxDBData.SetWriteURL;
begin
if fUserName+fPassword <> '' then fFullURL := Format('%s/write?db=%s&u=%s&p=%s&precision=ms',[fURL,fDataBase,fUserName,fPassword])
else fFullURL := Format('%s/write?db=%s&precision=ms',[fURL,fDataBase]);
end;
procedure TInfluxDBData.Stop;
begin
inherited;
if Assigned(fHTTPClient) then FreeAndNil(fHTTPClient);
end;
procedure TInfluxDBData.Write(const aMeasurement: string; const aFieldKey : string; aFieldValue : TFlexValue; aTime: TDateTime);
var
fields : IList<TFlexPair>;
begin
fields := TxList<TFlexPair>.Create;
fields.Add(TFlexPair.Create(aFieldKey,aFieldValue));
if atime <> 0 then Write(GenerateWriteQuery(aMeasurement,nil,fields,aTime))
else Write(GenerateWriteQuery(aMeasurement,nil,fields,Now()));
end;
procedure TInfluxDBData.Write(const aMeasurement: string; aTagPairs : IList<TPair>; aFieldPairs: IList<TFlexPair>; aTime: TDateTime);
begin
if atime <> 0 then Write(GenerateWriteQuery(aMeasurement,aTagPairs,aFieldPairs,aTime))
else Write(GenerateWriteQuery(aMeasurement,aTagPairs,aFieldPairs,Now()));
end;
procedure TInfluxDBData.Write(const aMeasurement: string; aFieldPairs: IList<TFlexPair>; aTime: TDateTime);
begin
if atime <> 0 then Write(GenerateWriteQuery(aMeasurement,nil,aFieldPairs,aTime))
else Write(GenerateWriteQuery(aMeasurement,nil,aFieldPairs,Now()));
end;
procedure TInfluxDBData.SetUserName(const Value: string);
begin
if fUserName <> Value then
begin
fUserName := Value;
SetWriteURL;
end;
end;
procedure TInfluxDBData.CreateDataBase;
var
resp : IHttpRequestResponse;
begin
try
resp := fHTTPClient.Post(Format('%s/query?q=CREATE DATABASE %s',[fURL,fDatabase]),'');
except
on E : Exception do raise EInfluxDBData.CreateFmt('[TInfluxDBData] Creating DB: %s',[e.Message]);
end;
if not (resp.StatusCode in [200,204]) then
raise EInfluxDBData.Create(Format('[TInfluxDBData] : Response %d : %s trying to create database',[resp.StatusCode,resp.StatusText]));
end;
procedure TInfluxDBData.EscapeData(var aTags : string);
begin
aTags := StringReplace(aTags,' ','\ ',[rfReplaceAll]);
end;
function TInfluxDBData.GenerateWriteQuery(const aMeasurement : string; aTagPairs : IList<TPair>; aFieldPairs : IList<TFlexPair>; aTime : TDateTime): string;
var
incinfo : TStringList;
tags : string;
fields : string;
tagpair : TPair;
flexpair : TFlexPair;
begin
incinfo := TStringList.Create;
try
//add global tags
for tagpair in fTags do
begin
if not tagpair.Value.IsEmpty then incinfo.Add(Format('%s=%s',[tagpair.Name,tagpair.Value]));
end;
//add current query tags
if aTagPairs <> nil then
begin
for tagpair in aTagPairs do
begin
if not tagpair.Value.IsEmpty then incinfo.Add(Format('%s=%s',[tagpair.Name,tagpair.Value]));
end;
end;
tags := CommaText(incinfo);
EscapeData(tags);
incinfo.Clear;
for flexpair in aFieldPairs do
begin
if flexpair.Value.IsInteger then incinfo.Add(Format('%s=%d',[flexpair.Name,flexpair.Value.AsInt64]))
else if flexpair.Value.IsFloating then incinfo.Add(Format('%s=%f',[flexpair.Name,flexpair.Value.AsExtended]))
else incinfo.Add(Format('%s="%s"',[flexpair.Name,flexpair.Value.AsString]));
end;
fields := CommaText(incinfo);
Result := Format('%s,%s %s %d',[aMeasurement,tags,fields,DateTimeToUnix(LocalTimeToUTC(aTime){$IFNDEF FPC},True{$ENDIF})*1000]);
finally
incinfo.Free;
end;
end;
procedure TInfluxDBData.Write(const aLine : string);
var
resp : IHttpRequestResponse;
stream : TStringStream;
begin
if not fInitiated then Init;
stream := TStringStream.Create(aLine,TEncoding.UTF8);
var a := aline;
try
try
resp := fHTTPClient.Post(fFullURL,stream);
except
on E : Exception do raise EInfluxDBData.CreateFmt('[TInfluxDBData] Write Error: %s',[e.Message]);
end;
finally
stream.Free;
end;
if not (resp.StatusCode in [200,204]) then
raise EInfluxDBData.Create(Format('[TInfluxDBData] : Response %d : %s trying to post event',[resp.StatusCode,resp.StatusText]));
end;
end.