Skip to content

Commit 03bcdb0

Browse files
authored
implement tree rendering for StreamingTableExec (#15085)
* feat: support tree rendering for StreamingTableExec Signed-off-by: Alan Tang <jmtangcs@gmail.com> * feat: simpler expr for streamingExec Signed-off-by: Alan Tang <jmtangcs@gmail.com> chore: Describe more precisely Signed-off-by: Alan Tang <jmtangcs@gmail.com> --------- Signed-off-by: Alan Tang <jmtangcs@gmail.com>
1 parent f0b86fc commit 03bcdb0

File tree

2 files changed

+237
-4
lines changed

2 files changed

+237
-4
lines changed

datafusion/physical-plan/src/streaming.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -209,8 +209,16 @@ impl DisplayAs for StreamingTableExec {
209209
Ok(())
210210
}
211211
DisplayFormatType::TreeRender => {
212-
// TODO: collect info
213-
write!(f, "")
212+
if self.infinite {
213+
writeln!(f, "infinite={}", self.infinite)?;
214+
}
215+
if let Some(limit) = self.limit {
216+
write!(f, "limit={limit}")?;
217+
} else {
218+
write!(f, "limit=None")?;
219+
}
220+
221+
Ok(())
214222
}
215223
}
216224
}

datafusion/sqllogictest/test_files/explain_tree.slt

Lines changed: 227 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1011,7 +1011,10 @@ physical_plan
10111011
08)└─────────────┬─────────────┘
10121012
09)┌─────────────┴─────────────┐
10131013
10)│ StreamingTableExec │
1014-
11)└───────────────────────────┘
1014+
11)│ -------------------- │
1015+
12)│ infinite: true │
1016+
13)│ limit: None │
1017+
14)└───────────────────────────┘
10151018

10161019
query TT
10171020
EXPLAIN SELECT *
@@ -1035,7 +1038,10 @@ physical_plan
10351038
10)└─────────────┬─────────────┘
10361039
11)┌─────────────┴─────────────┐
10371040
12)│ StreamingTableExec │
1038-
13)└───────────────────────────┘
1041+
13)│ -------------------- │
1042+
14)│ infinite: true │
1043+
15)│ limit: None │
1044+
16)└───────────────────────────┘
10391045

10401046
# Query with hash join.
10411047
query TT
@@ -1271,3 +1277,222 @@ drop table table4;
12711277

12721278
statement ok
12731279
drop table table5;
1280+
1281+
# Test on StreamingTableExec
1282+
# prepare table
1283+
statement ok
1284+
CREATE UNBOUNDED EXTERNAL TABLE data (
1285+
"date" DATE,
1286+
"ticker" VARCHAR,
1287+
"time" TIMESTAMP,
1288+
) STORED AS CSV
1289+
WITH ORDER ("date", "ticker", "time")
1290+
LOCATION './a.parquet';
1291+
1292+
1293+
# query
1294+
query TT
1295+
explain SELECT * FROM data
1296+
WHERE ticker = 'A'
1297+
ORDER BY "date", "time";
1298+
----
1299+
logical_plan
1300+
01)Sort: data.date ASC NULLS LAST, data.time ASC NULLS LAST
1301+
02)--Filter: data.ticker = Utf8("A")
1302+
03)----TableScan: data projection=[date, ticker, time]
1303+
physical_plan
1304+
01)┌───────────────────────────┐
1305+
02)│ SortPreservingMergeExec │
1306+
03)└─────────────┬─────────────┘
1307+
04)┌─────────────┴─────────────┐
1308+
05)│ CoalesceBatchesExec │
1309+
06)└─────────────┬─────────────┘
1310+
07)┌─────────────┴─────────────┐
1311+
08)│ FilterExec │
1312+
09)│ -------------------- │
1313+
10)│ predicate: │
1314+
11)│ ticker@1 = A │
1315+
12)└─────────────┬─────────────┘
1316+
13)┌─────────────┴─────────────┐
1317+
14)│ RepartitionExec │
1318+
15)└─────────────┬─────────────┘
1319+
16)┌─────────────┴─────────────┐
1320+
17)│ StreamingTableExec │
1321+
18)│ -------------------- │
1322+
19)│ infinite: true │
1323+
20)│ limit: None │
1324+
21)└───────────────────────────┘
1325+
1326+
1327+
# constant ticker, CAST(time AS DATE) = time, order by time
1328+
query TT
1329+
explain SELECT * FROM data
1330+
WHERE ticker = 'A' AND CAST(time AS DATE) = date
1331+
ORDER BY "time"
1332+
----
1333+
logical_plan
1334+
01)Sort: data.time ASC NULLS LAST
1335+
02)--Filter: data.ticker = Utf8("A") AND CAST(data.time AS Date32) = data.date
1336+
03)----TableScan: data projection=[date, ticker, time]
1337+
physical_plan
1338+
01)┌───────────────────────────┐
1339+
02)│ SortPreservingMergeExec │
1340+
03)└─────────────┬─────────────┘
1341+
04)┌─────────────┴─────────────┐
1342+
05)│ CoalesceBatchesExec │
1343+
06)└─────────────┬─────────────┘
1344+
07)┌─────────────┴─────────────┐
1345+
08)│ FilterExec │
1346+
09)│ -------------------- │
1347+
10)│ predicate: │
1348+
11)│ ticker@1 = A AND CAST(time│
1349+
12)│ @2 AS Date32) = date@0 │
1350+
13)└─────────────┬─────────────┘
1351+
14)┌─────────────┴─────────────┐
1352+
15)│ RepartitionExec │
1353+
16)└─────────────┬─────────────┘
1354+
17)┌─────────────┴─────────────┐
1355+
18)│ StreamingTableExec │
1356+
19)│ -------------------- │
1357+
20)│ infinite: true │
1358+
21)│ limit: None │
1359+
22)└───────────────────────────┘
1360+
1361+
# same thing but order by date
1362+
query TT
1363+
explain SELECT * FROM data
1364+
WHERE ticker = 'A' AND CAST(time AS DATE) = date
1365+
ORDER BY "date"
1366+
----
1367+
logical_plan
1368+
01)Sort: data.date ASC NULLS LAST
1369+
02)--Filter: data.ticker = Utf8("A") AND CAST(data.time AS Date32) = data.date
1370+
03)----TableScan: data projection=[date, ticker, time]
1371+
physical_plan
1372+
01)┌───────────────────────────┐
1373+
02)│ SortPreservingMergeExec │
1374+
03)└─────────────┬─────────────┘
1375+
04)┌─────────────┴─────────────┐
1376+
05)│ CoalesceBatchesExec │
1377+
06)└─────────────┬─────────────┘
1378+
07)┌─────────────┴─────────────┐
1379+
08)│ FilterExec │
1380+
09)│ -------------------- │
1381+
10)│ predicate: │
1382+
11)│ ticker@1 = A AND CAST(time│
1383+
12)│ @2 AS Date32) = date@0 │
1384+
13)└─────────────┬─────────────┘
1385+
14)┌─────────────┴─────────────┐
1386+
15)│ RepartitionExec │
1387+
16)└─────────────┬─────────────┘
1388+
17)┌─────────────┴─────────────┐
1389+
18)│ StreamingTableExec │
1390+
19)│ -------------------- │
1391+
20)│ infinite: true │
1392+
21)│ limit: None │
1393+
22)└───────────────────────────┘
1394+
1395+
# same thing but order by ticker
1396+
query TT
1397+
explain SELECT * FROM data
1398+
WHERE ticker = 'A' AND CAST(time AS DATE) = date
1399+
ORDER BY "ticker"
1400+
----
1401+
logical_plan
1402+
01)Sort: data.ticker ASC NULLS LAST
1403+
02)--Filter: data.ticker = Utf8("A") AND CAST(data.time AS Date32) = data.date
1404+
03)----TableScan: data projection=[date, ticker, time]
1405+
physical_plan
1406+
01)┌───────────────────────────┐
1407+
02)│ CoalescePartitionsExec │
1408+
03)└─────────────┬─────────────┘
1409+
04)┌─────────────┴─────────────┐
1410+
05)│ CoalesceBatchesExec │
1411+
06)└─────────────┬─────────────┘
1412+
07)┌─────────────┴─────────────┐
1413+
08)│ FilterExec │
1414+
09)│ -------------------- │
1415+
10)│ predicate: │
1416+
11)│ ticker@1 = A AND CAST(time│
1417+
12)│ @2 AS Date32) = date@0 │
1418+
13)└─────────────┬─────────────┘
1419+
14)┌─────────────┴─────────────┐
1420+
15)│ RepartitionExec │
1421+
16)└─────────────┬─────────────┘
1422+
17)┌─────────────┴─────────────┐
1423+
18)│ StreamingTableExec │
1424+
19)│ -------------------- │
1425+
20)│ infinite: true │
1426+
21)│ limit: None │
1427+
22)└───────────────────────────┘
1428+
1429+
1430+
# same thing but order by time, date
1431+
query TT
1432+
explain SELECT * FROM data
1433+
WHERE ticker = 'A' AND CAST(time AS DATE) = date
1434+
ORDER BY "time", "date";
1435+
----
1436+
logical_plan
1437+
01)Sort: data.time ASC NULLS LAST, data.date ASC NULLS LAST
1438+
02)--Filter: data.ticker = Utf8("A") AND CAST(data.time AS Date32) = data.date
1439+
03)----TableScan: data projection=[date, ticker, time]
1440+
physical_plan
1441+
01)┌───────────────────────────┐
1442+
02)│ SortPreservingMergeExec │
1443+
03)└─────────────┬─────────────┘
1444+
04)┌─────────────┴─────────────┐
1445+
05)│ CoalesceBatchesExec │
1446+
06)└─────────────┬─────────────┘
1447+
07)┌─────────────┴─────────────┐
1448+
08)│ FilterExec │
1449+
09)│ -------------------- │
1450+
10)│ predicate: │
1451+
11)│ ticker@1 = A AND CAST(time│
1452+
12)│ @2 AS Date32) = date@0 │
1453+
13)└─────────────┬─────────────┘
1454+
14)┌─────────────┴─────────────┐
1455+
15)│ RepartitionExec │
1456+
16)└─────────────┬─────────────┘
1457+
17)┌─────────────┴─────────────┐
1458+
18)│ StreamingTableExec │
1459+
19)│ -------------------- │
1460+
20)│ infinite: true │
1461+
21)│ limit: None │
1462+
22)└───────────────────────────┘
1463+
1464+
1465+
1466+
1467+
# query
1468+
query TT
1469+
explain SELECT * FROM data
1470+
WHERE date = '2006-01-02'
1471+
ORDER BY "ticker", "time";
1472+
----
1473+
logical_plan
1474+
01)Sort: data.ticker ASC NULLS LAST, data.time ASC NULLS LAST
1475+
02)--Filter: data.date = Date32("2006-01-02")
1476+
03)----TableScan: data projection=[date, ticker, time]
1477+
physical_plan
1478+
01)┌───────────────────────────┐
1479+
02)│ SortPreservingMergeExec │
1480+
03)└─────────────┬─────────────┘
1481+
04)┌─────────────┴─────────────┐
1482+
05)│ CoalesceBatchesExec │
1483+
06)└─────────────┬─────────────┘
1484+
07)┌─────────────┴─────────────┐
1485+
08)│ FilterExec │
1486+
09)│ -------------------- │
1487+
10)│ predicate: │
1488+
11)│ date@0 = 2006-01-02 │
1489+
12)└─────────────┬─────────────┘
1490+
13)┌─────────────┴─────────────┐
1491+
14)│ RepartitionExec │
1492+
15)└─────────────┬─────────────┘
1493+
16)┌─────────────┴─────────────┐
1494+
17)│ StreamingTableExec │
1495+
18)│ -------------------- │
1496+
19)│ infinite: true │
1497+
20)│ limit: None │
1498+
21)└───────────────────────────┘

0 commit comments

Comments
 (0)