Skip to content

Commit 6e2c057

Browse files
committed
Fix RpcDataSet deadlock, timezone compatibility, and column index issues
- Add async NextAsync()/FetchResultsAsync() to RpcDataSet, mark sync Next() as obsolete - Add HasNextAsync() to SessionDataSet for non-blocking iteration - Fix TimeZoneNotFoundException by adding IANA-to-Windows timezone mapping - Fix GetDateByTsBlockColumnIndex/GetIntByTsBlockColumnIndex for Time column (index -1) - Fix RowRecord obsolete constructor usage in RpcDataSet.GetRow() and IoTDBCommand - Migrate all samples from deprecated SessionPool(host,port,poolSize) to Builder pattern - Migrate all samples from deprecated RowRecord 3-arg constructor to 4-arg with TSDataType
1 parent 6f451bd commit 6e2c057

16 files changed

+416
-141
lines changed

Apache-IoTDB-Client-CSharp-UserCase/Program.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ static async Task ExecuteQueryStatement()
9999
await session_pool.Open(false);
100100
var res = await session_pool.ExecuteQueryStatementAsync("select * from root.ln.wf01.wt01");
101101
res.ShowTableNames();
102-
while (res.HasNext())
102+
while (await res.HasNextAsync())
103103
{
104104
Console.WriteLine(res.Next());
105105
}

samples/Apache.IoTDB.Samples/SessionPoolTest.AlignedRecord.cs

Lines changed: 54 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,11 @@ public partial class SessionPoolTest
2828
{
2929
public async Task TestInsertAlignedRecord()
3030
{
31-
var session_pool = new SessionPool(host, port, poolSize);
31+
var session_pool = new SessionPool.Builder()
32+
.SetHost(host)
33+
.SetPort(port)
34+
.SetPoolSize(poolSize)
35+
.Build();
3236
int status;
3337
await session_pool.Open(false);
3438
if (debug) session_pool.OpenDebugMode();
@@ -51,7 +55,7 @@ public async Task TestInsertAlignedRecord()
5155
var start_ms = DateTime.Now.Ticks / 10000;
5256
for (var timestamp = 1; timestamp <= fetchSize * processedSize; timestamp++)
5357
{
54-
var rowRecord = new RowRecord(timestamp, values, measures);
58+
var rowRecord = new RowRecord(timestamp, values, measures, new List<TSDataType> { TSDataType.TEXT, TSDataType.BOOLEAN, TSDataType.INT32 });
5559
var task = session_pool.InsertAlignedRecordAsync(
5660
string.Format("{0}.{1}", testDatabaseName, testDevice), rowRecord);
5761
tasks.Add(task);
@@ -65,7 +69,11 @@ public async Task TestInsertAlignedRecord()
6569
}
6670
public async Task TestInsertAlignedStringRecord()
6771
{
68-
var session_pool = new SessionPool(host, port, poolSize);
72+
var session_pool = new SessionPool.Builder()
73+
.SetHost(host)
74+
.SetPort(port)
75+
.SetPoolSize(poolSize)
76+
.Build();
6977
var status = 0;
7078
await session_pool.Open(false);
7179
if (debug) session_pool.OpenDebugMode();
@@ -98,7 +106,7 @@ public async Task TestInsertAlignedStringRecord()
98106
Console.WriteLine(string.Format("total insert aligned string record time is {0}", end_ms - start_ms));
99107
var res = await session_pool.ExecuteQueryStatementAsync("select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice));
100108
var res_cnt = 0;
101-
while (res.HasNext())
109+
while (await res.HasNextAsync())
102110
{
103111
res_cnt++;
104112
res.Next();
@@ -111,7 +119,11 @@ public async Task TestInsertAlignedStringRecord()
111119
}
112120
public async Task TestInsertAlignedRecords()
113121
{
114-
var session_pool = new SessionPool(host, port, poolSize);
122+
var session_pool = new SessionPool.Builder()
123+
.SetHost(host)
124+
.SetPort(port)
125+
.SetPoolSize(poolSize)
126+
.Build();
115127
await session_pool.Open(false);
116128
if (debug) session_pool.OpenDebugMode();
117129

@@ -168,6 +180,10 @@ public async Task TestInsertAlignedRecords()
168180
testMeasurements[5],
169181
testMeasurements[6]
170182
});
183+
var dataTypes_lst = new List<List<TSDataType>>() { };
184+
dataTypes_lst.Add(new List<TSDataType>() { TSDataType.BOOLEAN, TSDataType.INT32 });
185+
dataTypes_lst.Add(new List<TSDataType>() { TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64, TSDataType.DOUBLE });
186+
dataTypes_lst.Add(new List<TSDataType>() { TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64, TSDataType.DOUBLE, TSDataType.FLOAT, TSDataType.TEXT });
171187
var values_lst = new List<List<object>>() { };
172188
values_lst.Add(new List<object>() { true, (int)123 });
173189
values_lst.Add(new List<object>() { true, (int)123, (long)456, (double)1.1 });
@@ -177,15 +193,15 @@ public async Task TestInsertAlignedRecords()
177193
var rowRecords = new List<RowRecord>() { };
178194
for (var i = 0; i < 3; i++)
179195
{
180-
var rowRecord = new RowRecord(timestamp_lst[i], values_lst[i], measurements_lst[i]);
196+
var rowRecord = new RowRecord(timestamp_lst[i], values_lst[i], measurements_lst[i], dataTypes_lst[i]);
181197
rowRecords.Add(rowRecord);
182198
}
183199

184200
status = await session_pool.InsertAlignedRecordsAsync(device_id, rowRecords);
185201
System.Diagnostics.Debug.Assert(status == 0);
186202
var res = await session_pool.ExecuteQueryStatementAsync(
187203
"select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice) + " where time<10");
188-
SessionPoolTest.PrintDataSetByString(res);
204+
await SessionPoolTest.PrintDataSetByString(res);
189205
Console.WriteLine(rowRecords);
190206

191207
System.Diagnostics.Debug.Assert(true);
@@ -201,7 +217,8 @@ public async Task TestInsertAlignedRecords()
201217
{
202218
device_id.Add(string.Format("{0}.{1}", testDatabaseName, testDevice));
203219
rowRecords.Add(new RowRecord(timestamp, new List<object>() { true, (int)123 },
204-
new List<string>() { testMeasurements[1], testMeasurements[2] }));
220+
new List<string>() { testMeasurements[1], testMeasurements[2] },
221+
new List<TSDataType>() { TSDataType.BOOLEAN, TSDataType.INT32 }));
205222
if (timestamp % fetchSize == 0)
206223
{
207224
tasks.Add(session_pool.InsertAlignedRecordsAsync(device_id, rowRecords));
@@ -216,7 +233,7 @@ public async Task TestInsertAlignedRecords()
216233
res.ShowTableNames();
217234
var record_count = fetchSize * processedSize;
218235
var res_count = 0;
219-
while (res.HasNext())
236+
while (await res.HasNextAsync())
220237
{
221238
res_count += 1;
222239
Console.WriteLine(res.Next());
@@ -234,7 +251,11 @@ public async Task TestInsertAlignedRecords()
234251
}
235252
public async Task TestInsertAlignedStringRecords()
236253
{
237-
var session_pool = new SessionPool(host, port, poolSize);
254+
var session_pool = new SessionPool.Builder()
255+
.SetHost(host)
256+
.SetPort(port)
257+
.SetPoolSize(poolSize)
258+
.Build();
238259
await session_pool.Open(false);
239260
if (debug) session_pool.OpenDebugMode();
240261

@@ -267,7 +288,7 @@ public async Task TestInsertAlignedStringRecords()
267288
System.Diagnostics.Debug.Assert(status == 0);
268289
var res = await session_pool.ExecuteQueryStatementAsync(
269290
"select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice) + " where time<10");
270-
SessionPoolTest.PrintDataSetByString(res);
291+
await SessionPoolTest.PrintDataSetByString(res);
271292

272293
await res.Close();
273294

@@ -298,7 +319,7 @@ public async Task TestInsertAlignedStringRecords()
298319
"select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice));
299320
res.ShowTableNames();
300321
var res_count = 0;
301-
while (res.HasNext())
322+
while (await res.HasNextAsync())
302323
{
303324
Console.WriteLine(res.Next());
304325
res_count += 1;
@@ -314,7 +335,11 @@ public async Task TestInsertAlignedStringRecords()
314335
}
315336
public async Task TestInsertAlignedRecordsOfOneDevice()
316337
{
317-
var session_pool = new SessionPool(host, port, poolSize);
338+
var session_pool = new SessionPool.Builder()
339+
.SetHost(host)
340+
.SetPort(port)
341+
.SetPoolSize(poolSize)
342+
.Build();
318343
await session_pool.Open(false);
319344
if (debug) session_pool.OpenDebugMode();
320345

@@ -377,25 +402,30 @@ public async Task TestInsertAlignedRecordsOfOneDevice()
377402
values_lst.Add(new List<object>()
378403
{true, (int) 123, (long) 456, (double) 1.1, (float) 10001.1, "test_record"});
379404
var timestamp_lst = new List<long>() { 1, 2, 3 };
405+
var dataTypes_lst = new List<List<TSDataType>>() { };
406+
dataTypes_lst.Add(new List<TSDataType>() { TSDataType.BOOLEAN, TSDataType.INT32 });
407+
dataTypes_lst.Add(new List<TSDataType>() { TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64, TSDataType.DOUBLE });
408+
dataTypes_lst.Add(new List<TSDataType>() { TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64, TSDataType.DOUBLE, TSDataType.FLOAT, TSDataType.TEXT });
380409
var rowRecords = new List<RowRecord>() { };
381410
for (var i = 0; i < 3; i++)
382411
{
383-
var rowRecord = new RowRecord(timestamp_lst[i], values_lst[i], measurements_lst[i]);
412+
var rowRecord = new RowRecord(timestamp_lst[i], values_lst[i], measurements_lst[i], dataTypes_lst[i]);
384413
rowRecords.Add(rowRecord);
385414
}
386415
status = await session_pool.InsertAlignedRecordsOfOneDeviceAsync(device_id, rowRecords);
387416
System.Diagnostics.Debug.Assert(status == 0);
388417
var res = await session_pool.ExecuteQueryStatementAsync(
389418
"select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice) + " where time<10");
390-
SessionPoolTest.PrintDataSetByString(res);
419+
await SessionPoolTest.PrintDataSetByString(res);
391420

392421
await res.Close();
393422
rowRecords = new List<RowRecord>() { };
394423
var tasks = new List<Task<int>>();
395424
for (var timestamp = 4; timestamp <= fetchSize * processedSize; timestamp++)
396425
{
397426
rowRecords.Add(new RowRecord(timestamp, new List<object>() { true, (int)123 },
398-
new List<string>() { testMeasurements[1], testMeasurements[2] }));
427+
new List<string>() { testMeasurements[1], testMeasurements[2] },
428+
new List<TSDataType>() { TSDataType.BOOLEAN, TSDataType.INT32 }));
399429
if (timestamp % fetchSize == 0)
400430
{
401431
tasks.Add(session_pool.InsertAlignedRecordsOfOneDeviceAsync(device_id, rowRecords));
@@ -407,7 +437,7 @@ public async Task TestInsertAlignedRecordsOfOneDevice()
407437
res = await session_pool.ExecuteQueryStatementAsync(
408438
"select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice));
409439
var res_count = 0;
410-
while (res.HasNext())
440+
while (await res.HasNextAsync())
411441
{
412442
res_count += 1;
413443
res.Next();
@@ -423,7 +453,11 @@ public async Task TestInsertAlignedRecordsOfOneDevice()
423453
}
424454
public async Task TestInsertAlignedStringRecordsOfOneDevice()
425455
{
426-
var session_pool = new SessionPool(host, port, poolSize);
456+
var session_pool = new SessionPool.Builder()
457+
.SetHost(host)
458+
.SetPort(port)
459+
.SetPoolSize(poolSize)
460+
.Build();
427461
await session_pool.Open(false);
428462
if (debug) session_pool.OpenDebugMode();
429463

@@ -454,7 +488,7 @@ public async Task TestInsertAlignedStringRecordsOfOneDevice()
454488
System.Diagnostics.Debug.Assert(status == 0);
455489
var res = await session_pool.ExecuteQueryStatementAsync(
456490
"select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice) + " where time<10");
457-
SessionPoolTest.PrintDataSetByString(res);
491+
await SessionPoolTest.PrintDataSetByString(res);
458492

459493
await res.Close();
460494
// large data test
@@ -480,7 +514,7 @@ public async Task TestInsertAlignedStringRecordsOfOneDevice()
480514
res = await session_pool.ExecuteQueryStatementAsync(
481515
"select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice));
482516
var res_count = 0;
483-
while (res.HasNext())
517+
while (await res.HasNextAsync())
484518
{
485519
res_count += 1;
486520
res.Next();

samples/Apache.IoTDB.Samples/SessionPoolTest.AlignedTablet.cs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,11 @@ public partial class SessionPoolTest
2929
{
3030
public async Task TestInsertAlignedTablet()
3131
{
32-
var session_pool = new SessionPool(host, port, poolSize);
32+
var session_pool = new SessionPool.Builder()
33+
.SetHost(host)
34+
.SetPort(port)
35+
.SetPoolSize(poolSize)
36+
.Build();
3337
var status = 0;
3438
await session_pool.Open(false);
3539
if (debug) session_pool.OpenDebugMode();
@@ -54,7 +58,7 @@ public async Task TestInsertAlignedTablet()
5458
System.Diagnostics.Debug.Assert(status == 0);
5559
var res = await session_pool.ExecuteQueryStatementAsync(
5660
"select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice) + " where time<15");
57-
SessionPoolTest.PrintDataSetByString(res);
61+
await SessionPoolTest.PrintDataSetByString(res);
5862

5963
await res.Close();
6064
// large data test
@@ -83,7 +87,7 @@ public async Task TestInsertAlignedTablet()
8387
"select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice));
8488
res.ShowTableNames();
8589
var res_count = 0;
86-
while (res.HasNext())
90+
while (await res.HasNextAsync())
8791
{
8892
res_count += 1;
8993
res.Next();
@@ -100,7 +104,11 @@ public async Task TestInsertAlignedTablet()
100104

101105
public async Task TestInsertAlignedTablets()
102106
{
103-
var session_pool = new SessionPool(host, port, poolSize);
107+
var session_pool = new SessionPool.Builder()
108+
.SetHost(host)
109+
.SetPort(port)
110+
.SetPoolSize(poolSize)
111+
.Build();
104112
var status = 0;
105113
await session_pool.Open(false);
106114
if (debug) session_pool.OpenDebugMode();
@@ -148,7 +156,7 @@ public async Task TestInsertAlignedTablets()
148156
System.Diagnostics.Debug.Assert(status == 0);
149157
var res = await session_pool.ExecuteQueryStatementAsync(
150158
"select * from " + string.Format("{0}.{1}", testDatabaseName, testDevices[1]) + " where time<15");
151-
SessionPoolTest.PrintDataSetByString(res);
159+
await SessionPoolTest.PrintDataSetByString(res);
152160

153161
// large data test
154162
var tasks = new List<Task<int>>();
@@ -175,7 +183,7 @@ public async Task TestInsertAlignedTablets()
175183
"select * from " + string.Format("{0}.{1}", testDatabaseName, testDevices[1]));
176184
res.ShowTableNames();
177185
var res_count = 0;
178-
while (res.HasNext())
186+
while (await res.HasNextAsync())
179187
{
180188
res_count += 1;
181189
res.Next();

0 commit comments

Comments
 (0)